1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6 SupportedFunctionReturnValue,
7 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8 storage::{
9 AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10 AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11 DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12 DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool, DbPoolCloseable,
13 ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
14 ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15 JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy,
16 LockedExecution, Pagination, PendingState, PendingStateFinished,
17 PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
18 VersionType,
19 },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24 CachedStatement, Connection, ErrorCode, OpenFlags, OptionalExtension, Params, ToSql,
25 Transaction, TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28 cmp::max,
29 collections::VecDeque,
30 fmt::Debug,
31 ops::DerefMut,
32 path::Path,
33 str::FromStr,
34 sync::{
35 Arc, Mutex,
36 atomic::{AtomicBool, Ordering},
37 },
38 time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42 sync::{mpsc, oneshot},
43 time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53 join_set_id: JoinSetId,
54 delay_id: DelayId,
55 expires_at: DateTime<Utc>,
56}
57const PRAGMA: [[&str; 2]; 10] = [
69 ["journal_mode", "wal"],
70 ["synchronous", "FULL"],
71 ["foreign_keys", "true"],
72 ["busy_timeout", "1000"],
73 ["cache_size", "10000"], ["temp_store", "MEMORY"],
75 ["page_size", "8192"], ["mmap_size", "134217728"],
77 ["journal_size_limit", "67108864"],
78 ["integrity_check", ""],
79];
80
81const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84 id INTEGER PRIMARY KEY AUTOINCREMENT,
85 schema_version INTEGER NOT NULL,
86 created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94 execution_id TEXT NOT NULL,
95 created_at TEXT NOT NULL,
96 json_value TEXT NOT NULL,
97 version INTEGER NOT NULL,
98 variant TEXT NOT NULL,
99 join_set_id TEXT,
100 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101 PRIMARY KEY (execution_id, version)
102) STRICT
103";
104const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
107";
108const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
111";
112
113const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119 id INTEGER PRIMARY KEY AUTOINCREMENT,
120 created_at TEXT NOT NULL,
121 execution_id TEXT NOT NULL,
122 join_set_id TEXT NOT NULL,
123
124 delay_id TEXT,
125 delay_success INTEGER,
126
127 child_execution_id TEXT,
128 finished_version INTEGER,
129
130 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
131) STRICT
132";
133const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
135CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
136";
137const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
139CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
140ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
141";
142const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
144CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
145ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
146";
147
148const CREATE_TABLE_T_STATE: &str = r"
155CREATE TABLE IF NOT EXISTS t_state (
156 execution_id TEXT NOT NULL,
157 is_top_level INTEGER NOT NULL,
158 corresponding_version INTEGER NOT NULL,
159 pending_expires_finished TEXT NOT NULL,
160 ffqn TEXT NOT NULL,
161 state TEXT NOT NULL,
162 created_at TEXT NOT NULL,
163 updated_at TEXT NOT NULL,
164 scheduled_at TEXT NOT NULL,
165 intermittent_event_count INTEGER NOT NULL,
166
167 max_retries INTEGER,
168 retry_exp_backoff_millis INTEGER,
169 last_lock_version INTEGER,
170 executor_id TEXT,
171 run_id TEXT,
172
173 join_set_id TEXT,
174 join_set_closing INTEGER,
175
176 result_kind TEXT,
177
178 PRIMARY KEY (execution_id)
179) STRICT
180";
181const STATE_PENDING_AT: &str = "PendingAt";
182const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
183const STATE_LOCKED: &str = "Locked";
184const STATE_FINISHED: &str = "Finished";
185const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
186
187const IDX_T_STATE_LOCK_PENDING: &str = r"
189CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
190";
191const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
193";
194const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
195CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
196";
197const IDX_T_STATE_FFQN: &str = r"
199CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
200";
201const IDX_T_STATE_CREATED_AT: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
204";
205
206const CREATE_TABLE_T_DELAY: &str = r"
208CREATE TABLE IF NOT EXISTS t_delay (
209 execution_id TEXT NOT NULL,
210 join_set_id TEXT NOT NULL,
211 delay_id TEXT NOT NULL,
212 expires_at TEXT NOT NULL,
213 PRIMARY KEY (execution_id, join_set_id, delay_id)
214) STRICT
215";
216
217const CREATE_TABLE_T_BACKTRACE: &str = r"
219CREATE TABLE IF NOT EXISTS t_backtrace (
220 execution_id TEXT NOT NULL,
221 component_id TEXT NOT NULL,
222 version_min_including INTEGER NOT NULL,
223 version_max_excluding INTEGER NOT NULL,
224 wasm_backtrace TEXT NOT NULL,
225 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
226) STRICT
227";
228const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
230CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
231";
232
233#[derive(Debug, thiserror::Error, Clone)]
234enum RusqliteError {
235 #[error("not found")]
236 NotFound,
237 #[error("generic: {0}")]
238 Generic(StrVariant),
239}
240
241mod conversions {
242
243 use super::RusqliteError;
244 use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
245 use rusqlite::{
246 ToSql,
247 types::{FromSql, FromSqlError},
248 };
249 use std::{fmt::Debug, str::FromStr};
250 use tracing::error;
251
252 impl From<rusqlite::Error> for RusqliteError {
253 fn from(err: rusqlite::Error) -> Self {
254 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
255 RusqliteError::NotFound
256 } else {
257 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
258 RusqliteError::Generic(err.to_string().into())
259 }
260 }
261 }
262
263 impl From<RusqliteError> for DbErrorGeneric {
264 fn from(err: RusqliteError) -> DbErrorGeneric {
265 match err {
266 RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
267 RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
268 }
269 }
270 }
271 impl From<RusqliteError> for DbErrorRead {
272 fn from(err: RusqliteError) -> Self {
273 if matches!(err, RusqliteError::NotFound) {
274 Self::NotFound
275 } else {
276 Self::from(DbErrorGeneric::from(err))
277 }
278 }
279 }
280 impl From<RusqliteError> for DbErrorReadWithTimeout {
281 fn from(err: RusqliteError) -> Self {
282 Self::from(DbErrorRead::from(err))
283 }
284 }
285 impl From<RusqliteError> for DbErrorWrite {
286 fn from(err: RusqliteError) -> Self {
287 if matches!(err, RusqliteError::NotFound) {
288 Self::NotFound
289 } else {
290 Self::from(DbErrorGeneric::from(err))
291 }
292 }
293 }
294
295 pub(crate) struct JsonWrapper<T>(pub(crate) T);
296 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
297 fn column_result(
298 value: rusqlite::types::ValueRef<'_>,
299 ) -> rusqlite::types::FromSqlResult<Self> {
300 let value = match value {
301 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
302 Ok(value)
303 }
304 other => {
305 error!(
306 backtrace = %std::backtrace::Backtrace::capture(),
307 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
308 );
309 Err(FromSqlError::InvalidType)
310 }
311 }?;
312 let value = serde_json::from_slice::<T>(value).map_err(|err| {
313 error!(
314 backtrace = %std::backtrace::Backtrace::capture(),
315 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
316 r#type = std::any::type_name::<T>()
317 );
318 FromSqlError::InvalidType
319 })?;
320 Ok(Self(value))
321 }
322 }
323 impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
324 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
325 let string = serde_json::to_string(&self.0).map_err(|err| {
326 error!(
327 "Cannot serialize {value:?} of type `{type}` - {err:?}",
328 value = self.0,
329 r#type = std::any::type_name::<T>()
330 );
331 rusqlite::Error::ToSqlConversionFailure(Box::new(err))
332 })?;
333 Ok(rusqlite::types::ToSqlOutput::Owned(
334 rusqlite::types::Value::Text(string),
335 ))
336 }
337 }
338
339 pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
340 impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
341 fn column_result(
342 value: rusqlite::types::ValueRef<'_>,
343 ) -> rusqlite::types::FromSqlResult<Self> {
344 let value = String::column_result(value)?;
345 let value = T::from_str(&value).map_err(|err| {
346 error!(
347 backtrace = %std::backtrace::Backtrace::capture(),
348 "Cannot convert string `{value}` to type:`{type}` - {err:?}",
349 r#type = std::any::type_name::<T>()
350 );
351 FromSqlError::InvalidType
352 })?;
353 Ok(Self(value))
354 }
355 }
356
357 #[derive(Debug, thiserror::Error)]
359 #[error("{0}")]
360 pub(crate) struct OtherError(&'static str);
361 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
362 FromSqlError::other(OtherError(input)).into()
363 }
364
365 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
366 DbErrorGeneric::Uncategorized(input.into())
367 }
368}
369
370#[derive(Debug)]
371struct CombinedStateDTO {
372 state: String,
373 ffqn: String,
374 pending_expires_finished: DateTime<Utc>,
375 last_lock_version: Option<Version>,
377 executor_id: Option<ExecutorId>,
378 run_id: Option<RunId>,
379 join_set_id: Option<JoinSetId>,
381 join_set_closing: Option<bool>,
382 result_kind: Option<PendingStateFinishedResultKind>,
384}
385#[derive(Debug)]
386struct CombinedState {
387 ffqn: FunctionFqn,
388 pending_state: PendingState,
389 corresponding_version: Version,
390}
391impl CombinedState {
392 fn get_next_version_assert_not_finished(&self) -> Version {
393 assert!(!self.pending_state.is_finished());
394 self.corresponding_version.increment()
395 }
396
397 #[cfg(feature = "test")]
398 fn get_next_version_or_finished(&self) -> Version {
399 if self.pending_state.is_finished() {
400 self.corresponding_version.clone()
401 } else {
402 self.corresponding_version.increment()
403 }
404 }
405}
406
407#[derive(Debug)]
408struct NotifierPendingAt {
409 scheduled_at: DateTime<Utc>,
410 ffqn: FunctionFqn,
411}
412
413#[derive(Debug)]
414struct NotifierExecutionFinished {
415 execution_id: ExecutionId,
416 retval: SupportedFunctionReturnValue,
417}
418
419#[derive(Debug, Default)]
420struct AppendNotifier {
421 pending_at: Option<NotifierPendingAt>,
422 execution_finished: Option<NotifierExecutionFinished>,
423 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
424}
425
426impl CombinedState {
427 fn new(
428 dto: &CombinedStateDTO,
429 corresponding_version: Version,
430 ) -> Result<Self, rusqlite::Error> {
431 let pending_state = match dto {
432 CombinedStateDTO {
434 state,
435 ffqn: _,
436 pending_expires_finished: scheduled_at,
437 last_lock_version: None,
438 executor_id: None,
439 run_id: None,
440 join_set_id: None,
441 join_set_closing: None,
442 result_kind: None,
443 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
444 scheduled_at: *scheduled_at,
445 last_lock: None,
446 }),
447 CombinedStateDTO {
449 state,
450 ffqn: _,
451 pending_expires_finished: scheduled_at,
452 last_lock_version: None,
453 executor_id: Some(executor_id),
454 run_id: Some(run_id),
455 join_set_id: None,
456 join_set_closing: None,
457 result_kind: None,
458 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
459 scheduled_at: *scheduled_at,
460 last_lock: Some(LockedBy {
461 executor_id: *executor_id,
462 run_id: *run_id,
463 }),
464 }),
465 CombinedStateDTO {
466 state,
467 ffqn: _,
468 pending_expires_finished: lock_expires_at,
469 last_lock_version: Some(_),
470 executor_id: Some(executor_id),
471 run_id: Some(run_id),
472 join_set_id: None,
473 join_set_closing: None,
474 result_kind: None,
475 } if state == STATE_LOCKED => Ok(PendingState::Locked(PendingStateLocked {
476 locked_by: LockedBy {
477 executor_id: *executor_id,
478 run_id: *run_id,
479 },
480 lock_expires_at: *lock_expires_at,
481 })),
482 CombinedStateDTO {
483 state,
484 ffqn: _,
485 pending_expires_finished: lock_expires_at,
486 last_lock_version: None,
487 executor_id: _,
488 run_id: _,
489 join_set_id: Some(join_set_id),
490 join_set_closing: Some(join_set_closing),
491 result_kind: None,
492 } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
493 join_set_id: join_set_id.clone(),
494 closing: *join_set_closing,
495 lock_expires_at: *lock_expires_at,
496 }),
497 CombinedStateDTO {
498 state,
499 ffqn: _,
500 pending_expires_finished: finished_at,
501 last_lock_version: None,
502 executor_id: None,
503 run_id: None,
504 join_set_id: None,
505 join_set_closing: None,
506 result_kind: Some(result_kind),
507 } if state == STATE_FINISHED => Ok(PendingState::Finished {
508 finished: PendingStateFinished {
509 finished_at: *finished_at,
510 version: corresponding_version.0,
511 result_kind: *result_kind,
512 },
513 }),
514 _ => {
515 error!("Cannot deserialize pending state from {dto:?}");
516 Err(consistency_rusqlite("invalid `t_state`"))
517 }
518 }?;
519 Ok(Self {
520 ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
521 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
522 consistency_rusqlite("invalid ffqn value in `t_state`")
523 })?,
524 pending_state,
525 corresponding_version,
526 })
527 }
528}
529
530#[derive(derive_more::Debug)]
531struct LogicalTx {
532 #[debug(skip)]
533 func: Box<dyn FnMut(&mut Transaction) + Send>,
534 sent_at: Instant,
535 func_name: &'static str,
536 #[debug(skip)]
537 commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
538}
539
540#[derive(derive_more::Debug)]
541enum ThreadCommand {
542 LogicalTx(LogicalTx),
543 Shutdown,
544}
545
546#[derive(Clone)]
547pub struct SqlitePool(SqlitePoolInner);
548
549type ResponseSubscribers =
550 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
551type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
552type ExecutionFinishedSubscribers =
553 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
554#[derive(Clone)]
555struct SqlitePoolInner {
556 shutdown_requested: Arc<AtomicBool>,
557 shutdown_finished: Arc<AtomicBool>,
558 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
559 response_subscribers: ResponseSubscribers,
560 pending_ffqn_subscribers: PendingFfqnSubscribers,
561 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
562 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
564
565#[async_trait]
566impl DbPoolCloseable for SqlitePool {
567 async fn close(self) {
568 debug!("Sqlite is closing");
569 self.0.shutdown_requested.store(true, Ordering::Release);
570 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
572 while !self.0.shutdown_finished.load(Ordering::Acquire) {
573 tokio::time::sleep(Duration::from_millis(1)).await;
574 }
575 debug!("Sqlite was closed");
576 }
577}
578
579#[async_trait]
580impl DbPool for SqlitePool {
581 fn connection(&self) -> Box<dyn DbConnection> {
582 Box::new(self.clone())
583 }
584}
585impl Drop for SqlitePool {
586 fn drop(&mut self) {
587 let arc = self.0.join_handle.take().expect("join_handle was set");
588 if let Ok(join_handle) = Arc::try_unwrap(arc) {
589 if !join_handle.is_finished() {
591 if !self.0.shutdown_finished.load(Ordering::Acquire) {
592 let backtrace = std::backtrace::Backtrace::capture();
594 warn!("SqlitePool was not closed properly - {backtrace}");
595 self.0.shutdown_requested.store(true, Ordering::Release);
596 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
598 } else {
601 }
603 }
604 }
605 }
606}
607
608#[derive(Debug, Clone)]
609pub struct SqliteConfig {
610 pub queue_capacity: usize,
611 pub pragma_override: Option<HashMap<String, String>>,
612 pub metrics_threshold: Option<Duration>,
613}
614impl Default for SqliteConfig {
615 fn default() -> Self {
616 Self {
617 queue_capacity: 100,
618 pragma_override: None,
619 metrics_threshold: None,
620 }
621 }
622}
623
624impl SqlitePool {
625 fn init_thread(
626 path: &Path,
627 mut pragma_override: HashMap<String, String>,
628 ) -> Result<Connection, InitializationError> {
629 fn conn_execute<P: Params>(
630 conn: &Connection,
631 sql: &str,
632 params: P,
633 ) -> Result<(), InitializationError> {
634 conn.execute(sql, params).map(|_| ()).map_err(|err| {
635 error!("Cannot run `{sql}` - {err:?}");
636 InitializationError
637 })
638 }
639 fn pragma_update(
640 conn: &Connection,
641 name: &str,
642 value: &str,
643 ) -> Result<(), InitializationError> {
644 if value.is_empty() {
645 debug!("Querying PRAGMA {name}");
646 conn.pragma_query(None, name, |row| {
647 debug!("{row:?}");
648 Ok(())
649 })
650 .map_err(|err| {
651 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
652 InitializationError
653 })
654 } else {
655 debug!("Setting PRAGMA {name}={value}");
656 conn.pragma_update(None, name, value).map_err(|err| {
657 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
658 InitializationError
659 })
660 }
661 }
662
663 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
664 error!("cannot open the connection - {err:?}");
665 InitializationError
666 })?;
667
668 for [pragma_name, default_value] in PRAGMA {
669 let pragma_value = pragma_override
670 .remove(pragma_name)
671 .unwrap_or_else(|| default_value.to_string());
672 pragma_update(&conn, pragma_name, &pragma_value)?;
673 }
674 for (pragma_name, pragma_value) in pragma_override.drain() {
676 pragma_update(&conn, &pragma_name, &pragma_value)?;
677 }
678
679 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
681 conn_execute(
683 &conn,
684 &format!(
685 "INSERT INTO t_metadata (schema_version, created_at) VALUES
686 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
687 ),
688 [Utc::now()],
689 )?;
690 let actual_version = conn
692 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
693 .map_err(|err| {
694 error!("cannot select schema version - {err:?}");
695 InitializationError
696 })?
697 .query_row([], |row| row.get::<_, u32>("schema_version"));
698
699 let actual_version = actual_version.map_err(|err| {
700 error!("Cannot read the schema version - {err:?}");
701 InitializationError
702 })?;
703 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
704 error!(
705 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
706 );
707 return Err(InitializationError);
708 }
709
710 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
712 conn_execute(
713 &conn,
714 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
715 [],
716 )?;
717 conn_execute(
718 &conn,
719 CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
720 [],
721 )?;
722 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
724 conn_execute(
725 &conn,
726 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
727 [],
728 )?;
729 conn_execute(
730 &conn,
731 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
732 [],
733 )?;
734 conn_execute(
735 &conn,
736 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
737 [],
738 )?;
739 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
741 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
742 conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
743 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
744 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
745 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
746 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
748 conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
750 conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
751 Ok(conn)
752 }
753
754 fn connection_rpc(
755 mut conn: Connection,
756 shutdown_requested: &AtomicBool,
757 shutdown_finished: &AtomicBool,
758 mut command_rx: mpsc::Receiver<ThreadCommand>,
759 metrics_threshold: Option<Duration>,
760 ) {
761 let mut histograms = Histograms::new(metrics_threshold);
762 while Self::tick(
763 &mut conn,
764 shutdown_requested,
765 &mut command_rx,
766 &mut histograms,
767 )
768 .is_ok()
769 {
770 }
772 debug!("Closing command thread");
773 shutdown_finished.store(true, Ordering::Release);
774 }
775
776 fn tick(
777 conn: &mut Connection,
778 shutdown_requested: &AtomicBool,
779 command_rx: &mut mpsc::Receiver<ThreadCommand>,
780 histograms: &mut Histograms,
781 ) -> Result<(), ()> {
782 let mut ltx_list = Vec::new();
783 let mut ltx = match command_rx.blocking_recv() {
785 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
786 Some(ThreadCommand::Shutdown) => {
787 debug!("shutdown message received");
788 return Err(());
789 }
790 None => {
791 debug!("command_rx was closed");
792 return Err(());
793 }
794 };
795 let all_fns_start = std::time::Instant::now();
796 let mut physical_tx = conn
797 .transaction_with_behavior(TransactionBehavior::Immediate)
798 .map_err(|begin_err| {
799 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
800 })?;
801 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
802 ltx_list.push(ltx);
803
804 while let Ok(more) = command_rx.try_recv() {
805 let mut ltx = match more {
806 ThreadCommand::Shutdown => {
807 debug!("shutdown message received");
808 return Err(());
809 }
810 ThreadCommand::LogicalTx(ltx) => ltx,
811 };
812
813 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
814 ltx_list.push(ltx);
815 }
816 histograms.record_all_fns(all_fns_start.elapsed());
817
818 {
819 if shutdown_requested.load(Ordering::Relaxed) {
821 debug!("Recveived shutdown during processing of the batch");
822 return Err(());
823 }
824 let commit_result = {
825 let now = std::time::Instant::now();
826 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
827 histograms.record_commit(now.elapsed());
828 commit_result
829 };
830 if commit_result.is_ok() || ltx_list.len() == 1 {
831 for ltx in ltx_list {
833 let _ = ltx.commit_ack_sender.send(commit_result.clone());
835 }
836 } else {
837 warn!(
838 "Bulk transaction failed, committing {} transactions serially",
839 ltx_list.len()
840 );
841 for ltx in ltx_list {
844 Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
845 }
846 }
847 }
848 histograms.print_if_elapsed();
849 Ok(())
850 }
851
852 fn ltx_commit_single(
853 mut ltx: LogicalTx,
854 conn: &mut Connection,
855 shutdown_requested: &AtomicBool,
856 histograms: &mut Histograms,
857 ) -> Result<(), ()> {
858 let mut physical_tx = conn
859 .transaction_with_behavior(TransactionBehavior::Immediate)
860 .map_err(|begin_err| {
861 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
862 })?;
863 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
864 if shutdown_requested.load(Ordering::Relaxed) {
865 debug!("Recveived shutdown during processing of the batch");
866 return Err(());
867 }
868 let commit_result = {
869 let now = std::time::Instant::now();
870 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
871 histograms.record_commit(now.elapsed());
872 commit_result
873 };
874 let _ = ltx.commit_ack_sender.send(commit_result);
876 Ok(())
877 }
878
879 fn ltx_apply_to_tx(
880 ltx: &mut LogicalTx,
881 physical_tx: &mut Transaction,
882 histograms: &mut Histograms,
883 ) {
884 let sent_latency = ltx.sent_at.elapsed();
885 let started_at = Instant::now();
886 (ltx.func)(physical_tx);
887 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
888 }
889
890 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
891 pub async fn new<P: AsRef<Path>>(
892 path: P,
893 config: SqliteConfig,
894 ) -> Result<Self, InitializationError> {
895 let path = path.as_ref().to_owned();
896
897 let shutdown_requested = Arc::new(AtomicBool::new(false));
898 let shutdown_finished = Arc::new(AtomicBool::new(false));
899
900 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
901 info!("Sqlite database location: {path:?}");
902 let join_handle = {
903 let init_task = {
905 tokio::task::spawn_blocking(move || {
906 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
907 })
908 .await
909 };
910 let conn = match init_task {
911 Ok(res) => res?,
912 Err(join_err) => {
913 error!("Initialization panic - {join_err:?}");
914 return Err(InitializationError);
915 }
916 };
917 let shutdown_requested = shutdown_requested.clone();
918 let shutdown_finished = shutdown_finished.clone();
919 std::thread::spawn(move || {
921 Self::connection_rpc(
922 conn,
923 &shutdown_requested,
924 &shutdown_finished,
925 command_rx,
926 config.metrics_threshold,
927 );
928 })
929 };
930 Ok(SqlitePool(SqlitePoolInner {
931 shutdown_requested,
932 shutdown_finished,
933 command_tx,
934 response_subscribers: Arc::default(),
935 pending_ffqn_subscribers: Arc::default(),
936 join_handle: Some(Arc::new(join_handle)),
937 execution_finished_subscribers: Arc::default(),
938 }))
939 }
940
941 async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
943 where
944 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
945 T: Send + 'static,
946 E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
947 {
948 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
949 let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
950 let thread_command_func = {
951 let fn_res = fn_res.clone();
952 ThreadCommand::LogicalTx(LogicalTx {
953 func: Box::new(move |tx| {
954 let func_res = func(tx);
955 *fn_res.lock().unwrap() = Some(func_res);
956 }),
957 sent_at: Instant::now(),
958 func_name,
959 commit_ack_sender,
960 })
961 };
962 self.0
963 .command_tx
964 .send(thread_command_func)
965 .await
966 .map_err(|_send_err| DbErrorGeneric::Close)?;
967
968 match commit_ack_receiver.await {
970 Ok(Ok(())) => {
971 let mut guard = fn_res.lock().unwrap();
972 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
973 }
974 Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
975 Err(_) => Err(E::from(DbErrorGeneric::Close)),
976 }
977 }
978
979 fn fetch_created_event(
980 conn: &Connection,
981 execution_id: &ExecutionId,
982 ) -> Result<CreateRequest, DbErrorRead> {
983 let mut stmt = conn.prepare(
984 "SELECT created_at, json_value FROM t_execution_log WHERE \
985 execution_id = :execution_id AND version = 0",
986 )?;
987 let (created_at, event) = stmt.query_row(
988 named_params! {
989 ":execution_id": execution_id.to_string(),
990 },
991 |row| {
992 let created_at = row.get("created_at")?;
993 let event = row
994 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
995 .map_err(|serde| {
996 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
997 consistency_rusqlite("cannot deserialize `Created` event")
998 })?;
999 Ok((created_at, event.0))
1000 },
1001 )?;
1002 if let ExecutionEventInner::Created {
1003 ffqn,
1004 params,
1005 parent,
1006 scheduled_at,
1007 component_id,
1008 metadata,
1009 scheduled_by,
1010 } = event
1011 {
1012 Ok(CreateRequest {
1013 created_at,
1014 execution_id: execution_id.clone(),
1015 ffqn,
1016 params,
1017 parent,
1018 scheduled_at,
1019 component_id,
1020 metadata,
1021 scheduled_by,
1022 })
1023 } else {
1024 error!("Row with version=0 must be a `Created` event - {event:?}");
1025 Err(consistency_db_err("expected `Created` event").into())
1026 }
1027 }
1028
1029 fn check_expected_next_and_appending_version(
1030 expected_version: &Version,
1031 appending_version: &Version,
1032 ) -> Result<(), DbErrorWrite> {
1033 if *expected_version != *appending_version {
1034 debug!(
1035 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1036 );
1037 return Err(DbErrorWrite::NonRetriable(
1038 DbErrorWriteNonRetriable::VersionConflict {
1039 expected: expected_version.clone(),
1040 requested: appending_version.clone(),
1041 },
1042 ));
1043 }
1044 Ok(())
1045 }
1046
1047 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1048 fn create_inner(
1049 tx: &Transaction,
1050 req: CreateRequest,
1051 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1052 debug!("create_inner");
1053
1054 let version = Version::default();
1055 let execution_id = req.execution_id.clone();
1056 let execution_id_str = execution_id.to_string();
1057 let ffqn = req.ffqn.clone();
1058 let created_at = req.created_at;
1059 let scheduled_at = req.scheduled_at;
1060 let event = ExecutionEventInner::from(req);
1061 let event_ser = serde_json::to_string(&event).map_err(|err| {
1062 error!("Cannot serialize {event:?} - {err:?}");
1063 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1064 })?;
1065 tx.prepare(
1066 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1067 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1068 ?
1069 .execute(named_params! {
1070 ":execution_id": &execution_id_str,
1071 ":created_at": created_at,
1072 ":version": version.0,
1073 ":json_value": event_ser,
1074 ":variant": event.variant(),
1075 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1076 })
1077 ?;
1078 let pending_at = {
1079 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1080 tx.prepare(
1081 r"
1082 INSERT INTO t_state (
1083 execution_id,
1084 is_top_level,
1085 corresponding_version,
1086 pending_expires_finished,
1087 ffqn,
1088 state,
1089 created_at,
1090 updated_at,
1091 scheduled_at,
1092 intermittent_event_count
1093 )
1094 VALUES (
1095 :execution_id,
1096 :is_top_level,
1097 :corresponding_version,
1098 :pending_expires_finished,
1099 :ffqn,
1100 :state,
1101 :created_at,
1102 CURRENT_TIMESTAMP,
1103 :scheduled_at,
1104 0
1105 )
1106 ",
1107 )?
1108 .execute(named_params! {
1109 ":execution_id": execution_id.to_string(),
1110 ":is_top_level": execution_id.is_top_level(),
1111 ":corresponding_version": version.0,
1112 ":pending_expires_finished": scheduled_at,
1113 ":ffqn": ffqn.to_string(),
1114 ":state": STATE_PENDING_AT,
1115 ":created_at": created_at,
1116 ":scheduled_at": scheduled_at,
1117 })?;
1118 AppendNotifier {
1119 pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1120 execution_finished: None,
1121 response: None,
1122 }
1123 };
1124 let next_version = Version::new(version.0 + 1);
1125 Ok((next_version, pending_at))
1126 }
1127
1128 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1129 fn update_state_pending_after_response_appended(
1130 tx: &Transaction,
1131 execution_id: &ExecutionId,
1132 scheduled_at: DateTime<Utc>, corresponding_version: &Version, ) -> Result<AppendNotifier, DbErrorWrite> {
1135 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1136 let execution_id_str = execution_id.to_string();
1137 let mut stmt = tx
1138 .prepare_cached(
1139 r"
1140 UPDATE t_state
1141 SET
1142 corresponding_version = :corresponding_version,
1143 pending_expires_finished = :pending_expires_finished,
1144 state = :state,
1145 updated_at = CURRENT_TIMESTAMP,
1146
1147 last_lock_version = NULL,
1148
1149 join_set_id = NULL,
1150 join_set_closing = NULL,
1151
1152 result_kind = NULL
1153 WHERE execution_id = :execution_id
1154 ",
1155 )
1156 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1157 let updated = stmt
1158 .execute(named_params! {
1159 ":execution_id": execution_id_str,
1160 ":corresponding_version": corresponding_version.0,
1161 ":pending_expires_finished": scheduled_at,
1162 ":state": STATE_PENDING_AT,
1163 })
1164 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1165 if updated != 1 {
1166 return Err(DbErrorWrite::NotFound);
1167 }
1168 Ok(AppendNotifier {
1169 pending_at: Some(NotifierPendingAt {
1170 scheduled_at,
1171 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1172 }),
1173 execution_finished: None,
1174 response: None,
1175 })
1176 }
1177
1178 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1179 fn update_state_pending_after_event_appended(
1180 tx: &Transaction,
1181 execution_id: &ExecutionId,
1182 appending_version: &Version,
1183 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1185 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1186 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1187 let mut stmt = tx
1190 .prepare_cached(
1191 r"
1192 UPDATE t_state
1193 SET
1194 corresponding_version = :appending_version,
1195 pending_expires_finished = :pending_expires_finished,
1196 state = :state,
1197 updated_at = CURRENT_TIMESTAMP,
1198 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1199
1200 last_lock_version = NULL,
1201
1202 join_set_id = NULL,
1203 join_set_closing = NULL,
1204
1205 result_kind = NULL
1206 WHERE execution_id = :execution_id;
1207 ",
1208 )
1209 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1210 let updated = stmt
1211 .execute(named_params! {
1212 ":execution_id": execution_id.to_string(),
1213 ":appending_version": appending_version.0,
1214 ":pending_expires_finished": scheduled_at,
1215 ":state": STATE_PENDING_AT,
1216 ":intermittent_delta": i32::from(intermittent_failure) })
1218 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1219 if updated != 1 {
1220 return Err(DbErrorWrite::NotFound);
1221 }
1222 Ok((
1223 appending_version.increment(),
1224 AppendNotifier {
1225 pending_at: Some(NotifierPendingAt {
1226 scheduled_at,
1227 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1228 }),
1229 execution_finished: None,
1230 response: None,
1231 },
1232 ))
1233 }
1234
1235 fn update_state_locked_get_intermittent_event_count(
1236 tx: &Transaction,
1237 execution_id: &ExecutionId,
1238 executor_id: ExecutorId,
1239 run_id: RunId,
1240 lock_expires_at: DateTime<Utc>,
1241 appending_version: &Version,
1242 retry_config: ComponentRetryConfig,
1243 ) -> Result<u32, DbErrorWrite> {
1244 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1245 let backoff_millis =
1246 u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1247 let execution_id_str = execution_id.to_string();
1248 let mut stmt = tx
1249 .prepare_cached(
1250 r"
1251 UPDATE t_state
1252 SET
1253 corresponding_version = :appending_version,
1254 pending_expires_finished = :pending_expires_finished,
1255 state = :state,
1256 updated_at = CURRENT_TIMESTAMP,
1257
1258 max_retries = :max_retries,
1259 retry_exp_backoff_millis = :retry_exp_backoff_millis,
1260 last_lock_version = :appending_version,
1261 executor_id = :executor_id,
1262 run_id = :run_id,
1263
1264 join_set_id = NULL,
1265 join_set_closing = NULL,
1266
1267 result_kind = NULL
1268 WHERE execution_id = :execution_id
1269 ",
1270 )
1271 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1272 let updated = stmt
1273 .execute(named_params! {
1274 ":execution_id": execution_id_str,
1275 ":appending_version": appending_version.0,
1276 ":pending_expires_finished": lock_expires_at,
1277 ":state": STATE_LOCKED,
1278 ":max_retries": retry_config.max_retries,
1279 ":retry_exp_backoff_millis": backoff_millis,
1280 ":executor_id": executor_id.to_string(),
1281 ":run_id": run_id.to_string(),
1282 })
1283 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1284 if updated != 1 {
1285 return Err(DbErrorWrite::NotFound);
1286 }
1287
1288 let intermittent_event_count = tx
1290 .prepare(
1291 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1292 )?
1293 .query_row(
1294 named_params! {
1295 ":execution_id": execution_id_str,
1296 },
1297 |row| {
1298 let intermittent_event_count = row.get("intermittent_event_count")?;
1299 Ok(intermittent_event_count)
1300 },
1301 )
1302 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1303
1304 Ok(intermittent_event_count)
1305 }
1306
1307 fn update_state_blocked(
1308 tx: &Transaction,
1309 execution_id: &ExecutionId,
1310 appending_version: &Version,
1311 join_set_id: &JoinSetId,
1313 lock_expires_at: DateTime<Utc>,
1314 join_set_closing: bool,
1315 ) -> Result<
1316 AppendResponse, DbErrorWrite,
1318 > {
1319 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1320 let execution_id_str = execution_id.to_string();
1321 let mut stmt = tx.prepare_cached(
1322 r"
1323 UPDATE t_state
1324 SET
1325 corresponding_version = :appending_version,
1326 pending_expires_finished = :pending_expires_finished,
1327 state = :state,
1328 updated_at = CURRENT_TIMESTAMP,
1329
1330 last_lock_version = NULL,
1331
1332 join_set_id = :join_set_id,
1333 join_set_closing = :join_set_closing,
1334
1335 result_kind = NULL
1336 WHERE execution_id = :execution_id
1337 ",
1338 )?;
1339 let updated = stmt.execute(named_params! {
1340 ":execution_id": execution_id_str,
1341 ":appending_version": appending_version.0,
1342 ":pending_expires_finished": lock_expires_at,
1343 ":state": STATE_BLOCKED_BY_JOIN_SET,
1344 ":join_set_id": join_set_id,
1345 ":join_set_closing": join_set_closing,
1346 })?;
1347 if updated != 1 {
1348 return Err(DbErrorWrite::NotFound);
1349 }
1350 Ok(appending_version.increment())
1351 }
1352
1353 fn update_state_finished(
1354 tx: &Transaction,
1355 execution_id: &ExecutionId,
1356 appending_version: &Version,
1357 finished_at: DateTime<Utc>,
1359 result_kind: PendingStateFinishedResultKind,
1360 ) -> Result<(), DbErrorWrite> {
1361 debug!("Setting t_state to Finished");
1362 let execution_id_str = execution_id.to_string();
1363 let mut stmt = tx.prepare_cached(
1364 r"
1365 UPDATE t_state
1366 SET
1367 corresponding_version = :appending_version,
1368 pending_expires_finished = :pending_expires_finished,
1369 state = :state,
1370 updated_at = CURRENT_TIMESTAMP,
1371
1372 last_lock_version = NULL,
1373 executor_id = NULL,
1374 run_id = NULL,
1375
1376 join_set_id = NULL,
1377 join_set_closing = NULL,
1378
1379 result_kind = :result_kind
1380 WHERE execution_id = :execution_id
1381 ",
1382 )?;
1383
1384 let updated = stmt.execute(named_params! {
1385 ":execution_id": execution_id_str,
1386 ":appending_version": appending_version.0,
1387 ":pending_expires_finished": finished_at,
1388 ":state": STATE_FINISHED,
1389 ":result_kind": JsonWrapper(result_kind),
1390 })?;
1391 if updated != 1 {
1392 return Err(DbErrorWrite::NotFound);
1393 }
1394 Ok(())
1395 }
1396
1397 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1399 fn bump_state_next_version(
1400 tx: &Transaction,
1401 execution_id: &ExecutionId,
1402 appending_version: &Version,
1403 delay_req: Option<DelayReq>,
1404 ) -> Result<AppendResponse , DbErrorWrite> {
1405 debug!("update_index_version");
1406 let execution_id_str = execution_id.to_string();
1407 let mut stmt = tx.prepare_cached(
1408 r"
1409 UPDATE t_state
1410 SET
1411 corresponding_version = :appending_version,
1412 updated_at = CURRENT_TIMESTAMP
1413 WHERE execution_id = :execution_id
1414 ",
1415 )?;
1416 let updated = stmt.execute(named_params! {
1417 ":execution_id": execution_id_str,
1418 ":appending_version": appending_version.0,
1419 })?;
1420 if updated != 1 {
1421 return Err(DbErrorWrite::NotFound);
1422 }
1423 if let Some(DelayReq {
1424 join_set_id,
1425 delay_id,
1426 expires_at,
1427 }) = delay_req
1428 {
1429 debug!("Inserting delay to `t_delay`");
1430 let mut stmt = tx.prepare_cached(
1431 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1432 VALUES \
1433 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1434 )?;
1435 stmt.execute(named_params! {
1436 ":execution_id": execution_id_str,
1437 ":join_set_id": join_set_id.to_string(),
1438 ":delay_id": delay_id.to_string(),
1439 ":expires_at": expires_at,
1440 })
1441 .map_err(|err| match err.sqlite_error().map(|err| err.code) {
1442 Some(ErrorCode::ConstraintViolation) => DbErrorWrite::NonRetriable(
1443 DbErrorWriteNonRetriable::IllegalState("conflicting delay id".into()),
1444 ),
1445 _ => DbErrorWrite::from(err),
1446 })?;
1447 }
1448 Ok(appending_version.increment())
1449 }
1450
1451 fn get_combined_state(
1452 tx: &Transaction,
1453 execution_id: &ExecutionId,
1454 ) -> Result<CombinedState, DbErrorRead> {
1455 let mut stmt = tx.prepare(
1456 r"
1457 SELECT
1458 state, ffqn, corresponding_version, pending_expires_finished,
1459 last_lock_version, executor_id, run_id,
1460 join_set_id, join_set_closing,
1461 result_kind
1462 FROM t_state
1463 WHERE
1464 execution_id = :execution_id
1465 ",
1466 )?;
1467 stmt.query_row(
1468 named_params! {
1469 ":execution_id": execution_id.to_string(),
1470 },
1471 |row| {
1472 CombinedState::new(
1473 &CombinedStateDTO {
1474 state: row.get("state")?,
1475 ffqn: row.get("ffqn")?,
1476 pending_expires_finished: row
1477 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1478 last_lock_version: row
1479 .get::<_, Option<VersionType>>("last_lock_version")?
1480 .map(Version::new),
1481 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1482 run_id: row.get::<_, Option<RunId>>("run_id")?,
1483 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1484 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1485 result_kind: row
1486 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1487 "result_kind",
1488 )?
1489 .map(|wrapper| wrapper.0),
1490 },
1491 Version::new(row.get("corresponding_version")?),
1492 )
1493 },
1494 )
1495 .map_err(DbErrorRead::from)
1496 }
1497
1498 fn list_executions(
1499 read_tx: &Transaction,
1500 ffqn: Option<&FunctionFqn>,
1501 top_level_only: bool,
1502 pagination: &ExecutionListPagination,
1503 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1504 struct StatementModifier<'a> {
1505 where_vec: Vec<String>,
1506 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1507 limit: u32,
1508 limit_desc: bool,
1509 }
1510
1511 fn paginate<'a, T: rusqlite::ToSql + 'static>(
1512 pagination: &'a Pagination<Option<T>>,
1513 column: &str,
1514 top_level_only: bool,
1515 ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1516 let mut where_vec: Vec<String> = vec![];
1517 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1518 let limit = pagination.length();
1519 let limit_desc = pagination.is_desc();
1520 let rel = pagination.rel();
1521 match pagination {
1522 Pagination::NewerThan {
1523 cursor: Some(cursor),
1524 ..
1525 }
1526 | Pagination::OlderThan {
1527 cursor: Some(cursor),
1528 ..
1529 } => {
1530 where_vec.push(format!("{column} {rel} :cursor"));
1531 let cursor = cursor.to_sql().map_err(|err| {
1532 error!("Possible program error - cannot convert cursor to sql - {err:?}");
1533 DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1534 })?;
1535 params.push((":cursor", cursor));
1536 }
1537 _ => {}
1538 }
1539 if top_level_only {
1540 where_vec.push("is_top_level=true".to_string());
1541 }
1542 Ok(StatementModifier {
1543 where_vec,
1544 params,
1545 limit,
1546 limit_desc,
1547 })
1548 }
1549
1550 let mut statement_mod = match pagination {
1551 ExecutionListPagination::CreatedBy(pagination) => {
1552 paginate(pagination, "created_at", top_level_only)?
1553 }
1554 ExecutionListPagination::ExecutionId(pagination) => {
1555 paginate(pagination, "execution_id", top_level_only)?
1556 }
1557 };
1558
1559 let ffqn_temporary;
1560 if let Some(ffqn) = ffqn {
1561 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1562 ffqn_temporary = ffqn.to_string();
1563 let ffqn = ffqn_temporary
1564 .to_sql()
1565 .expect("string conversion never fails");
1566
1567 statement_mod.params.push((":ffqn", ffqn));
1568 }
1569
1570 let where_str = if statement_mod.where_vec.is_empty() {
1571 String::new()
1572 } else {
1573 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1574 };
1575 let sql = format!(
1576 r"
1577 SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1578 last_lock_version, executor_id, run_id,
1579 join_set_id, join_set_closing,
1580 result_kind
1581 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1582 ",
1583 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1584 limit = statement_mod.limit,
1585 );
1586 let mut vec: Vec<_> = read_tx
1587 .prepare(&sql)?
1588 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1589 statement_mod
1590 .params
1591 .into_iter()
1592 .collect::<Vec<_>>()
1593 .as_ref(),
1594 |row| {
1595 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1596 let created_at = row.get("created_at")?;
1597 let scheduled_at = row.get("scheduled_at")?;
1598 let combined_state = CombinedState::new(
1599 &CombinedStateDTO {
1600 state: row.get("state")?,
1601 ffqn: row.get("ffqn")?,
1602 pending_expires_finished: row
1603 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1604 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1605
1606 last_lock_version: row
1607 .get::<_, Option<VersionType>>("last_lock_version")?
1608 .map(Version::new),
1609 run_id: row.get::<_, Option<RunId>>("run_id")?,
1610 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1611 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1612 result_kind: row
1613 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1614 "result_kind",
1615 )?
1616 .map(|wrapper| wrapper.0),
1617 },
1618 Version::new(row.get("corresponding_version")?),
1619 )?;
1620 Ok(ExecutionWithState {
1621 execution_id,
1622 ffqn: combined_state.ffqn,
1623 pending_state: combined_state.pending_state,
1624 created_at,
1625 scheduled_at,
1626 })
1627 },
1628 )?
1629 .collect::<Vec<Result<_, _>>>()
1630 .into_iter()
1631 .filter_map(|row| match row {
1632 Ok(row) => Some(row),
1633 Err(err) => {
1634 warn!("Skipping row - {err:?}");
1635 None
1636 }
1637 })
1638 .collect();
1639
1640 if !statement_mod.limit_desc {
1641 vec.reverse();
1643 }
1644 Ok(vec)
1645 }
1646
1647 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1648 #[expect(clippy::too_many_arguments)]
1649 fn lock_single_execution(
1650 tx: &Transaction,
1651 created_at: DateTime<Utc>,
1652 component_id: &ComponentId,
1653 execution_id: &ExecutionId,
1654 run_id: RunId,
1655 appending_version: &Version,
1656 executor_id: ExecutorId,
1657 lock_expires_at: DateTime<Utc>,
1658 retry_config: ComponentRetryConfig,
1659 ) -> Result<LockedExecution, DbErrorWrite> {
1660 debug!("lock_single_execution");
1661 let combined_state = Self::get_combined_state(tx, execution_id)?;
1662 combined_state.pending_state.can_append_lock(
1663 created_at,
1664 executor_id,
1665 run_id,
1666 lock_expires_at,
1667 )?;
1668 let expected_version = combined_state.get_next_version_assert_not_finished();
1669 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1670
1671 let locked_event = Locked {
1673 component_id: component_id.clone(),
1674 executor_id,
1675 lock_expires_at,
1676 run_id,
1677 retry_config,
1678 };
1679 let event = ExecutionEventInner::Locked(locked_event.clone());
1680 let event_ser = serde_json::to_string(&event).map_err(|err| {
1681 warn!("Cannot serialize {event:?} - {err:?}");
1682 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1683 })?;
1684 let mut stmt = tx
1685 .prepare_cached(
1686 "INSERT INTO t_execution_log \
1687 (execution_id, created_at, json_value, version, variant) \
1688 VALUES \
1689 (:execution_id, :created_at, :json_value, :version, :variant)",
1690 )
1691 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1692 stmt.execute(named_params! {
1693 ":execution_id": execution_id.to_string(),
1694 ":created_at": created_at,
1695 ":json_value": event_ser,
1696 ":version": appending_version.0,
1697 ":variant": event.variant(),
1698 })
1699 .map_err(|err| {
1700 warn!("Cannot lock execution - {err:?}");
1701 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1702 })?;
1703
1704 let responses = Self::list_responses(tx, execution_id, None)?;
1706 let responses = responses.into_iter().map(|resp| resp.event).collect();
1707 trace!("Responses: {responses:?}");
1708
1709 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1710 tx,
1711 execution_id,
1712 executor_id,
1713 run_id,
1714 lock_expires_at,
1715 appending_version,
1716 retry_config,
1717 )?;
1718 let mut events = tx
1720 .prepare(
1721 "SELECT json_value, version FROM t_execution_log WHERE \
1722 execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
1723 ORDER BY version",
1724 )?
1725 .query_map(
1726 named_params! {
1727 ":execution_id": execution_id.to_string(),
1728 ":variant1": DUMMY_CREATED.variant(),
1729 ":variant2": DUMMY_HISTORY_EVENT.variant(),
1730 },
1731 |row| {
1732 let created_at_fake = DateTime::from_timestamp_nanos(0); let event = row
1734 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1735 .map_err(|serde| {
1736 error!("Cannot deserialize {row:?} - {serde:?}");
1737 consistency_rusqlite("cannot deserialize event")
1738 })?
1739 .0;
1740 let version = Version(row.get("version")?);
1741
1742 Ok(ExecutionEvent {
1743 created_at: created_at_fake,
1744 event,
1745 backtrace_id: None,
1746 version,
1747 })
1748 },
1749 )?
1750 .collect::<Result<Vec<_>, _>>()?
1751 .into_iter()
1752 .collect::<VecDeque<_>>();
1753 let Some(ExecutionEventInner::Created {
1754 ffqn,
1755 params,
1756 parent,
1757 metadata,
1758 ..
1759 }) = events.pop_front().map(|outer| outer.event)
1760 else {
1761 error!("Execution log must contain at least `Created` event");
1762 return Err(consistency_db_err("execution log must contain `Created` event").into());
1763 };
1764
1765 let event_history = events
1766 .into_iter()
1767 .map(|ExecutionEvent { event, version, .. }| {
1768 if let ExecutionEventInner::HistoryEvent { event } = event {
1769 Ok((event, version))
1770 } else {
1771 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1772 Err(consistency_db_err(
1773 "rows can only contain `Created` and `HistoryEvent` event kinds",
1774 ))
1775 }
1776 })
1777 .collect::<Result<Vec<_>, _>>()?;
1778
1779 Ok(LockedExecution {
1780 execution_id: execution_id.clone(),
1781 metadata,
1782 next_version: appending_version.increment(),
1783 ffqn,
1784 params,
1785 event_history,
1786 responses,
1787 parent,
1788 intermittent_event_count,
1789 locked_event,
1790 })
1791 }
1792
1793 fn count_join_next(
1794 tx: &Transaction,
1795 execution_id: &ExecutionId,
1796 join_set_id: &JoinSetId,
1797 ) -> Result<u64, DbErrorRead> {
1798 let mut stmt = tx.prepare(
1799 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1800 AND history_event_type = :join_next",
1801 )?;
1802 Ok(stmt.query_row(
1803 named_params! {
1804 ":execution_id": execution_id.to_string(),
1805 ":join_set_id": join_set_id.to_string(),
1806 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1807 },
1808 |row| row.get("count"),
1809 )?)
1810 }
1811
1812 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1813 #[expect(clippy::needless_return)]
1814 fn append(
1815 tx: &Transaction,
1816 execution_id: &ExecutionId,
1817 req: AppendRequest,
1818 appending_version: Version,
1819 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1820 if matches!(req.event, ExecutionEventInner::Created { .. }) {
1821 return Err(DbErrorWrite::NonRetriable(
1822 DbErrorWriteNonRetriable::ValidationFailed(
1823 "cannot append `Created` event - use `create` instead".into(),
1824 ),
1825 ));
1826 }
1827 if let AppendRequest {
1828 event:
1829 ExecutionEventInner::Locked(Locked {
1830 component_id,
1831 executor_id,
1832 run_id,
1833 lock_expires_at,
1834 retry_config,
1835 }),
1836 created_at,
1837 } = req
1838 {
1839 return Self::lock_single_execution(
1840 tx,
1841 created_at,
1842 &component_id,
1843 execution_id,
1844 run_id,
1845 &appending_version,
1846 executor_id,
1847 lock_expires_at,
1848 retry_config,
1849 )
1850 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1851 }
1852
1853 let combined_state = Self::get_combined_state(tx, execution_id)?;
1854 if combined_state.pending_state.is_finished() {
1855 debug!("Execution is already finished");
1856 return Err(DbErrorWrite::NonRetriable(
1857 DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1858 ));
1859 }
1860
1861 Self::check_expected_next_and_appending_version(
1862 &combined_state.get_next_version_assert_not_finished(),
1863 &appending_version,
1864 )?;
1865 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1866 error!("Cannot serialize {:?} - {err:?}", req.event);
1867 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1868 })?;
1869
1870 let mut stmt = tx.prepare(
1871 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1872 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1873 ?;
1874 stmt.execute(named_params! {
1875 ":execution_id": execution_id.to_string(),
1876 ":created_at": req.created_at,
1877 ":json_value": event_ser,
1878 ":version": appending_version.0,
1879 ":variant": req.event.variant(),
1880 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1881 })?;
1882 match &req.event {
1885 ExecutionEventInner::Created { .. } => {
1886 unreachable!("handled in the caller")
1887 }
1888
1889 ExecutionEventInner::Locked { .. } => {
1890 unreachable!("handled above")
1891 }
1892
1893 ExecutionEventInner::TemporarilyFailed {
1894 backoff_expires_at, ..
1895 }
1896 | ExecutionEventInner::TemporarilyTimedOut {
1897 backoff_expires_at, ..
1898 } => {
1899 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1900 tx,
1901 execution_id,
1902 &appending_version,
1903 *backoff_expires_at,
1904 true, )?;
1906 return Ok((next_version, notifier));
1907 }
1908
1909 ExecutionEventInner::Unlocked {
1910 backoff_expires_at, ..
1911 } => {
1912 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1913 tx,
1914 execution_id,
1915 &appending_version,
1916 *backoff_expires_at,
1917 false, )?;
1919 return Ok((next_version, notifier));
1920 }
1921
1922 ExecutionEventInner::Finished { result, .. } => {
1923 Self::update_state_finished(
1924 tx,
1925 execution_id,
1926 &appending_version,
1927 req.created_at,
1928 PendingStateFinishedResultKind::from(result),
1929 )?;
1930 return Ok((
1931 appending_version,
1932 AppendNotifier {
1933 pending_at: None,
1934 execution_finished: Some(NotifierExecutionFinished {
1935 execution_id: execution_id.clone(),
1936 retval: result.clone(),
1937 }),
1938 response: None,
1939 },
1940 ));
1941 }
1942
1943 ExecutionEventInner::HistoryEvent {
1944 event:
1945 HistoryEvent::JoinSetCreate { .. }
1946 | HistoryEvent::JoinSetRequest {
1947 request: JoinSetRequest::ChildExecutionRequest { .. },
1948 ..
1949 }
1950 | HistoryEvent::Persist { .. }
1951 | HistoryEvent::Schedule { .. }
1952 | HistoryEvent::Stub { .. }
1953 | HistoryEvent::JoinNextTooMany { .. },
1954 } => {
1955 return Ok((
1956 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1957 AppendNotifier::default(),
1958 ));
1959 }
1960
1961 ExecutionEventInner::HistoryEvent {
1962 event:
1963 HistoryEvent::JoinSetRequest {
1964 join_set_id,
1965 request:
1966 JoinSetRequest::DelayRequest {
1967 delay_id,
1968 expires_at,
1969 ..
1970 },
1971 },
1972 } => {
1973 return Ok((
1974 Self::bump_state_next_version(
1975 tx,
1976 execution_id,
1977 &appending_version,
1978 Some(DelayReq {
1979 join_set_id: join_set_id.clone(),
1980 delay_id: delay_id.clone(),
1981 expires_at: *expires_at,
1982 }),
1983 )?,
1984 AppendNotifier::default(),
1985 ));
1986 }
1987
1988 ExecutionEventInner::HistoryEvent {
1989 event:
1990 HistoryEvent::JoinNext {
1991 join_set_id,
1992 run_expires_at,
1993 closing,
1994 requested_ffqn: _,
1995 },
1996 } => {
1997 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1999 let nth_response =
2000 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2002 assert!(join_next_count > 0);
2003 if let Some(ResponseWithCursor {
2004 event:
2005 JoinSetResponseEventOuter {
2006 created_at: nth_created_at,
2007 ..
2008 },
2009 cursor: _,
2010 }) = nth_response
2011 {
2012 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2014 tx,
2015 execution_id,
2016 &appending_version,
2017 scheduled_at,
2018 false, )?;
2020 return Ok((next_version, notifier));
2021 }
2022 return Ok((
2023 Self::update_state_blocked(
2024 tx,
2025 execution_id,
2026 &appending_version,
2027 join_set_id,
2028 *run_expires_at,
2029 *closing,
2030 )?,
2031 AppendNotifier::default(),
2032 ));
2033 }
2034 }
2035 }
2036
2037 fn append_response(
2038 tx: &Transaction,
2039 execution_id: &ExecutionId,
2040 response_outer: JoinSetResponseEventOuter,
2041 ) -> Result<AppendNotifier, DbErrorWrite> {
2042 let mut stmt = tx.prepare(
2043 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2044 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2045 )?;
2046 let join_set_id = &response_outer.event.join_set_id;
2047 let (delay_id, delay_success) = match &response_outer.event.event {
2048 JoinSetResponse::DelayFinished { delay_id, result } => {
2049 (Some(delay_id.to_string()), Some(result.is_ok()))
2050 }
2051 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2052 };
2053 let (child_execution_id, finished_version) = match &response_outer.event.event {
2054 JoinSetResponse::ChildExecutionFinished {
2055 child_execution_id,
2056 finished_version,
2057 result: _,
2058 } => (
2059 Some(child_execution_id.to_string()),
2060 Some(finished_version.0),
2061 ),
2062 JoinSetResponse::DelayFinished { .. } => (None, None),
2063 };
2064
2065 stmt.execute(named_params! {
2066 ":execution_id": execution_id.to_string(),
2067 ":created_at": response_outer.created_at,
2068 ":join_set_id": join_set_id.to_string(),
2069 ":delay_id": delay_id,
2070 ":delay_success": delay_success,
2071 ":child_execution_id": child_execution_id,
2072 ":finished_version": finished_version,
2073 })
2074 .map_err(|err| match err.sqlite_error().map(|err| err.code) {
2075 Some(ErrorCode::ConstraintViolation) => DbErrorWrite::NonRetriable(
2076 DbErrorWriteNonRetriable::IllegalState("conflicting response id".into()),
2077 ),
2078 _ => DbErrorWrite::from(err),
2079 })?;
2080
2081 let combined_state = Self::get_combined_state(tx, execution_id)?;
2083 debug!("previous_pending_state: {combined_state:?}");
2084 let mut notifier = if let PendingState::BlockedByJoinSet {
2085 join_set_id: found_join_set_id,
2086 lock_expires_at, closing: _,
2088 } = combined_state.pending_state
2089 && *join_set_id == found_join_set_id
2090 {
2091 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2094 Self::update_state_pending_after_response_appended(
2097 tx,
2098 execution_id,
2099 scheduled_at,
2100 &combined_state.corresponding_version, )?
2102 } else {
2103 AppendNotifier::default()
2104 };
2105 if let JoinSetResponseEvent {
2106 join_set_id,
2107 event:
2108 JoinSetResponse::DelayFinished {
2109 delay_id,
2110 result: _,
2111 },
2112 } = &response_outer.event
2113 {
2114 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2115 let mut stmt =
2116 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2117 ?;
2118 stmt.execute(named_params! {
2119 ":execution_id": execution_id.to_string(),
2120 ":join_set_id": join_set_id.to_string(),
2121 ":delay_id": delay_id.to_string(),
2122 })?;
2123 }
2124 notifier.response = Some((execution_id.clone(), response_outer));
2125 Ok(notifier)
2126 }
2127
2128 fn append_backtrace(
2129 tx: &Transaction,
2130 backtrace_info: &BacktraceInfo,
2131 ) -> Result<(), DbErrorWrite> {
2132 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2133 warn!(
2134 "Cannot serialize backtrace {:?} - {err:?}",
2135 backtrace_info.wasm_backtrace
2136 );
2137 DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2138 })?;
2139 let mut stmt = tx
2140 .prepare(
2141 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2142 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2143 )
2144 ?;
2145 stmt.execute(named_params! {
2146 ":execution_id": backtrace_info.execution_id.to_string(),
2147 ":component_id": backtrace_info.component_id.to_string(),
2148 ":version_min_including": backtrace_info.version_min_including.0,
2149 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2150 ":wasm_backtrace": backtrace,
2151 })?;
2152 Ok(())
2153 }
2154
2155 #[cfg(feature = "test")]
2156 fn get(
2157 tx: &Transaction,
2158 execution_id: &ExecutionId,
2159 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2160 let mut stmt = tx.prepare(
2161 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2162 execution_id = :execution_id ORDER BY version",
2163 )?;
2164 let events = stmt
2165 .query_map(
2166 named_params! {
2167 ":execution_id": execution_id.to_string(),
2168 },
2169 |row| {
2170 let created_at = row.get("created_at")?;
2171 let event = row
2172 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2173 .map_err(|serde| {
2174 error!("Cannot deserialize {row:?} - {serde:?}");
2175 consistency_rusqlite("cannot deserialize event")
2176 })?
2177 .0;
2178 let version = Version(row.get("version")?);
2179
2180 Ok(ExecutionEvent {
2181 created_at,
2182 event,
2183 backtrace_id: None,
2184 version,
2185 })
2186 },
2187 )?
2188 .collect::<Result<Vec<_>, _>>()?;
2189 if events.is_empty() {
2190 return Err(DbErrorRead::NotFound);
2191 }
2192 let combined_state = Self::get_combined_state(tx, execution_id)?;
2193 let responses = Self::list_responses(tx, execution_id, None)?
2194 .into_iter()
2195 .map(|resp| resp.event)
2196 .collect();
2197 Ok(concepts::storage::ExecutionLog {
2198 execution_id: execution_id.clone(),
2199 events,
2200 responses,
2201 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2203 })
2204 }
2205
2206 fn list_execution_events(
2207 tx: &Transaction,
2208 execution_id: &ExecutionId,
2209 version_min: VersionType,
2210 version_max_excluding: VersionType,
2211 include_backtrace_id: bool,
2212 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2213 let select = if include_backtrace_id {
2214 "SELECT
2215 log.created_at,
2216 log.json_value,
2217 log.version as version,
2218 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2219 bt.version_min_including AS backtrace_id
2220 FROM
2221 t_execution_log AS log
2222 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2223 t_backtrace AS bt ON log.execution_id = bt.execution_id
2224 -- Check if the log's version falls within the backtrace's range
2225 AND log.version >= bt.version_min_including
2226 AND log.version < bt.version_max_excluding
2227 WHERE
2228 log.execution_id = :execution_id
2229 AND log.version >= :version_min
2230 AND log.version < :version_max_excluding
2231 ORDER BY
2232 log.version;"
2233 } else {
2234 "SELECT
2235 created_at, json_value, NULL as backtrace_id, version
2236 FROM t_execution_log WHERE
2237 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2238 ORDER BY version"
2239 };
2240 tx.prepare(select)?
2241 .query_map(
2242 named_params! {
2243 ":execution_id": execution_id.to_string(),
2244 ":version_min": version_min,
2245 ":version_max_excluding": version_max_excluding
2246 },
2247 |row| {
2248 let created_at = row.get("created_at")?;
2249 let backtrace_id = row
2250 .get::<_, Option<VersionType>>("backtrace_id")?
2251 .map(Version::new);
2252 let version = Version(row.get("version")?);
2253
2254 let event = row
2255 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2256 .map(|event| ExecutionEvent {
2257 created_at,
2258 event: event.0,
2259 backtrace_id,
2260 version,
2261 })
2262 .map_err(|serde| {
2263 error!("Cannot deserialize {row:?} - {serde:?}");
2264 consistency_rusqlite("cannot deserialize")
2265 })?;
2266 Ok(event)
2267 },
2268 )?
2269 .collect::<Result<Vec<_>, _>>()
2270 .map_err(DbErrorRead::from)
2271 }
2272
2273 fn get_execution_event(
2274 tx: &Transaction,
2275 execution_id: &ExecutionId,
2276 version: VersionType,
2277 ) -> Result<ExecutionEvent, DbErrorRead> {
2278 let mut stmt = tx.prepare(
2279 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2280 execution_id = :execution_id AND version = :version",
2281 )?;
2282 stmt.query_row(
2283 named_params! {
2284 ":execution_id": execution_id.to_string(),
2285 ":version": version,
2286 },
2287 |row| {
2288 let created_at = row.get("created_at")?;
2289 let event = row
2290 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2291 .map_err(|serde| {
2292 error!("Cannot deserialize {row:?} - {serde:?}");
2293 consistency_rusqlite("cannot deserialize event")
2294 })?;
2295 let version = Version(row.get("version")?);
2296
2297 Ok(ExecutionEvent {
2298 created_at,
2299 event: event.0,
2300 backtrace_id: None,
2301 version,
2302 })
2303 },
2304 )
2305 .map_err(DbErrorRead::from)
2306 }
2307
2308 fn get_last_execution_event(
2309 tx: &Transaction,
2310 execution_id: &ExecutionId,
2311 ) -> Result<ExecutionEvent, DbErrorRead> {
2312 let mut stmt = tx.prepare(
2313 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2314 execution_id = :execution_id ORDER BY version DESC",
2315 )?;
2316 stmt.query_row(
2317 named_params! {
2318 ":execution_id": execution_id.to_string(),
2319 },
2320 |row| {
2321 let created_at = row.get("created_at")?;
2322 let event = row
2323 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2324 .map_err(|serde| {
2325 error!("Cannot deserialize {row:?} - {serde:?}");
2326 consistency_rusqlite("cannot deserialize event")
2327 })?;
2328 let version = Version(row.get("version")?);
2329 Ok(ExecutionEvent {
2330 created_at,
2331 event: event.0,
2332 backtrace_id: None,
2333 version: version.clone(),
2334 })
2335 },
2336 )
2337 .map_err(DbErrorRead::from)
2338 }
2339
2340 fn list_responses(
2341 tx: &Transaction,
2342 execution_id: &ExecutionId,
2343 pagination: Option<Pagination<u32>>,
2344 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2345 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2347 let mut sql = "SELECT \
2348 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
2349 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2350 WHERE \
2351 r.execution_id = :execution_id \
2352 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2353 "
2354 .to_string();
2355 let limit = match &pagination {
2356 Some(
2357 pagination @ (Pagination::NewerThan { cursor, .. }
2358 | Pagination::OlderThan { cursor, .. }),
2359 ) => {
2360 params.push((":cursor", Box::new(cursor)));
2361 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2362 Some(pagination.length())
2363 }
2364 None => None,
2365 };
2366 sql.push_str(" ORDER BY id");
2367 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2368 sql.push_str(" DESC");
2369 }
2370 if let Some(limit) = limit {
2371 write!(sql, " LIMIT {limit}").unwrap();
2372 }
2373 params.push((":execution_id", Box::new(execution_id.to_string())));
2374 tx.prepare(&sql)?
2375 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2376 params
2377 .iter()
2378 .map(|(key, value)| (*key, value.as_ref()))
2379 .collect::<Vec<_>>()
2380 .as_ref(),
2381 Self::parse_response_with_cursor,
2382 )?
2383 .collect::<Result<Vec<_>, rusqlite::Error>>()
2384 .map_err(DbErrorRead::from)
2385 }
2386
2387 fn parse_response_with_cursor(
2388 row: &rusqlite::Row<'_>,
2389 ) -> Result<ResponseWithCursor, rusqlite::Error> {
2390 let id = row.get("id")?;
2391 let created_at: DateTime<Utc> = row.get("created_at")?;
2392 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2393 let event = match (
2394 row.get::<_, Option<DelayId>>("delay_id")?,
2395 row.get::<_, Option<bool>>("delay_success")?,
2396 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2397 row.get::<_, Option<VersionType>>("finished_version")?,
2398 row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2399 ) {
2400 (Some(delay_id), Some(delay_success), None, None, None) => {
2401 JoinSetResponse::DelayFinished {
2402 delay_id,
2403 result: delay_success.then_some(()).ok_or(()),
2404 }
2405 }
2406 (
2407 None,
2408 None,
2409 Some(child_execution_id),
2410 Some(finished_version),
2411 Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2412 ) => JoinSetResponse::ChildExecutionFinished {
2413 child_execution_id,
2414 finished_version: Version(finished_version),
2415 result,
2416 },
2417 (delay, delay_success, child, finished, result) => {
2418 error!(
2419 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
2420 result.map(|it| it.0)
2421 );
2422 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2423 }
2424 };
2425 Ok(ResponseWithCursor {
2426 cursor: id,
2427 event: JoinSetResponseEventOuter {
2428 event: JoinSetResponseEvent { join_set_id, event },
2429 created_at,
2430 },
2431 })
2432 }
2433
2434 fn nth_response(
2435 tx: &Transaction,
2436 execution_id: &ExecutionId,
2437 join_set_id: &JoinSetId,
2438 skip_rows: u64,
2439 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2440 tx
2442 .prepare(
2443 "SELECT r.id, r.created_at, r.join_set_id, \
2444 r.delay_id, r.delay_success, \
2445 r.child_execution_id, r.finished_version, l.json_value \
2446 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2447 WHERE \
2448 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2449 (
2450 r.finished_version = l.version \
2451 OR \
2452 r.child_execution_id IS NULL \
2453 ) \
2454 ORDER BY id \
2455 LIMIT 1 OFFSET :offset",
2456 )
2457 ?
2458 .query_row(
2459 named_params! {
2460 ":execution_id": execution_id.to_string(),
2461 ":join_set_id": join_set_id.to_string(),
2462 ":offset": skip_rows,
2463 },
2464 Self::parse_response_with_cursor,
2465 )
2466 .optional()
2467 .map_err(DbErrorRead::from)
2468 }
2469
2470 #[instrument(level = Level::TRACE, skip_all)]
2472 fn get_responses_with_offset(
2473 tx: &Transaction,
2474 execution_id: &ExecutionId,
2475 skip_rows: usize,
2476 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2477 tx.prepare(
2479 "SELECT r.id, r.created_at, r.join_set_id, \
2480 r.delay_id, r.delay_success, \
2481 r.child_execution_id, r.finished_version, l.json_value \
2482 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2483 WHERE \
2484 r.execution_id = :execution_id AND \
2485 ( \
2486 r.finished_version = l.version \
2487 OR r.child_execution_id IS NULL \
2488 ) \
2489 ORDER BY id \
2490 LIMIT -1 OFFSET :offset",
2491 )
2492 ?
2493 .query_map(
2494 named_params! {
2495 ":execution_id": execution_id.to_string(),
2496 ":offset": skip_rows,
2497 },
2498 Self::parse_response_with_cursor,
2499 )
2500 ?
2501 .collect::<Result<Vec<_>, _>>()
2502 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2503 .map_err(DbErrorRead::from)
2504 }
2505
2506 fn get_pending_of_single_ffqn(
2507 mut stmt: CachedStatement,
2508 batch_size: usize,
2509 pending_at_or_sooner: DateTime<Utc>,
2510 ffqn: &FunctionFqn,
2511 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2512 stmt.query_map(
2513 named_params! {
2514 ":pending_expires_finished": pending_at_or_sooner,
2515 ":ffqn": ffqn.to_string(),
2516 ":batch_size": batch_size,
2517 },
2518 |row| {
2519 let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2520 let next_version =
2521 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2522 Ok(execution_id.map(|exe| (exe, next_version)))
2523 },
2524 )
2525 .map_err(|err| {
2526 warn!("Ignoring consistency error {err:?}");
2527 })?
2528 .collect::<Result<Vec<_>, _>>()
2529 .map_err(|err| {
2530 warn!("Ignoring consistency error {err:?}");
2531 })?
2532 .into_iter()
2533 .collect::<Result<Vec<_>, _>>()
2534 .map_err(|err| {
2535 warn!("Ignoring consistency error {err:?}");
2536 })
2537 }
2538
2539 fn get_pending(
2541 conn: &Connection,
2542 batch_size: usize,
2543 pending_at_or_sooner: DateTime<Utc>,
2544 ffqns: &[FunctionFqn],
2545 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2546 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2547 for ffqn in ffqns {
2548 let stmt = conn
2550 .prepare_cached(&format!(
2551 r#"
2552 SELECT execution_id, corresponding_version FROM t_state WHERE
2553 state = "{STATE_PENDING_AT}" AND
2554 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2555 ORDER BY pending_expires_finished LIMIT :batch_size
2556 "#
2557 ))
2558 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2559
2560 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2561 stmt,
2562 batch_size - execution_ids_versions.len(),
2563 pending_at_or_sooner,
2564 ffqn,
2565 ) {
2566 execution_ids_versions.extend(execs_and_versions);
2567 if execution_ids_versions.len() == batch_size {
2568 break;
2570 }
2571 }
2573 }
2574 Ok(execution_ids_versions)
2575 }
2576
2577 #[instrument(level = Level::TRACE, skip_all)]
2579 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2580 let (pending_ats, finished_execs, responses) = {
2581 let (mut pending_ats, mut finished_execs, mut responses) =
2582 (Vec::new(), Vec::new(), Vec::new());
2583 for notifier in notifiers {
2584 if let Some(pending_at) = notifier.pending_at {
2585 pending_ats.push(pending_at);
2586 }
2587 if let Some(finished) = notifier.execution_finished {
2588 finished_execs.push(finished);
2589 }
2590 if let Some(response) = notifier.response {
2591 responses.push(response);
2592 }
2593 }
2594 (pending_ats, finished_execs, responses)
2595 };
2596
2597 if !pending_ats.is_empty() {
2599 let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2600 for pending_at in pending_ats {
2601 Self::notify_pending_locked(&pending_at, current_time, &guard);
2602 }
2603 }
2604 if !finished_execs.is_empty() {
2607 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2608 for finished in finished_execs {
2609 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2610 for (_tag, sender) in listeners_of_exe_id {
2611 let _ = sender.send(finished.retval.clone());
2614 }
2615 }
2616 }
2617 }
2618 if !responses.is_empty() {
2620 let mut guard = self.0.response_subscribers.lock().unwrap();
2621 for (execution_id, response) in responses {
2622 if let Some((sender, _)) = guard.remove(&execution_id) {
2623 let _ = sender.send(response);
2624 }
2625 }
2626 }
2627 }
2628
2629 fn notify_pending_locked(
2630 notifier: &NotifierPendingAt,
2631 current_time: DateTime<Utc>,
2632 ffqn_to_pending_subscription: &std::sync::MutexGuard<
2633 HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2634 >,
2635 ) {
2636 if notifier.scheduled_at <= current_time
2638 && let Some((subscription, _)) = ffqn_to_pending_subscription.get(¬ifier.ffqn)
2639 {
2640 debug!("Notifying pending subscriber");
2641 let _ = subscription.try_send(());
2643 }
2644 }
2645}
2646
2647#[async_trait]
2648impl DbExecutor for SqlitePool {
2649 #[instrument(level = Level::TRACE, skip(self))]
2650 async fn lock_pending(
2651 &self,
2652 batch_size: usize,
2653 pending_at_or_sooner: DateTime<Utc>,
2654 ffqns: Arc<[FunctionFqn]>,
2655 created_at: DateTime<Utc>,
2656 component_id: ComponentId,
2657 executor_id: ExecutorId,
2658 lock_expires_at: DateTime<Utc>,
2659 run_id: RunId,
2660 retry_config: ComponentRetryConfig,
2661 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2662 let execution_ids_versions = self
2663 .transaction(
2664 move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2665 "get_pending",
2666 )
2667 .await?;
2668 if execution_ids_versions.is_empty() {
2669 Ok(vec![])
2670 } else {
2671 debug!("Locking {execution_ids_versions:?}");
2672 self.transaction(
2673 move |tx| {
2674 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2675 for (execution_id, version) in &execution_ids_versions {
2677 match Self::lock_single_execution(
2678 tx,
2679 created_at,
2680 &component_id,
2681 execution_id,
2682 run_id,
2683 version,
2684 executor_id,
2685 lock_expires_at,
2686 retry_config,
2687 ) {
2688 Ok(locked) => locked_execs.push(locked),
2689 Err(err) => {
2690 warn!("Locking row {execution_id} failed - {err:?}");
2691 }
2692 }
2693 }
2694 Ok(locked_execs)
2695 },
2696 "lock_pending",
2697 )
2698 .await
2699 }
2700 }
2701
2702 #[instrument(level = Level::DEBUG, skip(self))]
2703 async fn lock_one(
2704 &self,
2705 created_at: DateTime<Utc>,
2706 component_id: ComponentId,
2707 execution_id: &ExecutionId,
2708 run_id: RunId,
2709 version: Version,
2710 executor_id: ExecutorId,
2711 lock_expires_at: DateTime<Utc>,
2712 retry_config: ComponentRetryConfig,
2713 ) -> Result<LockedExecution, DbErrorWrite> {
2714 debug!(%execution_id, "lock_one");
2715 let execution_id = execution_id.clone();
2716 self.transaction(
2717 move |tx| {
2718 Self::lock_single_execution(
2719 tx,
2720 created_at,
2721 &component_id,
2722 &execution_id,
2723 run_id,
2724 &version,
2725 executor_id,
2726 lock_expires_at,
2727 retry_config,
2728 )
2729 },
2730 "lock_inner",
2731 )
2732 .await
2733 }
2734
2735 #[instrument(level = Level::DEBUG, skip(self, req))]
2736 async fn append(
2737 &self,
2738 execution_id: ExecutionId,
2739 version: Version,
2740 req: AppendRequest,
2741 ) -> Result<AppendResponse, DbErrorWrite> {
2742 debug!(%req, "append");
2743 trace!(?req, "append");
2744 let created_at = req.created_at;
2745 let (version, notifier) = self
2746 .transaction(
2747 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2748 "append",
2749 )
2750 .await?;
2751 self.notify_all(vec![notifier], created_at);
2752 Ok(version)
2753 }
2754
2755 #[instrument(level = Level::DEBUG, skip_all)]
2756 async fn append_batch_respond_to_parent(
2757 &self,
2758 events: AppendEventsToExecution,
2759 response: AppendResponseToExecution,
2760 current_time: DateTime<Utc>,
2761 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2762 debug!("append_batch_respond_to_parent");
2763 if events.execution_id == response.parent_execution_id {
2764 return Err(DbErrorWrite::NonRetriable(
2767 DbErrorWriteNonRetriable::ValidationFailed(
2768 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2769 ),
2770 ));
2771 }
2772 if events.batch.is_empty() {
2773 error!("Batch cannot be empty");
2774 return Err(DbErrorWrite::NonRetriable(
2775 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2776 ));
2777 }
2778 let (version, notifiers) = {
2779 self.transaction(
2780 move |tx| {
2781 let mut version = events.version.clone();
2782 let mut notifier_of_child = None;
2783 for append_request in &events.batch {
2784 let (v, n) = Self::append(
2785 tx,
2786 &events.execution_id,
2787 append_request.clone(),
2788 version,
2789 )?;
2790 version = v;
2791 notifier_of_child = Some(n);
2792 }
2793
2794 let pending_at_parent = Self::append_response(
2795 tx,
2796 &response.parent_execution_id,
2797 JoinSetResponseEventOuter {
2798 created_at: response.created_at,
2799 event: JoinSetResponseEvent {
2800 join_set_id: response.join_set_id.clone(),
2801 event: JoinSetResponse::ChildExecutionFinished {
2802 child_execution_id: response.child_execution_id.clone(),
2803 finished_version: response.finished_version.clone(),
2804 result: response.result.clone(),
2805 },
2806 },
2807 },
2808 )?;
2809 Ok::<_, DbErrorWrite>((
2810 version,
2811 vec![
2812 notifier_of_child.expect("checked that the batch is not empty"),
2813 pending_at_parent,
2814 ],
2815 ))
2816 },
2817 "append_batch_respond_to_parent",
2818 )
2819 .await?
2820 };
2821 self.notify_all(notifiers, current_time);
2822 Ok(version)
2823 }
2824
2825 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2828 async fn wait_for_pending(
2829 &self,
2830 pending_at_or_sooner: DateTime<Utc>,
2831 ffqns: Arc<[FunctionFqn]>,
2832 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2833 ) {
2834 let unique_tag: u64 = rand::random();
2835 let (sender, mut receiver) = mpsc::channel(1); {
2837 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2838 for ffqn in ffqns.as_ref() {
2839 ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2840 }
2841 }
2842 async {
2843 let Ok(execution_ids_versions) = self
2844 .transaction(
2845 {
2846 let ffqns = ffqns.clone();
2847 move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2848 },
2849 "subscribe_to_pending",
2850 )
2851 .await
2852 else {
2853 trace!(
2854 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2855 );
2856 timeout_fut.await;
2857 return;
2858 };
2859 if !execution_ids_versions.is_empty() {
2860 trace!("Not waiting, database already contains new pending executions");
2861 return;
2862 }
2863 tokio::select! { _ = receiver.recv() => {
2865 trace!("Received a notification");
2866 }
2867 () = timeout_fut => {
2868 }
2869 }
2870 }.await;
2871 {
2873 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2874 for ffqn in ffqns.as_ref() {
2875 match ffqn_to_pending_subscription.remove(ffqn) {
2876 Some((_, tag)) if tag == unique_tag => {
2877 }
2879 Some(other) => {
2880 ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2882 }
2883 None => {
2884 }
2886 }
2887 }
2888 }
2889 }
2890}
2891
2892#[async_trait]
2893impl DbConnection for SqlitePool {
2894 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2895 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2896 debug!("create");
2897 trace!(?req, "create");
2898 let created_at = req.created_at;
2899 let (version, notifier) = self
2900 .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2901 .await?;
2902 self.notify_all(vec![notifier], created_at);
2903 Ok(version)
2904 }
2905
2906 #[instrument(level = Level::DEBUG, skip(self, batch))]
2907 async fn append_batch(
2908 &self,
2909 current_time: DateTime<Utc>,
2910 batch: Vec<AppendRequest>,
2911 execution_id: ExecutionId,
2912 version: Version,
2913 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2914 debug!("append_batch");
2915 trace!(?batch, "append_batch");
2916 assert!(!batch.is_empty(), "Empty batch request");
2917
2918 let (version, notifier) = self
2919 .transaction(
2920 move |tx| {
2921 let mut version = version.clone();
2922 let mut notifier = None;
2923 for append_request in &batch {
2924 let (v, n) =
2925 Self::append(tx, &execution_id, append_request.clone(), version)?;
2926 version = v;
2927 notifier = Some(n);
2928 }
2929 Ok::<_, DbErrorWrite>((
2930 version,
2931 notifier.expect("checked that the batch is not empty"),
2932 ))
2933 },
2934 "append_batch",
2935 )
2936 .await?;
2937
2938 self.notify_all(vec![notifier], current_time);
2939 Ok(version)
2940 }
2941
2942 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2943 async fn append_batch_create_new_execution(
2944 &self,
2945 current_time: DateTime<Utc>,
2946 batch: Vec<AppendRequest>,
2947 execution_id: ExecutionId,
2948 version: Version,
2949 child_req: Vec<CreateRequest>,
2950 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2951 debug!("append_batch_create_new_execution");
2952 trace!(?batch, ?child_req, "append_batch_create_new_execution");
2953 assert!(!batch.is_empty(), "Empty batch request");
2954
2955 let (version, notifiers) = self
2956 .transaction(
2957 move |tx| {
2958 let mut notifier = None;
2959 let mut version = version.clone();
2960 for append_request in &batch {
2961 let (v, n) =
2962 Self::append(tx, &execution_id, append_request.clone(), version)?;
2963 version = v;
2964 notifier = Some(n);
2965 }
2966 let mut notifiers = Vec::new();
2967 notifiers.push(notifier.expect("checked that the batch is not empty"));
2968
2969 for child_req in &child_req {
2970 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2971 notifiers.push(notifier);
2972 }
2973 Ok::<_, DbErrorWrite>((version, notifiers))
2974 },
2975 "append_batch_create_new_execution_inner",
2976 )
2977 .await?;
2978 self.notify_all(notifiers, current_time);
2979 Ok(version)
2980 }
2981
2982 #[cfg(feature = "test")]
2983 #[instrument(level = Level::DEBUG, skip(self))]
2984 async fn get(
2985 &self,
2986 execution_id: &ExecutionId,
2987 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2988 trace!("get");
2989 let execution_id = execution_id.clone();
2990 self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2991 .await
2992 }
2993
2994 #[instrument(level = Level::DEBUG, skip(self))]
2995 async fn list_execution_events(
2996 &self,
2997 execution_id: &ExecutionId,
2998 since: &Version,
2999 max_length: VersionType,
3000 include_backtrace_id: bool,
3001 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3002 let execution_id = execution_id.clone();
3003 let since = since.0;
3004 self.transaction(
3005 move |tx| {
3006 Self::list_execution_events(
3007 tx,
3008 &execution_id,
3009 since,
3010 since + max_length,
3011 include_backtrace_id,
3012 )
3013 },
3014 "get",
3015 )
3016 .await
3017 }
3018
3019 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3023 async fn subscribe_to_next_responses(
3024 &self,
3025 execution_id: &ExecutionId,
3026 start_idx: usize,
3027 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3028 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3029 debug!("next_responses");
3030 let unique_tag: u64 = rand::random();
3031 let execution_id = execution_id.clone();
3032
3033 let cleanup = || {
3034 let mut guard = self.0.response_subscribers.lock().unwrap();
3035 match guard.remove(&execution_id) {
3036 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
3038 guard.insert(execution_id.clone(), other);
3040 }
3041 None => {} }
3043 };
3044
3045 let response_subscribers = self.0.response_subscribers.clone();
3046 let resp_or_receiver = {
3047 let execution_id = execution_id.clone();
3048 self.transaction(
3049 move |tx| {
3050 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
3051 if responses.is_empty() {
3052 let (sender, receiver) = oneshot::channel();
3054 response_subscribers
3055 .lock()
3056 .unwrap()
3057 .insert(execution_id.clone(), (sender, unique_tag));
3058 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
3059 } else {
3060 Ok(itertools::Either::Left(responses))
3061 }
3062 },
3063 "subscribe_to_next_responses",
3064 )
3065 .await
3066 }
3067 .inspect_err(|_| {
3068 cleanup();
3069 })?;
3070 match resp_or_receiver {
3071 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3073 let res = async move {
3074 tokio::select! {
3075 resp = receiver => {
3076 let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
3077 Ok(vec![resp])
3078 }
3079 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3080 }
3081 }
3082 .await;
3083 cleanup();
3084 res
3085 }
3086 }
3087 }
3088
3089 async fn wait_for_finished_result(
3091 &self,
3092 execution_id: &ExecutionId,
3093 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
3094 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3095 let unique_tag: u64 = rand::random();
3096 let execution_id = execution_id.clone();
3097 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3098
3099 let cleanup = || {
3100 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3101 if let Some(subscribers) = guard.get_mut(&execution_id) {
3102 subscribers.remove(&unique_tag);
3103 }
3104 };
3105
3106 let resp_or_receiver = {
3107 let execution_id = execution_id.clone();
3108 self.transaction(move |tx| {
3109 let pending_state =
3110 Self::get_combined_state(tx, &execution_id)?.pending_state;
3111 if let PendingState::Finished { finished } = pending_state {
3112 let event =
3113 Self::get_execution_event(tx, &execution_id, finished.version)?;
3114 if let ExecutionEventInner::Finished { result, ..} = event.event {
3115 Ok(itertools::Either::Left(result))
3116 } else {
3117 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3118 Err(DbErrorReadWithTimeout::from(consistency_db_err(
3119 "cannot get finished event based on t_state version"
3120 )))
3121 }
3122 } else {
3123 let (sender, receiver) = oneshot::channel();
3127 let mut guard = execution_finished_subscription.lock().unwrap();
3128 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3129 Ok(itertools::Either::Right(receiver))
3130 }
3131 }, "wait_for_finished_result")
3132 .await
3133 }
3134 .inspect_err(|_| {
3135 cleanup();
3139 })?;
3140
3141 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3142 match resp_or_receiver {
3143 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3145 let res = async move {
3146 tokio::select! {
3147 resp = receiver => {
3148 Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3149 }
3150 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3151 }
3152 }
3153 .await;
3154 cleanup();
3155 res
3156 }
3157 }
3158 }
3159
3160 #[cfg(feature = "test")]
3161 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3162 async fn append_response(
3163 &self,
3164 created_at: DateTime<Utc>,
3165 execution_id: ExecutionId,
3166 response_event: JoinSetResponseEvent,
3167 ) -> Result<(), DbErrorWrite> {
3168 debug!("append_response");
3169 let event = JoinSetResponseEventOuter {
3170 created_at,
3171 event: response_event,
3172 };
3173 let notifier = self
3174 .transaction(
3175 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3176 "append_response",
3177 )
3178 .await?;
3179 self.notify_all(vec![notifier], created_at);
3180 Ok(())
3181 }
3182
3183 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3184 async fn append_delay_response(
3185 &self,
3186 created_at: DateTime<Utc>,
3187 execution_id: ExecutionId,
3188 join_set_id: JoinSetId,
3189 delay_id: DelayId,
3190 result: Result<(), ()>,
3191 ) -> Result<(), DbErrorWrite> {
3192 debug!("append_delay_response");
3193 let event = JoinSetResponseEventOuter {
3194 created_at,
3195 event: JoinSetResponseEvent {
3196 join_set_id,
3197 event: JoinSetResponse::DelayFinished { delay_id, result },
3198 },
3199 };
3200 let notifier = self
3201 .transaction(
3202 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3203 "append_delay_response",
3204 )
3205 .await?;
3206 self.notify_all(vec![notifier], created_at);
3207 Ok(())
3208 }
3209
3210 #[instrument(level = Level::DEBUG, skip_all)]
3211 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3212 debug!("append_backtrace");
3213 self.transaction(
3214 move |tx| Self::append_backtrace(tx, &append),
3215 "append_backtrace",
3216 )
3217 .await
3218 }
3219
3220 #[instrument(level = Level::DEBUG, skip_all)]
3221 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3222 debug!("append_backtrace_batch");
3223 self.transaction(
3224 move |tx| {
3225 for append in &batch {
3226 Self::append_backtrace(tx, append)?;
3227 }
3228 Ok(())
3229 },
3230 "append_backtrace",
3231 )
3232 .await
3233 }
3234
3235 #[instrument(level = Level::DEBUG, skip_all)]
3236 async fn get_backtrace(
3237 &self,
3238 execution_id: &ExecutionId,
3239 filter: BacktraceFilter,
3240 ) -> Result<BacktraceInfo, DbErrorRead> {
3241 debug!("get_last_backtrace");
3242 let execution_id = execution_id.clone();
3243
3244 self.transaction(
3245 move |tx| {
3246 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3247 WHERE execution_id = :execution_id";
3248 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3249 let select = match &filter {
3250 BacktraceFilter::Specific(version) =>{
3251 params.push((":version", Box::new(version.0)));
3252 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3253 },
3254 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3255 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3256 };
3257 tx
3258 .prepare(&select)
3259 ?
3260 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3261 params
3262 .iter()
3263 .map(|(key, value)| (*key, value.as_ref()))
3264 .collect::<Vec<_>>()
3265 .as_ref(),
3266 |row| {
3267 Ok(BacktraceInfo {
3268 execution_id: execution_id.clone(),
3269 component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3270 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3271 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3272 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3273 })
3274 },
3275 ).map_err(DbErrorRead::from)
3276 },
3277 "get_last_backtrace",
3278 ).await
3279 }
3280
3281 #[instrument(level = Level::TRACE, skip(self))]
3283 async fn get_expired_timers(
3284 &self,
3285 at: DateTime<Utc>,
3286 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3287 self.transaction(
3288 move |conn| {
3289 let mut expired_timers = conn.prepare(
3290 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3291 )?
3292 .query_map(
3293 named_params! {
3294 ":at": at,
3295 },
3296 |row| {
3297 let execution_id = row.get("execution_id")?;
3298 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3299 let delay_id = row.get::<_, DelayId>("delay_id")?;
3300 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3301 Ok(ExpiredTimer::Delay(delay))
3302 },
3303 )?
3304 .collect::<Result<Vec<_>, _>>()?;
3305 let expired = conn.prepare(&format!(r#"
3307 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3308 executor_id, run_id
3309 FROM t_state
3310 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3311 "#
3312 )
3313 )?
3314 .query_map(
3315 named_params! {
3316 ":at": at,
3317 },
3318 |row| {
3319 let execution_id = row.get("execution_id")?;
3320 let locked_at_version = Version::new(row.get("last_lock_version")?);
3321 let next_version = Version::new(row.get("corresponding_version")?).increment();
3322 let intermittent_event_count = row.get("intermittent_event_count")?;
3323 let max_retries = row.get("max_retries")?;
3324 let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3325 let executor_id = row.get("executor_id")?;
3326 let run_id = row.get("run_id")?;
3327 let lock = ExpiredLock {
3328 execution_id,
3329 locked_at_version,
3330 next_version,
3331 intermittent_event_count,
3332 max_retries,
3333 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3334 locked_by: LockedBy { executor_id, run_id },
3335 };
3336 Ok(ExpiredTimer::Lock(lock))
3337 }
3338 )?
3339 .collect::<Result<Vec<_>, _>>()?;
3340 expired_timers.extend(expired);
3341 if !expired_timers.is_empty() {
3342 debug!("get_expired_timers found {expired_timers:?}");
3343 }
3344 Ok(expired_timers)
3345 }, "get_expired_timers"
3346 )
3347 .await
3348 }
3349
3350 async fn get_execution_event(
3351 &self,
3352 execution_id: &ExecutionId,
3353 version: &Version,
3354 ) -> Result<ExecutionEvent, DbErrorRead> {
3355 let version = version.0;
3356 let execution_id = execution_id.clone();
3357 self.transaction(
3358 move |tx| Self::get_execution_event(tx, &execution_id, version),
3359 "get_execution_event",
3360 )
3361 .await
3362 }
3363
3364 async fn get_last_execution_event(
3365 &self,
3366 execution_id: &ExecutionId,
3367 ) -> Result<ExecutionEvent, DbErrorRead> {
3368 let execution_id = execution_id.clone();
3369 self.transaction(
3370 move |tx| Self::get_last_execution_event(tx, &execution_id),
3371 "get_last_execution_event",
3372 )
3373 .await
3374 }
3375
3376 async fn get_pending_state(
3377 &self,
3378 execution_id: &ExecutionId,
3379 ) -> Result<PendingState, DbErrorRead> {
3380 let execution_id = execution_id.clone();
3381 Ok(self
3382 .transaction(
3383 move |tx| Self::get_combined_state(tx, &execution_id),
3384 "get_pending_state",
3385 )
3386 .await?
3387 .pending_state)
3388 }
3389
3390 async fn list_executions(
3391 &self,
3392 ffqn: Option<FunctionFqn>,
3393 top_level_only: bool,
3394 pagination: ExecutionListPagination,
3395 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3396 self.transaction(
3397 move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3398 "list_executions",
3399 )
3400 .await
3401 }
3402
3403 async fn list_responses(
3404 &self,
3405 execution_id: &ExecutionId,
3406 pagination: Pagination<u32>,
3407 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3408 let execution_id = execution_id.clone();
3409 self.transaction(
3410 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3411 "list_executions",
3412 )
3413 .await
3414 }
3415}
3416
3417#[cfg(any(test, feature = "tempfile"))]
3418pub mod tempfile {
3419 use super::{SqliteConfig, SqlitePool};
3420 use tempfile::NamedTempFile;
3421
3422 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3423 if let Ok(path) = std::env::var("SQLITE_FILE") {
3424 (
3425 SqlitePool::new(path, SqliteConfig::default())
3426 .await
3427 .unwrap(),
3428 None,
3429 )
3430 } else {
3431 let file = NamedTempFile::new().unwrap();
3432 let path = file.path();
3433 (
3434 SqlitePool::new(path, SqliteConfig::default())
3435 .await
3436 .unwrap(),
3437 Some(file),
3438 )
3439 }
3440 }
3441}