1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6 SupportedFunctionReturnValue,
7 component_id::InputContentDigest,
8 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool,
14 DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15 ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest,
16 JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse,
17 Locked, LockedBy, LockedExecution, Pagination, PendingState, PendingStateFinished,
18 PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
19 VersionType,
20 },
21};
22use conversions::{JsonWrapper, consistency_db_err, consistency_rusqlite};
23use hashbrown::HashMap;
24use rusqlite::{
25 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
26 TransactionBehavior, named_params, types::ToSqlOutput,
27};
28use std::{
29 cmp::max,
30 collections::VecDeque,
31 fmt::Debug,
32 ops::DerefMut,
33 path::Path,
34 str::FromStr,
35 sync::{
36 Arc, Mutex,
37 atomic::{AtomicBool, Ordering},
38 },
39 time::Duration,
40};
41use std::{fmt::Write as _, pin::Pin};
42use tokio::{
43 sync::{mpsc, oneshot},
44 time::Instant,
45};
46use tracing::{Level, debug, error, info, instrument, trace, warn};
47
48#[derive(Debug, thiserror::Error)]
49#[error("initialization error")]
50pub struct InitializationError;
51
52#[derive(Debug, Clone)]
53struct DelayReq {
54 join_set_id: JoinSetId,
55 delay_id: DelayId,
56 expires_at: DateTime<Utc>,
57}
58const PRAGMA: [[&str; 2]; 10] = [
70 ["journal_mode", "wal"],
71 ["synchronous", "FULL"],
72 ["foreign_keys", "true"],
73 ["busy_timeout", "1000"],
74 ["cache_size", "10000"], ["temp_store", "MEMORY"],
76 ["page_size", "8192"], ["mmap_size", "134217728"],
78 ["journal_size_limit", "67108864"],
79 ["integrity_check", ""],
80];
81
82const CREATE_TABLE_T_METADATA: &str = r"
84CREATE TABLE IF NOT EXISTS t_metadata (
85 id INTEGER PRIMARY KEY AUTOINCREMENT,
86 schema_version INTEGER NOT NULL,
87 created_at TEXT NOT NULL
88) STRICT
89";
90const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 4;
91
92const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
94CREATE TABLE IF NOT EXISTS t_execution_log (
95 execution_id TEXT NOT NULL,
96 created_at TEXT NOT NULL,
97 json_value TEXT NOT NULL,
98 version INTEGER NOT NULL,
99 variant TEXT NOT NULL,
100 join_set_id TEXT,
101 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
102 PRIMARY KEY (execution_id, version)
103) STRICT
104";
105const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
107CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
108";
109const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
111CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
112";
113
114const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
116 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type=\"{}\";",
117 HISTORY_EVENT_TYPE_JOIN_NEXT
118);
119
120const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
125CREATE TABLE IF NOT EXISTS t_join_set_response (
126 id INTEGER PRIMARY KEY AUTOINCREMENT,
127 created_at TEXT NOT NULL,
128 execution_id TEXT NOT NULL,
129 join_set_id TEXT NOT NULL,
130
131 delay_id TEXT,
132 delay_success INTEGER,
133
134 child_execution_id TEXT,
135 finished_version INTEGER,
136
137 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
138) STRICT
139";
140const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
142CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
143";
144const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
146CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
147ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
148";
149const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
151CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
152ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
153";
154
155const CREATE_TABLE_T_STATE: &str = r"
165CREATE TABLE IF NOT EXISTS t_state (
166 execution_id TEXT NOT NULL,
167 is_top_level INTEGER NOT NULL,
168 corresponding_version INTEGER NOT NULL,
169 ffqn TEXT NOT NULL,
170 created_at TEXT NOT NULL,
171 component_id_input_digest BLOB NOT NULL,
172 first_scheduled_at TEXT NOT NULL,
173
174 pending_expires_finished TEXT NOT NULL,
175 state TEXT NOT NULL,
176 updated_at TEXT NOT NULL,
177 intermittent_event_count INTEGER NOT NULL,
178
179 max_retries INTEGER,
180 retry_exp_backoff_millis INTEGER,
181 last_lock_version INTEGER,
182 executor_id TEXT,
183 run_id TEXT,
184
185 join_set_id TEXT,
186 join_set_closing INTEGER,
187
188 result_kind TEXT,
189
190 PRIMARY KEY (execution_id)
191) STRICT
192";
193const STATE_PENDING_AT: &str = "PendingAt";
194const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
195const STATE_LOCKED: &str = "Locked";
196const STATE_FINISHED: &str = "Finished";
197const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
198
199const IDX_T_STATE_LOCK_PENDING: &str = r"
201CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
202";
203const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
204CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
205";
206const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
208";
209const IDX_T_STATE_FFQN: &str = r"
211CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
212";
213const IDX_T_STATE_CREATED_AT: &str = r"
215CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
216";
217
218const CREATE_TABLE_T_DELAY: &str = r"
220CREATE TABLE IF NOT EXISTS t_delay (
221 execution_id TEXT NOT NULL,
222 join_set_id TEXT NOT NULL,
223 delay_id TEXT NOT NULL,
224 expires_at TEXT NOT NULL,
225 PRIMARY KEY (execution_id, join_set_id, delay_id)
226) STRICT
227";
228
229const CREATE_TABLE_T_BACKTRACE: &str = r"
231CREATE TABLE IF NOT EXISTS t_backtrace (
232 execution_id TEXT NOT NULL,
233 component_id TEXT NOT NULL,
234 version_min_including INTEGER NOT NULL,
235 version_max_excluding INTEGER NOT NULL,
236 wasm_backtrace TEXT NOT NULL,
237 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
238) STRICT
239";
240const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
242CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
243";
244
245#[derive(Debug, thiserror::Error, Clone)]
246enum RusqliteError {
247 #[error("not found")]
248 NotFound,
249 #[error("generic: {0}")]
250 Generic(StrVariant),
251}
252
253mod conversions {
254
255 use super::RusqliteError;
256 use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
257 use rusqlite::{
258 ToSql,
259 types::{FromSql, FromSqlError},
260 };
261 use std::fmt::Debug;
262 use tracing::error;
263
264 impl From<rusqlite::Error> for RusqliteError {
265 fn from(err: rusqlite::Error) -> Self {
266 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
267 RusqliteError::NotFound
268 } else {
269 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
270 RusqliteError::Generic(err.to_string().into())
271 }
272 }
273 }
274
275 impl From<RusqliteError> for DbErrorGeneric {
276 fn from(err: RusqliteError) -> DbErrorGeneric {
277 match err {
278 RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
279 RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
280 }
281 }
282 }
283 impl From<RusqliteError> for DbErrorRead {
284 fn from(err: RusqliteError) -> Self {
285 if matches!(err, RusqliteError::NotFound) {
286 Self::NotFound
287 } else {
288 Self::from(DbErrorGeneric::from(err))
289 }
290 }
291 }
292 impl From<RusqliteError> for DbErrorReadWithTimeout {
293 fn from(err: RusqliteError) -> Self {
294 Self::from(DbErrorRead::from(err))
295 }
296 }
297 impl From<RusqliteError> for DbErrorWrite {
298 fn from(err: RusqliteError) -> Self {
299 if matches!(err, RusqliteError::NotFound) {
300 Self::NotFound
301 } else {
302 Self::from(DbErrorGeneric::from(err))
303 }
304 }
305 }
306
307 pub(crate) struct JsonWrapper<T>(pub(crate) T);
308 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
309 fn column_result(
310 value: rusqlite::types::ValueRef<'_>,
311 ) -> rusqlite::types::FromSqlResult<Self> {
312 let value = match value {
313 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
314 Ok(value)
315 }
316 other => {
317 error!(
318 backtrace = %std::backtrace::Backtrace::capture(),
319 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
320 );
321 Err(FromSqlError::InvalidType)
322 }
323 }?;
324 let value = serde_json::from_slice::<T>(value).map_err(|err| {
325 error!(
326 backtrace = %std::backtrace::Backtrace::capture(),
327 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
328 r#type = std::any::type_name::<T>()
329 );
330 FromSqlError::InvalidType
331 })?;
332 Ok(Self(value))
333 }
334 }
335 impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
336 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
337 let string = serde_json::to_string(&self.0).map_err(|err| {
338 error!(
339 "Cannot serialize {value:?} of type `{type}` - {err:?}",
340 value = self.0,
341 r#type = std::any::type_name::<T>()
342 );
343 rusqlite::Error::ToSqlConversionFailure(Box::new(err))
344 })?;
345 Ok(rusqlite::types::ToSqlOutput::Owned(
346 rusqlite::types::Value::Text(string),
347 ))
348 }
349 }
350
351 #[derive(Debug, thiserror::Error)]
353 #[error("{0}")]
354 pub(crate) struct OtherError(&'static str);
355 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
356 FromSqlError::other(OtherError(input)).into()
357 }
358
359 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
360 DbErrorGeneric::Uncategorized(input.into())
361 }
362}
363
364#[derive(Debug)]
365struct CombinedStateDTO {
366 state: String,
367 ffqn: String,
368 component_id_input_digest: InputContentDigest,
369 pending_expires_finished: DateTime<Utc>,
370 last_lock_version: Option<Version>,
372 executor_id: Option<ExecutorId>,
373 run_id: Option<RunId>,
374 join_set_id: Option<JoinSetId>,
376 join_set_closing: Option<bool>,
377 result_kind: Option<PendingStateFinishedResultKind>,
379}
380#[derive(Debug)]
381struct CombinedState {
382 ffqn: FunctionFqn,
383 pending_state: PendingState,
384 corresponding_version: Version,
385}
386impl CombinedState {
387 fn get_next_version_assert_not_finished(&self) -> Version {
388 assert!(!self.pending_state.is_finished());
389 self.corresponding_version.increment()
390 }
391
392 #[cfg(feature = "test")]
393 fn get_next_version_or_finished(&self) -> Version {
394 if self.pending_state.is_finished() {
395 self.corresponding_version.clone()
396 } else {
397 self.corresponding_version.increment()
398 }
399 }
400}
401
402#[derive(Debug)]
403struct NotifierPendingAt {
404 scheduled_at: DateTime<Utc>,
405 ffqn: FunctionFqn,
406 component_input_digest: InputContentDigest,
407}
408
409#[derive(Debug)]
410struct NotifierExecutionFinished {
411 execution_id: ExecutionId,
412 retval: SupportedFunctionReturnValue,
413}
414
415#[derive(Debug, Default)]
416struct AppendNotifier {
417 pending_at: Option<NotifierPendingAt>,
418 execution_finished: Option<NotifierExecutionFinished>,
419 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
420}
421
422impl CombinedState {
423 fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, rusqlite::Error> {
424 let ffqn = FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
425 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
426 consistency_rusqlite("invalid ffqn value in `t_state`")
427 })?;
428 let pending_state = match dto {
429 CombinedStateDTO {
431 state,
432 ffqn: _,
433 component_id_input_digest,
434 pending_expires_finished: scheduled_at,
435 last_lock_version: None,
436 executor_id: None,
437 run_id: None,
438 join_set_id: None,
439 join_set_closing: None,
440 result_kind: None,
441 } if state == STATE_PENDING_AT => PendingState::PendingAt {
442 scheduled_at,
443 last_lock: None,
444 component_id_input_digest,
445 },
446 CombinedStateDTO {
448 state,
449 ffqn: _,
450 component_id_input_digest,
451 pending_expires_finished: scheduled_at,
452 last_lock_version: None,
453 executor_id: Some(executor_id),
454 run_id: Some(run_id),
455 join_set_id: None,
456 join_set_closing: None,
457 result_kind: None,
458 } if state == STATE_PENDING_AT => PendingState::PendingAt {
459 scheduled_at,
460 last_lock: Some(LockedBy {
461 executor_id,
462 run_id,
463 }),
464 component_id_input_digest,
465 },
466 CombinedStateDTO {
467 state,
468 ffqn: _,
469 component_id_input_digest,
470 pending_expires_finished: lock_expires_at,
471 last_lock_version: Some(_),
472 executor_id: Some(executor_id),
473 run_id: Some(run_id),
474 join_set_id: None,
475 join_set_closing: None,
476 result_kind: None,
477 } if state == STATE_LOCKED => PendingState::Locked(PendingStateLocked {
478 locked_by: LockedBy {
479 executor_id,
480 run_id,
481 },
482 lock_expires_at,
483 component_id_input_digest,
484 }),
485 CombinedStateDTO {
486 state,
487 ffqn: _,
488 component_id_input_digest,
489 pending_expires_finished: lock_expires_at,
490 last_lock_version: None,
491 executor_id: _,
492 run_id: _,
493 join_set_id: Some(join_set_id),
494 join_set_closing: Some(join_set_closing),
495 result_kind: None,
496 } if state == STATE_BLOCKED_BY_JOIN_SET => PendingState::BlockedByJoinSet {
497 join_set_id: join_set_id.clone(),
498 closing: join_set_closing,
499 lock_expires_at,
500 component_id_input_digest,
501 },
502 CombinedStateDTO {
503 state,
504 ffqn: _,
505 component_id_input_digest,
506 pending_expires_finished: finished_at,
507 last_lock_version: None,
508 executor_id: None,
509 run_id: None,
510 join_set_id: None,
511 join_set_closing: None,
512 result_kind: Some(result_kind),
513 } if state == STATE_FINISHED => PendingState::Finished {
514 finished: PendingStateFinished {
515 finished_at,
516 version: corresponding_version.0,
517 result_kind,
518 },
519 component_id_input_digest,
520 },
521
522 _ => {
523 error!("Cannot deserialize pending state from {dto:?}");
524 return Err(consistency_rusqlite("invalid `t_state`"));
525 }
526 };
527 Ok(Self {
528 ffqn,
529 pending_state,
530 corresponding_version,
531 })
532 }
533}
534
535#[derive(derive_more::Debug)]
536struct LogicalTx {
537 #[debug(skip)]
538 func: Box<dyn FnMut(&mut Transaction) + Send>,
539 sent_at: Instant,
540 func_name: &'static str,
541 #[debug(skip)]
542 commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
543}
544
545#[derive(derive_more::Debug)]
546enum ThreadCommand {
547 LogicalTx(LogicalTx),
548 Shutdown,
549}
550
551#[derive(Clone)]
552pub struct SqlitePool(SqlitePoolInner);
553
554type ResponseSubscribers =
555 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
556type PendingSubscribers = Arc<Mutex<PendingFfqnSubscribersHolder>>;
557type ExecutionFinishedSubscribers =
558 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
559
560#[derive(Default)]
561struct PendingFfqnSubscribersHolder {
562 by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
563 by_component: HashMap<InputContentDigest , (mpsc::Sender<()>, u64)>,
564}
565impl PendingFfqnSubscribersHolder {
566 fn notify(&self, notifier: &NotifierPendingAt) {
567 if let Some((subscription, _)) = self.by_ffqns.get(¬ifier.ffqn) {
568 debug!("Notifying pending subscriber by ffqn");
569 let _ = subscription.try_send(());
571 }
572 if let Some((subscription, _)) = self.by_component.get(¬ifier.component_input_digest) {
573 debug!("Notifying pending subscriber by component");
574 let _ = subscription.try_send(());
576 }
577 }
578
579 fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
580 self.by_ffqns.insert(ffqn, value);
581 }
582
583 fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
584 self.by_ffqns.remove(ffqn)
585 }
586
587 fn insert_by_component(
588 &mut self,
589 input_content_digest: InputContentDigest,
590 value: (mpsc::Sender<()>, u64),
591 ) {
592 self.by_component.insert(input_content_digest, value);
593 }
594
595 fn remove_by_component(
596 &mut self,
597 input_content_digest: &InputContentDigest,
598 ) -> Option<(mpsc::Sender<()>, u64)> {
599 self.by_component.remove(input_content_digest)
600 }
601}
602
603#[derive(Clone)]
604struct SqlitePoolInner {
605 shutdown_requested: Arc<AtomicBool>,
606 shutdown_finished: Arc<AtomicBool>,
607 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
608 response_subscribers: ResponseSubscribers,
609 pending_subscribers: PendingSubscribers,
610 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
611 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
613
614#[async_trait]
615impl DbPoolCloseable for SqlitePool {
616 async fn close(self) {
617 debug!("Sqlite is closing");
618 self.0.shutdown_requested.store(true, Ordering::Release);
619 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
621 while !self.0.shutdown_finished.load(Ordering::Acquire) {
622 tokio::time::sleep(Duration::from_millis(1)).await;
623 }
624 debug!("Sqlite was closed");
625 }
626}
627
628#[async_trait]
629impl DbPool for SqlitePool {
630 fn connection(&self) -> Box<dyn DbConnection> {
631 Box::new(self.clone())
632 }
633}
634impl Drop for SqlitePool {
635 fn drop(&mut self) {
636 let arc = self.0.join_handle.take().expect("join_handle was set");
637 if let Ok(join_handle) = Arc::try_unwrap(arc) {
638 if !join_handle.is_finished() {
640 if !self.0.shutdown_finished.load(Ordering::Acquire) {
641 let backtrace = std::backtrace::Backtrace::capture();
643 warn!("SqlitePool was not closed properly - {backtrace}");
644 self.0.shutdown_requested.store(true, Ordering::Release);
645 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
647 } else {
650 }
652 }
653 }
654 }
655}
656
657#[derive(Debug, Clone)]
658pub struct SqliteConfig {
659 pub queue_capacity: usize,
660 pub pragma_override: Option<HashMap<String, String>>,
661 pub metrics_threshold: Option<Duration>,
662}
663impl Default for SqliteConfig {
664 fn default() -> Self {
665 Self {
666 queue_capacity: 100,
667 pragma_override: None,
668 metrics_threshold: None,
669 }
670 }
671}
672
673impl SqlitePool {
674 fn init_thread(
675 path: &Path,
676 mut pragma_override: HashMap<String, String>,
677 ) -> Result<Connection, InitializationError> {
678 fn conn_execute<P: Params>(
679 conn: &Connection,
680 sql: &str,
681 params: P,
682 ) -> Result<(), InitializationError> {
683 conn.execute(sql, params).map(|_| ()).map_err(|err| {
684 error!("Cannot run `{sql}` - {err:?}");
685 InitializationError
686 })
687 }
688 fn pragma_update(
689 conn: &Connection,
690 name: &str,
691 value: &str,
692 ) -> Result<(), InitializationError> {
693 if value.is_empty() {
694 debug!("Querying PRAGMA {name}");
695 conn.pragma_query(None, name, |row| {
696 debug!("{row:?}");
697 Ok(())
698 })
699 .map_err(|err| {
700 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
701 InitializationError
702 })
703 } else {
704 debug!("Setting PRAGMA {name}={value}");
705 conn.pragma_update(None, name, value).map_err(|err| {
706 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
707 InitializationError
708 })
709 }
710 }
711
712 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
713 error!("cannot open the connection - {err:?}");
714 InitializationError
715 })?;
716
717 for [pragma_name, default_value] in PRAGMA {
718 let pragma_value = pragma_override
719 .remove(pragma_name)
720 .unwrap_or_else(|| default_value.to_string());
721 pragma_update(&conn, pragma_name, &pragma_value)?;
722 }
723 for (pragma_name, pragma_value) in pragma_override.drain() {
725 pragma_update(&conn, &pragma_name, &pragma_value)?;
726 }
727
728 {
730 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
731 let actual_version = conn
734 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
735 .map_err(|err| {
736 error!("cannot select schema version - {err:?}");
737 InitializationError
738 })?
739 .query_row([], |row| row.get::<_, u32>("schema_version"))
740 .optional()
741 .map_err(|err| {
742 error!("Cannot read the schema version - {err:?}");
743 InitializationError
744 })?;
745
746 match actual_version {
747 None => conn_execute(
748 &conn,
749 &format!(
750 "INSERT INTO t_metadata (schema_version, created_at) VALUES
751 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
752 ),
753 [Utc::now()],
754 )?,
755 Some(actual_version) => {
756 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
758 error!(
759 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
760 );
761 return Err(InitializationError);
762 }
763 }
764 }
765 }
766
767 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
769 conn_execute(
770 &conn,
771 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
772 [],
773 )?;
774 conn_execute(
775 &conn,
776 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
777 [],
778 )?;
779 conn_execute(
780 &conn,
781 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
782 [],
783 )?;
784 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
786 conn_execute(
787 &conn,
788 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
789 [],
790 )?;
791 conn_execute(
792 &conn,
793 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
794 [],
795 )?;
796 conn_execute(
797 &conn,
798 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
799 [],
800 )?;
801 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
803 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
804 conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
805 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
806 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
807 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
808 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
810 conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
812 conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
813 Ok(conn)
814 }
815
816 fn connection_rpc(
817 mut conn: Connection,
818 shutdown_requested: &AtomicBool,
819 shutdown_finished: &AtomicBool,
820 mut command_rx: mpsc::Receiver<ThreadCommand>,
821 metrics_threshold: Option<Duration>,
822 ) {
823 let mut histograms = Histograms::new(metrics_threshold);
824 while Self::tick(
825 &mut conn,
826 shutdown_requested,
827 &mut command_rx,
828 &mut histograms,
829 )
830 .is_ok()
831 {
832 }
834 debug!("Closing command thread");
835 shutdown_finished.store(true, Ordering::Release);
836 }
837
838 fn tick(
839 conn: &mut Connection,
840 shutdown_requested: &AtomicBool,
841 command_rx: &mut mpsc::Receiver<ThreadCommand>,
842 histograms: &mut Histograms,
843 ) -> Result<(), ()> {
844 let mut ltx_list = Vec::new();
845 let mut ltx = match command_rx.blocking_recv() {
847 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
848 Some(ThreadCommand::Shutdown) => {
849 debug!("Shutdown message received");
850 return Err(());
851 }
852 None => {
853 debug!("command_rx was closed");
854 return Err(());
855 }
856 };
857 let all_fns_start = std::time::Instant::now();
858 let mut physical_tx = conn
859 .transaction_with_behavior(TransactionBehavior::Immediate)
860 .map_err(|begin_err| {
861 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
862 })?;
863 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
864 ltx_list.push(ltx);
865
866 while let Ok(more) = command_rx.try_recv() {
867 let mut ltx = match more {
868 ThreadCommand::Shutdown => {
869 debug!("Shutdown message received");
870 return Err(());
871 }
872 ThreadCommand::LogicalTx(ltx) => ltx,
873 };
874
875 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
876 ltx_list.push(ltx);
877 }
878 histograms.record_all_fns(all_fns_start.elapsed());
879
880 {
881 if shutdown_requested.load(Ordering::Relaxed) {
883 debug!("Recveived shutdown during processing of the batch");
884 return Err(());
885 }
886 let commit_result = {
887 let now = std::time::Instant::now();
888 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
889 histograms.record_commit(now.elapsed());
890 commit_result
891 };
892 if commit_result.is_ok() || ltx_list.len() == 1 {
893 for ltx in ltx_list {
895 let _ = ltx.commit_ack_sender.send(commit_result.clone());
897 }
898 } else {
899 warn!(
900 "Bulk transaction failed, committing {} transactions serially",
901 ltx_list.len()
902 );
903 for ltx in ltx_list {
906 Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
907 }
908 }
909 }
910 histograms.print_if_elapsed();
911 Ok(())
912 }
913
914 fn ltx_commit_single(
915 mut ltx: LogicalTx,
916 conn: &mut Connection,
917 shutdown_requested: &AtomicBool,
918 histograms: &mut Histograms,
919 ) -> Result<(), ()> {
920 let mut physical_tx = conn
921 .transaction_with_behavior(TransactionBehavior::Immediate)
922 .map_err(|begin_err| {
923 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
924 })?;
925 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
926 if shutdown_requested.load(Ordering::Relaxed) {
927 debug!("Recveived shutdown during processing of the batch");
928 return Err(());
929 }
930 let commit_result = {
931 let now = std::time::Instant::now();
932 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
933 histograms.record_commit(now.elapsed());
934 commit_result
935 };
936 let _ = ltx.commit_ack_sender.send(commit_result);
938 Ok(())
939 }
940
941 fn ltx_apply_to_tx(
942 ltx: &mut LogicalTx,
943 physical_tx: &mut Transaction,
944 histograms: &mut Histograms,
945 ) {
946 let sent_latency = ltx.sent_at.elapsed();
947 let started_at = Instant::now();
948 (ltx.func)(physical_tx);
949 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
950 }
951
952 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
953 pub async fn new<P: AsRef<Path>>(
954 path: P,
955 config: SqliteConfig,
956 ) -> Result<Self, InitializationError> {
957 let path = path.as_ref().to_owned();
958
959 let shutdown_requested = Arc::new(AtomicBool::new(false));
960 let shutdown_finished = Arc::new(AtomicBool::new(false));
961
962 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
963 info!("Sqlite database location: {path:?}");
964 let join_handle = {
965 let init_task = {
967 tokio::task::spawn_blocking(move || {
968 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
969 })
970 .await
971 };
972 let conn = match init_task {
973 Ok(res) => res?,
974 Err(join_err) => {
975 error!("Initialization panic - {join_err:?}");
976 return Err(InitializationError);
977 }
978 };
979 let shutdown_requested = shutdown_requested.clone();
980 let shutdown_finished = shutdown_finished.clone();
981 std::thread::spawn(move || {
983 Self::connection_rpc(
984 conn,
985 &shutdown_requested,
986 &shutdown_finished,
987 command_rx,
988 config.metrics_threshold,
989 );
990 })
991 };
992 Ok(SqlitePool(SqlitePoolInner {
993 shutdown_requested,
994 shutdown_finished,
995 command_tx,
996 response_subscribers: Arc::default(),
997 pending_subscribers: Arc::default(),
998 join_handle: Some(Arc::new(join_handle)),
999 execution_finished_subscribers: Arc::default(),
1000 }))
1001 }
1002
1003 async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
1005 where
1006 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1007 T: Send + 'static,
1008 E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
1009 {
1010 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
1011 let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
1012 let thread_command_func = {
1013 let fn_res = fn_res.clone();
1014 ThreadCommand::LogicalTx(LogicalTx {
1015 func: Box::new(move |tx| {
1016 let func_res = func(tx);
1017 *fn_res.lock().unwrap() = Some(func_res);
1018 }),
1019 sent_at: Instant::now(),
1020 func_name,
1021 commit_ack_sender,
1022 })
1023 };
1024 self.0
1025 .command_tx
1026 .send(thread_command_func)
1027 .await
1028 .map_err(|_send_err| DbErrorGeneric::Close)?;
1029
1030 match commit_ack_receiver.await {
1032 Ok(Ok(())) => {
1033 let mut guard = fn_res.lock().unwrap();
1034 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
1035 }
1036 Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
1037 Err(_) => Err(E::from(DbErrorGeneric::Close)),
1038 }
1039 }
1040
1041 fn fetch_created_event(
1042 conn: &Connection,
1043 execution_id: &ExecutionId,
1044 ) -> Result<CreateRequest, DbErrorRead> {
1045 let mut stmt = conn.prepare(
1046 "SELECT created_at, json_value FROM t_execution_log WHERE \
1047 execution_id = :execution_id AND version = 0",
1048 )?;
1049 let (created_at, event) = stmt.query_row(
1050 named_params! {
1051 ":execution_id": execution_id.to_string(),
1052 },
1053 |row| {
1054 let created_at = row.get("created_at")?;
1055 let event = row
1056 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1057 .map_err(|serde| {
1058 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
1059 consistency_rusqlite("cannot deserialize `Created` event")
1060 })?;
1061 Ok((created_at, event.0))
1062 },
1063 )?;
1064 if let ExecutionRequest::Created {
1065 ffqn,
1066 params,
1067 parent,
1068 scheduled_at,
1069 component_id,
1070 metadata,
1071 scheduled_by,
1072 } = event
1073 {
1074 Ok(CreateRequest {
1075 created_at,
1076 execution_id: execution_id.clone(),
1077 ffqn,
1078 params,
1079 parent,
1080 scheduled_at,
1081 component_id,
1082 metadata,
1083 scheduled_by,
1084 })
1085 } else {
1086 error!("Row with version=0 must be a `Created` event - {event:?}");
1087 Err(consistency_db_err("expected `Created` event").into())
1088 }
1089 }
1090
1091 fn check_expected_next_and_appending_version(
1092 expected_version: &Version,
1093 appending_version: &Version,
1094 ) -> Result<(), DbErrorWrite> {
1095 if *expected_version != *appending_version {
1096 debug!(
1097 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1098 );
1099 return Err(DbErrorWrite::NonRetriable(
1100 DbErrorWriteNonRetriable::VersionConflict {
1101 expected: expected_version.clone(),
1102 requested: appending_version.clone(),
1103 },
1104 ));
1105 }
1106 Ok(())
1107 }
1108
1109 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1110 fn create_inner(
1111 tx: &Transaction,
1112 req: CreateRequest,
1113 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1114 debug!("create_inner");
1115
1116 let version = Version::default();
1117 let execution_id = req.execution_id.clone();
1118 let execution_id_str = execution_id.to_string();
1119 let ffqn = req.ffqn.clone();
1120 let created_at = req.created_at;
1121 let scheduled_at = req.scheduled_at;
1122 let component_id = req.component_id.clone();
1123 let event = ExecutionRequest::from(req);
1124 let event_ser = serde_json::to_string(&event).map_err(|err| {
1125 error!("Cannot serialize {event:?} - {err:?}");
1126 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1127 })?;
1128 tx.prepare(
1129 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1130 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1131 ?
1132 .execute(named_params! {
1133 ":execution_id": &execution_id_str,
1134 ":created_at": created_at,
1135 ":version": version.0,
1136 ":json_value": event_ser,
1137 ":variant": event.variant(),
1138 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1139 })
1140 ?;
1141 let pending_at = {
1142 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1143 tx.prepare(
1144 r"
1145 INSERT INTO t_state (
1146 execution_id,
1147 is_top_level,
1148 corresponding_version,
1149 pending_expires_finished,
1150 ffqn,
1151 state,
1152 created_at,
1153 component_id_input_digest,
1154 updated_at,
1155 first_scheduled_at,
1156 intermittent_event_count
1157 )
1158 VALUES (
1159 :execution_id,
1160 :is_top_level,
1161 :corresponding_version,
1162 :pending_expires_finished,
1163 :ffqn,
1164 :state,
1165 :created_at,
1166 :component_id_input_digest,
1167 CURRENT_TIMESTAMP,
1168 :first_scheduled_at,
1169 0
1170 )
1171 ",
1172 )?
1173 .execute(named_params! {
1174 ":execution_id": execution_id.to_string(),
1175 ":is_top_level": execution_id.is_top_level(),
1176 ":corresponding_version": version.0,
1177 ":pending_expires_finished": scheduled_at,
1178 ":ffqn": ffqn.to_string(),
1179 ":state": STATE_PENDING_AT,
1180 ":created_at": created_at,
1181 ":component_id_input_digest": component_id.input_digest,
1182 ":first_scheduled_at": scheduled_at,
1183 })?;
1184 AppendNotifier {
1185 pending_at: Some(NotifierPendingAt {
1186 scheduled_at,
1187 ffqn,
1188 component_input_digest: component_id.input_digest,
1189 }),
1190 execution_finished: None,
1191 response: None,
1192 }
1193 };
1194 let next_version = Version::new(version.0 + 1);
1195 Ok((next_version, pending_at))
1196 }
1197
1198 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1199 fn update_state_pending_after_response_appended(
1200 tx: &Transaction,
1201 execution_id: &ExecutionId,
1202 scheduled_at: DateTime<Utc>, corresponding_version: &Version, component_input_digest: InputContentDigest,
1205 ) -> Result<AppendNotifier, DbErrorWrite> {
1206 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1207 let mut stmt = tx
1208 .prepare_cached(
1209 r"
1210 UPDATE t_state
1211 SET
1212 corresponding_version = :corresponding_version,
1213 pending_expires_finished = :pending_expires_finished,
1214 state = :state,
1215 updated_at = CURRENT_TIMESTAMP,
1216
1217 last_lock_version = NULL,
1218
1219 join_set_id = NULL,
1220 join_set_closing = NULL,
1221
1222 result_kind = NULL
1223 WHERE execution_id = :execution_id
1224 ",
1225 )
1226 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1227 let updated = stmt
1228 .execute(named_params! {
1229 ":execution_id": execution_id,
1230 ":corresponding_version": corresponding_version.0,
1231 ":pending_expires_finished": scheduled_at,
1232 ":state": STATE_PENDING_AT,
1233 })
1234 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1235 if updated != 1 {
1236 return Err(DbErrorWrite::NotFound);
1237 }
1238 Ok(AppendNotifier {
1239 pending_at: Some(NotifierPendingAt {
1240 scheduled_at,
1241 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1242 component_input_digest,
1243 }),
1244 execution_finished: None,
1245 response: None,
1246 })
1247 }
1248
1249 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1250 fn update_state_pending_after_event_appended(
1251 tx: &Transaction,
1252 execution_id: &ExecutionId,
1253 appending_version: &Version,
1254 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1256 component_input_digest: InputContentDigest,
1257 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1258 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1259 let mut stmt = tx
1262 .prepare_cached(
1263 r"
1264 UPDATE t_state
1265 SET
1266 corresponding_version = :appending_version,
1267 pending_expires_finished = :pending_expires_finished,
1268 state = :state,
1269 updated_at = CURRENT_TIMESTAMP,
1270 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1271
1272 last_lock_version = NULL,
1273
1274 join_set_id = NULL,
1275 join_set_closing = NULL,
1276
1277 result_kind = NULL
1278 WHERE execution_id = :execution_id;
1279 ",
1280 )
1281 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1282 let updated = stmt
1283 .execute(named_params! {
1284 ":execution_id": execution_id.to_string(),
1285 ":appending_version": appending_version.0,
1286 ":pending_expires_finished": scheduled_at,
1287 ":state": STATE_PENDING_AT,
1288 ":intermittent_delta": i32::from(intermittent_failure) })
1290 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1291 if updated != 1 {
1292 return Err(DbErrorWrite::NotFound);
1293 }
1294 Ok((
1295 appending_version.increment(),
1296 AppendNotifier {
1297 pending_at: Some(NotifierPendingAt {
1298 scheduled_at,
1299 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1300 component_input_digest,
1301 }),
1302 execution_finished: None,
1303 response: None,
1304 },
1305 ))
1306 }
1307
1308 fn update_state_locked_get_intermittent_event_count(
1309 tx: &Transaction,
1310 execution_id: &ExecutionId,
1311 executor_id: ExecutorId,
1312 run_id: RunId,
1313 lock_expires_at: DateTime<Utc>,
1314 appending_version: &Version,
1315 retry_config: ComponentRetryConfig,
1316 ) -> Result<u32, DbErrorWrite> {
1317 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1318 let backoff_millis =
1319 u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1320 let execution_id_str = execution_id.to_string();
1321 let mut stmt = tx
1322 .prepare_cached(
1323 r"
1324 UPDATE t_state
1325 SET
1326 corresponding_version = :appending_version,
1327 pending_expires_finished = :pending_expires_finished,
1328 state = :state,
1329 updated_at = CURRENT_TIMESTAMP,
1330
1331 max_retries = :max_retries,
1332 retry_exp_backoff_millis = :retry_exp_backoff_millis,
1333 last_lock_version = :appending_version,
1334 executor_id = :executor_id,
1335 run_id = :run_id,
1336
1337 join_set_id = NULL,
1338 join_set_closing = NULL,
1339
1340 result_kind = NULL
1341 WHERE execution_id = :execution_id
1342 ",
1343 )
1344 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1345 let updated = stmt
1346 .execute(named_params! {
1347 ":execution_id": execution_id_str,
1348 ":appending_version": appending_version.0,
1349 ":pending_expires_finished": lock_expires_at,
1350 ":state": STATE_LOCKED,
1351 ":max_retries": retry_config.max_retries,
1352 ":retry_exp_backoff_millis": backoff_millis,
1353 ":executor_id": executor_id.to_string(),
1354 ":run_id": run_id.to_string(),
1355 })
1356 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1357 if updated != 1 {
1358 return Err(DbErrorWrite::NotFound);
1359 }
1360
1361 let intermittent_event_count = tx
1363 .prepare(
1364 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1365 )?
1366 .query_row(
1367 named_params! {
1368 ":execution_id": execution_id_str,
1369 },
1370 |row| {
1371 let intermittent_event_count = row.get("intermittent_event_count")?;
1372 Ok(intermittent_event_count)
1373 },
1374 )
1375 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1376
1377 Ok(intermittent_event_count)
1378 }
1379
1380 fn update_state_blocked(
1381 tx: &Transaction,
1382 execution_id: &ExecutionId,
1383 appending_version: &Version,
1384 join_set_id: &JoinSetId,
1386 lock_expires_at: DateTime<Utc>,
1387 join_set_closing: bool,
1388 ) -> Result<
1389 AppendResponse, DbErrorWrite,
1391 > {
1392 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1393 let execution_id_str = execution_id.to_string();
1394 let mut stmt = tx.prepare_cached(
1395 r"
1396 UPDATE t_state
1397 SET
1398 corresponding_version = :appending_version,
1399 pending_expires_finished = :pending_expires_finished,
1400 state = :state,
1401 updated_at = CURRENT_TIMESTAMP,
1402
1403 last_lock_version = NULL,
1404
1405 join_set_id = :join_set_id,
1406 join_set_closing = :join_set_closing,
1407
1408 result_kind = NULL
1409 WHERE execution_id = :execution_id
1410 ",
1411 )?;
1412 let updated = stmt.execute(named_params! {
1413 ":execution_id": execution_id_str,
1414 ":appending_version": appending_version.0,
1415 ":pending_expires_finished": lock_expires_at,
1416 ":state": STATE_BLOCKED_BY_JOIN_SET,
1417 ":join_set_id": join_set_id,
1418 ":join_set_closing": join_set_closing,
1419 })?;
1420 if updated != 1 {
1421 return Err(DbErrorWrite::NotFound);
1422 }
1423 Ok(appending_version.increment())
1424 }
1425
1426 fn update_state_finished(
1427 tx: &Transaction,
1428 execution_id: &ExecutionId,
1429 appending_version: &Version,
1430 finished_at: DateTime<Utc>,
1432 result_kind: PendingStateFinishedResultKind,
1433 ) -> Result<(), DbErrorWrite> {
1434 debug!("Setting t_state to Finished");
1435 let execution_id_str = execution_id.to_string();
1436 let mut stmt = tx.prepare_cached(
1437 r"
1438 UPDATE t_state
1439 SET
1440 corresponding_version = :appending_version,
1441 pending_expires_finished = :pending_expires_finished,
1442 state = :state,
1443 updated_at = CURRENT_TIMESTAMP,
1444
1445 last_lock_version = NULL,
1446 executor_id = NULL,
1447 run_id = NULL,
1448
1449 join_set_id = NULL,
1450 join_set_closing = NULL,
1451
1452 result_kind = :result_kind
1453 WHERE execution_id = :execution_id
1454 ",
1455 )?;
1456
1457 let updated = stmt.execute(named_params! {
1458 ":execution_id": execution_id_str,
1459 ":appending_version": appending_version.0,
1460 ":pending_expires_finished": finished_at,
1461 ":state": STATE_FINISHED,
1462 ":result_kind": JsonWrapper(result_kind),
1463 })?;
1464 if updated != 1 {
1465 return Err(DbErrorWrite::NotFound);
1466 }
1467 Ok(())
1468 }
1469
1470 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1472 fn bump_state_next_version(
1473 tx: &Transaction,
1474 execution_id: &ExecutionId,
1475 appending_version: &Version,
1476 delay_req: Option<DelayReq>,
1477 ) -> Result<AppendResponse , DbErrorWrite> {
1478 debug!("update_index_version");
1479 let execution_id_str = execution_id.to_string();
1480 let mut stmt = tx.prepare_cached(
1481 r"
1482 UPDATE t_state
1483 SET
1484 corresponding_version = :appending_version,
1485 updated_at = CURRENT_TIMESTAMP
1486 WHERE execution_id = :execution_id
1487 ",
1488 )?;
1489 let updated = stmt.execute(named_params! {
1490 ":execution_id": execution_id_str,
1491 ":appending_version": appending_version.0,
1492 })?;
1493 if updated != 1 {
1494 return Err(DbErrorWrite::NotFound);
1495 }
1496 if let Some(DelayReq {
1497 join_set_id,
1498 delay_id,
1499 expires_at,
1500 }) = delay_req
1501 {
1502 debug!("Inserting delay to `t_delay`");
1503 let mut stmt = tx.prepare_cached(
1504 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1505 VALUES \
1506 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1507 )?;
1508 stmt.execute(named_params! {
1509 ":execution_id": execution_id_str,
1510 ":join_set_id": join_set_id.to_string(),
1511 ":delay_id": delay_id.to_string(),
1512 ":expires_at": expires_at,
1513 })?;
1514 }
1515 Ok(appending_version.increment())
1516 }
1517
1518 fn get_combined_state(
1519 tx: &Transaction,
1520 execution_id: &ExecutionId,
1521 ) -> Result<CombinedState, DbErrorRead> {
1522 let mut stmt = tx.prepare(
1523 r"
1524 SELECT
1525 state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1526 last_lock_version, executor_id, run_id,
1527 join_set_id, join_set_closing,
1528 result_kind
1529 FROM t_state
1530 WHERE
1531 execution_id = :execution_id
1532 ",
1533 )?;
1534 stmt.query_row(
1535 named_params! {
1536 ":execution_id": execution_id.to_string(),
1537 },
1538 |row| {
1539 CombinedState::new(
1540 CombinedStateDTO {
1541 component_id_input_digest: row.get("component_id_input_digest")?,
1542 state: row.get("state")?,
1543 ffqn: row.get("ffqn")?,
1544 pending_expires_finished: row
1545 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1546 last_lock_version: row
1547 .get::<_, Option<VersionType>>("last_lock_version")?
1548 .map(Version::new),
1549 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1550 run_id: row.get::<_, Option<RunId>>("run_id")?,
1551 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1552 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1553 result_kind: row
1554 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1555 "result_kind",
1556 )?
1557 .map(|wrapper| wrapper.0),
1558 },
1559 Version::new(row.get("corresponding_version")?),
1560 )
1561 },
1562 )
1563 .map_err(DbErrorRead::from)
1564 }
1565
1566 fn list_executions(
1567 read_tx: &Transaction,
1568 ffqn: Option<&FunctionFqn>,
1569 top_level_only: bool,
1570 pagination: &ExecutionListPagination,
1571 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1572 #[derive(Debug)]
1573 struct StatementModifier<'a> {
1574 where_vec: Vec<String>,
1575 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1576 limit: u32,
1577 limit_desc: bool,
1578 }
1579
1580 fn paginate<'a, T: rusqlite::ToSql + 'static>(
1581 pagination: &'a Pagination<Option<T>>,
1582 column: &str,
1583 top_level_only: bool,
1584 ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1585 let mut where_vec: Vec<String> = vec![];
1586 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1587 let limit = pagination.length();
1588 let limit_desc = pagination.is_desc();
1589 match pagination {
1590 Pagination::NewerThan {
1591 cursor: Some(cursor),
1592 ..
1593 }
1594 | Pagination::OlderThan {
1595 cursor: Some(cursor),
1596 ..
1597 } => {
1598 where_vec.push(format!("{column} {rel} :cursor", rel = pagination.rel()));
1599 let cursor = cursor.to_sql().map_err(|err| {
1600 error!("Possible program error - cannot convert cursor to sql - {err:?}");
1601 DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1602 })?;
1603 params.push((":cursor", cursor));
1604 }
1605 _ => {}
1606 }
1607 if top_level_only {
1608 where_vec.push("is_top_level=true".to_string());
1609 }
1610 Ok(StatementModifier {
1611 where_vec,
1612 params,
1613 limit: u32::from(limit),
1614 limit_desc,
1615 })
1616 }
1617
1618 let mut statement_mod = match pagination {
1619 ExecutionListPagination::CreatedBy(pagination) => {
1620 paginate(pagination, "created_at", top_level_only)?
1621 }
1622 ExecutionListPagination::ExecutionId(pagination) => {
1623 paginate(pagination, "execution_id", top_level_only)?
1624 }
1625 };
1626
1627 let ffqn_temporary;
1628 if let Some(ffqn) = ffqn {
1629 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1630 ffqn_temporary = ffqn.to_string();
1631 let ffqn = ffqn_temporary
1632 .to_sql()
1633 .expect("string conversion never fails");
1634
1635 statement_mod.params.push((":ffqn", ffqn));
1636 }
1637
1638 let where_str = if statement_mod.where_vec.is_empty() {
1639 String::new()
1640 } else {
1641 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1642 };
1643 let sql = format!(
1644 r"
1645 SELECT created_at, first_scheduled_at, component_id_input_digest,
1646 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1647 last_lock_version, executor_id, run_id,
1648 join_set_id, join_set_closing,
1649 result_kind
1650 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1651 ",
1652 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1653 limit = statement_mod.limit,
1654 );
1655 let mut vec: Vec<_> = read_tx
1656 .prepare(&sql)?
1657 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1658 statement_mod
1659 .params
1660 .into_iter()
1661 .collect::<Vec<_>>()
1662 .as_ref(),
1663 |row| {
1664 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1665 let component_id_input_digest: InputContentDigest =
1666 row.get("component_id_input_digest")?;
1667 let created_at = row.get("created_at")?;
1668 let first_scheduled_at = row.get("first_scheduled_at")?;
1669 let combined_state = CombinedState::new(
1670 CombinedStateDTO {
1671 component_id_input_digest: component_id_input_digest.clone(),
1672 state: row.get("state")?,
1673 ffqn: row.get("ffqn")?,
1674 pending_expires_finished: row
1675 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1676 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1677
1678 last_lock_version: row
1679 .get::<_, Option<VersionType>>("last_lock_version")?
1680 .map(Version::new),
1681 run_id: row.get::<_, Option<RunId>>("run_id")?,
1682 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1683 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1684 result_kind: row
1685 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1686 "result_kind",
1687 )?
1688 .map(|wrapper| wrapper.0),
1689 },
1690 Version::new(row.get("corresponding_version")?),
1691 )?;
1692 Ok(ExecutionWithState {
1693 execution_id,
1694 ffqn: combined_state.ffqn,
1695 pending_state: combined_state.pending_state,
1696 created_at,
1697 first_scheduled_at,
1698 component_digest: component_id_input_digest,
1699 })
1700 },
1701 )?
1702 .collect::<Vec<Result<_, _>>>()
1703 .into_iter()
1704 .filter_map(|row| match row {
1705 Ok(row) => Some(row),
1706 Err(err) => {
1707 error!("Skipping row - {err:?}");
1708 None
1709 }
1710 })
1711 .collect();
1712
1713 if !statement_mod.limit_desc {
1714 vec.reverse();
1716 }
1717 Ok(vec)
1718 }
1719
1720 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1721 #[expect(clippy::too_many_arguments)]
1722 fn lock_single_execution(
1723 tx: &Transaction,
1724 created_at: DateTime<Utc>,
1725 component_id: ComponentId,
1726 execution_id: &ExecutionId,
1727 run_id: RunId,
1728 appending_version: &Version,
1729 executor_id: ExecutorId,
1730 lock_expires_at: DateTime<Utc>,
1731 retry_config: ComponentRetryConfig,
1732 ) -> Result<LockedExecution, DbErrorWrite> {
1733 debug!("lock_single_execution");
1734 let combined_state = Self::get_combined_state(tx, execution_id)?;
1735 combined_state.pending_state.can_append_lock(
1736 created_at,
1737 executor_id,
1738 run_id,
1739 lock_expires_at,
1740 )?;
1741 let expected_version = combined_state.get_next_version_assert_not_finished();
1742 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1743
1744 let locked_event = Locked {
1746 component_id,
1747 executor_id,
1748 lock_expires_at,
1749 run_id,
1750 retry_config,
1751 };
1752 let event = ExecutionRequest::Locked(locked_event.clone());
1753 let event_ser = serde_json::to_string(&event).map_err(|err| {
1754 warn!("Cannot serialize {event:?} - {err:?}");
1755 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1756 })?;
1757 let mut stmt = tx
1758 .prepare_cached(
1759 "INSERT INTO t_execution_log \
1760 (execution_id, created_at, json_value, version, variant) \
1761 VALUES \
1762 (:execution_id, :created_at, :json_value, :version, :variant)",
1763 )
1764 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1765 stmt.execute(named_params! {
1766 ":execution_id": execution_id.to_string(),
1767 ":created_at": created_at,
1768 ":json_value": event_ser,
1769 ":version": appending_version.0,
1770 ":variant": event.variant(),
1771 })
1772 .map_err(|err| {
1773 warn!("Cannot lock execution - {err:?}");
1774 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1775 })?;
1776
1777 let responses = Self::list_responses(tx, execution_id, None)?;
1779 let responses = responses.into_iter().map(|resp| resp.event).collect();
1780 trace!("Responses: {responses:?}");
1781
1782 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1783 tx,
1784 execution_id,
1785 executor_id,
1786 run_id,
1787 lock_expires_at,
1788 appending_version,
1789 retry_config,
1790 )?;
1791 let mut events = tx
1793 .prepare(
1794 "SELECT json_value, version FROM t_execution_log WHERE \
1795 execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
1796 ORDER BY version",
1797 )?
1798 .query_map(
1799 named_params! {
1800 ":execution_id": execution_id.to_string(),
1801 ":variant1": DUMMY_CREATED.variant(),
1802 ":variant2": DUMMY_HISTORY_EVENT.variant(),
1803 },
1804 |row| {
1805 let created_at_fake = DateTime::from_timestamp_nanos(0); let event = row
1807 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1808 .map_err(|serde| {
1809 error!("Cannot deserialize {row:?} - {serde:?}");
1810 consistency_rusqlite("cannot deserialize event")
1811 })?
1812 .0;
1813 let version = Version(row.get("version")?);
1814
1815 Ok(ExecutionEvent {
1816 created_at: created_at_fake,
1817 event,
1818 backtrace_id: None,
1819 version,
1820 })
1821 },
1822 )?
1823 .collect::<Result<Vec<_>, _>>()?
1824 .into_iter()
1825 .collect::<VecDeque<_>>();
1826 let Some(ExecutionRequest::Created {
1827 ffqn,
1828 params,
1829 parent,
1830 metadata,
1831 ..
1832 }) = events.pop_front().map(|outer| outer.event)
1833 else {
1834 error!("Execution log must contain at least `Created` event");
1835 return Err(consistency_db_err("execution log must contain `Created` event").into());
1836 };
1837
1838 let event_history = events
1839 .into_iter()
1840 .map(|ExecutionEvent { event, version, .. }| {
1841 if let ExecutionRequest::HistoryEvent { event } = event {
1842 Ok((event, version))
1843 } else {
1844 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1845 Err(consistency_db_err(
1846 "rows can only contain `Created` and `HistoryEvent` event kinds",
1847 ))
1848 }
1849 })
1850 .collect::<Result<Vec<_>, _>>()?;
1851
1852 Ok(LockedExecution {
1853 execution_id: execution_id.clone(),
1854 metadata,
1855 next_version: appending_version.increment(),
1856 ffqn,
1857 params,
1858 event_history,
1859 responses,
1860 parent,
1861 intermittent_event_count,
1862 locked_event,
1863 })
1864 }
1865
1866 fn count_join_next(
1867 tx: &Transaction,
1868 execution_id: &ExecutionId,
1869 join_set_id: &JoinSetId,
1870 ) -> Result<u64, DbErrorRead> {
1871 let mut stmt = tx.prepare(
1872 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1873 AND history_event_type = :join_next",
1874 )?;
1875 Ok(stmt.query_row(
1876 named_params! {
1877 ":execution_id": execution_id.to_string(),
1878 ":join_set_id": join_set_id.to_string(),
1879 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1880 },
1881 |row| row.get("count"),
1882 )?)
1883 }
1884
1885 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1886 #[expect(clippy::needless_return)]
1887 fn append(
1888 tx: &Transaction,
1889 execution_id: &ExecutionId,
1890 req: AppendRequest,
1891 appending_version: Version,
1892 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1893 if matches!(req.event, ExecutionRequest::Created { .. }) {
1894 return Err(DbErrorWrite::NonRetriable(
1895 DbErrorWriteNonRetriable::ValidationFailed(
1896 "cannot append `Created` event - use `create` instead".into(),
1897 ),
1898 ));
1899 }
1900 if let AppendRequest {
1901 event:
1902 ExecutionRequest::Locked(Locked {
1903 component_id,
1904 executor_id,
1905 run_id,
1906 lock_expires_at,
1907 retry_config,
1908 }),
1909 created_at,
1910 } = req
1911 {
1912 return Self::lock_single_execution(
1913 tx,
1914 created_at,
1915 component_id,
1916 execution_id,
1917 run_id,
1918 &appending_version,
1919 executor_id,
1920 lock_expires_at,
1921 retry_config,
1922 )
1923 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1924 }
1925
1926 let combined_state = Self::get_combined_state(tx, execution_id)?;
1927 if combined_state.pending_state.is_finished() {
1928 debug!("Execution is already finished");
1929 return Err(DbErrorWrite::NonRetriable(
1930 DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1931 ));
1932 }
1933
1934 Self::check_expected_next_and_appending_version(
1935 &combined_state.get_next_version_assert_not_finished(),
1936 &appending_version,
1937 )?;
1938 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1939 error!("Cannot serialize {:?} - {err:?}", req.event);
1940 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1941 })?;
1942
1943 let mut stmt = tx.prepare(
1944 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1945 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1946 ?;
1947 stmt.execute(named_params! {
1948 ":execution_id": execution_id.to_string(),
1949 ":created_at": req.created_at,
1950 ":json_value": event_ser,
1951 ":version": appending_version.0,
1952 ":variant": req.event.variant(),
1953 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1954 })?;
1955 match &req.event {
1958 ExecutionRequest::Created { .. } => {
1959 unreachable!("handled in the caller")
1960 }
1961
1962 ExecutionRequest::Locked { .. } => {
1963 unreachable!("handled above")
1964 }
1965
1966 ExecutionRequest::TemporarilyFailed {
1967 backoff_expires_at, ..
1968 }
1969 | ExecutionRequest::TemporarilyTimedOut {
1970 backoff_expires_at, ..
1971 } => {
1972 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1973 tx,
1974 execution_id,
1975 &appending_version,
1976 *backoff_expires_at,
1977 true, combined_state
1979 .pending_state
1980 .get_component_id_input_digest()
1981 .clone(),
1982 )?;
1983 return Ok((next_version, notifier));
1984 }
1985
1986 ExecutionRequest::Unlocked {
1987 backoff_expires_at, ..
1988 } => {
1989 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1990 tx,
1991 execution_id,
1992 &appending_version,
1993 *backoff_expires_at,
1994 false, combined_state
1996 .pending_state
1997 .get_component_id_input_digest()
1998 .clone(),
1999 )?;
2000 return Ok((next_version, notifier));
2001 }
2002
2003 ExecutionRequest::Finished { result, .. } => {
2004 Self::update_state_finished(
2005 tx,
2006 execution_id,
2007 &appending_version,
2008 req.created_at,
2009 PendingStateFinishedResultKind::from(result),
2010 )?;
2011 return Ok((
2012 appending_version,
2013 AppendNotifier {
2014 pending_at: None,
2015 execution_finished: Some(NotifierExecutionFinished {
2016 execution_id: execution_id.clone(),
2017 retval: result.clone(),
2018 }),
2019 response: None,
2020 },
2021 ));
2022 }
2023
2024 ExecutionRequest::HistoryEvent {
2025 event:
2026 HistoryEvent::JoinSetCreate { .. }
2027 | HistoryEvent::JoinSetRequest {
2028 request: JoinSetRequest::ChildExecutionRequest { .. },
2029 ..
2030 }
2031 | HistoryEvent::Persist { .. }
2032 | HistoryEvent::Schedule { .. }
2033 | HistoryEvent::Stub { .. }
2034 | HistoryEvent::JoinNextTooMany { .. },
2035 } => {
2036 return Ok((
2037 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
2038 AppendNotifier::default(),
2039 ));
2040 }
2041
2042 ExecutionRequest::HistoryEvent {
2043 event:
2044 HistoryEvent::JoinSetRequest {
2045 join_set_id,
2046 request:
2047 JoinSetRequest::DelayRequest {
2048 delay_id,
2049 expires_at,
2050 ..
2051 },
2052 },
2053 } => {
2054 return Ok((
2055 Self::bump_state_next_version(
2056 tx,
2057 execution_id,
2058 &appending_version,
2059 Some(DelayReq {
2060 join_set_id: join_set_id.clone(),
2061 delay_id: delay_id.clone(),
2062 expires_at: *expires_at,
2063 }),
2064 )?,
2065 AppendNotifier::default(),
2066 ));
2067 }
2068
2069 ExecutionRequest::HistoryEvent {
2070 event:
2071 HistoryEvent::JoinNext {
2072 join_set_id,
2073 run_expires_at,
2074 closing,
2075 requested_ffqn: _,
2076 },
2077 } => {
2078 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
2080 let nth_response =
2081 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2083 assert!(join_next_count > 0);
2084 if let Some(ResponseWithCursor {
2085 event:
2086 JoinSetResponseEventOuter {
2087 created_at: nth_created_at,
2088 ..
2089 },
2090 cursor: _,
2091 }) = nth_response
2092 {
2093 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2095 tx,
2096 execution_id,
2097 &appending_version,
2098 scheduled_at,
2099 false, combined_state
2101 .pending_state
2102 .get_component_id_input_digest()
2103 .clone(),
2104 )?;
2105 return Ok((next_version, notifier));
2106 }
2107 return Ok((
2108 Self::update_state_blocked(
2109 tx,
2110 execution_id,
2111 &appending_version,
2112 join_set_id,
2113 *run_expires_at,
2114 *closing,
2115 )?,
2116 AppendNotifier::default(),
2117 ));
2118 }
2119 }
2120 }
2121
2122 fn append_response(
2123 tx: &Transaction,
2124 execution_id: &ExecutionId,
2125 response_outer: JoinSetResponseEventOuter,
2126 ) -> Result<AppendNotifier, DbErrorWrite> {
2127 let mut stmt = tx.prepare(
2128 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2129 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2130 )?;
2131 let join_set_id = &response_outer.event.join_set_id;
2132 let (delay_id, delay_success) = match &response_outer.event.event {
2133 JoinSetResponse::DelayFinished { delay_id, result } => {
2134 (Some(delay_id.to_string()), Some(result.is_ok()))
2135 }
2136 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2137 };
2138 let (child_execution_id, finished_version) = match &response_outer.event.event {
2139 JoinSetResponse::ChildExecutionFinished {
2140 child_execution_id,
2141 finished_version,
2142 result: _,
2143 } => (
2144 Some(child_execution_id.to_string()),
2145 Some(finished_version.0),
2146 ),
2147 JoinSetResponse::DelayFinished { .. } => (None, None),
2148 };
2149
2150 stmt.execute(named_params! {
2151 ":execution_id": execution_id.to_string(),
2152 ":created_at": response_outer.created_at,
2153 ":join_set_id": join_set_id.to_string(),
2154 ":delay_id": delay_id,
2155 ":delay_success": delay_success,
2156 ":child_execution_id": child_execution_id,
2157 ":finished_version": finished_version,
2158 })?;
2159
2160 let combined_state = Self::get_combined_state(tx, execution_id)?;
2162 debug!("previous_pending_state: {combined_state:?}");
2163 let mut notifier = if let PendingState::BlockedByJoinSet {
2164 join_set_id: found_join_set_id,
2165 lock_expires_at, closing: _,
2167 component_id_input_digest,
2168 } = combined_state.pending_state
2169 && *join_set_id == found_join_set_id
2170 {
2171 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2174 Self::update_state_pending_after_response_appended(
2177 tx,
2178 execution_id,
2179 scheduled_at,
2180 &combined_state.corresponding_version, component_id_input_digest,
2182 )?
2183 } else {
2184 AppendNotifier::default()
2185 };
2186 if let JoinSetResponseEvent {
2187 join_set_id,
2188 event:
2189 JoinSetResponse::DelayFinished {
2190 delay_id,
2191 result: _,
2192 },
2193 } = &response_outer.event
2194 {
2195 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2196 let mut stmt =
2197 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2198 ?;
2199 stmt.execute(named_params! {
2200 ":execution_id": execution_id.to_string(),
2201 ":join_set_id": join_set_id.to_string(),
2202 ":delay_id": delay_id.to_string(),
2203 })?;
2204 }
2205 notifier.response = Some((execution_id.clone(), response_outer));
2206 Ok(notifier)
2207 }
2208
2209 fn append_backtrace(
2210 tx: &Transaction,
2211 backtrace_info: &BacktraceInfo,
2212 ) -> Result<(), DbErrorWrite> {
2213 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2214 warn!(
2215 "Cannot serialize backtrace {:?} - {err:?}",
2216 backtrace_info.wasm_backtrace
2217 );
2218 DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2219 })?;
2220 let mut stmt = tx
2221 .prepare(
2222 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2223 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2224 )
2225 ?;
2226 stmt.execute(named_params! {
2227 ":execution_id": backtrace_info.execution_id.to_string(),
2228 ":component_id": backtrace_info.component_id.to_string(),
2229 ":version_min_including": backtrace_info.version_min_including.0,
2230 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2231 ":wasm_backtrace": backtrace,
2232 })?;
2233 Ok(())
2234 }
2235
2236 #[cfg(feature = "test")]
2237 fn get(
2238 tx: &Transaction,
2239 execution_id: &ExecutionId,
2240 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2241 let mut stmt = tx.prepare(
2242 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2243 execution_id = :execution_id ORDER BY version",
2244 )?;
2245 let events = stmt
2246 .query_map(
2247 named_params! {
2248 ":execution_id": execution_id.to_string(),
2249 },
2250 |row| {
2251 let created_at = row.get("created_at")?;
2252 let event = row
2253 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2254 .map_err(|serde| {
2255 error!("Cannot deserialize {row:?} - {serde:?}");
2256 consistency_rusqlite("cannot deserialize event")
2257 })?
2258 .0;
2259 let version = Version(row.get("version")?);
2260
2261 Ok(ExecutionEvent {
2262 created_at,
2263 event,
2264 backtrace_id: None,
2265 version,
2266 })
2267 },
2268 )?
2269 .collect::<Result<Vec<_>, _>>()?;
2270 if events.is_empty() {
2271 return Err(DbErrorRead::NotFound);
2272 }
2273 let combined_state = Self::get_combined_state(tx, execution_id)?;
2274 let responses = Self::list_responses(tx, execution_id, None)?
2275 .into_iter()
2276 .map(|resp| resp.event)
2277 .collect();
2278 Ok(concepts::storage::ExecutionLog {
2279 execution_id: execution_id.clone(),
2280 events,
2281 responses,
2282 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2284 })
2285 }
2286
2287 fn list_execution_events(
2288 tx: &Transaction,
2289 execution_id: &ExecutionId,
2290 version_min: VersionType,
2291 version_max_excluding: VersionType,
2292 include_backtrace_id: bool,
2293 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2294 let select = if include_backtrace_id {
2295 "SELECT
2296 log.created_at,
2297 log.json_value,
2298 log.version as version,
2299 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2300 bt.version_min_including AS backtrace_id
2301 FROM
2302 t_execution_log AS log
2303 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2304 t_backtrace AS bt ON log.execution_id = bt.execution_id
2305 -- Check if the log's version falls within the backtrace's range
2306 AND log.version >= bt.version_min_including
2307 AND log.version < bt.version_max_excluding
2308 WHERE
2309 log.execution_id = :execution_id
2310 AND log.version >= :version_min
2311 AND log.version < :version_max_excluding
2312 ORDER BY
2313 log.version;"
2314 } else {
2315 "SELECT
2316 created_at, json_value, NULL as backtrace_id, version
2317 FROM t_execution_log WHERE
2318 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2319 ORDER BY version"
2320 };
2321 tx.prepare(select)?
2322 .query_map(
2323 named_params! {
2324 ":execution_id": execution_id.to_string(),
2325 ":version_min": version_min,
2326 ":version_max_excluding": version_max_excluding
2327 },
2328 |row| {
2329 let created_at = row.get("created_at")?;
2330 let backtrace_id = row
2331 .get::<_, Option<VersionType>>("backtrace_id")?
2332 .map(Version::new);
2333 let version = Version(row.get("version")?);
2334
2335 let event = row
2336 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2337 .map(|event| ExecutionEvent {
2338 created_at,
2339 event: event.0,
2340 backtrace_id,
2341 version,
2342 })
2343 .map_err(|serde| {
2344 error!("Cannot deserialize {row:?} - {serde:?}");
2345 consistency_rusqlite("cannot deserialize")
2346 })?;
2347 Ok(event)
2348 },
2349 )?
2350 .collect::<Result<Vec<_>, _>>()
2351 .map_err(DbErrorRead::from)
2352 }
2353
2354 fn get_execution_event(
2355 tx: &Transaction,
2356 execution_id: &ExecutionId,
2357 version: VersionType,
2358 ) -> Result<ExecutionEvent, DbErrorRead> {
2359 let mut stmt = tx.prepare(
2360 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2361 execution_id = :execution_id AND version = :version",
2362 )?;
2363 stmt.query_row(
2364 named_params! {
2365 ":execution_id": execution_id.to_string(),
2366 ":version": version,
2367 },
2368 |row| {
2369 let created_at = row.get("created_at")?;
2370 let event = row
2371 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2372 .map_err(|serde| {
2373 error!("Cannot deserialize {row:?} - {serde:?}");
2374 consistency_rusqlite("cannot deserialize event")
2375 })?;
2376 let version = Version(row.get("version")?);
2377
2378 Ok(ExecutionEvent {
2379 created_at,
2380 event: event.0,
2381 backtrace_id: None,
2382 version,
2383 })
2384 },
2385 )
2386 .map_err(DbErrorRead::from)
2387 }
2388
2389 fn get_last_execution_event(
2390 tx: &Transaction,
2391 execution_id: &ExecutionId,
2392 ) -> Result<ExecutionEvent, DbErrorRead> {
2393 let mut stmt = tx.prepare(
2394 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2395 execution_id = :execution_id ORDER BY version DESC",
2396 )?;
2397 stmt.query_row(
2398 named_params! {
2399 ":execution_id": execution_id.to_string(),
2400 },
2401 |row| {
2402 let created_at = row.get("created_at")?;
2403 let event = row
2404 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2405 .map_err(|serde| {
2406 error!("Cannot deserialize {row:?} - {serde:?}");
2407 consistency_rusqlite("cannot deserialize event")
2408 })?;
2409 let version = Version(row.get("version")?);
2410 Ok(ExecutionEvent {
2411 created_at,
2412 event: event.0,
2413 backtrace_id: None,
2414 version: version.clone(),
2415 })
2416 },
2417 )
2418 .map_err(DbErrorRead::from)
2419 }
2420
2421 fn list_responses(
2422 tx: &Transaction,
2423 execution_id: &ExecutionId,
2424 pagination: Option<Pagination<u32>>,
2425 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2426 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2428 let mut sql = "SELECT \
2429 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
2430 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2431 WHERE \
2432 r.execution_id = :execution_id \
2433 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2434 "
2435 .to_string();
2436 let limit = match &pagination {
2437 Some(
2438 pagination @ (Pagination::NewerThan { cursor, .. }
2439 | Pagination::OlderThan { cursor, .. }),
2440 ) => {
2441 params.push((":cursor", Box::new(cursor)));
2442 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2443 Some(pagination.length())
2444 }
2445 None => None,
2446 };
2447 sql.push_str(" ORDER BY id");
2448 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2449 sql.push_str(" DESC");
2450 }
2451 if let Some(limit) = limit {
2452 write!(sql, " LIMIT {limit}").unwrap();
2453 }
2454 params.push((":execution_id", Box::new(execution_id.to_string())));
2455 tx.prepare(&sql)?
2456 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2457 params
2458 .iter()
2459 .map(|(key, value)| (*key, value.as_ref()))
2460 .collect::<Vec<_>>()
2461 .as_ref(),
2462 Self::parse_response_with_cursor,
2463 )?
2464 .collect::<Result<Vec<_>, rusqlite::Error>>()
2465 .map_err(DbErrorRead::from)
2466 }
2467
2468 fn parse_response_with_cursor(
2469 row: &rusqlite::Row<'_>,
2470 ) -> Result<ResponseWithCursor, rusqlite::Error> {
2471 let id = row.get("id")?;
2472 let created_at: DateTime<Utc> = row.get("created_at")?;
2473 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2474 let event = match (
2475 row.get::<_, Option<DelayId>>("delay_id")?,
2476 row.get::<_, Option<bool>>("delay_success")?,
2477 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2478 row.get::<_, Option<VersionType>>("finished_version")?,
2479 row.get::<_, Option<JsonWrapper<ExecutionRequest>>>("json_value")?,
2480 ) {
2481 (Some(delay_id), Some(delay_success), None, None, None) => {
2482 JoinSetResponse::DelayFinished {
2483 delay_id,
2484 result: delay_success.then_some(()).ok_or(()),
2485 }
2486 }
2487 (
2488 None,
2489 None,
2490 Some(child_execution_id),
2491 Some(finished_version),
2492 Some(JsonWrapper(ExecutionRequest::Finished { result, .. })),
2493 ) => JoinSetResponse::ChildExecutionFinished {
2494 child_execution_id,
2495 finished_version: Version(finished_version),
2496 result,
2497 },
2498 (delay, delay_success, child, finished, result) => {
2499 error!(
2500 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
2501 result.map(|it| it.0)
2502 );
2503 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2504 }
2505 };
2506 Ok(ResponseWithCursor {
2507 cursor: id,
2508 event: JoinSetResponseEventOuter {
2509 event: JoinSetResponseEvent { join_set_id, event },
2510 created_at,
2511 },
2512 })
2513 }
2514
2515 fn nth_response(
2516 tx: &Transaction,
2517 execution_id: &ExecutionId,
2518 join_set_id: &JoinSetId,
2519 skip_rows: u64,
2520 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2521 tx
2523 .prepare(
2524 "SELECT r.id, r.created_at, r.join_set_id, \
2525 r.delay_id, r.delay_success, \
2526 r.child_execution_id, r.finished_version, l.json_value \
2527 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2528 WHERE \
2529 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2530 (
2531 r.finished_version = l.version \
2532 OR \
2533 r.child_execution_id IS NULL \
2534 ) \
2535 ORDER BY id \
2536 LIMIT 1 OFFSET :offset",
2537 )
2538 ?
2539 .query_row(
2540 named_params! {
2541 ":execution_id": execution_id.to_string(),
2542 ":join_set_id": join_set_id.to_string(),
2543 ":offset": skip_rows,
2544 },
2545 Self::parse_response_with_cursor,
2546 )
2547 .optional()
2548 .map_err(DbErrorRead::from)
2549 }
2550
2551 fn delay_response(
2552 tx: &Transaction,
2553 execution_id: &ExecutionId,
2554 delay_id: &DelayId,
2555 ) -> Result<Option<bool>, DbErrorRead> {
2556 tx.prepare(
2558 "SELECT delay_success \
2559 FROM t_join_set_response \
2560 WHERE \
2561 execution_id = :execution_id AND delay_id = :delay_id
2562 ",
2563 )?
2564 .query_row(
2565 named_params! {
2566 ":execution_id": execution_id.to_string(),
2567 ":delay_id": delay_id.to_string(),
2568 },
2569 |row| {
2570 let delay_success = row.get::<_, bool>("delay_success")?;
2571 Ok(delay_success)
2572 },
2573 )
2574 .optional()
2575 .map_err(DbErrorRead::from)
2576 }
2577
2578 #[instrument(level = Level::TRACE, skip_all)]
2580 fn get_responses_with_offset(
2581 tx: &Transaction,
2582 execution_id: &ExecutionId,
2583 skip_rows: usize,
2584 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2585 tx.prepare(
2587 "SELECT r.id, r.created_at, r.join_set_id, \
2588 r.delay_id, r.delay_success, \
2589 r.child_execution_id, r.finished_version, l.json_value \
2590 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2591 WHERE \
2592 r.execution_id = :execution_id AND \
2593 ( \
2594 r.finished_version = l.version \
2595 OR r.child_execution_id IS NULL \
2596 ) \
2597 ORDER BY id \
2598 LIMIT -1 OFFSET :offset",
2599 )
2600 ?
2601 .query_map(
2602 named_params! {
2603 ":execution_id": execution_id.to_string(),
2604 ":offset": skip_rows,
2605 },
2606 Self::parse_response_with_cursor,
2607 )
2608 ?
2609 .collect::<Result<Vec<_>, _>>()
2610 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2611 .map_err(DbErrorRead::from)
2612 }
2613
2614 fn get_pending_of_single_ffqn(
2615 mut stmt: CachedStatement,
2616 batch_size: usize,
2617 pending_at_or_sooner: DateTime<Utc>,
2618 ffqn: &FunctionFqn,
2619 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2620 stmt.query_map(
2621 named_params! {
2622 ":pending_expires_finished": pending_at_or_sooner,
2623 ":ffqn": ffqn.to_string(),
2624 ":batch_size": batch_size,
2625 },
2626 |row| {
2627 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2628 let next_version =
2629 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2630 Ok((execution_id, next_version))
2631 },
2632 )
2633 .map_err(|err| {
2634 warn!("Ignoring consistency error {err:?}");
2635 })?
2636 .collect::<Result<Vec<_>, _>>()
2637 .map_err(|err| {
2638 warn!("Ignoring consistency error {err:?}");
2639 })
2640 }
2641
2642 fn get_pending_by_ffqns(
2644 conn: &Connection,
2645 batch_size: usize,
2646 pending_at_or_sooner: DateTime<Utc>,
2647 ffqns: &[FunctionFqn],
2648 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2649 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2650 for ffqn in ffqns {
2651 let stmt = conn
2653 .prepare_cached(&format!(
2654 r#"
2655 SELECT execution_id, corresponding_version FROM t_state WHERE
2656 state = "{STATE_PENDING_AT}" AND
2657 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2658 ORDER BY pending_expires_finished LIMIT :batch_size
2659 "#
2660 ))
2661 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2662
2663 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2664 stmt,
2665 batch_size - execution_ids_versions.len(),
2666 pending_at_or_sooner,
2667 ffqn,
2668 ) {
2669 execution_ids_versions.extend(execs_and_versions);
2670 if execution_ids_versions.len() == batch_size {
2671 break;
2673 }
2674 }
2676 }
2677 Ok(execution_ids_versions)
2678 }
2679
2680 fn get_pending_by_component_input_digest(
2681 conn: &Connection,
2682 batch_size: usize,
2683 pending_at_or_sooner: DateTime<Utc>,
2684 input_digest: &InputContentDigest,
2685 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2686 let mut stmt = conn
2687 .prepare_cached(&format!(
2688 r#"
2689 SELECT execution_id, corresponding_version FROM t_state WHERE
2690 state = "{STATE_PENDING_AT}" AND
2691 pending_expires_finished <= :pending_expires_finished AND
2692 component_id_input_digest = :component_id_input_digest
2693 ORDER BY pending_expires_finished LIMIT :batch_size
2694 "#
2695 ))
2696 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2697
2698 stmt.query_map(
2699 named_params! {
2700 ":pending_expires_finished": pending_at_or_sooner,
2701 ":component_id_input_digest": input_digest,
2702 ":batch_size": batch_size,
2703 },
2704 |row| {
2705 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2706 let next_version =
2707 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2708 Ok((execution_id, next_version))
2709 },
2710 )?
2711 .collect::<Result<Vec<_>, _>>()
2712 .map_err(DbErrorGeneric::from)
2713 }
2714
2715 #[instrument(level = Level::TRACE, skip_all)]
2717 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2718 let (pending_ats, finished_execs, responses) = {
2719 let (mut pending_ats, mut finished_execs, mut responses) =
2720 (Vec::new(), Vec::new(), Vec::new());
2721 for notifier in notifiers {
2722 if let Some(pending_at) = notifier.pending_at {
2723 pending_ats.push(pending_at);
2724 }
2725 if let Some(finished) = notifier.execution_finished {
2726 finished_execs.push(finished);
2727 }
2728 if let Some(response) = notifier.response {
2729 responses.push(response);
2730 }
2731 }
2732 (pending_ats, finished_execs, responses)
2733 };
2734
2735 if !pending_ats.is_empty() {
2737 let guard = self.0.pending_subscribers.lock().unwrap();
2738 for pending_at in pending_ats {
2739 Self::notify_pending_locked(&pending_at, current_time, &guard);
2740 }
2741 }
2742 if !finished_execs.is_empty() {
2745 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2746 for finished in finished_execs {
2747 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2748 for (_tag, sender) in listeners_of_exe_id {
2749 let _ = sender.send(finished.retval.clone());
2752 }
2753 }
2754 }
2755 }
2756 if !responses.is_empty() {
2758 let mut guard = self.0.response_subscribers.lock().unwrap();
2759 for (execution_id, response) in responses {
2760 if let Some((sender, _)) = guard.remove(&execution_id) {
2761 let _ = sender.send(response);
2762 }
2763 }
2764 }
2765 }
2766
2767 fn notify_pending_locked(
2768 notifier: &NotifierPendingAt,
2769 current_time: DateTime<Utc>,
2770 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2771 ) {
2772 if notifier.scheduled_at <= current_time {
2774 ffqn_to_pending_subscription.notify(notifier);
2775 }
2776 }
2777
2778 fn upgrade_execution_component(
2779 tx: &Transaction,
2780 execution_id: &ExecutionId,
2781 old: &InputContentDigest,
2782 new: &InputContentDigest,
2783 ) -> Result<(), DbErrorWrite> {
2784 debug!("Updating t_state to component {new}");
2785 let mut stmt = tx
2786 .prepare_cached(
2787 r"
2788 UPDATE t_state
2789 SET
2790 updated_at = CURRENT_TIMESTAMP,
2791 component_id_input_digest = :new
2792 WHERE
2793 execution_id = :execution_id AND
2794 component_id_input_digest = :old
2795 ",
2796 )
2797 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2798 let updated = stmt
2799 .execute(named_params! {
2800 ":execution_id": execution_id,
2801 ":old": old,
2802 ":new": new,
2803 })
2804 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2805 if updated != 1 {
2806 return Err(DbErrorWrite::NotFound);
2807 }
2808 Ok(())
2809 }
2810}
2811
2812#[async_trait]
2813impl DbExecutor for SqlitePool {
2814 #[instrument(level = Level::TRACE, skip(self))]
2815 async fn lock_pending_by_ffqns(
2816 &self,
2817 batch_size: usize,
2818 pending_at_or_sooner: DateTime<Utc>,
2819 ffqns: Arc<[FunctionFqn]>,
2820 created_at: DateTime<Utc>,
2821 component_id: ComponentId,
2822 executor_id: ExecutorId,
2823 lock_expires_at: DateTime<Utc>,
2824 run_id: RunId,
2825 retry_config: ComponentRetryConfig,
2826 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2827 let execution_ids_versions = self
2828 .transaction(
2829 move |conn| {
2830 Self::get_pending_by_ffqns(conn, batch_size, pending_at_or_sooner, &ffqns)
2831 },
2832 "lock_pending_by_ffqns_get",
2833 )
2834 .await?;
2835 if execution_ids_versions.is_empty() {
2836 Ok(vec![])
2837 } else {
2838 debug!("Locking {execution_ids_versions:?}");
2839 self.transaction(
2840 move |tx| {
2841 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2842 for (execution_id, version) in &execution_ids_versions {
2844 match Self::lock_single_execution(
2845 tx,
2846 created_at,
2847 component_id.clone(),
2848 execution_id,
2849 run_id,
2850 version,
2851 executor_id,
2852 lock_expires_at,
2853 retry_config,
2854 ) {
2855 Ok(locked) => locked_execs.push(locked),
2856 Err(err) => {
2857 warn!("Locking row {execution_id} failed - {err:?}");
2858 }
2859 }
2860 }
2861 Ok(locked_execs)
2862 },
2863 "lock_pending_by_ffqns_one",
2864 )
2865 .await
2866 }
2867 }
2868
2869 #[instrument(level = Level::TRACE, skip(self))]
2870 async fn lock_pending_by_component_id(
2871 &self,
2872 batch_size: usize,
2873 pending_at_or_sooner: DateTime<Utc>,
2874 component_id: &ComponentId,
2875 created_at: DateTime<Utc>,
2876 executor_id: ExecutorId,
2877 lock_expires_at: DateTime<Utc>,
2878 run_id: RunId,
2879 retry_config: ComponentRetryConfig,
2880 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2881 let component_id = component_id.clone();
2882 let execution_ids_versions = self
2883 .transaction(
2884 {
2885 let component_id = component_id.clone();
2886 move |conn| {
2887 Self::get_pending_by_component_input_digest(
2888 conn,
2889 batch_size,
2890 pending_at_or_sooner,
2891 &component_id.input_digest,
2892 )
2893 }
2894 },
2895 "lock_pending_by_component_id_get",
2896 )
2897 .await?;
2898 if execution_ids_versions.is_empty() {
2899 Ok(vec![])
2900 } else {
2901 debug!("Locking {execution_ids_versions:?}");
2902 self.transaction(
2903 move |tx| {
2904 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2905 for (execution_id, version) in &execution_ids_versions {
2907 match Self::lock_single_execution(
2908 tx,
2909 created_at,
2910 component_id.clone(),
2911 execution_id,
2912 run_id,
2913 version,
2914 executor_id,
2915 lock_expires_at,
2916 retry_config,
2917 ) {
2918 Ok(locked) => locked_execs.push(locked),
2919 Err(err) => {
2920 warn!("Locking row {execution_id} failed - {err:?}");
2921 }
2922 }
2923 }
2924 Ok(locked_execs)
2925 },
2926 "lock_pending_by_component_id_one",
2927 )
2928 .await
2929 }
2930 }
2931
2932 #[instrument(level = Level::DEBUG, skip(self))]
2933 async fn lock_one(
2934 &self,
2935 created_at: DateTime<Utc>,
2936 component_id: ComponentId,
2937 execution_id: &ExecutionId,
2938 run_id: RunId,
2939 version: Version,
2940 executor_id: ExecutorId,
2941 lock_expires_at: DateTime<Utc>,
2942 retry_config: ComponentRetryConfig,
2943 ) -> Result<LockedExecution, DbErrorWrite> {
2944 debug!(%execution_id, "lock_one");
2945 let execution_id = execution_id.clone();
2946 self.transaction(
2947 move |tx| {
2948 Self::lock_single_execution(
2949 tx,
2950 created_at,
2951 component_id.clone(),
2952 &execution_id,
2953 run_id,
2954 &version,
2955 executor_id,
2956 lock_expires_at,
2957 retry_config,
2958 )
2959 },
2960 "lock_inner",
2961 )
2962 .await
2963 }
2964
2965 #[instrument(level = Level::DEBUG, skip(self, req))]
2966 async fn append(
2967 &self,
2968 execution_id: ExecutionId,
2969 version: Version,
2970 req: AppendRequest,
2971 ) -> Result<AppendResponse, DbErrorWrite> {
2972 debug!(%req, "append");
2973 trace!(?req, "append");
2974 let created_at = req.created_at;
2975 let (version, notifier) = self
2976 .transaction(
2977 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2978 "append",
2979 )
2980 .await?;
2981 self.notify_all(vec![notifier], created_at);
2982 Ok(version)
2983 }
2984
2985 #[instrument(level = Level::DEBUG, skip_all)]
2986 async fn append_batch_respond_to_parent(
2987 &self,
2988 events: AppendEventsToExecution,
2989 response: AppendResponseToExecution,
2990 current_time: DateTime<Utc>,
2991 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2992 debug!("append_batch_respond_to_parent");
2993 if events.execution_id == response.parent_execution_id {
2994 return Err(DbErrorWrite::NonRetriable(
2997 DbErrorWriteNonRetriable::ValidationFailed(
2998 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2999 ),
3000 ));
3001 }
3002 if events.batch.is_empty() {
3003 error!("Batch cannot be empty");
3004 return Err(DbErrorWrite::NonRetriable(
3005 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3006 ));
3007 }
3008 let (version, notifiers) = {
3009 self.transaction(
3010 move |tx| {
3011 let mut version = events.version.clone();
3012 let mut notifier_of_child = None;
3013 for append_request in &events.batch {
3014 let (v, n) = Self::append(
3015 tx,
3016 &events.execution_id,
3017 append_request.clone(),
3018 version,
3019 )?;
3020 version = v;
3021 notifier_of_child = Some(n);
3022 }
3023
3024 let pending_at_parent = Self::append_response(
3025 tx,
3026 &response.parent_execution_id,
3027 JoinSetResponseEventOuter {
3028 created_at: response.created_at,
3029 event: JoinSetResponseEvent {
3030 join_set_id: response.join_set_id.clone(),
3031 event: JoinSetResponse::ChildExecutionFinished {
3032 child_execution_id: response.child_execution_id.clone(),
3033 finished_version: response.finished_version.clone(),
3034 result: response.result.clone(),
3035 },
3036 },
3037 },
3038 )?;
3039 Ok::<_, DbErrorWrite>((
3040 version,
3041 vec![
3042 notifier_of_child.expect("checked that the batch is not empty"),
3043 pending_at_parent,
3044 ],
3045 ))
3046 },
3047 "append_batch_respond_to_parent",
3048 )
3049 .await?
3050 };
3051 self.notify_all(notifiers, current_time);
3052 Ok(version)
3053 }
3054
3055 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3058 async fn wait_for_pending_by_ffqn(
3059 &self,
3060 pending_at_or_sooner: DateTime<Utc>,
3061 ffqns: Arc<[FunctionFqn]>,
3062 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3063 ) {
3064 let unique_tag: u64 = rand::random();
3065 let (sender, mut receiver) = mpsc::channel(1); {
3067 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3068 for ffqn in ffqns.as_ref() {
3069 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3070 }
3071 }
3072 async {
3073 let Ok(execution_ids_versions) = self
3074 .transaction(
3075 {
3076 let ffqns = ffqns.clone();
3077 move |conn| Self::get_pending_by_ffqns(conn, 1, pending_at_or_sooner, ffqns.as_ref())
3078 },
3079 "get_pending_by_ffqns",
3080 )
3081 .await
3082 else {
3083 trace!(
3084 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3085 );
3086 timeout_fut.await;
3087 return;
3088 };
3089 if !execution_ids_versions.is_empty() {
3090 trace!("Not waiting, database already contains new pending executions");
3091 return;
3092 }
3093 tokio::select! { _ = receiver.recv() => {
3095 trace!("Received a notification");
3096 }
3097 () = timeout_fut => {
3098 }
3099 }
3100 }.await;
3101 {
3103 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3104 for ffqn in ffqns.as_ref() {
3105 match pending_subscribers.remove_ffqn(ffqn) {
3106 Some((_, tag)) if tag == unique_tag => {
3107 }
3109 Some(other) => {
3110 pending_subscribers.insert_ffqn(ffqn.clone(), other);
3112 }
3113 None => {
3114 }
3116 }
3117 }
3118 }
3119 }
3120
3121 async fn wait_for_pending_by_component_id(
3124 &self,
3125 pending_at_or_sooner: DateTime<Utc>,
3126 component_id: &ComponentId,
3127 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3128 ) {
3129 let unique_tag: u64 = rand::random();
3130 let (sender, mut receiver) = mpsc::channel(1); {
3132 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3133 pending_subscribers.insert_by_component(
3134 component_id.input_digest.clone(),
3135 (sender.clone(), unique_tag),
3136 );
3137 }
3138 async {
3139 let Ok(execution_ids_versions) = self
3140 .transaction(
3141 {
3142 let input_digest =component_id.input_digest.clone();
3143 move |conn| Self::get_pending_by_component_input_digest(conn, 1, pending_at_or_sooner, &input_digest)
3144 },
3145 "get_pending_by_component_input_digest",
3146 )
3147 .await
3148 else {
3149 trace!(
3150 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3151 );
3152 timeout_fut.await;
3153 return;
3154 };
3155 if !execution_ids_versions.is_empty() {
3156 trace!("Not waiting, database already contains new pending executions");
3157 return;
3158 }
3159 tokio::select! { _ = receiver.recv() => {
3161 trace!("Received a notification");
3162 }
3163 () = timeout_fut => {
3164 }
3165 }
3166 }.await;
3167 {
3169 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3170
3171 match pending_subscribers.remove_by_component(&component_id.input_digest) {
3172 Some((_, tag)) if tag == unique_tag => {
3173 }
3175 Some(other) => {
3176 pending_subscribers
3178 .insert_by_component(component_id.input_digest.clone(), other);
3179 }
3180 None => {
3181 }
3183 }
3184 }
3185 }
3186
3187 async fn get_last_execution_event(
3188 &self,
3189 execution_id: &ExecutionId,
3190 ) -> Result<ExecutionEvent, DbErrorRead> {
3191 let execution_id = execution_id.clone();
3192 self.transaction(
3193 move |tx| Self::get_last_execution_event(tx, &execution_id),
3194 "get_last_execution_event",
3195 )
3196 .await
3197 }
3198}
3199
3200#[async_trait]
3201impl DbConnection for SqlitePool {
3202 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3203 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3204 debug!("create");
3205 trace!(?req, "create");
3206 let created_at = req.created_at;
3207 let (version, notifier) = self
3208 .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
3209 .await?;
3210 self.notify_all(vec![notifier], created_at);
3211 Ok(version)
3212 }
3213
3214 #[instrument(level = Level::DEBUG, skip(self, batch))]
3215 async fn append_batch(
3216 &self,
3217 current_time: DateTime<Utc>,
3218 batch: Vec<AppendRequest>,
3219 execution_id: ExecutionId,
3220 version: Version,
3221 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3222 debug!("append_batch");
3223 trace!(?batch, "append_batch");
3224 assert!(!batch.is_empty(), "Empty batch request");
3225
3226 let (version, notifier) = self
3227 .transaction(
3228 move |tx| {
3229 let mut version = version.clone();
3230 let mut notifier = None;
3231 for append_request in &batch {
3232 let (v, n) =
3233 Self::append(tx, &execution_id, append_request.clone(), version)?;
3234 version = v;
3235 notifier = Some(n);
3236 }
3237 Ok::<_, DbErrorWrite>((
3238 version,
3239 notifier.expect("checked that the batch is not empty"),
3240 ))
3241 },
3242 "append_batch",
3243 )
3244 .await?;
3245
3246 self.notify_all(vec![notifier], current_time);
3247 Ok(version)
3248 }
3249
3250 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3251 async fn append_batch_create_new_execution(
3252 &self,
3253 current_time: DateTime<Utc>,
3254 batch: Vec<AppendRequest>,
3255 execution_id: ExecutionId,
3256 version: Version,
3257 child_req: Vec<CreateRequest>,
3258 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3259 debug!("append_batch_create_new_execution");
3260 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3261 assert!(!batch.is_empty(), "Empty batch request");
3262
3263 let (version, notifiers) = self
3264 .transaction(
3265 move |tx| {
3266 let mut notifier = None;
3267 let mut version = version.clone();
3268 for append_request in &batch {
3269 let (v, n) =
3270 Self::append(tx, &execution_id, append_request.clone(), version)?;
3271 version = v;
3272 notifier = Some(n);
3273 }
3274 let mut notifiers = Vec::new();
3275 notifiers.push(notifier.expect("checked that the batch is not empty"));
3276
3277 for child_req in &child_req {
3278 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
3279 notifiers.push(notifier);
3280 }
3281 Ok::<_, DbErrorWrite>((version, notifiers))
3282 },
3283 "append_batch_create_new_execution_inner",
3284 )
3285 .await?;
3286 self.notify_all(notifiers, current_time);
3287 Ok(version)
3288 }
3289
3290 #[cfg(feature = "test")]
3291 #[instrument(level = Level::DEBUG, skip(self))]
3292 async fn get(
3293 &self,
3294 execution_id: &ExecutionId,
3295 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3296 trace!("get");
3297 let execution_id = execution_id.clone();
3298 self.transaction(move |tx| Self::get(tx, &execution_id), "get")
3299 .await
3300 }
3301
3302 #[instrument(level = Level::DEBUG, skip(self))]
3303 async fn list_execution_events(
3304 &self,
3305 execution_id: &ExecutionId,
3306 since: &Version,
3307 max_length: VersionType,
3308 include_backtrace_id: bool,
3309 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3310 let execution_id = execution_id.clone();
3311 let since = since.0;
3312 self.transaction(
3313 move |tx| {
3314 Self::list_execution_events(
3315 tx,
3316 &execution_id,
3317 since,
3318 since + max_length,
3319 include_backtrace_id,
3320 )
3321 },
3322 "get",
3323 )
3324 .await
3325 }
3326
3327 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3331 async fn subscribe_to_next_responses(
3332 &self,
3333 execution_id: &ExecutionId,
3334 start_idx: usize,
3335 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3336 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3337 debug!("next_responses");
3338 let unique_tag: u64 = rand::random();
3339 let execution_id = execution_id.clone();
3340
3341 let cleanup = || {
3342 let mut guard = self.0.response_subscribers.lock().unwrap();
3343 match guard.remove(&execution_id) {
3344 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
3346 guard.insert(execution_id.clone(), other);
3348 }
3349 None => {} }
3351 };
3352
3353 let response_subscribers = self.0.response_subscribers.clone();
3354 let resp_or_receiver = {
3355 let execution_id = execution_id.clone();
3356 self.transaction(
3357 move |tx| {
3358 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
3359 if responses.is_empty() {
3360 let (sender, receiver) = oneshot::channel();
3362 response_subscribers
3363 .lock()
3364 .unwrap()
3365 .insert(execution_id.clone(), (sender, unique_tag));
3366 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
3367 } else {
3368 Ok(itertools::Either::Left(responses))
3369 }
3370 },
3371 "subscribe_to_next_responses",
3372 )
3373 .await
3374 }
3375 .inspect_err(|_| {
3376 cleanup();
3377 })?;
3378 match resp_or_receiver {
3379 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3381 let res = async move {
3382 tokio::select! {
3383 resp = receiver => {
3384 let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
3385 Ok(vec![resp])
3386 }
3387 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3388 }
3389 }
3390 .await;
3391 cleanup();
3392 res
3393 }
3394 }
3395 }
3396
3397 async fn wait_for_finished_result(
3399 &self,
3400 execution_id: &ExecutionId,
3401 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
3402 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3403 let unique_tag: u64 = rand::random();
3404 let execution_id = execution_id.clone();
3405 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3406
3407 let cleanup = || {
3408 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3409 if let Some(subscribers) = guard.get_mut(&execution_id) {
3410 subscribers.remove(&unique_tag);
3411 }
3412 };
3413
3414 let resp_or_receiver = {
3415 let execution_id = execution_id.clone();
3416 self.transaction(move |tx| {
3417 let pending_state =
3418 Self::get_combined_state(tx, &execution_id)?.pending_state;
3419 if let PendingState::Finished { finished, .. } = pending_state {
3420 let event =
3421 Self::get_execution_event(tx, &execution_id, finished.version)?;
3422 if let ExecutionRequest::Finished { result, ..} = event.event {
3423 Ok(itertools::Either::Left(result))
3424 } else {
3425 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3426 Err(DbErrorReadWithTimeout::from(consistency_db_err(
3427 "cannot get finished event based on t_state version"
3428 )))
3429 }
3430 } else {
3431 let (sender, receiver) = oneshot::channel();
3435 let mut guard = execution_finished_subscription.lock().unwrap();
3436 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3437 Ok(itertools::Either::Right(receiver))
3438 }
3439 }, "wait_for_finished_result")
3440 .await
3441 }
3442 .inspect_err(|_| {
3443 cleanup();
3447 })?;
3448
3449 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3450 match resp_or_receiver {
3451 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3453 let res = async move {
3454 tokio::select! {
3455 resp = receiver => {
3456 Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3457 }
3458 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3459 }
3460 }
3461 .await;
3462 cleanup();
3463 res
3464 }
3465 }
3466 }
3467
3468 #[cfg(feature = "test")]
3469 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3470 async fn append_response(
3471 &self,
3472 created_at: DateTime<Utc>,
3473 execution_id: ExecutionId,
3474 response_event: JoinSetResponseEvent,
3475 ) -> Result<(), DbErrorWrite> {
3476 debug!("append_response");
3477 let event = JoinSetResponseEventOuter {
3478 created_at,
3479 event: response_event,
3480 };
3481 let notifier = self
3482 .transaction(
3483 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3484 "append_response",
3485 )
3486 .await?;
3487 self.notify_all(vec![notifier], created_at);
3488 Ok(())
3489 }
3490
3491 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3492 async fn append_delay_response(
3493 &self,
3494 created_at: DateTime<Utc>,
3495 execution_id: ExecutionId,
3496 join_set_id: JoinSetId,
3497 delay_id: DelayId,
3498 result: Result<(), ()>,
3499 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3500 debug!("append_delay_response");
3501 let event = JoinSetResponseEventOuter {
3502 created_at,
3503 event: JoinSetResponseEvent {
3504 join_set_id,
3505 event: JoinSetResponse::DelayFinished {
3506 delay_id: delay_id.clone(),
3507 result,
3508 },
3509 },
3510 };
3511 let res = self
3512 .transaction(
3513 {
3514 let execution_id = execution_id.clone();
3515 move |tx| Self::append_response(tx, &execution_id, event.clone())
3516 },
3517 "append_delay_response",
3518 )
3519 .await;
3520 match res {
3521 Ok(notifier) => {
3522 self.notify_all(vec![notifier], created_at);
3523 Ok(AppendDelayResponseOutcome::Success)
3524 }
3525 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3526 let delay_success = self
3527 .transaction(
3528 move |tx| Self::delay_response(tx, &execution_id, &delay_id),
3529 "delay_response",
3530 )
3531 .await?;
3532 match delay_success {
3533 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3534 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3535 None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized(
3536 "insert failed yet select did not find the response".into(),
3537 ))),
3538 }
3539 }
3540 Err(err) => Err(err),
3541 }
3542 }
3543
3544 #[instrument(level = Level::DEBUG, skip_all)]
3545 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3546 debug!("append_backtrace");
3547 self.transaction(
3548 move |tx| Self::append_backtrace(tx, &append),
3549 "append_backtrace",
3550 )
3551 .await
3552 }
3553
3554 #[instrument(level = Level::DEBUG, skip_all)]
3555 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3556 debug!("append_backtrace_batch");
3557 self.transaction(
3558 move |tx| {
3559 for append in &batch {
3560 Self::append_backtrace(tx, append)?;
3561 }
3562 Ok(())
3563 },
3564 "append_backtrace",
3565 )
3566 .await
3567 }
3568
3569 #[instrument(level = Level::DEBUG, skip_all)]
3570 async fn get_backtrace(
3571 &self,
3572 execution_id: &ExecutionId,
3573 filter: BacktraceFilter,
3574 ) -> Result<BacktraceInfo, DbErrorRead> {
3575 debug!("get_last_backtrace");
3576 let execution_id = execution_id.clone();
3577
3578 self.transaction(
3579 move |tx| {
3580 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3581 WHERE execution_id = :execution_id";
3582 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3583 let select = match &filter {
3584 BacktraceFilter::Specific(version) =>{
3585 params.push((":version", Box::new(version.0)));
3586 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3587 },
3588 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3589 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3590 };
3591 tx
3592 .prepare(&select)
3593 ?
3594 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3595 params
3596 .iter()
3597 .map(|(key, value)| (*key, value.as_ref()))
3598 .collect::<Vec<_>>()
3599 .as_ref(),
3600 |row| {
3601 Ok(BacktraceInfo {
3602 execution_id: execution_id.clone(),
3603 component_id: row.get::<_, JsonWrapper<_> >("component_id")?.0,
3604 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3605 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3606 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3607 })
3608 },
3609 ).map_err(DbErrorRead::from)
3610 },
3611 "get_last_backtrace",
3612 ).await
3613 }
3614
3615 #[instrument(level = Level::TRACE, skip(self))]
3617 async fn get_expired_timers(
3618 &self,
3619 at: DateTime<Utc>,
3620 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3621 self.transaction(
3622 move |conn| {
3623 let mut expired_timers = conn.prepare(
3624 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3625 )?
3626 .query_map(
3627 named_params! {
3628 ":at": at,
3629 },
3630 |row| {
3631 let execution_id = row.get("execution_id")?;
3632 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3633 let delay_id = row.get::<_, DelayId>("delay_id")?;
3634 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3635 Ok(ExpiredTimer::Delay(delay))
3636 },
3637 )?
3638 .collect::<Result<Vec<_>, _>>()?;
3639 let expired = conn.prepare(&format!(r#"
3641 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3642 executor_id, run_id
3643 FROM t_state
3644 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3645 "#
3646 )
3647 )?
3648 .query_map(
3649 named_params! {
3650 ":at": at,
3651 },
3652 |row| {
3653 let execution_id = row.get("execution_id")?;
3654 let locked_at_version = Version::new(row.get("last_lock_version")?);
3655 let next_version = Version::new(row.get("corresponding_version")?).increment();
3656 let intermittent_event_count = row.get("intermittent_event_count")?;
3657 let max_retries = row.get("max_retries")?;
3658 let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3659 let executor_id = row.get("executor_id")?;
3660 let run_id = row.get("run_id")?;
3661 let lock = ExpiredLock {
3662 execution_id,
3663 locked_at_version,
3664 next_version,
3665 intermittent_event_count,
3666 max_retries,
3667 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3668 locked_by: LockedBy { executor_id, run_id },
3669 };
3670 Ok(ExpiredTimer::Lock(lock))
3671 }
3672 )?
3673 .collect::<Result<Vec<_>, _>>()?;
3674 expired_timers.extend(expired);
3675 if !expired_timers.is_empty() {
3676 debug!("get_expired_timers found {expired_timers:?}");
3677 }
3678 Ok(expired_timers)
3679 }, "get_expired_timers"
3680 )
3681 .await
3682 }
3683
3684 async fn get_execution_event(
3685 &self,
3686 execution_id: &ExecutionId,
3687 version: &Version,
3688 ) -> Result<ExecutionEvent, DbErrorRead> {
3689 let version = version.0;
3690 let execution_id = execution_id.clone();
3691 self.transaction(
3692 move |tx| Self::get_execution_event(tx, &execution_id, version),
3693 "get_execution_event",
3694 )
3695 .await
3696 }
3697
3698 async fn get_pending_state(
3699 &self,
3700 execution_id: &ExecutionId,
3701 ) -> Result<PendingState, DbErrorRead> {
3702 let execution_id = execution_id.clone();
3703 Ok(self
3704 .transaction(
3705 move |tx| Self::get_combined_state(tx, &execution_id),
3706 "get_pending_state",
3707 )
3708 .await?
3709 .pending_state)
3710 }
3711
3712 async fn list_executions(
3713 &self,
3714 ffqn: Option<FunctionFqn>,
3715 top_level_only: bool,
3716 pagination: ExecutionListPagination,
3717 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3718 self.transaction(
3719 move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3720 "list_executions",
3721 )
3722 .await
3723 }
3724
3725 async fn list_responses(
3726 &self,
3727 execution_id: &ExecutionId,
3728 pagination: Pagination<u32>,
3729 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3730 let execution_id = execution_id.clone();
3731 self.transaction(
3732 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3733 "list_responses",
3734 )
3735 .await
3736 }
3737
3738 async fn upgrade_execution_component(
3739 &self,
3740 execution_id: &ExecutionId,
3741 old: &InputContentDigest,
3742 new: &InputContentDigest,
3743 ) -> Result<(), DbErrorWrite> {
3744 let execution_id = execution_id.clone();
3745 let old = old.clone();
3746 let new = new.clone();
3747 self.transaction(
3748 move |tx| Self::upgrade_execution_component(tx, &execution_id, &old, &new),
3749 "upgrade_execution_component",
3750 )
3751 .await
3752 }
3753}
3754
3755#[cfg(any(test, feature = "tempfile"))]
3756pub mod tempfile {
3757 use super::{SqliteConfig, SqlitePool};
3758 use tempfile::NamedTempFile;
3759
3760 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3761 if let Ok(path) = std::env::var("SQLITE_FILE") {
3762 (
3763 SqlitePool::new(path, SqliteConfig::default())
3764 .await
3765 .unwrap(),
3766 None,
3767 )
3768 } else {
3769 let file = NamedTempFile::new().unwrap();
3770 let path = file.path();
3771 (
3772 SqlitePool::new(path, SqliteConfig::default())
3773 .await
3774 .unwrap(),
3775 Some(file),
3776 )
3777 }
3778 }
3779}