1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6 SupportedFunctionReturnValue,
7 component_id::InputContentDigest,
8 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14 DbPool, DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15 ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT,
16 HistoryEvent, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
17 JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy, LockedExecution,
18 Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
19 PendingStateLocked, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
20 STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType,
21 },
22};
23use conversions::{JsonWrapper, consistency_db_err, consistency_rusqlite};
24use hashbrown::HashMap;
25use rusqlite::{
26 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
27 TransactionBehavior, named_params, types::ToSqlOutput,
28};
29use std::{
30 cmp::max,
31 collections::VecDeque,
32 fmt::Debug,
33 ops::DerefMut,
34 path::Path,
35 str::FromStr,
36 sync::{
37 Arc, Mutex,
38 atomic::{AtomicBool, Ordering},
39 },
40 time::Duration,
41};
42use std::{fmt::Write as _, pin::Pin};
43use tokio::{
44 sync::{mpsc, oneshot},
45 time::Instant,
46};
47use tracing::{Level, debug, error, info, instrument, trace, warn};
48
49#[derive(Debug, thiserror::Error)]
50#[error("initialization error")]
51pub struct InitializationError;
52
53#[derive(Debug, Clone)]
54struct DelayReq {
55 join_set_id: JoinSetId,
56 delay_id: DelayId,
57 expires_at: DateTime<Utc>,
58}
59const PRAGMA: [[&str; 2]; 10] = [
71 ["journal_mode", "wal"],
72 ["synchronous", "FULL"],
73 ["foreign_keys", "true"],
74 ["busy_timeout", "1000"],
75 ["cache_size", "10000"], ["temp_store", "MEMORY"],
77 ["page_size", "8192"], ["mmap_size", "134217728"],
79 ["journal_size_limit", "67108864"],
80 ["integrity_check", ""],
81];
82
83const CREATE_TABLE_T_METADATA: &str = r"
85CREATE TABLE IF NOT EXISTS t_metadata (
86 id INTEGER PRIMARY KEY AUTOINCREMENT,
87 schema_version INTEGER NOT NULL,
88 created_at TEXT NOT NULL
89) STRICT
90";
91const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 5;
92
93const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
95CREATE TABLE IF NOT EXISTS t_execution_log (
96 execution_id TEXT NOT NULL,
97 created_at TEXT NOT NULL,
98 json_value TEXT NOT NULL,
99 version INTEGER NOT NULL,
100 variant TEXT NOT NULL,
101 join_set_id TEXT,
102 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
103 PRIMARY KEY (execution_id, version)
104) STRICT
105";
106const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
108CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
109";
110const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
112CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
113";
114
115const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
117 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type=\"{}\";",
118 HISTORY_EVENT_TYPE_JOIN_NEXT
119);
120
121const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
126CREATE TABLE IF NOT EXISTS t_join_set_response (
127 id INTEGER PRIMARY KEY AUTOINCREMENT,
128 created_at TEXT NOT NULL,
129 execution_id TEXT NOT NULL,
130 join_set_id TEXT NOT NULL,
131
132 delay_id TEXT,
133 delay_success INTEGER,
134
135 child_execution_id TEXT,
136 finished_version INTEGER,
137
138 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
139) STRICT
140";
141const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
143CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
144";
145const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
147CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
148ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
149";
150const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
152CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
153ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
154";
155
156const CREATE_TABLE_T_STATE: &str = r"
166CREATE TABLE IF NOT EXISTS t_state (
167 execution_id TEXT NOT NULL,
168 is_top_level INTEGER NOT NULL,
169 corresponding_version INTEGER NOT NULL,
170 ffqn TEXT NOT NULL,
171 created_at TEXT NOT NULL,
172 component_id_input_digest BLOB NOT NULL,
173 first_scheduled_at TEXT NOT NULL,
174
175 pending_expires_finished TEXT NOT NULL,
176 state TEXT NOT NULL,
177 updated_at TEXT NOT NULL,
178 intermittent_event_count INTEGER NOT NULL,
179
180 max_retries INTEGER,
181 retry_exp_backoff_millis INTEGER,
182 last_lock_version INTEGER,
183 executor_id TEXT,
184 run_id TEXT,
185
186 join_set_id TEXT,
187 join_set_closing INTEGER,
188
189 result_kind TEXT,
190
191 PRIMARY KEY (execution_id)
192) STRICT
193";
194
195const IDX_T_STATE_LOCK_PENDING: &str = r"
197CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
198";
199const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
200CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
201";
202const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
204";
205const IDX_T_STATE_FFQN: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
208";
209const IDX_T_STATE_CREATED_AT: &str = r"
211CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
212";
213
214const CREATE_TABLE_T_DELAY: &str = r"
216CREATE TABLE IF NOT EXISTS t_delay (
217 execution_id TEXT NOT NULL,
218 join_set_id TEXT NOT NULL,
219 delay_id TEXT NOT NULL,
220 expires_at TEXT NOT NULL,
221 PRIMARY KEY (execution_id, join_set_id, delay_id)
222) STRICT
223";
224
225const CREATE_TABLE_T_BACKTRACE: &str = r"
227CREATE TABLE IF NOT EXISTS t_backtrace (
228 execution_id TEXT NOT NULL,
229 component_id TEXT NOT NULL,
230 version_min_including INTEGER NOT NULL,
231 version_max_excluding INTEGER NOT NULL,
232 wasm_backtrace TEXT NOT NULL,
233 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
234) STRICT
235";
236const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
238CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
239";
240
241#[derive(Debug, thiserror::Error, Clone)]
242enum RusqliteError {
243 #[error("not found")]
244 NotFound,
245 #[error("generic: {0}")]
246 Generic(StrVariant),
247}
248
249mod conversions {
250
251 use super::RusqliteError;
252 use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
253 use rusqlite::{
254 ToSql,
255 types::{FromSql, FromSqlError},
256 };
257 use std::fmt::Debug;
258 use tracing::error;
259
260 impl From<rusqlite::Error> for RusqliteError {
261 fn from(err: rusqlite::Error) -> Self {
262 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
263 RusqliteError::NotFound
264 } else {
265 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
266 RusqliteError::Generic(err.to_string().into())
267 }
268 }
269 }
270
271 impl From<RusqliteError> for DbErrorGeneric {
272 fn from(err: RusqliteError) -> DbErrorGeneric {
273 match err {
274 RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
275 RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
276 }
277 }
278 }
279 impl From<RusqliteError> for DbErrorRead {
280 fn from(err: RusqliteError) -> Self {
281 if matches!(err, RusqliteError::NotFound) {
282 Self::NotFound
283 } else {
284 Self::from(DbErrorGeneric::from(err))
285 }
286 }
287 }
288 impl From<RusqliteError> for DbErrorReadWithTimeout {
289 fn from(err: RusqliteError) -> Self {
290 Self::from(DbErrorRead::from(err))
291 }
292 }
293 impl From<RusqliteError> for DbErrorWrite {
294 fn from(err: RusqliteError) -> Self {
295 if matches!(err, RusqliteError::NotFound) {
296 Self::NotFound
297 } else {
298 Self::from(DbErrorGeneric::from(err))
299 }
300 }
301 }
302
303 pub(crate) struct JsonWrapper<T>(pub(crate) T);
304 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
305 fn column_result(
306 value: rusqlite::types::ValueRef<'_>,
307 ) -> rusqlite::types::FromSqlResult<Self> {
308 let value = match value {
309 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
310 Ok(value)
311 }
312 other => {
313 error!(
314 backtrace = %std::backtrace::Backtrace::capture(),
315 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
316 );
317 Err(FromSqlError::InvalidType)
318 }
319 }?;
320 let value = serde_json::from_slice::<T>(value).map_err(|err| {
321 error!(
322 backtrace = %std::backtrace::Backtrace::capture(),
323 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
324 r#type = std::any::type_name::<T>()
325 );
326 FromSqlError::InvalidType
327 })?;
328 Ok(Self(value))
329 }
330 }
331 impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
332 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
333 let string = serde_json::to_string(&self.0).map_err(|err| {
334 error!(
335 "Cannot serialize {value:?} of type `{type}` - {err:?}",
336 value = self.0,
337 r#type = std::any::type_name::<T>()
338 );
339 rusqlite::Error::ToSqlConversionFailure(Box::new(err))
340 })?;
341 Ok(rusqlite::types::ToSqlOutput::Owned(
342 rusqlite::types::Value::Text(string),
343 ))
344 }
345 }
346
347 #[derive(Debug, thiserror::Error)]
349 #[error("{0}")]
350 pub(crate) struct OtherError(&'static str);
351 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
352 FromSqlError::other(OtherError(input)).into()
353 }
354
355 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
356 DbErrorGeneric::Uncategorized(input.into())
357 }
358}
359
360#[derive(Debug)]
361struct CombinedStateDTO {
362 state: String,
363 ffqn: String,
364 component_id_input_digest: InputContentDigest,
365 pending_expires_finished: DateTime<Utc>,
366 last_lock_version: Option<Version>,
368 executor_id: Option<ExecutorId>,
369 run_id: Option<RunId>,
370 join_set_id: Option<JoinSetId>,
372 join_set_closing: Option<bool>,
373 result_kind: Option<PendingStateFinishedResultKind>,
375}
376#[derive(Debug)]
377struct CombinedState {
378 ffqn: FunctionFqn,
379 pending_state: PendingState,
380 corresponding_version: Version,
381}
382impl CombinedState {
383 fn get_next_version_assert_not_finished(&self) -> Version {
384 assert!(!self.pending_state.is_finished());
385 self.corresponding_version.increment()
386 }
387
388 #[cfg(feature = "test")]
389 fn get_next_version_or_finished(&self) -> Version {
390 if self.pending_state.is_finished() {
391 self.corresponding_version.clone()
392 } else {
393 self.corresponding_version.increment()
394 }
395 }
396}
397
398#[derive(Debug)]
399struct NotifierPendingAt {
400 scheduled_at: DateTime<Utc>,
401 ffqn: FunctionFqn,
402 component_input_digest: InputContentDigest,
403}
404
405#[derive(Debug)]
406struct NotifierExecutionFinished {
407 execution_id: ExecutionId,
408 retval: SupportedFunctionReturnValue,
409}
410
411#[derive(Debug, Default)]
412struct AppendNotifier {
413 pending_at: Option<NotifierPendingAt>,
414 execution_finished: Option<NotifierExecutionFinished>,
415 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
416}
417
418impl CombinedState {
419 fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, rusqlite::Error> {
420 let ffqn = FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
421 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
422 consistency_rusqlite("invalid ffqn value in `t_state`")
423 })?;
424 let pending_state = match dto {
425 CombinedStateDTO {
427 state,
428 ffqn: _,
429 component_id_input_digest,
430 pending_expires_finished: scheduled_at,
431 last_lock_version: None,
432 executor_id: None,
433 run_id: None,
434 join_set_id: None,
435 join_set_closing: None,
436 result_kind: None,
437 } if state == STATE_PENDING_AT => PendingState::PendingAt {
438 scheduled_at,
439 last_lock: None,
440 component_id_input_digest,
441 },
442 CombinedStateDTO {
444 state,
445 ffqn: _,
446 component_id_input_digest,
447 pending_expires_finished: scheduled_at,
448 last_lock_version: None,
449 executor_id: Some(executor_id),
450 run_id: Some(run_id),
451 join_set_id: None,
452 join_set_closing: None,
453 result_kind: None,
454 } if state == STATE_PENDING_AT => PendingState::PendingAt {
455 scheduled_at,
456 last_lock: Some(LockedBy {
457 executor_id,
458 run_id,
459 }),
460 component_id_input_digest,
461 },
462 CombinedStateDTO {
463 state,
464 ffqn: _,
465 component_id_input_digest,
466 pending_expires_finished: lock_expires_at,
467 last_lock_version: Some(_),
468 executor_id: Some(executor_id),
469 run_id: Some(run_id),
470 join_set_id: None,
471 join_set_closing: None,
472 result_kind: None,
473 } if state == STATE_LOCKED => PendingState::Locked(PendingStateLocked {
474 locked_by: LockedBy {
475 executor_id,
476 run_id,
477 },
478 lock_expires_at,
479 component_id_input_digest,
480 }),
481 CombinedStateDTO {
482 state,
483 ffqn: _,
484 component_id_input_digest,
485 pending_expires_finished: lock_expires_at,
486 last_lock_version: None,
487 executor_id: _,
488 run_id: _,
489 join_set_id: Some(join_set_id),
490 join_set_closing: Some(join_set_closing),
491 result_kind: None,
492 } if state == STATE_BLOCKED_BY_JOIN_SET => PendingState::BlockedByJoinSet {
493 join_set_id: join_set_id.clone(),
494 closing: join_set_closing,
495 lock_expires_at,
496 component_id_input_digest,
497 },
498 CombinedStateDTO {
499 state,
500 ffqn: _,
501 component_id_input_digest,
502 pending_expires_finished: finished_at,
503 last_lock_version: None,
504 executor_id: None,
505 run_id: None,
506 join_set_id: None,
507 join_set_closing: None,
508 result_kind: Some(result_kind),
509 } if state == STATE_FINISHED => PendingState::Finished {
510 finished: PendingStateFinished {
511 finished_at,
512 version: corresponding_version.0,
513 result_kind,
514 },
515 component_id_input_digest,
516 },
517
518 _ => {
519 error!("Cannot deserialize pending state from {dto:?}");
520 return Err(consistency_rusqlite("invalid `t_state`"));
521 }
522 };
523 Ok(Self {
524 ffqn,
525 pending_state,
526 corresponding_version,
527 })
528 }
529}
530
531#[derive(derive_more::Debug)]
532struct LogicalTx {
533 #[debug(skip)]
534 func: Box<dyn FnMut(&mut Transaction) + Send>,
535 sent_at: Instant,
536 func_name: &'static str,
537 #[debug(skip)]
538 commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
539}
540
541#[derive(derive_more::Debug)]
542enum ThreadCommand {
543 LogicalTx(LogicalTx),
544 Shutdown,
545}
546
547#[derive(Clone)]
548pub struct SqlitePool(SqlitePoolInner);
549
550type ResponseSubscribers =
551 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
552type PendingSubscribers = Arc<Mutex<PendingFfqnSubscribersHolder>>;
553type ExecutionFinishedSubscribers =
554 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
555
556#[derive(Default)]
557struct PendingFfqnSubscribersHolder {
558 by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
559 by_component: HashMap<InputContentDigest , (mpsc::Sender<()>, u64)>,
560}
561impl PendingFfqnSubscribersHolder {
562 fn notify(&self, notifier: &NotifierPendingAt) {
563 if let Some((subscription, _)) = self.by_ffqns.get(¬ifier.ffqn) {
564 debug!("Notifying pending subscriber by ffqn");
565 let _ = subscription.try_send(());
567 }
568 if let Some((subscription, _)) = self.by_component.get(¬ifier.component_input_digest) {
569 debug!("Notifying pending subscriber by component");
570 let _ = subscription.try_send(());
572 }
573 }
574
575 fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
576 self.by_ffqns.insert(ffqn, value);
577 }
578
579 fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
580 self.by_ffqns.remove(ffqn)
581 }
582
583 fn insert_by_component(
584 &mut self,
585 input_content_digest: InputContentDigest,
586 value: (mpsc::Sender<()>, u64),
587 ) {
588 self.by_component.insert(input_content_digest, value);
589 }
590
591 fn remove_by_component(
592 &mut self,
593 input_content_digest: &InputContentDigest,
594 ) -> Option<(mpsc::Sender<()>, u64)> {
595 self.by_component.remove(input_content_digest)
596 }
597}
598
599#[derive(Clone)]
600struct SqlitePoolInner {
601 shutdown_requested: Arc<AtomicBool>,
602 shutdown_finished: Arc<AtomicBool>,
603 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
604 response_subscribers: ResponseSubscribers,
605 pending_subscribers: PendingSubscribers,
606 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
607 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
609
610#[async_trait]
611impl DbPoolCloseable for SqlitePool {
612 async fn close(&self) {
613 debug!("Sqlite is closing");
614 self.0.shutdown_requested.store(true, Ordering::Release);
615 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
617 while !self.0.shutdown_finished.load(Ordering::Acquire) {
618 tokio::time::sleep(Duration::from_millis(1)).await;
619 }
620 debug!("Sqlite was closed");
621 }
622}
623
624#[async_trait]
625impl DbPool for SqlitePool {
626 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
627 if self.0.shutdown_requested.load(Ordering::Acquire) {
628 return Err(DbErrorGeneric::Close);
629 }
630 Ok(Box::new(self.clone()))
631 }
632
633 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
634 if self.0.shutdown_requested.load(Ordering::Acquire) {
635 return Err(DbErrorGeneric::Close);
636 }
637 Ok(Box::new(self.clone()))
638 }
639 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
640 if self.0.shutdown_requested.load(Ordering::Acquire) {
641 return Err(DbErrorGeneric::Close);
642 }
643 Ok(Box::new(self.clone()))
644 }
645 #[cfg(feature = "test")]
646 async fn connection_test(
647 &self,
648 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
649 if self.0.shutdown_requested.load(Ordering::Acquire) {
650 return Err(DbErrorGeneric::Close);
651 }
652 Ok(Box::new(self.clone()))
653 }
654}
655
656impl Drop for SqlitePool {
657 fn drop(&mut self) {
658 let arc = self.0.join_handle.take().expect("join_handle was set");
659 if let Ok(join_handle) = Arc::try_unwrap(arc) {
660 if !join_handle.is_finished() {
662 if !self.0.shutdown_finished.load(Ordering::Acquire) {
663 let backtrace = std::backtrace::Backtrace::capture();
665 warn!("SqlitePool was not closed properly - {backtrace}");
666 self.0.shutdown_requested.store(true, Ordering::Release);
667 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
669 } else {
672 }
674 }
675 }
676 }
677}
678
679#[derive(Debug, Clone)]
680pub struct SqliteConfig {
681 pub queue_capacity: usize,
682 pub pragma_override: Option<HashMap<String, String>>,
683 pub metrics_threshold: Option<Duration>,
684}
685impl Default for SqliteConfig {
686 fn default() -> Self {
687 Self {
688 queue_capacity: 100,
689 pragma_override: None,
690 metrics_threshold: None,
691 }
692 }
693}
694
695impl SqlitePool {
696 fn init_thread(
697 path: &Path,
698 mut pragma_override: HashMap<String, String>,
699 ) -> Result<Connection, InitializationError> {
700 fn conn_execute<P: Params>(
701 conn: &Connection,
702 sql: &str,
703 params: P,
704 ) -> Result<(), InitializationError> {
705 conn.execute(sql, params).map(|_| ()).map_err(|err| {
706 error!("Cannot run `{sql}` - {err:?}");
707 InitializationError
708 })
709 }
710 fn pragma_update(
711 conn: &Connection,
712 name: &str,
713 value: &str,
714 ) -> Result<(), InitializationError> {
715 if value.is_empty() {
716 debug!("Querying PRAGMA {name}");
717 conn.pragma_query(None, name, |row| {
718 debug!("{row:?}");
719 Ok(())
720 })
721 .map_err(|err| {
722 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
723 InitializationError
724 })
725 } else {
726 debug!("Setting PRAGMA {name}={value}");
727 conn.pragma_update(None, name, value).map_err(|err| {
728 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
729 InitializationError
730 })
731 }
732 }
733
734 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
735 error!("cannot open the connection - {err:?}");
736 InitializationError
737 })?;
738
739 for [pragma_name, default_value] in PRAGMA {
740 let pragma_value = pragma_override
741 .remove(pragma_name)
742 .unwrap_or_else(|| default_value.to_string());
743 pragma_update(&conn, pragma_name, &pragma_value)?;
744 }
745 for (pragma_name, pragma_value) in pragma_override.drain() {
747 pragma_update(&conn, &pragma_name, &pragma_value)?;
748 }
749
750 {
752 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
753 let actual_version = conn
756 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
757 .map_err(|err| {
758 error!("cannot select schema version - {err:?}");
759 InitializationError
760 })?
761 .query_row([], |row| row.get::<_, u32>("schema_version"))
762 .optional()
763 .map_err(|err| {
764 error!("Cannot read the schema version - {err:?}");
765 InitializationError
766 })?;
767
768 match actual_version {
769 None => conn_execute(
770 &conn,
771 &format!(
772 "INSERT INTO t_metadata (schema_version, created_at) VALUES
773 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
774 ),
775 [Utc::now()],
776 )?,
777 Some(actual_version) => {
778 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
780 error!(
781 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
782 );
783 return Err(InitializationError);
784 }
785 }
786 }
787 }
788
789 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
791 conn_execute(
792 &conn,
793 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
794 [],
795 )?;
796 conn_execute(
797 &conn,
798 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
799 [],
800 )?;
801 conn_execute(
802 &conn,
803 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
804 [],
805 )?;
806 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
808 conn_execute(
809 &conn,
810 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
811 [],
812 )?;
813 conn_execute(
814 &conn,
815 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
816 [],
817 )?;
818 conn_execute(
819 &conn,
820 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
821 [],
822 )?;
823 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
825 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
826 conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
827 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
828 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
829 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
830 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
832 conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
834 conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
835 Ok(conn)
836 }
837
838 fn connection_rpc(
839 mut conn: Connection,
840 shutdown_requested: &AtomicBool,
841 shutdown_finished: &AtomicBool,
842 mut command_rx: mpsc::Receiver<ThreadCommand>,
843 metrics_threshold: Option<Duration>,
844 ) {
845 let mut histograms = Histograms::new(metrics_threshold);
846 while Self::tick(
847 &mut conn,
848 shutdown_requested,
849 &mut command_rx,
850 &mut histograms,
851 )
852 .is_ok()
853 {
854 }
856 debug!("Closing command thread");
857 shutdown_finished.store(true, Ordering::Release);
858 }
859
860 fn tick(
861 conn: &mut Connection,
862 shutdown_requested: &AtomicBool,
863 command_rx: &mut mpsc::Receiver<ThreadCommand>,
864 histograms: &mut Histograms,
865 ) -> Result<(), ()> {
866 let mut ltx_list = Vec::new();
867 let mut ltx = match command_rx.blocking_recv() {
869 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
870 Some(ThreadCommand::Shutdown) => {
871 debug!("Shutdown message received");
872 return Err(());
873 }
874 None => {
875 debug!("command_rx was closed");
876 return Err(());
877 }
878 };
879 let all_fns_start = std::time::Instant::now();
880 let mut physical_tx = conn
881 .transaction_with_behavior(TransactionBehavior::Immediate)
882 .map_err(|begin_err| {
883 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
884 })?;
885 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
886 ltx_list.push(ltx);
887
888 while let Ok(more) = command_rx.try_recv() {
889 let mut ltx = match more {
890 ThreadCommand::Shutdown => {
891 debug!("Shutdown message received");
892 return Err(());
893 }
894 ThreadCommand::LogicalTx(ltx) => ltx,
895 };
896
897 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
898 ltx_list.push(ltx);
899 }
900 histograms.record_all_fns(all_fns_start.elapsed());
901
902 {
903 if shutdown_requested.load(Ordering::Relaxed) {
905 debug!("Recveived shutdown during processing of the batch");
906 return Err(());
907 }
908 let commit_result = {
909 let now = std::time::Instant::now();
910 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
911 histograms.record_commit(now.elapsed());
912 commit_result
913 };
914 if commit_result.is_ok() || ltx_list.len() == 1 {
915 for ltx in ltx_list {
917 let _ = ltx.commit_ack_sender.send(commit_result.clone());
919 }
920 } else {
921 warn!(
922 "Bulk transaction failed, committing {} transactions serially",
923 ltx_list.len()
924 );
925 for ltx in ltx_list {
928 Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
929 }
930 }
931 }
932 histograms.print_if_elapsed();
933 Ok(())
934 }
935
936 fn ltx_commit_single(
937 mut ltx: LogicalTx,
938 conn: &mut Connection,
939 shutdown_requested: &AtomicBool,
940 histograms: &mut Histograms,
941 ) -> Result<(), ()> {
942 let mut physical_tx = conn
943 .transaction_with_behavior(TransactionBehavior::Immediate)
944 .map_err(|begin_err| {
945 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
946 })?;
947 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
948 if shutdown_requested.load(Ordering::Relaxed) {
949 debug!("Recveived shutdown during processing of the batch");
950 return Err(());
951 }
952 let commit_result = {
953 let now = std::time::Instant::now();
954 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
955 histograms.record_commit(now.elapsed());
956 commit_result
957 };
958 let _ = ltx.commit_ack_sender.send(commit_result);
960 Ok(())
961 }
962
963 fn ltx_apply_to_tx(
964 ltx: &mut LogicalTx,
965 physical_tx: &mut Transaction,
966 histograms: &mut Histograms,
967 ) {
968 let sent_latency = ltx.sent_at.elapsed();
969 let started_at = Instant::now();
970 (ltx.func)(physical_tx);
971 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
972 }
973
974 #[instrument(skip_all, name = "sqlite_new")]
975 pub async fn new<P: AsRef<Path>>(
976 path: P,
977 config: SqliteConfig,
978 ) -> Result<Self, InitializationError> {
979 let path = path.as_ref().to_owned();
980
981 let shutdown_requested = Arc::new(AtomicBool::new(false));
982 let shutdown_finished = Arc::new(AtomicBool::new(false));
983
984 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
985 info!("Sqlite database location: {path:?}");
986 let join_handle = {
987 let init_task = {
989 tokio::task::spawn_blocking(move || {
990 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
991 })
992 .await
993 };
994 let conn = match init_task {
995 Ok(res) => res?,
996 Err(join_err) => {
997 error!("Initialization panic - {join_err:?}");
998 return Err(InitializationError);
999 }
1000 };
1001 let shutdown_requested = shutdown_requested.clone();
1002 let shutdown_finished = shutdown_finished.clone();
1003 std::thread::spawn(move || {
1005 Self::connection_rpc(
1006 conn,
1007 &shutdown_requested,
1008 &shutdown_finished,
1009 command_rx,
1010 config.metrics_threshold,
1011 );
1012 })
1013 };
1014 Ok(SqlitePool(SqlitePoolInner {
1015 shutdown_requested,
1016 shutdown_finished,
1017 command_tx,
1018 response_subscribers: Arc::default(),
1019 pending_subscribers: Arc::default(),
1020 join_handle: Some(Arc::new(join_handle)),
1021 execution_finished_subscribers: Arc::default(),
1022 }))
1023 }
1024
1025 async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
1027 where
1028 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1029 T: Send + 'static,
1030 E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
1031 {
1032 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
1033 let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
1034 let thread_command_func = {
1035 let fn_res = fn_res.clone();
1036 ThreadCommand::LogicalTx(LogicalTx {
1037 func: Box::new(move |tx| {
1038 let func_res = func(tx);
1039 *fn_res.lock().unwrap() = Some(func_res);
1040 }),
1041 sent_at: Instant::now(),
1042 func_name,
1043 commit_ack_sender,
1044 })
1045 };
1046 self.0
1047 .command_tx
1048 .send(thread_command_func)
1049 .await
1050 .map_err(|_send_err| DbErrorGeneric::Close)?;
1051
1052 match commit_ack_receiver.await {
1054 Ok(Ok(())) => {
1055 let mut guard = fn_res.lock().unwrap();
1056 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
1057 }
1058 Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
1059 Err(_) => Err(E::from(DbErrorGeneric::Close)),
1060 }
1061 }
1062
1063 fn fetch_created_event(
1064 conn: &Connection,
1065 execution_id: &ExecutionId,
1066 ) -> Result<CreateRequest, DbErrorRead> {
1067 let mut stmt = conn.prepare(
1068 "SELECT created_at, json_value FROM t_execution_log WHERE \
1069 execution_id = :execution_id AND version = 0",
1070 )?;
1071 let (created_at, event) = stmt.query_row(
1072 named_params! {
1073 ":execution_id": execution_id.to_string(),
1074 },
1075 |row| {
1076 let created_at = row.get("created_at")?;
1077 let event = row
1078 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1079 .map_err(|serde| {
1080 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
1081 consistency_rusqlite("cannot deserialize `Created` event")
1082 })?;
1083 Ok((created_at, event.0))
1084 },
1085 )?;
1086 if let ExecutionRequest::Created {
1087 ffqn,
1088 params,
1089 parent,
1090 scheduled_at,
1091 component_id,
1092 metadata,
1093 scheduled_by,
1094 } = event
1095 {
1096 Ok(CreateRequest {
1097 created_at,
1098 execution_id: execution_id.clone(),
1099 ffqn,
1100 params,
1101 parent,
1102 scheduled_at,
1103 component_id,
1104 metadata,
1105 scheduled_by,
1106 })
1107 } else {
1108 error!("Row with version=0 must be a `Created` event - {event:?}");
1109 Err(consistency_db_err("expected `Created` event").into())
1110 }
1111 }
1112
1113 fn check_expected_next_and_appending_version(
1114 expected_version: &Version,
1115 appending_version: &Version,
1116 ) -> Result<(), DbErrorWrite> {
1117 if *expected_version != *appending_version {
1118 debug!(
1119 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1120 );
1121 return Err(DbErrorWrite::NonRetriable(
1122 DbErrorWriteNonRetriable::VersionConflict {
1123 expected: expected_version.clone(),
1124 requested: appending_version.clone(),
1125 },
1126 ));
1127 }
1128 Ok(())
1129 }
1130
1131 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1132 fn create_inner(
1133 tx: &Transaction,
1134 req: CreateRequest,
1135 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1136 trace!("create_inner");
1137
1138 let version = Version::default();
1139 let execution_id = req.execution_id.clone();
1140 let execution_id_str = execution_id.to_string();
1141 let ffqn = req.ffqn.clone();
1142 let created_at = req.created_at;
1143 let scheduled_at = req.scheduled_at;
1144 let component_id = req.component_id.clone();
1145 let event = ExecutionRequest::from(req);
1146 let event_ser = serde_json::to_string(&event).map_err(|err| {
1147 error!("Cannot serialize {event:?} - {err:?}");
1148 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1149 })?;
1150 tx.prepare(
1151 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1152 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1153 ?
1154 .execute(named_params! {
1155 ":execution_id": &execution_id_str,
1156 ":created_at": created_at,
1157 ":version": version.0,
1158 ":json_value": event_ser,
1159 ":variant": event.variant(),
1160 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1161 })
1162 ?;
1163 let pending_at = {
1164 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1165 tx.prepare(
1166 r"
1167 INSERT INTO t_state (
1168 execution_id,
1169 is_top_level,
1170 corresponding_version,
1171 pending_expires_finished,
1172 ffqn,
1173 state,
1174 created_at,
1175 component_id_input_digest,
1176 updated_at,
1177 first_scheduled_at,
1178 intermittent_event_count
1179 )
1180 VALUES (
1181 :execution_id,
1182 :is_top_level,
1183 :corresponding_version,
1184 :pending_expires_finished,
1185 :ffqn,
1186 :state,
1187 :created_at,
1188 :component_id_input_digest,
1189 CURRENT_TIMESTAMP,
1190 :first_scheduled_at,
1191 0
1192 )
1193 ",
1194 )?
1195 .execute(named_params! {
1196 ":execution_id": execution_id.to_string(),
1197 ":is_top_level": execution_id.is_top_level(),
1198 ":corresponding_version": version.0,
1199 ":pending_expires_finished": scheduled_at,
1200 ":ffqn": ffqn.to_string(),
1201 ":state": STATE_PENDING_AT,
1202 ":created_at": created_at,
1203 ":component_id_input_digest": component_id.input_digest,
1204 ":first_scheduled_at": scheduled_at,
1205 })?;
1206 AppendNotifier {
1207 pending_at: Some(NotifierPendingAt {
1208 scheduled_at,
1209 ffqn,
1210 component_input_digest: component_id.input_digest,
1211 }),
1212 execution_finished: None,
1213 response: None,
1214 }
1215 };
1216 let next_version = Version::new(version.0 + 1);
1217 Ok((next_version, pending_at))
1218 }
1219
1220 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1221 fn update_state_pending_after_response_appended(
1222 tx: &Transaction,
1223 execution_id: &ExecutionId,
1224 scheduled_at: DateTime<Utc>, corresponding_version: &Version, component_input_digest: InputContentDigest,
1227 ) -> Result<AppendNotifier, DbErrorWrite> {
1228 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1229 let mut stmt = tx
1230 .prepare_cached(
1231 r"
1232 UPDATE t_state
1233 SET
1234 corresponding_version = :corresponding_version,
1235 pending_expires_finished = :pending_expires_finished,
1236 state = :state,
1237 updated_at = CURRENT_TIMESTAMP,
1238
1239 last_lock_version = NULL,
1240
1241 join_set_id = NULL,
1242 join_set_closing = NULL,
1243
1244 result_kind = NULL
1245 WHERE execution_id = :execution_id
1246 ",
1247 )
1248 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1249 let updated = stmt
1250 .execute(named_params! {
1251 ":execution_id": execution_id,
1252 ":corresponding_version": corresponding_version.0,
1253 ":pending_expires_finished": scheduled_at,
1254 ":state": STATE_PENDING_AT,
1255 })
1256 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1257 if updated != 1 {
1258 return Err(DbErrorWrite::NotFound);
1259 }
1260 Ok(AppendNotifier {
1261 pending_at: Some(NotifierPendingAt {
1262 scheduled_at,
1263 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1264 component_input_digest,
1265 }),
1266 execution_finished: None,
1267 response: None,
1268 })
1269 }
1270
1271 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1272 fn update_state_pending_after_event_appended(
1273 tx: &Transaction,
1274 execution_id: &ExecutionId,
1275 appending_version: &Version,
1276 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1278 component_input_digest: InputContentDigest,
1279 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1280 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1281 let mut stmt = tx
1284 .prepare_cached(
1285 r"
1286 UPDATE t_state
1287 SET
1288 corresponding_version = :appending_version,
1289 pending_expires_finished = :pending_expires_finished,
1290 state = :state,
1291 updated_at = CURRENT_TIMESTAMP,
1292 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1293
1294 last_lock_version = NULL,
1295
1296 join_set_id = NULL,
1297 join_set_closing = NULL,
1298
1299 result_kind = NULL
1300 WHERE execution_id = :execution_id;
1301 ",
1302 )
1303 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1304 let updated = stmt
1305 .execute(named_params! {
1306 ":execution_id": execution_id.to_string(),
1307 ":appending_version": appending_version.0,
1308 ":pending_expires_finished": scheduled_at,
1309 ":state": STATE_PENDING_AT,
1310 ":intermittent_delta": i32::from(intermittent_failure) })
1312 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1313 if updated != 1 {
1314 return Err(DbErrorWrite::NotFound);
1315 }
1316 Ok((
1317 appending_version.increment(),
1318 AppendNotifier {
1319 pending_at: Some(NotifierPendingAt {
1320 scheduled_at,
1321 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1322 component_input_digest,
1323 }),
1324 execution_finished: None,
1325 response: None,
1326 },
1327 ))
1328 }
1329
1330 fn update_state_locked_get_intermittent_event_count(
1331 tx: &Transaction,
1332 execution_id: &ExecutionId,
1333 executor_id: ExecutorId,
1334 run_id: RunId,
1335 lock_expires_at: DateTime<Utc>,
1336 appending_version: &Version,
1337 retry_config: ComponentRetryConfig,
1338 ) -> Result<u32, DbErrorWrite> {
1339 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1340 let backoff_millis = i64::try_from(retry_config.retry_exp_backoff.as_millis())
1341 .map_err(|_| DbErrorGeneric::Uncategorized("backoff too big".into()))?; let execution_id_str = execution_id.to_string();
1343 let mut stmt = tx
1344 .prepare_cached(
1345 r"
1346 UPDATE t_state
1347 SET
1348 corresponding_version = :appending_version,
1349 pending_expires_finished = :pending_expires_finished,
1350 state = :state,
1351 updated_at = CURRENT_TIMESTAMP,
1352
1353 max_retries = :max_retries,
1354 retry_exp_backoff_millis = :retry_exp_backoff_millis,
1355 last_lock_version = :appending_version,
1356 executor_id = :executor_id,
1357 run_id = :run_id,
1358
1359 join_set_id = NULL,
1360 join_set_closing = NULL,
1361
1362 result_kind = NULL
1363 WHERE execution_id = :execution_id
1364 ",
1365 )
1366 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1367 let updated = stmt
1368 .execute(named_params! {
1369 ":execution_id": execution_id_str,
1370 ":appending_version": appending_version.0,
1371 ":pending_expires_finished": lock_expires_at,
1372 ":state": STATE_LOCKED,
1373 ":max_retries": retry_config.max_retries,
1374 ":retry_exp_backoff_millis": backoff_millis,
1375 ":executor_id": executor_id.to_string(),
1376 ":run_id": run_id.to_string(),
1377 })
1378 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1379 if updated != 1 {
1380 return Err(DbErrorWrite::NotFound);
1381 }
1382
1383 let intermittent_event_count = tx
1385 .prepare(
1386 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1387 )?
1388 .query_row(
1389 named_params! {
1390 ":execution_id": execution_id_str,
1391 },
1392 |row| {
1393 let intermittent_event_count = row.get("intermittent_event_count")?;
1394 Ok(intermittent_event_count)
1395 },
1396 )
1397 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1398
1399 Ok(intermittent_event_count)
1400 }
1401
1402 fn update_state_blocked(
1403 tx: &Transaction,
1404 execution_id: &ExecutionId,
1405 appending_version: &Version,
1406 join_set_id: &JoinSetId,
1408 lock_expires_at: DateTime<Utc>,
1409 join_set_closing: bool,
1410 ) -> Result<
1411 AppendResponse, DbErrorWrite,
1413 > {
1414 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1415 let execution_id_str = execution_id.to_string();
1416 let mut stmt = tx.prepare_cached(
1417 r"
1418 UPDATE t_state
1419 SET
1420 corresponding_version = :appending_version,
1421 pending_expires_finished = :pending_expires_finished,
1422 state = :state,
1423 updated_at = CURRENT_TIMESTAMP,
1424
1425 last_lock_version = NULL,
1426
1427 join_set_id = :join_set_id,
1428 join_set_closing = :join_set_closing,
1429
1430 result_kind = NULL
1431 WHERE execution_id = :execution_id
1432 ",
1433 )?;
1434 let updated = stmt.execute(named_params! {
1435 ":execution_id": execution_id_str,
1436 ":appending_version": appending_version.0,
1437 ":pending_expires_finished": lock_expires_at,
1438 ":state": STATE_BLOCKED_BY_JOIN_SET,
1439 ":join_set_id": join_set_id,
1440 ":join_set_closing": join_set_closing,
1441 })?;
1442 if updated != 1 {
1443 return Err(DbErrorWrite::NotFound);
1444 }
1445 Ok(appending_version.increment())
1446 }
1447
1448 fn update_state_finished(
1449 tx: &Transaction,
1450 execution_id: &ExecutionId,
1451 appending_version: &Version,
1452 finished_at: DateTime<Utc>,
1454 result_kind: PendingStateFinishedResultKind,
1455 ) -> Result<(), DbErrorWrite> {
1456 debug!("Setting t_state to Finished");
1457 let execution_id_str = execution_id.to_string();
1458 let mut stmt = tx.prepare_cached(
1459 r"
1460 UPDATE t_state
1461 SET
1462 corresponding_version = :appending_version,
1463 pending_expires_finished = :pending_expires_finished,
1464 state = :state,
1465 updated_at = CURRENT_TIMESTAMP,
1466
1467 last_lock_version = NULL,
1468 executor_id = NULL,
1469 run_id = NULL,
1470
1471 join_set_id = NULL,
1472 join_set_closing = NULL,
1473
1474 result_kind = :result_kind
1475 WHERE execution_id = :execution_id
1476 ",
1477 )?;
1478
1479 let updated = stmt.execute(named_params! {
1480 ":execution_id": execution_id_str,
1481 ":appending_version": appending_version.0,
1482 ":pending_expires_finished": finished_at,
1483 ":state": STATE_FINISHED,
1484 ":result_kind": JsonWrapper(result_kind),
1485 })?;
1486 if updated != 1 {
1487 return Err(DbErrorWrite::NotFound);
1488 }
1489 Ok(())
1490 }
1491
1492 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1494 fn bump_state_next_version(
1495 tx: &Transaction,
1496 execution_id: &ExecutionId,
1497 appending_version: &Version,
1498 delay_req: Option<DelayReq>,
1499 ) -> Result<AppendResponse , DbErrorWrite> {
1500 debug!("update_index_version");
1501 let execution_id_str = execution_id.to_string();
1502 let mut stmt = tx.prepare_cached(
1503 r"
1504 UPDATE t_state
1505 SET
1506 corresponding_version = :appending_version,
1507 updated_at = CURRENT_TIMESTAMP
1508 WHERE execution_id = :execution_id
1509 ",
1510 )?;
1511 let updated = stmt.execute(named_params! {
1512 ":execution_id": execution_id_str,
1513 ":appending_version": appending_version.0,
1514 })?;
1515 if updated != 1 {
1516 return Err(DbErrorWrite::NotFound);
1517 }
1518 if let Some(DelayReq {
1519 join_set_id,
1520 delay_id,
1521 expires_at,
1522 }) = delay_req
1523 {
1524 debug!("Inserting delay to `t_delay`");
1525 let mut stmt = tx.prepare_cached(
1526 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1527 VALUES \
1528 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1529 )?;
1530 stmt.execute(named_params! {
1531 ":execution_id": execution_id_str,
1532 ":join_set_id": join_set_id.to_string(),
1533 ":delay_id": delay_id.to_string(),
1534 ":expires_at": expires_at,
1535 })?;
1536 }
1537 Ok(appending_version.increment())
1538 }
1539
1540 fn get_combined_state(
1541 tx: &Transaction,
1542 execution_id: &ExecutionId,
1543 ) -> Result<CombinedState, DbErrorRead> {
1544 let mut stmt = tx.prepare(
1545 r"
1546 SELECT
1547 state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1548 last_lock_version, executor_id, run_id,
1549 join_set_id, join_set_closing,
1550 result_kind
1551 FROM t_state
1552 WHERE
1553 execution_id = :execution_id
1554 ",
1555 )?;
1556 stmt.query_row(
1557 named_params! {
1558 ":execution_id": execution_id.to_string(),
1559 },
1560 |row| {
1561 CombinedState::new(
1562 CombinedStateDTO {
1563 component_id_input_digest: row.get("component_id_input_digest")?,
1564 state: row.get("state")?,
1565 ffqn: row.get("ffqn")?,
1566 pending_expires_finished: row
1567 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1568 last_lock_version: row
1569 .get::<_, Option<VersionType>>("last_lock_version")?
1570 .map(Version::new),
1571 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1572 run_id: row.get::<_, Option<RunId>>("run_id")?,
1573 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1574 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1575 result_kind: row
1576 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1577 "result_kind",
1578 )?
1579 .map(|wrapper| wrapper.0),
1580 },
1581 Version::new(row.get("corresponding_version")?),
1582 )
1583 },
1584 )
1585 .map_err(DbErrorRead::from)
1586 }
1587
1588 fn list_executions(
1589 read_tx: &Transaction,
1590 ffqn: Option<&FunctionFqn>,
1591 top_level_only: bool,
1592 pagination: &ExecutionListPagination,
1593 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1594 #[derive(Debug)]
1595 struct StatementModifier<'a> {
1596 where_vec: Vec<String>,
1597 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1598 limit: u32,
1599 limit_desc: bool,
1600 }
1601
1602 fn paginate<'a, T: rusqlite::ToSql + 'static>(
1603 pagination: &'a Pagination<Option<T>>,
1604 column: &str,
1605 top_level_only: bool,
1606 ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1607 let mut where_vec: Vec<String> = vec![];
1608 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1609 let limit = pagination.length();
1610 let limit_desc = pagination.is_desc();
1611 match pagination {
1612 Pagination::NewerThan {
1613 cursor: Some(cursor),
1614 ..
1615 }
1616 | Pagination::OlderThan {
1617 cursor: Some(cursor),
1618 ..
1619 } => {
1620 where_vec.push(format!("{column} {rel} :cursor", rel = pagination.rel()));
1621 let cursor = cursor.to_sql().map_err(|err| {
1622 error!("Possible program error - cannot convert cursor to sql - {err:?}");
1623 DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1624 })?;
1625 params.push((":cursor", cursor));
1626 }
1627 _ => {}
1628 }
1629 if top_level_only {
1630 where_vec.push("is_top_level=true".to_string());
1631 }
1632 Ok(StatementModifier {
1633 where_vec,
1634 params,
1635 limit: u32::from(limit),
1636 limit_desc,
1637 })
1638 }
1639
1640 let mut statement_mod = match pagination {
1641 ExecutionListPagination::CreatedBy(pagination) => {
1642 paginate(pagination, "created_at", top_level_only)?
1643 }
1644 ExecutionListPagination::ExecutionId(pagination) => {
1645 paginate(pagination, "execution_id", top_level_only)?
1646 }
1647 };
1648
1649 let ffqn_temporary;
1650 if let Some(ffqn) = ffqn {
1651 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1652 ffqn_temporary = ffqn.to_string();
1653 let ffqn = ffqn_temporary
1654 .to_sql()
1655 .expect("string conversion never fails");
1656
1657 statement_mod.params.push((":ffqn", ffqn));
1658 }
1659
1660 let where_str = if statement_mod.where_vec.is_empty() {
1661 String::new()
1662 } else {
1663 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1664 };
1665 let sql = format!(
1666 r"
1667 SELECT created_at, first_scheduled_at, component_id_input_digest,
1668 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1669 last_lock_version, executor_id, run_id,
1670 join_set_id, join_set_closing,
1671 result_kind
1672 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1673 ",
1674 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1675 limit = statement_mod.limit,
1676 );
1677 let mut vec: Vec<_> = read_tx
1678 .prepare(&sql)?
1679 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1680 statement_mod
1681 .params
1682 .into_iter()
1683 .collect::<Vec<_>>()
1684 .as_ref(),
1685 |row| {
1686 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1687 let component_id_input_digest: InputContentDigest =
1688 row.get("component_id_input_digest")?;
1689 let created_at = row.get("created_at")?;
1690 let first_scheduled_at = row.get("first_scheduled_at")?;
1691 let combined_state = CombinedState::new(
1692 CombinedStateDTO {
1693 component_id_input_digest: component_id_input_digest.clone(),
1694 state: row.get("state")?,
1695 ffqn: row.get("ffqn")?,
1696 pending_expires_finished: row
1697 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1698 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1699
1700 last_lock_version: row
1701 .get::<_, Option<VersionType>>("last_lock_version")?
1702 .map(Version::new),
1703 run_id: row.get::<_, Option<RunId>>("run_id")?,
1704 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1705 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1706 result_kind: row
1707 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1708 "result_kind",
1709 )?
1710 .map(|wrapper| wrapper.0),
1711 },
1712 Version::new(row.get("corresponding_version")?),
1713 )?;
1714 Ok(ExecutionWithState {
1715 execution_id,
1716 ffqn: combined_state.ffqn,
1717 pending_state: combined_state.pending_state,
1718 created_at,
1719 first_scheduled_at,
1720 component_digest: component_id_input_digest,
1721 })
1722 },
1723 )?
1724 .collect::<Vec<Result<_, _>>>()
1725 .into_iter()
1726 .filter_map(|row| match row {
1727 Ok(row) => Some(row),
1728 Err(err) => {
1729 warn!("Skipping row - {err:?}");
1730 None
1731 }
1732 })
1733 .collect();
1734
1735 if !statement_mod.limit_desc {
1736 vec.reverse();
1738 }
1739 Ok(vec)
1740 }
1741
1742 fn list_responses(
1743 tx: &Transaction,
1744 execution_id: &ExecutionId,
1745 pagination: Option<Pagination<u32>>,
1746 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1747 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1749 let mut sql = "SELECT \
1750 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1751 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1752 WHERE \
1753 r.execution_id = :execution_id \
1754 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
1755 "
1756 .to_string();
1757 let limit = match &pagination {
1758 Some(
1759 pagination @ (Pagination::NewerThan { cursor, .. }
1760 | Pagination::OlderThan { cursor, .. }),
1761 ) => {
1762 params.push((":cursor", Box::new(cursor)));
1763 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
1764 Some(pagination.length())
1765 }
1766 None => None,
1767 };
1768 sql.push_str(" ORDER BY id");
1769 if pagination.as_ref().is_some_and(Pagination::is_desc) {
1770 sql.push_str(" DESC");
1771 }
1772 if let Some(limit) = limit {
1773 write!(sql, " LIMIT {limit}").unwrap();
1774 }
1775 params.push((":execution_id", Box::new(execution_id.to_string())));
1776 tx.prepare(&sql)?
1777 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
1778 params
1779 .iter()
1780 .map(|(key, value)| (*key, value.as_ref()))
1781 .collect::<Vec<_>>()
1782 .as_ref(),
1783 Self::parse_response_with_cursor,
1784 )?
1785 .collect::<Result<Vec<_>, rusqlite::Error>>()
1786 .map_err(DbErrorRead::from)
1787 }
1788
1789 fn parse_response_with_cursor(
1790 row: &rusqlite::Row<'_>,
1791 ) -> Result<ResponseWithCursor, rusqlite::Error> {
1792 let id = row.get("id")?;
1793 let created_at: DateTime<Utc> = row.get("created_at")?;
1794 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
1795 let event = match (
1796 row.get::<_, Option<DelayId>>("delay_id")?,
1797 row.get::<_, Option<bool>>("delay_success")?,
1798 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
1799 row.get::<_, Option<VersionType>>("finished_version")?,
1800 row.get::<_, Option<JsonWrapper<ExecutionRequest>>>("json_value")?,
1801 ) {
1802 (Some(delay_id), Some(delay_success), None, None, None) => {
1803 JoinSetResponse::DelayFinished {
1804 delay_id,
1805 result: delay_success.then_some(()).ok_or(()),
1806 }
1807 }
1808 (
1809 None,
1810 None,
1811 Some(child_execution_id),
1812 Some(finished_version),
1813 Some(JsonWrapper(ExecutionRequest::Finished { result, .. })),
1814 ) => JoinSetResponse::ChildExecutionFinished {
1815 child_execution_id,
1816 finished_version: Version(finished_version),
1817 result,
1818 },
1819 (delay, delay_success, child, finished, result) => {
1820 error!(
1821 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
1822 result.map(|it| it.0)
1823 );
1824 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
1825 }
1826 };
1827 Ok(ResponseWithCursor {
1828 cursor: id,
1829 event: JoinSetResponseEventOuter {
1830 event: JoinSetResponseEvent { join_set_id, event },
1831 created_at,
1832 },
1833 })
1834 }
1835
1836 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1837 #[expect(clippy::too_many_arguments)]
1838 fn lock_single_execution(
1839 tx: &Transaction,
1840 created_at: DateTime<Utc>,
1841 component_id: ComponentId,
1842 execution_id: &ExecutionId,
1843 run_id: RunId,
1844 appending_version: &Version,
1845 executor_id: ExecutorId,
1846 lock_expires_at: DateTime<Utc>,
1847 retry_config: ComponentRetryConfig,
1848 ) -> Result<LockedExecution, DbErrorWrite> {
1849 debug!("lock_single_execution");
1850 let combined_state = Self::get_combined_state(tx, execution_id)?;
1851 combined_state.pending_state.can_append_lock(
1852 created_at,
1853 executor_id,
1854 run_id,
1855 lock_expires_at,
1856 )?;
1857 let expected_version = combined_state.get_next_version_assert_not_finished();
1858 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1859
1860 let locked_event = Locked {
1862 component_id,
1863 executor_id,
1864 lock_expires_at,
1865 run_id,
1866 retry_config,
1867 };
1868 let event = ExecutionRequest::Locked(locked_event.clone());
1869 let event_ser = serde_json::to_string(&event).map_err(|err| {
1870 warn!("Cannot serialize {event:?} - {err:?}");
1871 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1872 })?;
1873 let mut stmt = tx
1874 .prepare_cached(
1875 "INSERT INTO t_execution_log \
1876 (execution_id, created_at, json_value, version, variant) \
1877 VALUES \
1878 (:execution_id, :created_at, :json_value, :version, :variant)",
1879 )
1880 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1881 stmt.execute(named_params! {
1882 ":execution_id": execution_id.to_string(),
1883 ":created_at": created_at,
1884 ":json_value": event_ser,
1885 ":version": appending_version.0,
1886 ":variant": event.variant(),
1887 })
1888 .map_err(|err| {
1889 warn!("Cannot lock execution - {err:?}");
1890 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1891 })?;
1892
1893 let responses = Self::list_responses(tx, execution_id, None)?;
1895 let responses = responses.into_iter().map(|resp| resp.event).collect();
1896 trace!("Responses: {responses:?}");
1897
1898 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1899 tx,
1900 execution_id,
1901 executor_id,
1902 run_id,
1903 lock_expires_at,
1904 appending_version,
1905 retry_config,
1906 )?;
1907 let mut events = tx
1909 .prepare(
1910 "SELECT json_value, version FROM t_execution_log WHERE \
1911 execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
1912 ORDER BY version",
1913 )?
1914 .query_map(
1915 named_params! {
1916 ":execution_id": execution_id.to_string(),
1917 ":variant1": DUMMY_CREATED.variant(),
1918 ":variant2": DUMMY_HISTORY_EVENT.variant(),
1919 },
1920 |row| {
1921 let created_at_fake = DateTime::from_timestamp_nanos(0); let event = row
1923 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1924 .map_err(|serde| {
1925 error!("Cannot deserialize {row:?} - {serde:?}");
1926 consistency_rusqlite("cannot deserialize event")
1927 })?
1928 .0;
1929 let version = Version(row.get("version")?);
1930
1931 Ok(ExecutionEvent {
1932 created_at: created_at_fake,
1933 event,
1934 backtrace_id: None,
1935 version,
1936 })
1937 },
1938 )?
1939 .collect::<Result<Vec<_>, _>>()?
1940 .into_iter()
1941 .collect::<VecDeque<_>>();
1942 let Some(ExecutionRequest::Created {
1943 ffqn,
1944 params,
1945 parent,
1946 metadata,
1947 ..
1948 }) = events.pop_front().map(|outer| outer.event)
1949 else {
1950 error!("Execution log must contain at least `Created` event");
1951 return Err(consistency_db_err("execution log must contain `Created` event").into());
1952 };
1953
1954 let event_history = events
1955 .into_iter()
1956 .map(|ExecutionEvent { event, version, .. }| {
1957 if let ExecutionRequest::HistoryEvent { event } = event {
1958 Ok((event, version))
1959 } else {
1960 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1961 Err(consistency_db_err(
1962 "rows can only contain `Created` and `HistoryEvent` event kinds",
1963 ))
1964 }
1965 })
1966 .collect::<Result<Vec<_>, _>>()?;
1967
1968 Ok(LockedExecution {
1969 execution_id: execution_id.clone(),
1970 metadata,
1971 next_version: appending_version.increment(),
1972 ffqn,
1973 params,
1974 event_history,
1975 responses,
1976 parent,
1977 intermittent_event_count,
1978 locked_event,
1979 })
1980 }
1981
1982 fn count_join_next(
1983 tx: &Transaction,
1984 execution_id: &ExecutionId,
1985 join_set_id: &JoinSetId,
1986 ) -> Result<u32, DbErrorRead> {
1987 let mut stmt = tx.prepare(
1988 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1989 AND history_event_type = :join_next",
1990 )?;
1991 Ok(stmt.query_row(
1992 named_params! {
1993 ":execution_id": execution_id.to_string(),
1994 ":join_set_id": join_set_id.to_string(),
1995 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1996 },
1997 |row| row.get::<_, u32>("count"),
1998 )?)
1999 }
2000
2001 fn nth_response(
2002 tx: &Transaction,
2003 execution_id: &ExecutionId,
2004 join_set_id: &JoinSetId,
2005 skip_rows: u32,
2006 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2007 tx
2009 .prepare(
2010 "SELECT r.id, r.created_at, r.join_set_id, \
2011 r.delay_id, r.delay_success, \
2012 r.child_execution_id, r.finished_version, l.json_value \
2013 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2014 WHERE \
2015 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2016 (
2017 r.finished_version = l.version \
2018 OR \
2019 r.child_execution_id IS NULL \
2020 ) \
2021 ORDER BY id \
2022 LIMIT 1 OFFSET :offset",
2023 )
2024 ?
2025 .query_row(
2026 named_params! {
2027 ":execution_id": execution_id.to_string(),
2028 ":join_set_id": join_set_id.to_string(),
2029 ":offset": skip_rows,
2030 },
2031 Self::parse_response_with_cursor,
2032 )
2033 .optional()
2034 .map_err(DbErrorRead::from)
2035 }
2036
2037 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2038 #[expect(clippy::needless_return)]
2039 fn append(
2040 tx: &Transaction,
2041 execution_id: &ExecutionId,
2042 req: AppendRequest,
2043 appending_version: Version,
2044 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2045 if matches!(req.event, ExecutionRequest::Created { .. }) {
2046 return Err(DbErrorWrite::NonRetriable(
2047 DbErrorWriteNonRetriable::ValidationFailed(
2048 "cannot append `Created` event - use `create` instead".into(),
2049 ),
2050 ));
2051 }
2052 if let AppendRequest {
2053 event:
2054 ExecutionRequest::Locked(Locked {
2055 component_id,
2056 executor_id,
2057 run_id,
2058 lock_expires_at,
2059 retry_config,
2060 }),
2061 created_at,
2062 } = req
2063 {
2064 return Self::lock_single_execution(
2065 tx,
2066 created_at,
2067 component_id,
2068 execution_id,
2069 run_id,
2070 &appending_version,
2071 executor_id,
2072 lock_expires_at,
2073 retry_config,
2074 )
2075 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2076 }
2077
2078 let combined_state = Self::get_combined_state(tx, execution_id)?;
2079 if combined_state.pending_state.is_finished() {
2080 debug!("Execution is already finished");
2081 return Err(DbErrorWrite::NonRetriable(
2082 DbErrorWriteNonRetriable::IllegalState("already finished".into()),
2083 ));
2084 }
2085
2086 Self::check_expected_next_and_appending_version(
2087 &combined_state.get_next_version_assert_not_finished(),
2088 &appending_version,
2089 )?;
2090 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
2091 error!("Cannot serialize {:?} - {err:?}", req.event);
2092 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
2093 })?;
2094
2095 let mut stmt = tx.prepare(
2096 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2097 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
2098 ?;
2099 stmt.execute(named_params! {
2100 ":execution_id": execution_id.to_string(),
2101 ":created_at": req.created_at,
2102 ":json_value": event_ser,
2103 ":version": appending_version.0,
2104 ":variant": req.event.variant(),
2105 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
2106 })?;
2107 match &req.event {
2110 ExecutionRequest::Created { .. } => {
2111 unreachable!("handled in the caller")
2112 }
2113
2114 ExecutionRequest::Locked { .. } => {
2115 unreachable!("handled above")
2116 }
2117
2118 ExecutionRequest::TemporarilyFailed {
2119 backoff_expires_at, ..
2120 }
2121 | ExecutionRequest::TemporarilyTimedOut {
2122 backoff_expires_at, ..
2123 } => {
2124 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2125 tx,
2126 execution_id,
2127 &appending_version,
2128 *backoff_expires_at,
2129 true, combined_state
2131 .pending_state
2132 .get_component_id_input_digest()
2133 .clone(),
2134 )?;
2135 return Ok((next_version, notifier));
2136 }
2137
2138 ExecutionRequest::Unlocked {
2139 backoff_expires_at, ..
2140 } => {
2141 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2142 tx,
2143 execution_id,
2144 &appending_version,
2145 *backoff_expires_at,
2146 false, combined_state
2148 .pending_state
2149 .get_component_id_input_digest()
2150 .clone(),
2151 )?;
2152 return Ok((next_version, notifier));
2153 }
2154
2155 ExecutionRequest::Finished { result, .. } => {
2156 Self::update_state_finished(
2157 tx,
2158 execution_id,
2159 &appending_version,
2160 req.created_at,
2161 PendingStateFinishedResultKind::from(result),
2162 )?;
2163 return Ok((
2164 appending_version,
2165 AppendNotifier {
2166 pending_at: None,
2167 execution_finished: Some(NotifierExecutionFinished {
2168 execution_id: execution_id.clone(),
2169 retval: result.clone(),
2170 }),
2171 response: None,
2172 },
2173 ));
2174 }
2175
2176 ExecutionRequest::HistoryEvent {
2177 event:
2178 HistoryEvent::JoinSetCreate { .. }
2179 | HistoryEvent::JoinSetRequest {
2180 request: JoinSetRequest::ChildExecutionRequest { .. },
2181 ..
2182 }
2183 | HistoryEvent::Persist { .. }
2184 | HistoryEvent::Schedule { .. }
2185 | HistoryEvent::Stub { .. }
2186 | HistoryEvent::JoinNextTooMany { .. },
2187 } => {
2188 return Ok((
2189 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
2190 AppendNotifier::default(),
2191 ));
2192 }
2193
2194 ExecutionRequest::HistoryEvent {
2195 event:
2196 HistoryEvent::JoinSetRequest {
2197 join_set_id,
2198 request:
2199 JoinSetRequest::DelayRequest {
2200 delay_id,
2201 expires_at,
2202 ..
2203 },
2204 },
2205 } => {
2206 return Ok((
2207 Self::bump_state_next_version(
2208 tx,
2209 execution_id,
2210 &appending_version,
2211 Some(DelayReq {
2212 join_set_id: join_set_id.clone(),
2213 delay_id: delay_id.clone(),
2214 expires_at: *expires_at,
2215 }),
2216 )?,
2217 AppendNotifier::default(),
2218 ));
2219 }
2220
2221 ExecutionRequest::HistoryEvent {
2222 event:
2223 HistoryEvent::JoinNext {
2224 join_set_id,
2225 run_expires_at,
2226 closing,
2227 requested_ffqn: _,
2228 },
2229 } => {
2230 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
2232 let nth_response =
2233 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2235 assert!(join_next_count > 0);
2236 if let Some(ResponseWithCursor {
2237 event:
2238 JoinSetResponseEventOuter {
2239 created_at: nth_created_at,
2240 ..
2241 },
2242 cursor: _,
2243 }) = nth_response
2244 {
2245 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2247 tx,
2248 execution_id,
2249 &appending_version,
2250 scheduled_at,
2251 false, combined_state
2253 .pending_state
2254 .get_component_id_input_digest()
2255 .clone(),
2256 )?;
2257 return Ok((next_version, notifier));
2258 }
2259 return Ok((
2260 Self::update_state_blocked(
2261 tx,
2262 execution_id,
2263 &appending_version,
2264 join_set_id,
2265 *run_expires_at,
2266 *closing,
2267 )?,
2268 AppendNotifier::default(),
2269 ));
2270 }
2271 }
2272 }
2273
2274 fn append_response(
2275 tx: &Transaction,
2276 execution_id: &ExecutionId,
2277 response_outer: JoinSetResponseEventOuter,
2278 ) -> Result<AppendNotifier, DbErrorWrite> {
2279 let mut stmt = tx.prepare(
2280 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2281 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2282 )?;
2283 let join_set_id = &response_outer.event.join_set_id;
2284 let (delay_id, delay_success) = match &response_outer.event.event {
2285 JoinSetResponse::DelayFinished { delay_id, result } => {
2286 (Some(delay_id.to_string()), Some(result.is_ok()))
2287 }
2288 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2289 };
2290 let (child_execution_id, finished_version) = match &response_outer.event.event {
2291 JoinSetResponse::ChildExecutionFinished {
2292 child_execution_id,
2293 finished_version,
2294 result: _,
2295 } => (
2296 Some(child_execution_id.to_string()),
2297 Some(finished_version.0),
2298 ),
2299 JoinSetResponse::DelayFinished { .. } => (None, None),
2300 };
2301
2302 stmt.execute(named_params! {
2303 ":execution_id": execution_id.to_string(),
2304 ":created_at": response_outer.created_at,
2305 ":join_set_id": join_set_id.to_string(),
2306 ":delay_id": delay_id,
2307 ":delay_success": delay_success,
2308 ":child_execution_id": child_execution_id,
2309 ":finished_version": finished_version,
2310 })?;
2311
2312 let combined_state = Self::get_combined_state(tx, execution_id)?;
2314 debug!("previous_pending_state: {combined_state:?}");
2315 let mut notifier = if let PendingState::BlockedByJoinSet {
2316 join_set_id: found_join_set_id,
2317 lock_expires_at, closing: _,
2319 component_id_input_digest,
2320 } = combined_state.pending_state
2321 && *join_set_id == found_join_set_id
2322 {
2323 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2326 Self::update_state_pending_after_response_appended(
2329 tx,
2330 execution_id,
2331 scheduled_at,
2332 &combined_state.corresponding_version, component_id_input_digest,
2334 )?
2335 } else {
2336 AppendNotifier::default()
2337 };
2338 if let JoinSetResponseEvent {
2339 join_set_id,
2340 event:
2341 JoinSetResponse::DelayFinished {
2342 delay_id,
2343 result: _,
2344 },
2345 } = &response_outer.event
2346 {
2347 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2348 let mut stmt =
2349 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2350 ?;
2351 stmt.execute(named_params! {
2352 ":execution_id": execution_id.to_string(),
2353 ":join_set_id": join_set_id.to_string(),
2354 ":delay_id": delay_id.to_string(),
2355 })?;
2356 }
2357 notifier.response = Some((execution_id.clone(), response_outer));
2358 Ok(notifier)
2359 }
2360
2361 fn append_backtrace(
2362 tx: &Transaction,
2363 backtrace_info: &BacktraceInfo,
2364 ) -> Result<(), DbErrorWrite> {
2365 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2366 warn!(
2367 "Cannot serialize backtrace {:?} - {err:?}",
2368 backtrace_info.wasm_backtrace
2369 );
2370 DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2371 })?;
2372 let mut stmt = tx
2373 .prepare(
2374 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2375 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2376 )
2377 ?;
2378 stmt.execute(named_params! {
2379 ":execution_id": backtrace_info.execution_id.to_string(),
2380 ":component_id": backtrace_info.component_id.to_string(),
2381 ":version_min_including": backtrace_info.version_min_including.0,
2382 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2383 ":wasm_backtrace": backtrace,
2384 })?;
2385 Ok(())
2386 }
2387
2388 #[cfg(feature = "test")]
2389 fn get(
2390 tx: &Transaction,
2391 execution_id: &ExecutionId,
2392 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2393 let mut stmt = tx.prepare(
2394 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2395 execution_id = :execution_id ORDER BY version",
2396 )?;
2397 let events = stmt
2398 .query_map(
2399 named_params! {
2400 ":execution_id": execution_id.to_string(),
2401 },
2402 |row| {
2403 let created_at = row.get("created_at")?;
2404 let event = row
2405 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2406 .map_err(|serde| {
2407 error!("Cannot deserialize {row:?} - {serde:?}");
2408 consistency_rusqlite("cannot deserialize event")
2409 })?
2410 .0;
2411 let version = Version(row.get("version")?);
2412
2413 Ok(ExecutionEvent {
2414 created_at,
2415 event,
2416 backtrace_id: None,
2417 version,
2418 })
2419 },
2420 )?
2421 .collect::<Result<Vec<_>, _>>()?;
2422 if events.is_empty() {
2423 return Err(DbErrorRead::NotFound);
2424 }
2425 let combined_state = Self::get_combined_state(tx, execution_id)?;
2426 let responses = Self::list_responses(tx, execution_id, None)?
2427 .into_iter()
2428 .map(|resp| resp.event)
2429 .collect();
2430 Ok(concepts::storage::ExecutionLog {
2431 execution_id: execution_id.clone(),
2432 events,
2433 responses,
2434 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2436 })
2437 }
2438
2439 fn list_execution_events(
2440 tx: &Transaction,
2441 execution_id: &ExecutionId,
2442 version_min: VersionType,
2443 version_max_excluding: VersionType,
2444 include_backtrace_id: bool,
2445 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2446 let select = if include_backtrace_id {
2447 "SELECT
2448 log.created_at,
2449 log.json_value,
2450 log.version as version,
2451 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2452 bt.version_min_including AS backtrace_id
2453 FROM
2454 t_execution_log AS log
2455 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2456 t_backtrace AS bt ON log.execution_id = bt.execution_id
2457 -- Check if the log's version falls within the backtrace's range
2458 AND log.version >= bt.version_min_including
2459 AND log.version < bt.version_max_excluding
2460 WHERE
2461 log.execution_id = :execution_id
2462 AND log.version >= :version_min
2463 AND log.version < :version_max_excluding
2464 ORDER BY
2465 log.version;"
2466 } else {
2467 "SELECT
2468 created_at, json_value, NULL as backtrace_id, version
2469 FROM t_execution_log WHERE
2470 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2471 ORDER BY version"
2472 };
2473 tx.prepare(select)?
2474 .query_map(
2475 named_params! {
2476 ":execution_id": execution_id.to_string(),
2477 ":version_min": version_min,
2478 ":version_max_excluding": version_max_excluding
2479 },
2480 |row| {
2481 let created_at = row.get("created_at")?;
2482 let backtrace_id = row
2483 .get::<_, Option<VersionType>>("backtrace_id")?
2484 .map(Version::new);
2485 let version = Version(row.get("version")?);
2486
2487 let event = row
2488 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2489 .map(|event| ExecutionEvent {
2490 created_at,
2491 event: event.0,
2492 backtrace_id,
2493 version,
2494 })
2495 .map_err(|serde| {
2496 error!("Cannot deserialize {row:?} - {serde:?}");
2497 consistency_rusqlite("cannot deserialize")
2498 })?;
2499 Ok(event)
2500 },
2501 )?
2502 .collect::<Result<Vec<_>, _>>()
2503 .map_err(DbErrorRead::from)
2504 }
2505
2506 fn get_execution_event(
2507 tx: &Transaction,
2508 execution_id: &ExecutionId,
2509 version: VersionType,
2510 ) -> Result<ExecutionEvent, DbErrorRead> {
2511 let mut stmt = tx.prepare(
2512 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2513 execution_id = :execution_id AND version = :version",
2514 )?;
2515 stmt.query_row(
2516 named_params! {
2517 ":execution_id": execution_id.to_string(),
2518 ":version": version,
2519 },
2520 |row| {
2521 let created_at = row.get("created_at")?;
2522 let event = row
2523 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2524 .map_err(|serde| {
2525 error!("Cannot deserialize {row:?} - {serde:?}");
2526 consistency_rusqlite("cannot deserialize event")
2527 })?;
2528 let version = Version(row.get("version")?);
2529
2530 Ok(ExecutionEvent {
2531 created_at,
2532 event: event.0,
2533 backtrace_id: None,
2534 version,
2535 })
2536 },
2537 )
2538 .map_err(DbErrorRead::from)
2539 }
2540
2541 fn get_last_execution_event(
2542 tx: &Transaction,
2543 execution_id: &ExecutionId,
2544 ) -> Result<ExecutionEvent, DbErrorRead> {
2545 let mut stmt = tx.prepare(
2546 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2547 execution_id = :execution_id ORDER BY version DESC",
2548 )?;
2549 stmt.query_row(
2550 named_params! {
2551 ":execution_id": execution_id.to_string(),
2552 },
2553 |row| {
2554 let created_at = row.get("created_at")?;
2555 let event = row
2556 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2557 .map_err(|serde| {
2558 error!("Cannot deserialize {row:?} - {serde:?}");
2559 consistency_rusqlite("cannot deserialize event")
2560 })?;
2561 let version = Version(row.get("version")?);
2562 Ok(ExecutionEvent {
2563 created_at,
2564 event: event.0,
2565 backtrace_id: None,
2566 version: version.clone(),
2567 })
2568 },
2569 )
2570 .map_err(DbErrorRead::from)
2571 }
2572
2573 fn delay_response(
2574 tx: &Transaction,
2575 execution_id: &ExecutionId,
2576 delay_id: &DelayId,
2577 ) -> Result<Option<bool>, DbErrorRead> {
2578 tx.prepare(
2580 "SELECT delay_success \
2581 FROM t_join_set_response \
2582 WHERE \
2583 execution_id = :execution_id AND delay_id = :delay_id
2584 ",
2585 )?
2586 .query_row(
2587 named_params! {
2588 ":execution_id": execution_id.to_string(),
2589 ":delay_id": delay_id.to_string(),
2590 },
2591 |row| {
2592 let delay_success = row.get::<_, bool>("delay_success")?;
2593 Ok(delay_success)
2594 },
2595 )
2596 .optional()
2597 .map_err(DbErrorRead::from)
2598 }
2599
2600 #[instrument(level = Level::TRACE, skip_all)]
2602 fn get_responses_with_offset(
2603 tx: &Transaction,
2604 execution_id: &ExecutionId,
2605 skip_rows: u32,
2606 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2607 tx.prepare(
2609 "SELECT r.id, r.created_at, r.join_set_id, \
2610 r.delay_id, r.delay_success, \
2611 r.child_execution_id, r.finished_version, l.json_value \
2612 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2613 WHERE \
2614 r.execution_id = :execution_id AND \
2615 ( \
2616 r.finished_version = l.version \
2617 OR r.child_execution_id IS NULL \
2618 ) \
2619 ORDER BY id \
2620 LIMIT -1 OFFSET :offset",
2621 )
2622 ?
2623 .query_map(
2624 named_params! {
2625 ":execution_id": execution_id.to_string(),
2626 ":offset": skip_rows,
2627 },
2628 Self::parse_response_with_cursor,
2629 )
2630 ?
2631 .collect::<Result<Vec<_>, _>>()
2632 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2633 .map_err(DbErrorRead::from)
2634 }
2635
2636 fn get_pending_of_single_ffqn(
2637 mut stmt: CachedStatement,
2638 batch_size: u32,
2639 pending_at_or_sooner: DateTime<Utc>,
2640 ffqn: &FunctionFqn,
2641 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2642 stmt.query_map(
2643 named_params! {
2644 ":pending_expires_finished": pending_at_or_sooner,
2645 ":ffqn": ffqn.to_string(),
2646 ":batch_size": batch_size,
2647 },
2648 |row| {
2649 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2650 let next_version =
2651 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2652 Ok((execution_id, next_version))
2653 },
2654 )
2655 .map_err(|err| {
2656 warn!("Ignoring consistency error {err:?}");
2657 })?
2658 .collect::<Result<Vec<_>, _>>()
2659 .map_err(|err| {
2660 warn!("Ignoring consistency error {err:?}");
2661 })
2662 }
2663
2664 fn get_pending_by_ffqns(
2666 conn: &Connection,
2667 batch_size: u32,
2668 pending_at_or_sooner: DateTime<Utc>,
2669 ffqns: &[FunctionFqn],
2670 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2671 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2672 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2673 for ffqn in ffqns {
2674 let stmt = conn
2676 .prepare_cached(&format!(
2677 r#"
2678 SELECT execution_id, corresponding_version FROM t_state WHERE
2679 state = "{STATE_PENDING_AT}" AND
2680 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2681 ORDER BY pending_expires_finished LIMIT :batch_size
2682 "#
2683 ))
2684 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2685
2686 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2687 stmt,
2688 u32::try_from(batch_size - execution_ids_versions.len())
2689 .expect("u32 - anything must fit to u32"),
2690 pending_at_or_sooner,
2691 ffqn,
2692 ) {
2693 execution_ids_versions.extend(execs_and_versions);
2694 if execution_ids_versions.len() == batch_size {
2695 break;
2697 }
2698 }
2700 }
2701 Ok(execution_ids_versions)
2702 }
2703
2704 fn get_pending_by_component_input_digest(
2705 conn: &Connection,
2706 batch_size: u32,
2707 pending_at_or_sooner: DateTime<Utc>,
2708 input_digest: &InputContentDigest,
2709 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2710 let mut stmt = conn
2711 .prepare_cached(&format!(
2712 r#"
2713 SELECT execution_id, corresponding_version FROM t_state WHERE
2714 state = "{STATE_PENDING_AT}" AND
2715 pending_expires_finished <= :pending_expires_finished AND
2716 component_id_input_digest = :component_id_input_digest
2717 ORDER BY pending_expires_finished LIMIT :batch_size
2718 "#
2719 ))
2720 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2721
2722 stmt.query_map(
2723 named_params! {
2724 ":pending_expires_finished": pending_at_or_sooner,
2725 ":component_id_input_digest": input_digest,
2726 ":batch_size": batch_size,
2727 },
2728 |row| {
2729 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2730 let next_version =
2731 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2732 Ok((execution_id, next_version))
2733 },
2734 )?
2735 .collect::<Result<Vec<_>, _>>()
2736 .map_err(DbErrorGeneric::from)
2737 }
2738
2739 #[instrument(level = Level::TRACE, skip_all)]
2741 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2742 let (pending_ats, finished_execs, responses) = {
2743 let (mut pending_ats, mut finished_execs, mut responses) =
2744 (Vec::new(), Vec::new(), Vec::new());
2745 for notifier in notifiers {
2746 if let Some(pending_at) = notifier.pending_at {
2747 pending_ats.push(pending_at);
2748 }
2749 if let Some(finished) = notifier.execution_finished {
2750 finished_execs.push(finished);
2751 }
2752 if let Some(response) = notifier.response {
2753 responses.push(response);
2754 }
2755 }
2756 (pending_ats, finished_execs, responses)
2757 };
2758
2759 if !pending_ats.is_empty() {
2761 let guard = self.0.pending_subscribers.lock().unwrap();
2762 for pending_at in pending_ats {
2763 Self::notify_pending_locked(&pending_at, current_time, &guard);
2764 }
2765 }
2766 if !finished_execs.is_empty() {
2769 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2770 for finished in finished_execs {
2771 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2772 for (_tag, sender) in listeners_of_exe_id {
2773 let _ = sender.send(finished.retval.clone());
2776 }
2777 }
2778 }
2779 }
2780 if !responses.is_empty() {
2782 let mut guard = self.0.response_subscribers.lock().unwrap();
2783 for (execution_id, response) in responses {
2784 if let Some((sender, _)) = guard.remove(&execution_id) {
2785 let _ = sender.send(response);
2786 }
2787 }
2788 }
2789 }
2790
2791 fn notify_pending_locked(
2792 notifier: &NotifierPendingAt,
2793 current_time: DateTime<Utc>,
2794 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2795 ) {
2796 if notifier.scheduled_at <= current_time {
2798 ffqn_to_pending_subscription.notify(notifier);
2799 }
2800 }
2801
2802 fn upgrade_execution_component(
2803 tx: &Transaction,
2804 execution_id: &ExecutionId,
2805 old: &InputContentDigest,
2806 new: &InputContentDigest,
2807 ) -> Result<(), DbErrorWrite> {
2808 debug!("Updating t_state to component {new}");
2809 let mut stmt = tx
2810 .prepare_cached(
2811 r"
2812 UPDATE t_state
2813 SET
2814 updated_at = CURRENT_TIMESTAMP,
2815 component_id_input_digest = :new
2816 WHERE
2817 execution_id = :execution_id AND
2818 component_id_input_digest = :old
2819 ",
2820 )
2821 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2822 let updated = stmt
2823 .execute(named_params! {
2824 ":execution_id": execution_id,
2825 ":old": old,
2826 ":new": new,
2827 })
2828 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2829 if updated != 1 {
2830 return Err(DbErrorWrite::NotFound);
2831 }
2832 Ok(())
2833 }
2834}
2835
2836#[async_trait]
2837impl DbExecutor for SqlitePool {
2838 #[instrument(level = Level::TRACE, skip(self))]
2839 async fn lock_pending_by_ffqns(
2840 &self,
2841 batch_size: u32,
2842 pending_at_or_sooner: DateTime<Utc>,
2843 ffqns: Arc<[FunctionFqn]>,
2844 created_at: DateTime<Utc>,
2845 component_id: ComponentId,
2846 executor_id: ExecutorId,
2847 lock_expires_at: DateTime<Utc>,
2848 run_id: RunId,
2849 retry_config: ComponentRetryConfig,
2850 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2851 let execution_ids_versions = self
2852 .transaction(
2853 move |conn| {
2854 Self::get_pending_by_ffqns(conn, batch_size, pending_at_or_sooner, &ffqns)
2855 },
2856 "lock_pending_by_ffqns_get",
2857 )
2858 .await?;
2859 if execution_ids_versions.is_empty() {
2860 Ok(vec![])
2861 } else {
2862 debug!("Locking {execution_ids_versions:?}");
2863 self.transaction(
2864 move |tx| {
2865 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2866 for (execution_id, version) in &execution_ids_versions {
2868 match Self::lock_single_execution(
2869 tx,
2870 created_at,
2871 component_id.clone(),
2872 execution_id,
2873 run_id,
2874 version,
2875 executor_id,
2876 lock_expires_at,
2877 retry_config,
2878 ) {
2879 Ok(locked) => locked_execs.push(locked),
2880 Err(err) => {
2881 warn!("Locking row {execution_id} failed - {err:?}");
2882 }
2883 }
2884 }
2885 Ok(locked_execs)
2886 },
2887 "lock_pending_by_ffqns_one",
2888 )
2889 .await
2890 }
2891 }
2892
2893 #[instrument(level = Level::TRACE, skip(self))]
2894 async fn lock_pending_by_component_id(
2895 &self,
2896 batch_size: u32,
2897 pending_at_or_sooner: DateTime<Utc>,
2898 component_id: &ComponentId,
2899 created_at: DateTime<Utc>,
2900 executor_id: ExecutorId,
2901 lock_expires_at: DateTime<Utc>,
2902 run_id: RunId,
2903 retry_config: ComponentRetryConfig,
2904 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2905 let component_id = component_id.clone();
2906 let execution_ids_versions = self
2907 .transaction(
2908 {
2909 let component_id = component_id.clone();
2910 move |conn| {
2911 Self::get_pending_by_component_input_digest(
2912 conn,
2913 batch_size,
2914 pending_at_or_sooner,
2915 &component_id.input_digest,
2916 )
2917 }
2918 },
2919 "lock_pending_by_component_id_get",
2920 )
2921 .await?;
2922 if execution_ids_versions.is_empty() {
2923 Ok(vec![])
2924 } else {
2925 debug!("Locking {execution_ids_versions:?}");
2926 self.transaction(
2927 move |tx| {
2928 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2929 for (execution_id, version) in &execution_ids_versions {
2931 match Self::lock_single_execution(
2932 tx,
2933 created_at,
2934 component_id.clone(),
2935 execution_id,
2936 run_id,
2937 version,
2938 executor_id,
2939 lock_expires_at,
2940 retry_config,
2941 ) {
2942 Ok(locked) => locked_execs.push(locked),
2943 Err(err) => {
2944 warn!("Locking row {execution_id} failed - {err:?}");
2945 }
2946 }
2947 }
2948 Ok(locked_execs)
2949 },
2950 "lock_pending_by_component_id_one",
2951 )
2952 .await
2953 }
2954 }
2955
2956 #[instrument(level = Level::DEBUG, skip(self))]
2957 async fn lock_one(
2958 &self,
2959 created_at: DateTime<Utc>,
2960 component_id: ComponentId,
2961 execution_id: &ExecutionId,
2962 run_id: RunId,
2963 version: Version,
2964 executor_id: ExecutorId,
2965 lock_expires_at: DateTime<Utc>,
2966 retry_config: ComponentRetryConfig,
2967 ) -> Result<LockedExecution, DbErrorWrite> {
2968 debug!(%execution_id, "lock_one");
2969 let execution_id = execution_id.clone();
2970 self.transaction(
2971 move |tx| {
2972 Self::lock_single_execution(
2973 tx,
2974 created_at,
2975 component_id.clone(),
2976 &execution_id,
2977 run_id,
2978 &version,
2979 executor_id,
2980 lock_expires_at,
2981 retry_config,
2982 )
2983 },
2984 "lock_inner",
2985 )
2986 .await
2987 }
2988
2989 #[instrument(level = Level::DEBUG, skip(self, req))]
2990 async fn append(
2991 &self,
2992 execution_id: ExecutionId,
2993 version: Version,
2994 req: AppendRequest,
2995 ) -> Result<AppendResponse, DbErrorWrite> {
2996 debug!(%req, "append");
2997 trace!(?req, "append");
2998 let created_at = req.created_at;
2999 let (version, notifier) = self
3000 .transaction(
3001 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
3002 "append",
3003 )
3004 .await?;
3005 self.notify_all(vec![notifier], created_at);
3006 Ok(version)
3007 }
3008
3009 #[instrument(level = Level::DEBUG, skip_all)]
3010 async fn append_batch_respond_to_parent(
3011 &self,
3012 events: AppendEventsToExecution,
3013 response: AppendResponseToExecution,
3014 current_time: DateTime<Utc>,
3015 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3016 debug!("append_batch_respond_to_parent");
3017 if events.execution_id == response.parent_execution_id {
3018 return Err(DbErrorWrite::NonRetriable(
3021 DbErrorWriteNonRetriable::ValidationFailed(
3022 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3023 ),
3024 ));
3025 }
3026 if events.batch.is_empty() {
3027 error!("Batch cannot be empty");
3028 return Err(DbErrorWrite::NonRetriable(
3029 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3030 ));
3031 }
3032 let (version, notifiers) = {
3033 self.transaction(
3034 move |tx| {
3035 let mut version = events.version.clone();
3036 let mut notifier_of_child = None;
3037 for append_request in &events.batch {
3038 let (v, n) = Self::append(
3039 tx,
3040 &events.execution_id,
3041 append_request.clone(),
3042 version,
3043 )?;
3044 version = v;
3045 notifier_of_child = Some(n);
3046 }
3047
3048 let pending_at_parent = Self::append_response(
3049 tx,
3050 &response.parent_execution_id,
3051 JoinSetResponseEventOuter {
3052 created_at: response.created_at,
3053 event: JoinSetResponseEvent {
3054 join_set_id: response.join_set_id.clone(),
3055 event: JoinSetResponse::ChildExecutionFinished {
3056 child_execution_id: response.child_execution_id.clone(),
3057 finished_version: response.finished_version.clone(),
3058 result: response.result.clone(),
3059 },
3060 },
3061 },
3062 )?;
3063 Ok::<_, DbErrorWrite>((
3064 version,
3065 vec![
3066 notifier_of_child.expect("checked that the batch is not empty"),
3067 pending_at_parent,
3068 ],
3069 ))
3070 },
3071 "append_batch_respond_to_parent",
3072 )
3073 .await?
3074 };
3075 self.notify_all(notifiers, current_time);
3076 Ok(version)
3077 }
3078
3079 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3082 async fn wait_for_pending_by_ffqn(
3083 &self,
3084 pending_at_or_sooner: DateTime<Utc>,
3085 ffqns: Arc<[FunctionFqn]>,
3086 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3087 ) {
3088 let unique_tag: u64 = rand::random();
3089 let (sender, mut receiver) = mpsc::channel(1); {
3091 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3092 for ffqn in ffqns.as_ref() {
3093 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3094 }
3095 }
3096 async {
3097 let Ok(execution_ids_versions) = self
3098 .transaction(
3099 {
3100 let ffqns = ffqns.clone();
3101 move |conn| Self::get_pending_by_ffqns(conn, 1, pending_at_or_sooner, ffqns.as_ref())
3102 },
3103 "get_pending_by_ffqns",
3104 )
3105 .await
3106 else {
3107 trace!(
3108 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3109 );
3110 timeout_fut.await;
3111 return;
3112 };
3113 if !execution_ids_versions.is_empty() {
3114 trace!("Not waiting, database already contains new pending executions");
3115 return;
3116 }
3117 tokio::select! { _ = receiver.recv() => {
3119 trace!("Received a notification");
3120 }
3121 () = timeout_fut => {
3122 }
3123 }
3124 }.await;
3125 {
3127 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3128 for ffqn in ffqns.as_ref() {
3129 match pending_subscribers.remove_ffqn(ffqn) {
3130 Some((_, tag)) if tag == unique_tag => {
3131 }
3133 Some(other) => {
3134 pending_subscribers.insert_ffqn(ffqn.clone(), other);
3136 }
3137 None => {
3138 }
3140 }
3141 }
3142 }
3143 }
3144
3145 async fn wait_for_pending_by_component_id(
3148 &self,
3149 pending_at_or_sooner: DateTime<Utc>,
3150 component_id: &ComponentId,
3151 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3152 ) {
3153 let unique_tag: u64 = rand::random();
3154 let (sender, mut receiver) = mpsc::channel(1); {
3156 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3157 pending_subscribers.insert_by_component(
3158 component_id.input_digest.clone(),
3159 (sender.clone(), unique_tag),
3160 );
3161 }
3162 async {
3163 let Ok(execution_ids_versions) = self
3164 .transaction(
3165 {
3166 let input_digest =component_id.input_digest.clone();
3167 move |conn| Self::get_pending_by_component_input_digest(conn, 1, pending_at_or_sooner, &input_digest)
3168 },
3169 "get_pending_by_component_input_digest",
3170 )
3171 .await
3172 else {
3173 trace!(
3174 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3175 );
3176 timeout_fut.await;
3177 return;
3178 };
3179 if !execution_ids_versions.is_empty() {
3180 trace!("Not waiting, database already contains new pending executions");
3181 return;
3182 }
3183 tokio::select! { _ = receiver.recv() => {
3185 trace!("Received a notification");
3186 }
3187 () = timeout_fut => {
3188 }
3189 }
3190 }.await;
3191 {
3193 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3194
3195 match pending_subscribers.remove_by_component(&component_id.input_digest) {
3196 Some((_, tag)) if tag == unique_tag => {
3197 }
3199 Some(other) => {
3200 pending_subscribers
3202 .insert_by_component(component_id.input_digest.clone(), other);
3203 }
3204 None => {
3205 }
3207 }
3208 }
3209 }
3210
3211 async fn get_last_execution_event(
3212 &self,
3213 execution_id: &ExecutionId,
3214 ) -> Result<ExecutionEvent, DbErrorRead> {
3215 let execution_id = execution_id.clone();
3216 self.transaction(
3217 move |tx| Self::get_last_execution_event(tx, &execution_id),
3218 "get_last_execution_event",
3219 )
3220 .await
3221 }
3222}
3223
3224#[async_trait]
3225impl DbExternalApi for SqlitePool {
3226 #[instrument(level = Level::DEBUG, skip_all)]
3227 async fn get_backtrace(
3228 &self,
3229 execution_id: &ExecutionId,
3230 filter: BacktraceFilter,
3231 ) -> Result<BacktraceInfo, DbErrorRead> {
3232 debug!("get_backtrace");
3233 let execution_id = execution_id.clone();
3234
3235 self.transaction(
3236 move |tx| {
3237 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3238 WHERE execution_id = :execution_id";
3239 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3240 let select = match &filter {
3241 BacktraceFilter::Specific(version) =>{
3242 params.push((":version", Box::new(version.0)));
3243 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3244 },
3245 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3246 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3247 };
3248 tx
3249 .prepare(&select)
3250 ?
3251 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3252 params
3253 .iter()
3254 .map(|(key, value)| (*key, value.as_ref()))
3255 .collect::<Vec<_>>()
3256 .as_ref(),
3257 |row| {
3258 Ok(BacktraceInfo {
3259 execution_id: execution_id.clone(),
3260 component_id: row.get::<_, JsonWrapper<_> >("component_id")?.0,
3261 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3262 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3263 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3264 })
3265 },
3266 ).map_err(DbErrorRead::from)
3267 },
3268 "get_last_backtrace",
3269 ).await
3270 }
3271
3272 async fn list_executions(
3273 &self,
3274 ffqn: Option<FunctionFqn>,
3275 top_level_only: bool,
3276 pagination: ExecutionListPagination,
3277 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3278 self.transaction(
3279 move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3280 "list_executions",
3281 )
3282 .await
3283 }
3284
3285 #[instrument(level = Level::DEBUG, skip(self))]
3286 async fn list_execution_events(
3287 &self,
3288 execution_id: &ExecutionId,
3289 since: &Version,
3290 max_length: VersionType,
3291 include_backtrace_id: bool,
3292 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3293 let execution_id = execution_id.clone();
3294 let since = since.0;
3295 self.transaction(
3296 move |tx| {
3297 Self::list_execution_events(
3298 tx,
3299 &execution_id,
3300 since,
3301 since + max_length,
3302 include_backtrace_id,
3303 )
3304 },
3305 "get",
3306 )
3307 .await
3308 }
3309
3310 async fn list_responses(
3311 &self,
3312 execution_id: &ExecutionId,
3313 pagination: Pagination<u32>,
3314 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3315 let execution_id = execution_id.clone();
3316 self.transaction(
3317 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3318 "list_responses",
3319 )
3320 .await
3321 }
3322
3323 async fn upgrade_execution_component(
3324 &self,
3325 execution_id: &ExecutionId,
3326 old: &InputContentDigest,
3327 new: &InputContentDigest,
3328 ) -> Result<(), DbErrorWrite> {
3329 let execution_id = execution_id.clone();
3330 let old = old.clone();
3331 let new = new.clone();
3332 self.transaction(
3333 move |tx| Self::upgrade_execution_component(tx, &execution_id, &old, &new),
3334 "upgrade_execution_component",
3335 )
3336 .await
3337 }
3338}
3339
3340#[async_trait]
3341impl DbConnection for SqlitePool {
3342 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3343 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3344 debug!("create");
3345 trace!(?req, "create");
3346 let created_at = req.created_at;
3347 let (version, notifier) = self
3348 .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
3349 .await?;
3350 self.notify_all(vec![notifier], created_at);
3351 Ok(version)
3352 }
3353
3354 #[instrument(level = Level::DEBUG, skip(self, batch))]
3355 async fn append_batch(
3356 &self,
3357 current_time: DateTime<Utc>,
3358 batch: Vec<AppendRequest>,
3359 execution_id: ExecutionId,
3360 version: Version,
3361 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3362 debug!("append_batch");
3363 trace!(?batch, "append_batch");
3364 assert!(!batch.is_empty(), "Empty batch request");
3365
3366 let (version, notifier) = self
3367 .transaction(
3368 move |tx| {
3369 let mut version = version.clone();
3370 let mut notifier = None;
3371 for append_request in &batch {
3372 let (v, n) =
3373 Self::append(tx, &execution_id, append_request.clone(), version)?;
3374 version = v;
3375 notifier = Some(n);
3376 }
3377 Ok::<_, DbErrorWrite>((
3378 version,
3379 notifier.expect("checked that the batch is not empty"),
3380 ))
3381 },
3382 "append_batch",
3383 )
3384 .await?;
3385
3386 self.notify_all(vec![notifier], current_time);
3387 Ok(version)
3388 }
3389
3390 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3391 async fn append_batch_create_new_execution(
3392 &self,
3393 current_time: DateTime<Utc>,
3394 batch: Vec<AppendRequest>,
3395 execution_id: ExecutionId,
3396 version: Version,
3397 child_req: Vec<CreateRequest>,
3398 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3399 debug!("append_batch_create_new_execution");
3400 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3401 assert!(!batch.is_empty(), "Empty batch request");
3402
3403 let (version, notifiers) = self
3404 .transaction(
3405 move |tx| {
3406 let mut notifier = None;
3407 let mut version = version.clone();
3408 for append_request in &batch {
3409 let (v, n) =
3410 Self::append(tx, &execution_id, append_request.clone(), version)?;
3411 version = v;
3412 notifier = Some(n);
3413 }
3414 let mut notifiers = Vec::new();
3415 notifiers.push(notifier.expect("checked that the batch is not empty"));
3416
3417 for child_req in &child_req {
3418 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
3419 notifiers.push(notifier);
3420 }
3421 Ok::<_, DbErrorWrite>((version, notifiers))
3422 },
3423 "append_batch_create_new_execution_inner",
3424 )
3425 .await?;
3426 self.notify_all(notifiers, current_time);
3427 Ok(version)
3428 }
3429
3430 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3434 async fn subscribe_to_next_responses(
3435 &self,
3436 execution_id: &ExecutionId,
3437 start_idx: u32,
3438 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3439 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3440 debug!("next_responses");
3441 let unique_tag: u64 = rand::random();
3442 let execution_id = execution_id.clone();
3443
3444 let cleanup = || {
3445 let mut guard = self.0.response_subscribers.lock().unwrap();
3446 match guard.remove(&execution_id) {
3447 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
3449 guard.insert(execution_id.clone(), other);
3451 }
3452 None => {} }
3454 };
3455
3456 let response_subscribers = self.0.response_subscribers.clone();
3457 let resp_or_receiver = {
3458 let execution_id = execution_id.clone();
3459 self.transaction(
3460 move |tx| {
3461 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
3462 if responses.is_empty() {
3463 let (sender, receiver) = oneshot::channel();
3465 response_subscribers
3466 .lock()
3467 .unwrap()
3468 .insert(execution_id.clone(), (sender, unique_tag));
3469 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
3470 } else {
3471 Ok(itertools::Either::Left(responses))
3472 }
3473 },
3474 "subscribe_to_next_responses",
3475 )
3476 .await
3477 }
3478 .inspect_err(|_| {
3479 cleanup();
3480 })?;
3481 match resp_or_receiver {
3482 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3484 let res = tokio::select! {
3485 resp = receiver => {
3486 match resp {
3487 Ok(resp) => Ok(vec![resp]),
3488 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3489 }
3490 }
3491 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3492 };
3493 cleanup();
3494 res
3495 }
3496 }
3497 }
3498
3499 async fn wait_for_finished_result(
3501 &self,
3502 execution_id: &ExecutionId,
3503 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3504 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3505 let unique_tag: u64 = rand::random();
3506 let execution_id = execution_id.clone();
3507 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3508
3509 let cleanup = || {
3510 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3511 if let Some(subscribers) = guard.get_mut(&execution_id) {
3512 subscribers.remove(&unique_tag);
3513 }
3514 };
3515
3516 let resp_or_receiver = {
3517 let execution_id = execution_id.clone();
3518 self.transaction(move |tx| {
3519 let pending_state =
3520 Self::get_combined_state(tx, &execution_id)?.pending_state;
3521 if let PendingState::Finished { finished, .. } = pending_state {
3522 let event =
3523 Self::get_execution_event(tx, &execution_id, finished.version)?;
3524 if let ExecutionRequest::Finished { result, ..} = event.event {
3525 Ok(itertools::Either::Left(result))
3526 } else {
3527 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3528 Err(DbErrorReadWithTimeout::from(consistency_db_err(
3529 "cannot get finished event based on t_state version"
3530 )))
3531 }
3532 } else {
3533 let (sender, receiver) = oneshot::channel();
3537 let mut guard = execution_finished_subscription.lock().unwrap();
3538 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3539 Ok(itertools::Either::Right(receiver))
3540 }
3541 }, "wait_for_finished_result")
3542 .await
3543 }
3544 .inspect_err(|_| {
3545 cleanup();
3549 })?;
3550
3551 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3552 match resp_or_receiver {
3553 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3555 let res = tokio::select! {
3556 resp = receiver => {
3557 match resp {
3558 Ok(retval) => Ok(retval),
3559 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3560 }
3561 }
3562 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3563 };
3564 cleanup();
3565 res
3566 }
3567 }
3568 }
3569
3570 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3571 async fn append_delay_response(
3572 &self,
3573 created_at: DateTime<Utc>,
3574 execution_id: ExecutionId,
3575 join_set_id: JoinSetId,
3576 delay_id: DelayId,
3577 result: Result<(), ()>,
3578 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3579 debug!("append_delay_response");
3580 let event = JoinSetResponseEventOuter {
3581 created_at,
3582 event: JoinSetResponseEvent {
3583 join_set_id,
3584 event: JoinSetResponse::DelayFinished {
3585 delay_id: delay_id.clone(),
3586 result,
3587 },
3588 },
3589 };
3590 let res = self
3591 .transaction(
3592 {
3593 let execution_id = execution_id.clone();
3594 move |tx| Self::append_response(tx, &execution_id, event.clone())
3595 },
3596 "append_delay_response",
3597 )
3598 .await;
3599 match res {
3600 Ok(notifier) => {
3601 self.notify_all(vec![notifier], created_at);
3602 Ok(AppendDelayResponseOutcome::Success)
3603 }
3604 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3605 let delay_success = self
3606 .transaction(
3607 move |tx| Self::delay_response(tx, &execution_id, &delay_id),
3608 "delay_response",
3609 )
3610 .await?;
3611 match delay_success {
3612 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3613 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3614 None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized(
3615 "insert failed yet select did not find the response".into(),
3616 ))),
3617 }
3618 }
3619 Err(err) => Err(err),
3620 }
3621 }
3622
3623 #[instrument(level = Level::DEBUG, skip_all)]
3624 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3625 debug!("append_backtrace");
3626 self.transaction(
3627 move |tx| Self::append_backtrace(tx, &append),
3628 "append_backtrace",
3629 )
3630 .await
3631 }
3632
3633 #[instrument(level = Level::DEBUG, skip_all)]
3634 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3635 debug!("append_backtrace_batch");
3636 self.transaction(
3637 move |tx| {
3638 for append in &batch {
3639 Self::append_backtrace(tx, append)?;
3640 }
3641 Ok(())
3642 },
3643 "append_backtrace",
3644 )
3645 .await
3646 }
3647
3648 #[instrument(level = Level::TRACE, skip(self))]
3650 async fn get_expired_timers(
3651 &self,
3652 at: DateTime<Utc>,
3653 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3654 self.transaction(
3655 move |conn| {
3656 let mut expired_timers = conn.prepare(
3657 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3658 )?
3659 .query_map(
3660 named_params! {
3661 ":at": at,
3662 },
3663 |row| {
3664 let execution_id = row.get("execution_id")?;
3665 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3666 let delay_id = row.get::<_, DelayId>("delay_id")?;
3667 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3668 Ok(ExpiredTimer::Delay(delay))
3669 },
3670 )?
3671 .collect::<Result<Vec<_>, _>>()?;
3672 let expired = conn.prepare(&format!(r#"
3674 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3675 executor_id, run_id
3676 FROM t_state
3677 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3678 "#
3679 )
3680 )?
3681 .query_map(
3682 named_params! {
3683 ":at": at,
3684 },
3685 |row| {
3686 let execution_id = row.get("execution_id")?;
3687 let locked_at_version = Version::new(row.get("last_lock_version")?);
3688 let next_version = Version::new(row.get("corresponding_version")?).increment();
3689 let intermittent_event_count = row.get("intermittent_event_count")?;
3690 let max_retries = row.get("max_retries")?;
3691 let retry_exp_backoff_millis = u64::from(row.get::<_, u32>("retry_exp_backoff_millis")?);
3692 let executor_id = row.get("executor_id")?;
3693 let run_id = row.get("run_id")?;
3694 let lock = ExpiredLock {
3695 execution_id,
3696 locked_at_version,
3697 next_version,
3698 intermittent_event_count,
3699 max_retries,
3700 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3701 locked_by: LockedBy { executor_id, run_id },
3702 };
3703 Ok(ExpiredTimer::Lock(lock))
3704 }
3705 )?
3706 .collect::<Result<Vec<_>, _>>()?;
3707 expired_timers.extend(expired);
3708 if !expired_timers.is_empty() {
3709 debug!("get_expired_timers found {expired_timers:?}");
3710 }
3711 Ok(expired_timers)
3712 }, "get_expired_timers"
3713 )
3714 .await
3715 }
3716
3717 async fn get_execution_event(
3718 &self,
3719 execution_id: &ExecutionId,
3720 version: &Version,
3721 ) -> Result<ExecutionEvent, DbErrorRead> {
3722 let version = version.0;
3723 let execution_id = execution_id.clone();
3724 self.transaction(
3725 move |tx| Self::get_execution_event(tx, &execution_id, version),
3726 "get_execution_event",
3727 )
3728 .await
3729 }
3730
3731 async fn get_pending_state(
3732 &self,
3733 execution_id: &ExecutionId,
3734 ) -> Result<PendingState, DbErrorRead> {
3735 let execution_id = execution_id.clone();
3736 Ok(self
3737 .transaction(
3738 move |tx| Self::get_combined_state(tx, &execution_id),
3739 "get_pending_state",
3740 )
3741 .await?
3742 .pending_state)
3743 }
3744}
3745
3746#[cfg(feature = "test")]
3747#[async_trait]
3748impl concepts::storage::DbConnectionTest for SqlitePool {
3749 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3750 async fn append_response(
3751 &self,
3752 created_at: DateTime<Utc>,
3753 execution_id: ExecutionId,
3754 response_event: JoinSetResponseEvent,
3755 ) -> Result<(), DbErrorWrite> {
3756 debug!("append_response");
3757 let event = JoinSetResponseEventOuter {
3758 created_at,
3759 event: response_event,
3760 };
3761 let notifier = self
3762 .transaction(
3763 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3764 "append_response",
3765 )
3766 .await?;
3767 self.notify_all(vec![notifier], created_at);
3768 Ok(())
3769 }
3770
3771 #[instrument(level = Level::DEBUG, skip(self))]
3772 async fn get(
3773 &self,
3774 execution_id: &ExecutionId,
3775 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3776 trace!("get");
3777 let execution_id = execution_id.clone();
3778 self.transaction(move |tx| Self::get(tx, &execution_id), "get")
3779 .await
3780 }
3781}
3782
3783#[cfg(any(test, feature = "tempfile"))]
3784pub mod tempfile {
3785 use super::{SqliteConfig, SqlitePool};
3786 use tempfile::NamedTempFile;
3787
3788 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3789 if let Ok(path) = std::env::var("SQLITE_FILE") {
3790 (
3791 SqlitePool::new(path, SqliteConfig::default())
3792 .await
3793 .unwrap(),
3794 None,
3795 )
3796 } else {
3797 let file = NamedTempFile::new().unwrap();
3798 let path = file.path();
3799 (
3800 SqlitePool::new(path, SqliteConfig::default())
3801 .await
3802 .unwrap(),
3803 Some(file),
3804 )
3805 }
3806 }
3807}