1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6 SupportedFunctionReturnValue,
7 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8 storage::{
9 AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10 AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11 DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12 DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool, DbPoolCloseable,
13 ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
14 ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15 JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy,
16 LockedExecution, Pagination, PendingState, PendingStateFinished,
17 PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
18 VersionType,
19 },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
25 TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28 cmp::max,
29 collections::VecDeque,
30 fmt::Debug,
31 ops::DerefMut,
32 path::Path,
33 str::FromStr,
34 sync::{
35 Arc, Mutex,
36 atomic::{AtomicBool, Ordering},
37 },
38 time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42 sync::{mpsc, oneshot},
43 time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53 join_set_id: JoinSetId,
54 delay_id: DelayId,
55 expires_at: DateTime<Utc>,
56}
57const PRAGMA: [[&str; 2]; 10] = [
69 ["journal_mode", "wal"],
70 ["synchronous", "FULL"],
71 ["foreign_keys", "true"],
72 ["busy_timeout", "1000"],
73 ["cache_size", "10000"], ["temp_store", "MEMORY"],
75 ["page_size", "8192"], ["mmap_size", "134217728"],
77 ["journal_size_limit", "67108864"],
78 ["integrity_check", ""],
79];
80
81const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84 id INTEGER PRIMARY KEY AUTOINCREMENT,
85 schema_version INTEGER NOT NULL,
86 created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94 execution_id TEXT NOT NULL,
95 created_at TEXT NOT NULL,
96 json_value TEXT NOT NULL,
97 version INTEGER NOT NULL,
98 variant TEXT NOT NULL,
99 join_set_id TEXT,
100 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101 PRIMARY KEY (execution_id, version)
102) STRICT
103";
104const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
107";
108const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
111";
112
113const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119 id INTEGER PRIMARY KEY AUTOINCREMENT,
120 created_at TEXT NOT NULL,
121 execution_id TEXT NOT NULL,
122 join_set_id TEXT NOT NULL,
123
124 delay_id TEXT,
125
126 child_execution_id TEXT,
127 finished_version INTEGER,
128
129 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145 execution_id TEXT NOT NULL,
146 is_top_level INTEGER NOT NULL,
147 corresponding_version INTEGER NOT NULL,
148 pending_expires_finished TEXT NOT NULL,
149 ffqn TEXT NOT NULL,
150 state TEXT NOT NULL,
151 created_at TEXT NOT NULL,
152 updated_at TEXT NOT NULL,
153 scheduled_at TEXT NOT NULL,
154 intermittent_event_count INTEGER NOT NULL,
155
156 max_retries INTEGER,
157 retry_exp_backoff_millis INTEGER,
158 last_lock_version INTEGER,
159 executor_id TEXT,
160 run_id TEXT,
161
162 join_set_id TEXT,
163 join_set_closing INTEGER,
164
165 result_kind TEXT,
166
167 PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198 execution_id TEXT NOT NULL,
199 join_set_id TEXT NOT NULL,
200 delay_id TEXT NOT NULL,
201 expires_at TEXT NOT NULL,
202 PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209 execution_id TEXT NOT NULL,
210 component_id TEXT NOT NULL,
211 version_min_including INTEGER NOT NULL,
212 version_max_excluding INTEGER NOT NULL,
213 wasm_backtrace TEXT NOT NULL,
214 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221
222#[derive(Debug, thiserror::Error, Clone)]
223enum RusqliteError {
224 #[error("not found")]
225 NotFound,
226 #[error("generic: {0}")]
227 Generic(StrVariant),
228}
229
230mod conversions {
231
232 use super::RusqliteError;
233 use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
234 use rusqlite::types::{FromSql, FromSqlError};
235 use std::{fmt::Debug, str::FromStr};
236 use tracing::error;
237
238 impl From<rusqlite::Error> for RusqliteError {
239 fn from(err: rusqlite::Error) -> Self {
240 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
241 RusqliteError::NotFound
242 } else {
243 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
244 RusqliteError::Generic(err.to_string().into())
245 }
246 }
247 }
248
249 impl From<RusqliteError> for DbErrorGeneric {
250 fn from(err: RusqliteError) -> DbErrorGeneric {
251 match err {
252 RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
253 RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
254 }
255 }
256 }
257 impl From<RusqliteError> for DbErrorRead {
258 fn from(err: RusqliteError) -> Self {
259 if matches!(err, RusqliteError::NotFound) {
260 Self::NotFound
261 } else {
262 Self::from(DbErrorGeneric::from(err))
263 }
264 }
265 }
266 impl From<RusqliteError> for DbErrorReadWithTimeout {
267 fn from(err: RusqliteError) -> Self {
268 Self::from(DbErrorRead::from(err))
269 }
270 }
271 impl From<RusqliteError> for DbErrorWrite {
272 fn from(err: RusqliteError) -> Self {
273 if matches!(err, RusqliteError::NotFound) {
274 Self::NotFound
275 } else {
276 Self::from(DbErrorGeneric::from(err))
277 }
278 }
279 }
280
281 pub(crate) struct JsonWrapper<T>(pub(crate) T);
282 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
283 fn column_result(
284 value: rusqlite::types::ValueRef<'_>,
285 ) -> rusqlite::types::FromSqlResult<Self> {
286 let value = match value {
287 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
288 Ok(value)
289 }
290 other => {
291 error!(
292 backtrace = %std::backtrace::Backtrace::capture(),
293 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
294 );
295 Err(FromSqlError::InvalidType)
296 }
297 }?;
298 let value = serde_json::from_slice::<T>(value).map_err(|err| {
299 error!(
300 backtrace = %std::backtrace::Backtrace::capture(),
301 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
302 r#type = std::any::type_name::<T>()
303 );
304 FromSqlError::InvalidType
305 })?;
306 Ok(Self(value))
307 }
308 }
309
310 pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
311 impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
312 fn column_result(
313 value: rusqlite::types::ValueRef<'_>,
314 ) -> rusqlite::types::FromSqlResult<Self> {
315 let value = String::column_result(value)?;
316 let value = T::from_str(&value).map_err(|err| {
317 error!(
318 backtrace = %std::backtrace::Backtrace::capture(),
319 "Cannot convert string `{value}` to type:`{type}` - {err:?}",
320 r#type = std::any::type_name::<T>()
321 );
322 FromSqlError::InvalidType
323 })?;
324 Ok(Self(value))
325 }
326 }
327
328 #[derive(Debug, thiserror::Error)]
330 #[error("{0}")]
331 pub(crate) struct OtherError(&'static str);
332 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
333 FromSqlError::other(OtherError(input)).into()
334 }
335
336 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
337 DbErrorGeneric::Uncategorized(input.into())
338 }
339}
340
341#[derive(Debug)]
342struct CombinedStateDTO {
343 state: String,
344 ffqn: String,
345 pending_expires_finished: DateTime<Utc>,
346 last_lock_version: Option<Version>,
348 executor_id: Option<ExecutorId>,
349 run_id: Option<RunId>,
350 join_set_id: Option<JoinSetId>,
352 join_set_closing: Option<bool>,
353 result_kind: Option<PendingStateFinishedResultKind>,
355}
356#[derive(Debug)]
357struct CombinedState {
358 ffqn: FunctionFqn,
359 pending_state: PendingState,
360 corresponding_version: Version,
361}
362impl CombinedState {
363 fn get_next_version_assert_not_finished(&self) -> Version {
364 assert!(!self.pending_state.is_finished());
365 self.corresponding_version.increment()
366 }
367
368 #[cfg(feature = "test")]
369 fn get_next_version_or_finished(&self) -> Version {
370 if self.pending_state.is_finished() {
371 self.corresponding_version.clone()
372 } else {
373 self.corresponding_version.increment()
374 }
375 }
376}
377
378#[derive(Debug)]
379struct NotifierPendingAt {
380 scheduled_at: DateTime<Utc>,
381 ffqn: FunctionFqn,
382}
383
384#[derive(Debug)]
385struct NotifierExecutionFinished {
386 execution_id: ExecutionId,
387 retval: SupportedFunctionReturnValue,
388}
389
390#[derive(Debug, Default)]
391struct AppendNotifier {
392 pending_at: Option<NotifierPendingAt>,
393 execution_finished: Option<NotifierExecutionFinished>,
394 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
395}
396
397impl CombinedState {
398 fn new(
399 dto: &CombinedStateDTO,
400 corresponding_version: Version,
401 ) -> Result<Self, rusqlite::Error> {
402 let pending_state = match dto {
403 CombinedStateDTO {
405 state,
406 ffqn: _,
407 pending_expires_finished: scheduled_at,
408 last_lock_version: None,
409 executor_id: None,
410 run_id: None,
411 join_set_id: None,
412 join_set_closing: None,
413 result_kind: None,
414 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
415 scheduled_at: *scheduled_at,
416 last_lock: None,
417 }),
418 CombinedStateDTO {
420 state,
421 ffqn: _,
422 pending_expires_finished: scheduled_at,
423 last_lock_version: None,
424 executor_id: Some(executor_id),
425 run_id: Some(run_id),
426 join_set_id: None,
427 join_set_closing: None,
428 result_kind: None,
429 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
430 scheduled_at: *scheduled_at,
431 last_lock: Some(LockedBy {
432 executor_id: *executor_id,
433 run_id: *run_id,
434 }),
435 }),
436 CombinedStateDTO {
437 state,
438 ffqn: _,
439 pending_expires_finished: lock_expires_at,
440 last_lock_version: Some(_),
441 executor_id: Some(executor_id),
442 run_id: Some(run_id),
443 join_set_id: None,
444 join_set_closing: None,
445 result_kind: None,
446 } if state == STATE_LOCKED => Ok(PendingState::Locked(PendingStateLocked {
447 locked_by: LockedBy {
448 executor_id: *executor_id,
449 run_id: *run_id,
450 },
451 lock_expires_at: *lock_expires_at,
452 })),
453 CombinedStateDTO {
454 state,
455 ffqn: _,
456 pending_expires_finished: lock_expires_at,
457 last_lock_version: None,
458 executor_id: _,
459 run_id: _,
460 join_set_id: Some(join_set_id),
461 join_set_closing: Some(join_set_closing),
462 result_kind: None,
463 } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
464 join_set_id: join_set_id.clone(),
465 closing: *join_set_closing,
466 lock_expires_at: *lock_expires_at,
467 }),
468 CombinedStateDTO {
469 state,
470 ffqn: _,
471 pending_expires_finished: finished_at,
472 last_lock_version: None,
473 executor_id: None,
474 run_id: None,
475 join_set_id: None,
476 join_set_closing: None,
477 result_kind: Some(result_kind),
478 } if state == STATE_FINISHED => Ok(PendingState::Finished {
479 finished: PendingStateFinished {
480 finished_at: *finished_at,
481 version: corresponding_version.0,
482 result_kind: *result_kind,
483 },
484 }),
485 _ => {
486 error!("Cannot deserialize pending state from {dto:?}");
487 Err(consistency_rusqlite("invalid `t_state`"))
488 }
489 }?;
490 Ok(Self {
491 ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
492 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
493 consistency_rusqlite("invalid ffqn value in `t_state`")
494 })?,
495 pending_state,
496 corresponding_version,
497 })
498 }
499}
500
501#[derive(derive_more::Debug)]
502struct LogicalTx {
503 #[debug(skip)]
504 func: Box<dyn FnMut(&mut Transaction) + Send>,
505 sent_at: Instant,
506 func_name: &'static str,
507 #[debug(skip)]
508 commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
509}
510
511#[derive(derive_more::Debug)]
512enum ThreadCommand {
513 LogicalTx(LogicalTx),
514 Shutdown,
515}
516
517#[derive(Clone)]
518pub struct SqlitePool(SqlitePoolInner);
519
520type ResponseSubscribers =
521 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
522type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
523type ExecutionFinishedSubscribers =
524 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
525#[derive(Clone)]
526struct SqlitePoolInner {
527 shutdown_requested: Arc<AtomicBool>,
528 shutdown_finished: Arc<AtomicBool>,
529 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
530 response_subscribers: ResponseSubscribers,
531 pending_ffqn_subscribers: PendingFfqnSubscribers,
532 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
533 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
535
536#[async_trait]
537impl DbPoolCloseable for SqlitePool {
538 async fn close(self) {
539 debug!("Sqlite is closing");
540 self.0.shutdown_requested.store(true, Ordering::Release);
541 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
543 while !self.0.shutdown_finished.load(Ordering::Acquire) {
544 tokio::time::sleep(Duration::from_millis(1)).await;
545 }
546 debug!("Sqlite was closed");
547 }
548}
549
550#[async_trait]
551impl DbPool for SqlitePool {
552 fn connection(&self) -> Box<dyn DbConnection> {
553 Box::new(self.clone())
554 }
555}
556impl Drop for SqlitePool {
557 fn drop(&mut self) {
558 let arc = self.0.join_handle.take().expect("join_handle was set");
559 if let Ok(join_handle) = Arc::try_unwrap(arc) {
560 if !join_handle.is_finished() {
562 if !self.0.shutdown_finished.load(Ordering::Acquire) {
563 let backtrace = std::backtrace::Backtrace::capture();
565 warn!("SqlitePool was not closed properly - {backtrace}");
566 self.0.shutdown_requested.store(true, Ordering::Release);
567 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
569 } else {
572 }
574 }
575 }
576 }
577}
578
579#[derive(Debug, Clone)]
580pub struct SqliteConfig {
581 pub queue_capacity: usize,
582 pub pragma_override: Option<HashMap<String, String>>,
583 pub metrics_threshold: Option<Duration>,
584}
585impl Default for SqliteConfig {
586 fn default() -> Self {
587 Self {
588 queue_capacity: 100,
589 pragma_override: None,
590 metrics_threshold: None,
591 }
592 }
593}
594
595impl SqlitePool {
596 fn init_thread(
597 path: &Path,
598 mut pragma_override: HashMap<String, String>,
599 ) -> Result<Connection, InitializationError> {
600 fn conn_execute<P: Params>(
601 conn: &Connection,
602 sql: &str,
603 params: P,
604 ) -> Result<(), InitializationError> {
605 conn.execute(sql, params).map(|_| ()).map_err(|err| {
606 error!("Cannot run `{sql}` - {err:?}");
607 InitializationError
608 })
609 }
610 fn pragma_update(
611 conn: &Connection,
612 name: &str,
613 value: &str,
614 ) -> Result<(), InitializationError> {
615 if value.is_empty() {
616 debug!("Querying PRAGMA {name}");
617 conn.pragma_query(None, name, |row| {
618 debug!("{row:?}");
619 Ok(())
620 })
621 .map_err(|err| {
622 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
623 InitializationError
624 })
625 } else {
626 debug!("Setting PRAGMA {name}={value}");
627 conn.pragma_update(None, name, value).map_err(|err| {
628 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
629 InitializationError
630 })
631 }
632 }
633
634 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
635 error!("cannot open the connection - {err:?}");
636 InitializationError
637 })?;
638
639 for [pragma_name, default_value] in PRAGMA {
640 let pragma_value = pragma_override
641 .remove(pragma_name)
642 .unwrap_or_else(|| default_value.to_string());
643 pragma_update(&conn, pragma_name, &pragma_value)?;
644 }
645 for (pragma_name, pragma_value) in pragma_override.drain() {
647 pragma_update(&conn, &pragma_name, &pragma_value)?;
648 }
649
650 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
652 conn_execute(
654 &conn,
655 &format!(
656 "INSERT INTO t_metadata (schema_version, created_at) VALUES
657 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
658 ),
659 [Utc::now()],
660 )?;
661 let actual_version = conn
663 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
664 .map_err(|err| {
665 error!("cannot select schema version - {err:?}");
666 InitializationError
667 })?
668 .query_row([], |row| row.get::<_, u32>("schema_version"));
669
670 let actual_version = actual_version.map_err(|err| {
671 error!("Cannot read the schema version - {err:?}");
672 InitializationError
673 })?;
674 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
675 error!(
676 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
677 );
678 return Err(InitializationError);
679 }
680
681 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
683 conn_execute(
684 &conn,
685 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
686 [],
687 )?;
688 conn_execute(
689 &conn,
690 CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
691 [],
692 )?;
693 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
695 conn_execute(
696 &conn,
697 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
698 [],
699 )?;
700 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
702 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
703 conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
704 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
705 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
706 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
707 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
709 conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
711 conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
712 Ok(conn)
713 }
714
715 fn connection_rpc(
716 mut conn: Connection,
717 shutdown_requested: &AtomicBool,
718 shutdown_finished: &AtomicBool,
719 mut command_rx: mpsc::Receiver<ThreadCommand>,
720 metrics_threshold: Option<Duration>,
721 ) {
722 let mut histograms = Histograms::new(metrics_threshold);
723 while Self::tick(
724 &mut conn,
725 shutdown_requested,
726 &mut command_rx,
727 &mut histograms,
728 )
729 .is_ok()
730 {
731 }
733 debug!("Closing command thread");
734 shutdown_finished.store(true, Ordering::Release);
735 }
736
737 fn tick(
738 conn: &mut Connection,
739 shutdown_requested: &AtomicBool,
740 command_rx: &mut mpsc::Receiver<ThreadCommand>,
741 histograms: &mut Histograms,
742 ) -> Result<(), ()> {
743 let mut ltx_list = Vec::new();
744 let mut ltx = match command_rx.blocking_recv() {
746 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
747 Some(ThreadCommand::Shutdown) => {
748 debug!("shutdown message received");
749 return Err(());
750 }
751 None => {
752 debug!("command_rx was closed");
753 return Err(());
754 }
755 };
756 let all_fns_start = std::time::Instant::now();
757 let mut physical_tx = conn
758 .transaction_with_behavior(TransactionBehavior::Immediate)
759 .map_err(|begin_err| {
760 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
761 })?;
762 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
763 ltx_list.push(ltx);
764
765 while let Ok(more) = command_rx.try_recv() {
766 let mut ltx = match more {
767 ThreadCommand::Shutdown => {
768 debug!("shutdown message received");
769 return Err(());
770 }
771 ThreadCommand::LogicalTx(ltx) => ltx,
772 };
773
774 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
775 ltx_list.push(ltx);
776 }
777 histograms.record_all_fns(all_fns_start.elapsed());
778
779 {
780 if shutdown_requested.load(Ordering::Relaxed) {
782 debug!("Recveived shutdown during processing of the batch");
783 return Err(());
784 }
785 let commit_result = {
786 let now = std::time::Instant::now();
787 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
788 histograms.record_commit(now.elapsed());
789 commit_result
790 };
791 if commit_result.is_ok() || ltx_list.len() == 1 {
792 for ltx in ltx_list {
794 let _ = ltx.commit_ack_sender.send(commit_result.clone());
796 }
797 } else {
798 warn!(
799 "Bulk transaction failed, committing {} transactions serially",
800 ltx_list.len()
801 );
802 for ltx in ltx_list {
804 Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
805 }
806 }
807 }
808 histograms.print_if_elapsed();
809 Ok(())
810 }
811
812 fn ltx_commit_single(
813 mut ltx: LogicalTx,
814 conn: &mut Connection,
815 shutdown_requested: &AtomicBool,
816 histograms: &mut Histograms,
817 ) -> Result<(), ()> {
818 let mut physical_tx = conn
819 .transaction_with_behavior(TransactionBehavior::Immediate)
820 .map_err(|begin_err| {
821 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
822 })?;
823 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
824 if shutdown_requested.load(Ordering::Relaxed) {
825 debug!("Recveived shutdown during processing of the batch");
826 return Err(());
827 }
828 let commit_result = {
829 let now = std::time::Instant::now();
830 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
831 histograms.record_commit(now.elapsed());
832 commit_result
833 };
834 let _ = ltx.commit_ack_sender.send(commit_result);
836 Ok(())
837 }
838
839 fn ltx_apply_to_tx(
840 ltx: &mut LogicalTx,
841 physical_tx: &mut Transaction,
842 histograms: &mut Histograms,
843 ) {
844 let sent_latency = ltx.sent_at.elapsed();
845 let started_at = Instant::now();
846 (ltx.func)(physical_tx);
847 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
848 }
849
850 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
851 pub async fn new<P: AsRef<Path>>(
852 path: P,
853 config: SqliteConfig,
854 ) -> Result<Self, InitializationError> {
855 let path = path.as_ref().to_owned();
856
857 let shutdown_requested = Arc::new(AtomicBool::new(false));
858 let shutdown_finished = Arc::new(AtomicBool::new(false));
859
860 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
861 info!("Sqlite database location: {path:?}");
862 let join_handle = {
863 let init_task = {
865 tokio::task::spawn_blocking(move || {
866 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
867 })
868 .await
869 };
870 let conn = match init_task {
871 Ok(res) => res?,
872 Err(join_err) => {
873 error!("Initialization panic - {join_err:?}");
874 return Err(InitializationError);
875 }
876 };
877 let shutdown_requested = shutdown_requested.clone();
878 let shutdown_finished = shutdown_finished.clone();
879 std::thread::spawn(move || {
881 Self::connection_rpc(
882 conn,
883 &shutdown_requested,
884 &shutdown_finished,
885 command_rx,
886 config.metrics_threshold,
887 );
888 })
889 };
890 Ok(SqlitePool(SqlitePoolInner {
891 shutdown_requested,
892 shutdown_finished,
893 command_tx,
894 response_subscribers: Arc::default(),
895 pending_ffqn_subscribers: Arc::default(),
896 join_handle: Some(Arc::new(join_handle)),
897 execution_finished_subscribers: Arc::default(),
898 }))
899 }
900
901 async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
903 where
904 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
905 T: Send + 'static,
906 E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
907 {
908 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
909 let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
910 let thread_command_func = {
911 let fn_res = fn_res.clone();
912 ThreadCommand::LogicalTx(LogicalTx {
913 func: Box::new(move |tx| {
914 let func_res = func(tx);
915 *fn_res.lock().unwrap() = Some(func_res);
916 }),
917 sent_at: Instant::now(),
918 func_name,
919 commit_ack_sender,
920 })
921 };
922 self.0
923 .command_tx
924 .send(thread_command_func)
925 .await
926 .map_err(|_send_err| DbErrorGeneric::Close)?;
927
928 match commit_ack_receiver.await {
930 Ok(Ok(())) => {
931 let mut guard = fn_res.lock().unwrap();
932 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
933 }
934 Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
935 Err(_) => Err(E::from(DbErrorGeneric::Close)),
936 }
937 }
938
939 fn fetch_created_event(
940 conn: &Connection,
941 execution_id: &ExecutionId,
942 ) -> Result<CreateRequest, DbErrorRead> {
943 let mut stmt = conn.prepare(
944 "SELECT created_at, json_value FROM t_execution_log WHERE \
945 execution_id = :execution_id AND version = 0",
946 )?;
947 let (created_at, event) = stmt.query_row(
948 named_params! {
949 ":execution_id": execution_id.to_string(),
950 },
951 |row| {
952 let created_at = row.get("created_at")?;
953 let event = row
954 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
955 .map_err(|serde| {
956 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
957 consistency_rusqlite("cannot deserialize `Created` event")
958 })?;
959 Ok((created_at, event.0))
960 },
961 )?;
962 if let ExecutionEventInner::Created {
963 ffqn,
964 params,
965 parent,
966 scheduled_at,
967 component_id,
968 metadata,
969 scheduled_by,
970 } = event
971 {
972 Ok(CreateRequest {
973 created_at,
974 execution_id: execution_id.clone(),
975 ffqn,
976 params,
977 parent,
978 scheduled_at,
979 component_id,
980 metadata,
981 scheduled_by,
982 })
983 } else {
984 error!("Row with version=0 must be a `Created` event - {event:?}");
985 Err(consistency_db_err("expected `Created` event").into())
986 }
987 }
988
989 fn check_expected_next_and_appending_version(
990 expected_version: &Version,
991 appending_version: &Version,
992 ) -> Result<(), DbErrorWrite> {
993 if *expected_version != *appending_version {
994 debug!(
995 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
996 );
997 return Err(DbErrorWrite::NonRetriable(
998 DbErrorWriteNonRetriable::VersionConflict(expected_version.clone()),
999 ));
1000 }
1001 Ok(())
1002 }
1003
1004 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1005 fn create_inner(
1006 tx: &Transaction,
1007 req: CreateRequest,
1008 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1009 debug!("create_inner");
1010
1011 let version = Version::default();
1012 let execution_id = req.execution_id.clone();
1013 let execution_id_str = execution_id.to_string();
1014 let ffqn = req.ffqn.clone();
1015 let created_at = req.created_at;
1016 let scheduled_at = req.scheduled_at;
1017 let event = ExecutionEventInner::from(req);
1018 let event_ser = serde_json::to_string(&event).map_err(|err| {
1019 error!("Cannot serialize {event:?} - {err:?}");
1020 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1021 })?;
1022 tx.prepare(
1023 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1024 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1025 ?
1026 .execute(named_params! {
1027 ":execution_id": &execution_id_str,
1028 ":created_at": created_at,
1029 ":version": version.0,
1030 ":json_value": event_ser,
1031 ":variant": event.variant(),
1032 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1033 })
1034 ?;
1035 let pending_at = {
1036 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1037 tx.prepare(
1038 r"
1039 INSERT INTO t_state (
1040 execution_id,
1041 is_top_level,
1042 corresponding_version,
1043 pending_expires_finished,
1044 ffqn,
1045 state,
1046 created_at,
1047 updated_at,
1048 scheduled_at,
1049 intermittent_event_count
1050 )
1051 VALUES (
1052 :execution_id,
1053 :is_top_level,
1054 :corresponding_version,
1055 :pending_expires_finished,
1056 :ffqn,
1057 :state,
1058 :created_at,
1059 CURRENT_TIMESTAMP,
1060 :scheduled_at,
1061 0
1062 )
1063 ",
1064 )?
1065 .execute(named_params! {
1066 ":execution_id": execution_id.to_string(),
1067 ":is_top_level": execution_id.is_top_level(),
1068 ":corresponding_version": version.0,
1069 ":pending_expires_finished": scheduled_at,
1070 ":ffqn": ffqn.to_string(),
1071 ":state": STATE_PENDING_AT,
1072 ":created_at": created_at,
1073 ":scheduled_at": scheduled_at,
1074 })?;
1075 AppendNotifier {
1076 pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1077 execution_finished: None,
1078 response: None,
1079 }
1080 };
1081 let next_version = Version::new(version.0 + 1);
1082 Ok((next_version, pending_at))
1083 }
1084
1085 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1086 fn update_state_pending_after_response_appended(
1087 tx: &Transaction,
1088 execution_id: &ExecutionId,
1089 scheduled_at: DateTime<Utc>, corresponding_version: &Version, ) -> Result<AppendNotifier, DbErrorWrite> {
1092 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1093 let execution_id_str = execution_id.to_string();
1094 let mut stmt = tx
1095 .prepare_cached(
1096 r"
1097 UPDATE t_state
1098 SET
1099 corresponding_version = :corresponding_version,
1100 pending_expires_finished = :pending_expires_finished,
1101 state = :state,
1102 updated_at = CURRENT_TIMESTAMP,
1103
1104 last_lock_version = NULL,
1105
1106 join_set_id = NULL,
1107 join_set_closing = NULL,
1108
1109 result_kind = NULL
1110 WHERE execution_id = :execution_id
1111 ",
1112 )
1113 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1114 let updated = stmt
1115 .execute(named_params! {
1116 ":execution_id": execution_id_str,
1117 ":corresponding_version": corresponding_version.0,
1118 ":pending_expires_finished": scheduled_at,
1119 ":state": STATE_PENDING_AT,
1120 })
1121 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1122 if updated != 1 {
1123 return Err(DbErrorWrite::NotFound);
1124 }
1125 Ok(AppendNotifier {
1126 pending_at: Some(NotifierPendingAt {
1127 scheduled_at,
1128 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1129 }),
1130 execution_finished: None,
1131 response: None,
1132 })
1133 }
1134
1135 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1136 fn update_state_pending_after_event_appended(
1137 tx: &Transaction,
1138 execution_id: &ExecutionId,
1139 appending_version: &Version,
1140 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1142 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1143 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1144 let mut stmt = tx
1147 .prepare_cached(
1148 r"
1149 UPDATE t_state
1150 SET
1151 corresponding_version = :appending_version,
1152 pending_expires_finished = :pending_expires_finished,
1153 state = :state,
1154 updated_at = CURRENT_TIMESTAMP,
1155 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1156
1157 last_lock_version = NULL,
1158
1159 join_set_id = NULL,
1160 join_set_closing = NULL,
1161
1162 result_kind = NULL
1163 WHERE execution_id = :execution_id;
1164 ",
1165 )
1166 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1167 let updated = stmt
1168 .execute(named_params! {
1169 ":execution_id": execution_id.to_string(),
1170 ":appending_version": appending_version.0,
1171 ":pending_expires_finished": scheduled_at,
1172 ":state": STATE_PENDING_AT,
1173 ":intermittent_delta": i32::from(intermittent_failure) })
1175 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1176 if updated != 1 {
1177 return Err(DbErrorWrite::NotFound);
1178 }
1179 Ok((
1180 appending_version.increment(),
1181 AppendNotifier {
1182 pending_at: Some(NotifierPendingAt {
1183 scheduled_at,
1184 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1185 }),
1186 execution_finished: None,
1187 response: None,
1188 },
1189 ))
1190 }
1191
1192 fn update_state_locked_get_intermittent_event_count(
1193 tx: &Transaction,
1194 execution_id: &ExecutionId,
1195 executor_id: ExecutorId,
1196 run_id: RunId,
1197 lock_expires_at: DateTime<Utc>,
1198 appending_version: &Version,
1199 retry_config: ComponentRetryConfig,
1200 ) -> Result<u32, DbErrorWrite> {
1201 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1202 let backoff_millis =
1203 u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1204 let execution_id_str = execution_id.to_string();
1205 let mut stmt = tx
1206 .prepare_cached(
1207 r"
1208 UPDATE t_state
1209 SET
1210 corresponding_version = :appending_version,
1211 pending_expires_finished = :pending_expires_finished,
1212 state = :state,
1213 updated_at = CURRENT_TIMESTAMP,
1214
1215 max_retries = :max_retries,
1216 retry_exp_backoff_millis = :retry_exp_backoff_millis,
1217 last_lock_version = :appending_version,
1218 executor_id = :executor_id,
1219 run_id = :run_id,
1220
1221 join_set_id = NULL,
1222 join_set_closing = NULL,
1223
1224 result_kind = NULL
1225 WHERE execution_id = :execution_id
1226 ",
1227 )
1228 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1229 let updated = stmt
1230 .execute(named_params! {
1231 ":execution_id": execution_id_str,
1232 ":appending_version": appending_version.0,
1233 ":pending_expires_finished": lock_expires_at,
1234 ":state": STATE_LOCKED,
1235 ":max_retries": retry_config.max_retries,
1236 ":retry_exp_backoff_millis": backoff_millis,
1237 ":executor_id": executor_id.to_string(),
1238 ":run_id": run_id.to_string(),
1239 })
1240 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1241 if updated != 1 {
1242 return Err(DbErrorWrite::NotFound);
1243 }
1244
1245 let intermittent_event_count = tx
1247 .prepare(
1248 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1249 )?
1250 .query_row(
1251 named_params! {
1252 ":execution_id": execution_id_str,
1253 },
1254 |row| {
1255 let intermittent_event_count = row.get("intermittent_event_count")?;
1256 Ok(intermittent_event_count)
1257 },
1258 )
1259 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1260
1261 Ok(intermittent_event_count)
1262 }
1263
1264 fn update_state_blocked(
1265 tx: &Transaction,
1266 execution_id: &ExecutionId,
1267 appending_version: &Version,
1268 join_set_id: &JoinSetId,
1270 lock_expires_at: DateTime<Utc>,
1271 join_set_closing: bool,
1272 ) -> Result<
1273 AppendResponse, DbErrorWrite,
1275 > {
1276 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1277 let execution_id_str = execution_id.to_string();
1278 let mut stmt = tx.prepare_cached(
1279 r"
1280 UPDATE t_state
1281 SET
1282 corresponding_version = :appending_version,
1283 pending_expires_finished = :pending_expires_finished,
1284 state = :state,
1285 updated_at = CURRENT_TIMESTAMP,
1286
1287 last_lock_version = NULL,
1288
1289 join_set_id = :join_set_id,
1290 join_set_closing = :join_set_closing,
1291
1292 result_kind = NULL
1293 WHERE execution_id = :execution_id
1294 ",
1295 )?;
1296 let updated = stmt.execute(named_params! {
1297 ":execution_id": execution_id_str,
1298 ":appending_version": appending_version.0,
1299 ":pending_expires_finished": lock_expires_at,
1300 ":state": STATE_BLOCKED_BY_JOIN_SET,
1301 ":join_set_id": join_set_id,
1302 ":join_set_closing": join_set_closing,
1303 })?;
1304 if updated != 1 {
1305 return Err(DbErrorWrite::NotFound);
1306 }
1307 Ok(appending_version.increment())
1308 }
1309
1310 fn update_state_finished(
1311 tx: &Transaction,
1312 execution_id: &ExecutionId,
1313 appending_version: &Version,
1314 finished_at: DateTime<Utc>,
1316 result_kind: PendingStateFinishedResultKind,
1317 ) -> Result<(), DbErrorWrite> {
1318 debug!("Setting t_state to Finished");
1319 let execution_id_str = execution_id.to_string();
1320 let mut stmt = tx.prepare_cached(
1321 r"
1322 UPDATE t_state
1323 SET
1324 corresponding_version = :appending_version,
1325 pending_expires_finished = :pending_expires_finished,
1326 state = :state,
1327 updated_at = CURRENT_TIMESTAMP,
1328
1329 last_lock_version = NULL,
1330 executor_id = NULL,
1331 run_id = NULL,
1332
1333 join_set_id = NULL,
1334 join_set_closing = NULL,
1335
1336 result_kind = :result_kind
1337 WHERE execution_id = :execution_id
1338 ",
1339 )?;
1340 let updated = stmt.execute(named_params! {
1341 ":execution_id": execution_id_str,
1342 ":appending_version": appending_version.0,
1343 ":pending_expires_finished": finished_at,
1344 ":state": STATE_FINISHED,
1345 ":result_kind": result_kind.to_string(),
1346 })?;
1347 if updated != 1 {
1348 return Err(DbErrorWrite::NotFound);
1349 }
1350 Ok(())
1351 }
1352
1353 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1355 fn bump_state_next_version(
1356 tx: &Transaction,
1357 execution_id: &ExecutionId,
1358 appending_version: &Version,
1359 delay_req: Option<DelayReq>,
1360 ) -> Result<AppendResponse , DbErrorWrite> {
1361 debug!("update_index_version");
1362 let execution_id_str = execution_id.to_string();
1363 let mut stmt = tx.prepare_cached(
1364 r"
1365 UPDATE t_state
1366 SET
1367 corresponding_version = :appending_version,
1368 updated_at = CURRENT_TIMESTAMP
1369 WHERE execution_id = :execution_id
1370 ",
1371 )?;
1372 let updated = stmt.execute(named_params! {
1373 ":execution_id": execution_id_str,
1374 ":appending_version": appending_version.0,
1375 })?;
1376 if updated != 1 {
1377 return Err(DbErrorWrite::NotFound);
1378 }
1379 if let Some(DelayReq {
1380 join_set_id,
1381 delay_id,
1382 expires_at,
1383 }) = delay_req
1384 {
1385 debug!("Inserting delay to `t_delay`");
1386 let mut stmt = tx.prepare_cached(
1387 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1388 VALUES \
1389 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1390 )?;
1391 stmt.execute(named_params! {
1392 ":execution_id": execution_id_str,
1393 ":join_set_id": join_set_id.to_string(),
1394 ":delay_id": delay_id.to_string(),
1395 ":expires_at": expires_at,
1396 })?;
1397 }
1398 Ok(appending_version.increment())
1399 }
1400
1401 fn get_combined_state(
1402 tx: &Transaction,
1403 execution_id: &ExecutionId,
1404 ) -> Result<CombinedState, DbErrorRead> {
1405 let mut stmt = tx.prepare(
1406 r"
1407 SELECT
1408 state, ffqn, corresponding_version, pending_expires_finished,
1409 last_lock_version, executor_id, run_id,
1410 join_set_id, join_set_closing,
1411 result_kind
1412 FROM t_state
1413 WHERE
1414 execution_id = :execution_id
1415 ",
1416 )?;
1417 stmt.query_row(
1418 named_params! {
1419 ":execution_id": execution_id.to_string(),
1420 },
1421 |row| {
1422 CombinedState::new(
1423 &CombinedStateDTO {
1424 state: row.get("state")?,
1425 ffqn: row.get("ffqn")?,
1426 pending_expires_finished: row
1427 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1428 last_lock_version: row
1429 .get::<_, Option<VersionType>>("last_lock_version")?
1430 .map(Version::new),
1431 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1432 run_id: row.get::<_, Option<RunId>>("run_id")?,
1433 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1434 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1435 result_kind: row
1436 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1437 "result_kind",
1438 )?
1439 .map(|wrapper| wrapper.0),
1440 },
1441 Version::new(row.get("corresponding_version")?),
1442 )
1443 },
1444 )
1445 .map_err(DbErrorRead::from)
1446 }
1447
1448 fn list_executions(
1449 read_tx: &Transaction,
1450 ffqn: Option<&FunctionFqn>,
1451 top_level_only: bool,
1452 pagination: &ExecutionListPagination,
1453 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1454 struct StatementModifier<'a> {
1455 where_vec: Vec<String>,
1456 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1457 limit: u32,
1458 limit_desc: bool,
1459 }
1460
1461 fn paginate<'a, T: rusqlite::ToSql + 'static>(
1462 pagination: &'a Pagination<Option<T>>,
1463 column: &str,
1464 top_level_only: bool,
1465 ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1466 let mut where_vec: Vec<String> = vec![];
1467 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1468 let limit = pagination.length();
1469 let limit_desc = pagination.is_desc();
1470 let rel = pagination.rel();
1471 match pagination {
1472 Pagination::NewerThan {
1473 cursor: Some(cursor),
1474 ..
1475 }
1476 | Pagination::OlderThan {
1477 cursor: Some(cursor),
1478 ..
1479 } => {
1480 where_vec.push(format!("{column} {rel} :cursor"));
1481 let cursor = cursor.to_sql().map_err(|err| {
1482 error!("Possible program error - cannot convert cursor to sql - {err:?}");
1483 DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1484 })?;
1485 params.push((":cursor", cursor));
1486 }
1487 _ => {}
1488 }
1489 if top_level_only {
1490 where_vec.push("is_top_level=true".to_string());
1491 }
1492 Ok(StatementModifier {
1493 where_vec,
1494 params,
1495 limit,
1496 limit_desc,
1497 })
1498 }
1499
1500 let mut statement_mod = match pagination {
1501 ExecutionListPagination::CreatedBy(pagination) => {
1502 paginate(pagination, "created_at", top_level_only)?
1503 }
1504 ExecutionListPagination::ExecutionId(pagination) => {
1505 paginate(pagination, "execution_id", top_level_only)?
1506 }
1507 };
1508
1509 let ffqn_temporary;
1510 if let Some(ffqn) = ffqn {
1511 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1512 ffqn_temporary = ffqn.to_string();
1513 let ffqn = ffqn_temporary
1514 .to_sql()
1515 .expect("string conversion never fails");
1516
1517 statement_mod.params.push((":ffqn", ffqn));
1518 }
1519
1520 let where_str = if statement_mod.where_vec.is_empty() {
1521 String::new()
1522 } else {
1523 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1524 };
1525 let sql = format!(
1526 r"
1527 SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1528 last_lock_version, executor_id, run_id,
1529 join_set_id, join_set_closing,
1530 result_kind
1531 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1532 ",
1533 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1534 limit = statement_mod.limit,
1535 );
1536 let mut vec: Vec<_> = read_tx
1537 .prepare(&sql)?
1538 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1539 statement_mod
1540 .params
1541 .into_iter()
1542 .collect::<Vec<_>>()
1543 .as_ref(),
1544 |row| {
1545 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1546 let created_at = row.get("created_at")?;
1547 let scheduled_at = row.get("scheduled_at")?;
1548 let combined_state = CombinedState::new(
1549 &CombinedStateDTO {
1550 state: row.get("state")?,
1551 ffqn: row.get("ffqn")?,
1552 pending_expires_finished: row
1553 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1554 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1555
1556 last_lock_version: row
1557 .get::<_, Option<VersionType>>("last_lock_version")?
1558 .map(Version::new),
1559 run_id: row.get::<_, Option<RunId>>("run_id")?,
1560 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1561 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1562 result_kind: row
1563 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1564 "result_kind",
1565 )?
1566 .map(|wrapper| wrapper.0),
1567 },
1568 Version::new(row.get("corresponding_version")?),
1569 )?;
1570 Ok(ExecutionWithState {
1571 execution_id,
1572 ffqn: combined_state.ffqn,
1573 pending_state: combined_state.pending_state,
1574 created_at,
1575 scheduled_at,
1576 })
1577 },
1578 )?
1579 .collect::<Vec<Result<_, _>>>()
1580 .into_iter()
1581 .filter_map(|row| match row {
1582 Ok(row) => Some(row),
1583 Err(err) => {
1584 warn!("Skipping row - {err:?}");
1585 None
1586 }
1587 })
1588 .collect();
1589
1590 if !statement_mod.limit_desc {
1591 vec.reverse();
1593 }
1594 Ok(vec)
1595 }
1596
1597 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1598 #[expect(clippy::too_many_arguments)]
1599 fn lock_single_execution(
1600 tx: &Transaction,
1601 created_at: DateTime<Utc>,
1602 component_id: &ComponentId,
1603 execution_id: &ExecutionId,
1604 run_id: RunId,
1605 appending_version: &Version,
1606 executor_id: ExecutorId,
1607 lock_expires_at: DateTime<Utc>,
1608 retry_config: ComponentRetryConfig,
1609 ) -> Result<LockedExecution, DbErrorWrite> {
1610 debug!("lock_single_execution");
1611 let combined_state = Self::get_combined_state(tx, execution_id)?;
1612 combined_state.pending_state.can_append_lock(
1613 created_at,
1614 executor_id,
1615 run_id,
1616 lock_expires_at,
1617 )?;
1618 let expected_version = combined_state.get_next_version_assert_not_finished();
1619 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1620
1621 let locked_event = Locked {
1623 component_id: component_id.clone(),
1624 executor_id,
1625 lock_expires_at,
1626 run_id,
1627 retry_config,
1628 };
1629 let event = ExecutionEventInner::Locked(locked_event.clone());
1630 let event_ser = serde_json::to_string(&event).map_err(|err| {
1631 warn!("Cannot serialize {event:?} - {err:?}");
1632 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1633 })?;
1634 let mut stmt = tx
1635 .prepare_cached(
1636 "INSERT INTO t_execution_log \
1637 (execution_id, created_at, json_value, version, variant) \
1638 VALUES \
1639 (:execution_id, :created_at, :json_value, :version, :variant)",
1640 )
1641 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1642 stmt.execute(named_params! {
1643 ":execution_id": execution_id.to_string(),
1644 ":created_at": created_at,
1645 ":json_value": event_ser,
1646 ":version": appending_version.0,
1647 ":variant": event.variant(),
1648 })
1649 .map_err(|err| {
1650 warn!("Cannot lock execution - {err:?}");
1651 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1652 })?;
1653
1654 let responses = Self::list_responses(tx, execution_id, None)?;
1656 let responses = responses.into_iter().map(|resp| resp.event).collect();
1657 trace!("Responses: {responses:?}");
1658
1659 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1660 tx,
1661 execution_id,
1662 executor_id,
1663 run_id,
1664 lock_expires_at,
1665 appending_version,
1666 retry_config,
1667 )?;
1668 let mut events = tx
1670 .prepare(
1671 "SELECT json_value FROM t_execution_log WHERE \
1672 execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1673 ORDER BY version",
1674 )?
1675 .query_map(
1676 named_params! {
1677 ":execution_id": execution_id.to_string(),
1678 ":v1": DUMMY_CREATED.variant(),
1679 ":v2": DUMMY_HISTORY_EVENT.variant(),
1680 },
1681 |row| {
1682 row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1683 .map(|wrapper| wrapper.0)
1684 .map_err(|serde| {
1685 error!("Cannot deserialize {row:?} - {serde:?}");
1686 consistency_rusqlite("cannot deserialize json value")
1687 })
1688 },
1689 )?
1690 .collect::<Result<Vec<_>, _>>()?
1691 .into_iter()
1692 .collect::<VecDeque<_>>();
1693 let Some(ExecutionEventInner::Created {
1694 ffqn,
1695 params,
1696 parent,
1697 metadata,
1698 ..
1699 }) = events.pop_front()
1700 else {
1701 error!("Execution log must contain at least `Created` event");
1702 return Err(consistency_db_err("execution log must contain `Created` event").into());
1703 };
1704
1705 let event_history = events
1706 .into_iter()
1707 .map(|event| {
1708 if let ExecutionEventInner::HistoryEvent { event } = event {
1709 Ok(event)
1710 } else {
1711 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1712 Err(consistency_db_err(
1713 "rows can only contain `Created` and `HistoryEvent` event kinds",
1714 ))
1715 }
1716 })
1717 .collect::<Result<Vec<_>, _>>()?;
1718
1719 Ok(LockedExecution {
1720 execution_id: execution_id.clone(),
1721 metadata,
1722 next_version: appending_version.increment(),
1723 ffqn,
1724 params,
1725 event_history,
1726 responses,
1727 parent,
1728 intermittent_event_count,
1729 locked_event,
1730 })
1731 }
1732
1733 fn count_join_next(
1734 tx: &Transaction,
1735 execution_id: &ExecutionId,
1736 join_set_id: &JoinSetId,
1737 ) -> Result<u64, DbErrorRead> {
1738 let mut stmt = tx.prepare(
1739 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1740 AND history_event_type = :join_next",
1741 )?;
1742 Ok(stmt.query_row(
1743 named_params! {
1744 ":execution_id": execution_id.to_string(),
1745 ":join_set_id": join_set_id.to_string(),
1746 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1747 },
1748 |row| row.get("count"),
1749 )?)
1750 }
1751
1752 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1753 #[expect(clippy::needless_return)]
1754 fn append(
1755 tx: &Transaction,
1756 execution_id: &ExecutionId,
1757 req: AppendRequest,
1758 appending_version: Version,
1759 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1760 if matches!(req.event, ExecutionEventInner::Created { .. }) {
1761 return Err(DbErrorWrite::NonRetriable(
1762 DbErrorWriteNonRetriable::ValidationFailed(
1763 "cannot append `Created` event - use `create` instead".into(),
1764 ),
1765 ));
1766 }
1767 if let AppendRequest {
1768 event:
1769 ExecutionEventInner::Locked(Locked {
1770 component_id,
1771 executor_id,
1772 run_id,
1773 lock_expires_at,
1774 retry_config,
1775 }),
1776 created_at,
1777 } = req
1778 {
1779 return Self::lock_single_execution(
1780 tx,
1781 created_at,
1782 &component_id,
1783 execution_id,
1784 run_id,
1785 &appending_version,
1786 executor_id,
1787 lock_expires_at,
1788 retry_config,
1789 )
1790 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1791 }
1792
1793 let combined_state = Self::get_combined_state(tx, execution_id)?;
1794 if combined_state.pending_state.is_finished() {
1795 debug!("Execution is already finished");
1796 return Err(DbErrorWrite::NonRetriable(
1797 DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1798 ));
1799 }
1800
1801 Self::check_expected_next_and_appending_version(
1802 &combined_state.get_next_version_assert_not_finished(),
1803 &appending_version,
1804 )?;
1805 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1806 error!("Cannot serialize {:?} - {err:?}", req.event);
1807 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1808 })?;
1809
1810 let mut stmt = tx.prepare(
1811 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1812 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1813 ?;
1814 stmt.execute(named_params! {
1815 ":execution_id": execution_id.to_string(),
1816 ":created_at": req.created_at,
1817 ":json_value": event_ser,
1818 ":version": appending_version.0,
1819 ":variant": req.event.variant(),
1820 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1821 })?;
1822 match &req.event {
1825 ExecutionEventInner::Created { .. } => {
1826 unreachable!("handled in the caller")
1827 }
1828
1829 ExecutionEventInner::Locked { .. } => {
1830 unreachable!("handled above")
1831 }
1832
1833 ExecutionEventInner::TemporarilyFailed {
1834 backoff_expires_at, ..
1835 }
1836 | ExecutionEventInner::TemporarilyTimedOut {
1837 backoff_expires_at, ..
1838 } => {
1839 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1840 tx,
1841 execution_id,
1842 &appending_version,
1843 *backoff_expires_at,
1844 true, )?;
1846 return Ok((next_version, notifier));
1847 }
1848
1849 ExecutionEventInner::Unlocked {
1850 backoff_expires_at, ..
1851 } => {
1852 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1853 tx,
1854 execution_id,
1855 &appending_version,
1856 *backoff_expires_at,
1857 false, )?;
1859 return Ok((next_version, notifier));
1860 }
1861
1862 ExecutionEventInner::Finished { result, .. } => {
1863 Self::update_state_finished(
1864 tx,
1865 execution_id,
1866 &appending_version,
1867 req.created_at,
1868 PendingStateFinishedResultKind::from(result),
1869 )?;
1870 return Ok((
1871 appending_version,
1872 AppendNotifier {
1873 pending_at: None,
1874 execution_finished: Some(NotifierExecutionFinished {
1875 execution_id: execution_id.clone(),
1876 retval: result.clone(),
1877 }),
1878 response: None,
1879 },
1880 ));
1881 }
1882
1883 ExecutionEventInner::HistoryEvent {
1884 event:
1885 HistoryEvent::JoinSetCreate { .. }
1886 | HistoryEvent::JoinSetRequest {
1887 request: JoinSetRequest::ChildExecutionRequest { .. },
1888 ..
1889 }
1890 | HistoryEvent::Persist { .. }
1891 | HistoryEvent::Schedule { .. }
1892 | HistoryEvent::Stub { .. }
1893 | HistoryEvent::JoinNextTooMany { .. },
1894 } => {
1895 return Ok((
1896 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1897 AppendNotifier::default(),
1898 ));
1899 }
1900
1901 ExecutionEventInner::HistoryEvent {
1902 event:
1903 HistoryEvent::JoinSetRequest {
1904 join_set_id,
1905 request:
1906 JoinSetRequest::DelayRequest {
1907 delay_id,
1908 expires_at,
1909 ..
1910 },
1911 },
1912 } => {
1913 return Ok((
1914 Self::bump_state_next_version(
1915 tx,
1916 execution_id,
1917 &appending_version,
1918 Some(DelayReq {
1919 join_set_id: join_set_id.clone(),
1920 delay_id: delay_id.clone(),
1921 expires_at: *expires_at,
1922 }),
1923 )?,
1924 AppendNotifier::default(),
1925 ));
1926 }
1927
1928 ExecutionEventInner::HistoryEvent {
1929 event:
1930 HistoryEvent::JoinNext {
1931 join_set_id,
1932 run_expires_at,
1933 closing,
1934 requested_ffqn: _,
1935 },
1936 } => {
1937 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1939 let nth_response =
1940 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1942 assert!(join_next_count > 0);
1943 if let Some(ResponseWithCursor {
1944 event:
1945 JoinSetResponseEventOuter {
1946 created_at: nth_created_at,
1947 ..
1948 },
1949 cursor: _,
1950 }) = nth_response
1951 {
1952 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1954 tx,
1955 execution_id,
1956 &appending_version,
1957 scheduled_at,
1958 false, )?;
1960 return Ok((next_version, notifier));
1961 }
1962 return Ok((
1963 Self::update_state_blocked(
1964 tx,
1965 execution_id,
1966 &appending_version,
1967 join_set_id,
1968 *run_expires_at,
1969 *closing,
1970 )?,
1971 AppendNotifier::default(),
1972 ));
1973 }
1974 }
1975 }
1976
1977 fn append_response(
1978 tx: &Transaction,
1979 execution_id: &ExecutionId,
1980 response_outer: JoinSetResponseEventOuter,
1981 ) -> Result<AppendNotifier, DbErrorWrite> {
1982 let mut stmt = tx.prepare(
1983 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1984 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1985 )?;
1986 let join_set_id = &response_outer.event.join_set_id;
1987 let delay_id = match &response_outer.event.event {
1988 JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1989 JoinSetResponse::ChildExecutionFinished { .. } => None,
1990 };
1991 let (child_execution_id, finished_version) = match &response_outer.event.event {
1992 JoinSetResponse::ChildExecutionFinished {
1993 child_execution_id,
1994 finished_version,
1995 result: _,
1996 } => (
1997 Some(child_execution_id.to_string()),
1998 Some(finished_version.0),
1999 ),
2000 JoinSetResponse::DelayFinished { .. } => (None, None),
2001 };
2002
2003 stmt.execute(named_params! {
2004 ":execution_id": execution_id.to_string(),
2005 ":created_at": response_outer.created_at,
2006 ":join_set_id": join_set_id.to_string(),
2007 ":delay_id": delay_id,
2008 ":child_execution_id": child_execution_id,
2009 ":finished_version": finished_version,
2010 })?;
2011
2012 let combined_state = Self::get_combined_state(tx, execution_id)?;
2014 debug!("previous_pending_state: {combined_state:?}");
2015 let mut notifier = if let PendingState::BlockedByJoinSet {
2016 join_set_id: found_join_set_id,
2017 lock_expires_at, closing: _,
2019 } = combined_state.pending_state
2020 && *join_set_id == found_join_set_id
2021 {
2022 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2025 Self::update_state_pending_after_response_appended(
2028 tx,
2029 execution_id,
2030 scheduled_at,
2031 &combined_state.corresponding_version, )?
2033 } else {
2034 AppendNotifier::default()
2035 };
2036 if let JoinSetResponseEvent {
2037 join_set_id,
2038 event: JoinSetResponse::DelayFinished { delay_id },
2039 } = &response_outer.event
2040 {
2041 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2042 let mut stmt =
2043 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2044 ?;
2045 stmt.execute(named_params! {
2046 ":execution_id": execution_id.to_string(),
2047 ":join_set_id": join_set_id.to_string(),
2048 ":delay_id": delay_id.to_string(),
2049 })?;
2050 }
2051 notifier.response = Some((execution_id.clone(), response_outer));
2052 Ok(notifier)
2053 }
2054
2055 fn append_backtrace(
2056 tx: &Transaction,
2057 backtrace_info: &BacktraceInfo,
2058 ) -> Result<(), DbErrorWrite> {
2059 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2060 warn!(
2061 "Cannot serialize backtrace {:?} - {err:?}",
2062 backtrace_info.wasm_backtrace
2063 );
2064 DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2065 })?;
2066 let mut stmt = tx
2067 .prepare(
2068 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2069 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2070 )
2071 ?;
2072 stmt.execute(named_params! {
2073 ":execution_id": backtrace_info.execution_id.to_string(),
2074 ":component_id": backtrace_info.component_id.to_string(),
2075 ":version_min_including": backtrace_info.version_min_including.0,
2076 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2077 ":wasm_backtrace": backtrace,
2078 })?;
2079 Ok(())
2080 }
2081
2082 #[cfg(feature = "test")]
2083 fn get(
2084 tx: &Transaction,
2085 execution_id: &ExecutionId,
2086 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2087 let mut stmt = tx.prepare(
2088 "SELECT created_at, json_value FROM t_execution_log WHERE \
2089 execution_id = :execution_id ORDER BY version",
2090 )?;
2091 let events = stmt
2092 .query_map(
2093 named_params! {
2094 ":execution_id": execution_id.to_string(),
2095 },
2096 |row| {
2097 let created_at = row.get("created_at")?;
2098 let event = row
2099 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2100 .map_err(|serde| {
2101 error!("Cannot deserialize {row:?} - {serde:?}");
2102 consistency_rusqlite("cannot deserialize event")
2103 })?
2104 .0;
2105
2106 Ok(ExecutionEvent {
2107 created_at,
2108 event,
2109 backtrace_id: None,
2110 })
2111 },
2112 )?
2113 .collect::<Result<Vec<_>, _>>()?;
2114 if events.is_empty() {
2115 return Err(DbErrorRead::NotFound);
2116 }
2117 let combined_state = Self::get_combined_state(tx, execution_id)?;
2118 let responses = Self::list_responses(tx, execution_id, None)?
2119 .into_iter()
2120 .map(|resp| resp.event)
2121 .collect();
2122 Ok(concepts::storage::ExecutionLog {
2123 execution_id: execution_id.clone(),
2124 events,
2125 responses,
2126 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2128 })
2129 }
2130
2131 fn list_execution_events(
2132 tx: &Transaction,
2133 execution_id: &ExecutionId,
2134 version_min: VersionType,
2135 version_max_excluding: VersionType,
2136 include_backtrace_id: bool,
2137 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2138 let select = if include_backtrace_id {
2139 "SELECT
2140 log.created_at,
2141 log.json_value,
2142 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2143 bt.version_min_including AS backtrace_id
2144 FROM
2145 t_execution_log AS log
2146 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2147 t_backtrace AS bt ON log.execution_id = bt.execution_id
2148 -- Check if the log's version falls within the backtrace's range
2149 AND log.version >= bt.version_min_including
2150 AND log.version < bt.version_max_excluding
2151 WHERE
2152 log.execution_id = :execution_id
2153 AND log.version >= :version_min
2154 AND log.version < :version_max_excluding
2155 ORDER BY
2156 log.version;"
2157 } else {
2158 "SELECT
2159 created_at, json_value, NULL as backtrace_id
2160 FROM t_execution_log WHERE
2161 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2162 ORDER BY version"
2163 };
2164 tx.prepare(select)?
2165 .query_map(
2166 named_params! {
2167 ":execution_id": execution_id.to_string(),
2168 ":version_min": version_min,
2169 ":version_max_excluding": version_max_excluding
2170 },
2171 |row| {
2172 let created_at = row.get("created_at")?;
2173 let backtrace_id = row
2174 .get::<_, Option<VersionType>>("backtrace_id")?
2175 .map(Version::new);
2176 let event = row
2177 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2178 .map(|event| ExecutionEvent {
2179 created_at,
2180 event: event.0,
2181 backtrace_id,
2182 })
2183 .map_err(|serde| {
2184 error!("Cannot deserialize {row:?} - {serde:?}");
2185 consistency_rusqlite("cannot deserialize")
2186 })?;
2187 Ok(event)
2188 },
2189 )?
2190 .collect::<Result<Vec<_>, _>>()
2191 .map_err(DbErrorRead::from)
2192 }
2193
2194 fn get_execution_event(
2195 tx: &Transaction,
2196 execution_id: &ExecutionId,
2197 version: VersionType,
2198 ) -> Result<ExecutionEvent, DbErrorRead> {
2199 let mut stmt = tx.prepare(
2200 "SELECT created_at, json_value FROM t_execution_log WHERE \
2201 execution_id = :execution_id AND version = :version",
2202 )?;
2203 stmt.query_row(
2204 named_params! {
2205 ":execution_id": execution_id.to_string(),
2206 ":version": version,
2207 },
2208 |row| {
2209 let created_at = row.get("created_at")?;
2210 let event = row
2211 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2212 .map_err(|serde| {
2213 error!("Cannot deserialize {row:?} - {serde:?}");
2214 consistency_rusqlite("cannot deserialize event")
2215 })?;
2216
2217 Ok(ExecutionEvent {
2218 created_at,
2219 event: event.0,
2220 backtrace_id: None,
2221 })
2222 },
2223 )
2224 .map_err(DbErrorRead::from)
2225 }
2226
2227 fn list_responses(
2228 tx: &Transaction,
2229 execution_id: &ExecutionId,
2230 pagination: Option<Pagination<u32>>,
2231 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2232 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2234 let mut sql = "SELECT \
2235 r.id, r.created_at, r.join_set_id, r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2236 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2237 WHERE \
2238 r.execution_id = :execution_id \
2239 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2240 "
2241 .to_string();
2242 let limit = match &pagination {
2243 Some(
2244 pagination @ (Pagination::NewerThan { cursor, .. }
2245 | Pagination::OlderThan { cursor, .. }),
2246 ) => {
2247 params.push((":cursor", Box::new(cursor)));
2248 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2249 Some(pagination.length())
2250 }
2251 None => None,
2252 };
2253 sql.push_str(" ORDER BY id");
2254 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2255 sql.push_str(" DESC");
2256 }
2257 if let Some(limit) = limit {
2258 write!(sql, " LIMIT {limit}").unwrap();
2259 }
2260 params.push((":execution_id", Box::new(execution_id.to_string())));
2261 tx.prepare(&sql)?
2262 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2263 params
2264 .iter()
2265 .map(|(key, value)| (*key, value.as_ref()))
2266 .collect::<Vec<_>>()
2267 .as_ref(),
2268 Self::parse_response_with_cursor,
2269 )?
2270 .collect::<Result<Vec<_>, rusqlite::Error>>()
2271 .map_err(DbErrorRead::from)
2272 }
2273
2274 fn parse_response_with_cursor(
2275 row: &rusqlite::Row<'_>,
2276 ) -> Result<ResponseWithCursor, rusqlite::Error> {
2277 let id = row.get("id")?;
2278 let created_at: DateTime<Utc> = row.get("created_at")?;
2279 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2280 let event = match (
2281 row.get::<_, Option<DelayId>>("delay_id")?,
2282 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2283 row.get::<_, Option<VersionType>>("finished_version")?,
2284 row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2285 ) {
2286 (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2287 (
2288 None,
2289 Some(child_execution_id),
2290 Some(finished_version),
2291 Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2292 ) => JoinSetResponse::ChildExecutionFinished {
2293 child_execution_id,
2294 finished_version: Version(finished_version),
2295 result,
2296 },
2297 (delay, child, finished, result) => {
2298 error!(
2299 "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2300 delay,
2301 result.map(|it| it.0)
2302 );
2303 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2304 }
2305 };
2306 Ok(ResponseWithCursor {
2307 cursor: id,
2308 event: JoinSetResponseEventOuter {
2309 event: JoinSetResponseEvent { join_set_id, event },
2310 created_at,
2311 },
2312 })
2313 }
2314
2315 fn nth_response(
2316 tx: &Transaction,
2317 execution_id: &ExecutionId,
2318 join_set_id: &JoinSetId,
2319 skip_rows: u64,
2320 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2321 tx
2323 .prepare(
2324 "SELECT r.id, r.created_at, r.join_set_id, \
2325 r.delay_id, \
2326 r.child_execution_id, r.finished_version, l.json_value \
2327 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2328 WHERE \
2329 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2330 (
2331 r.finished_version = l.version \
2332 OR \
2333 r.child_execution_id IS NULL \
2334 ) \
2335 ORDER BY id \
2336 LIMIT 1 OFFSET :offset",
2337 )
2338 ?
2339 .query_row(
2340 named_params! {
2341 ":execution_id": execution_id.to_string(),
2342 ":join_set_id": join_set_id.to_string(),
2343 ":offset": skip_rows,
2344 },
2345 Self::parse_response_with_cursor,
2346 )
2347 .optional()
2348 .map_err(DbErrorRead::from)
2349 }
2350
2351 #[instrument(level = Level::TRACE, skip_all)]
2353 fn get_responses_with_offset(
2354 tx: &Transaction,
2355 execution_id: &ExecutionId,
2356 skip_rows: usize,
2357 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2358 tx.prepare(
2360 "SELECT r.id, r.created_at, r.join_set_id, \
2361 r.delay_id, \
2362 r.child_execution_id, r.finished_version, l.json_value \
2363 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2364 WHERE \
2365 r.execution_id = :execution_id AND \
2366 ( \
2367 r.finished_version = l.version \
2368 OR r.child_execution_id IS NULL \
2369 ) \
2370 ORDER BY id \
2371 LIMIT -1 OFFSET :offset",
2372 )
2373 ?
2374 .query_map(
2375 named_params! {
2376 ":execution_id": execution_id.to_string(),
2377 ":offset": skip_rows,
2378 },
2379 Self::parse_response_with_cursor,
2380 )
2381 ?
2382 .collect::<Result<Vec<_>, _>>()
2383 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2384 .map_err(DbErrorRead::from)
2385 }
2386
2387 fn get_pending_of_single_ffqn(
2388 mut stmt: CachedStatement,
2389 batch_size: usize,
2390 pending_at_or_sooner: DateTime<Utc>,
2391 ffqn: &FunctionFqn,
2392 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2393 stmt.query_map(
2394 named_params! {
2395 ":pending_expires_finished": pending_at_or_sooner,
2396 ":ffqn": ffqn.to_string(),
2397 ":batch_size": batch_size,
2398 },
2399 |row| {
2400 let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2401 let next_version =
2402 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2403 Ok(execution_id.map(|exe| (exe, next_version)))
2404 },
2405 )
2406 .map_err(|err| {
2407 warn!("Ignoring consistency error {err:?}");
2408 })?
2409 .collect::<Result<Vec<_>, _>>()
2410 .map_err(|err| {
2411 warn!("Ignoring consistency error {err:?}");
2412 })?
2413 .into_iter()
2414 .collect::<Result<Vec<_>, _>>()
2415 .map_err(|err| {
2416 warn!("Ignoring consistency error {err:?}");
2417 })
2418 }
2419
2420 fn get_pending(
2422 conn: &Connection,
2423 batch_size: usize,
2424 pending_at_or_sooner: DateTime<Utc>,
2425 ffqns: &[FunctionFqn],
2426 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2427 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2428 for ffqn in ffqns {
2429 let stmt = conn
2431 .prepare_cached(&format!(
2432 r#"
2433 SELECT execution_id, corresponding_version FROM t_state WHERE
2434 state = "{STATE_PENDING_AT}" AND
2435 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2436 ORDER BY pending_expires_finished LIMIT :batch_size
2437 "#
2438 ))
2439 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2440
2441 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2442 stmt,
2443 batch_size - execution_ids_versions.len(),
2444 pending_at_or_sooner,
2445 ffqn,
2446 ) {
2447 execution_ids_versions.extend(execs_and_versions);
2448 if execution_ids_versions.len() == batch_size {
2449 break;
2451 }
2452 }
2454 }
2455 Ok(execution_ids_versions)
2456 }
2457
2458 #[instrument(level = Level::TRACE, skip_all)]
2460 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2461 let (pending_ats, finished_execs, responses) = {
2462 let (mut pending_ats, mut finished_execs, mut responses) =
2463 (Vec::new(), Vec::new(), Vec::new());
2464 for notifier in notifiers {
2465 if let Some(pending_at) = notifier.pending_at {
2466 pending_ats.push(pending_at);
2467 }
2468 if let Some(finished) = notifier.execution_finished {
2469 finished_execs.push(finished);
2470 }
2471 if let Some(response) = notifier.response {
2472 responses.push(response);
2473 }
2474 }
2475 (pending_ats, finished_execs, responses)
2476 };
2477
2478 if !pending_ats.is_empty() {
2480 let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2481 for pending_at in pending_ats {
2482 Self::notify_pending_locked(&pending_at, current_time, &guard);
2483 }
2484 }
2485 if !finished_execs.is_empty() {
2488 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2489 for finished in finished_execs {
2490 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2491 for (_tag, sender) in listeners_of_exe_id {
2492 let _ = sender.send(finished.retval.clone());
2495 }
2496 }
2497 }
2498 }
2499 if !responses.is_empty() {
2501 let mut guard = self.0.response_subscribers.lock().unwrap();
2502 for (execution_id, response) in responses {
2503 if let Some((sender, _)) = guard.remove(&execution_id) {
2504 let _ = sender.send(response);
2505 }
2506 }
2507 }
2508 }
2509
2510 fn notify_pending_locked(
2511 notifier: &NotifierPendingAt,
2512 current_time: DateTime<Utc>,
2513 ffqn_to_pending_subscription: &std::sync::MutexGuard<
2514 HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2515 >,
2516 ) {
2517 if notifier.scheduled_at <= current_time
2519 && let Some((subscription, _)) = ffqn_to_pending_subscription.get(¬ifier.ffqn)
2520 {
2521 debug!("Notifying pending subscriber");
2522 let _ = subscription.try_send(());
2524 }
2525 }
2526}
2527
2528#[async_trait]
2529impl DbExecutor for SqlitePool {
2530 #[instrument(level = Level::TRACE, skip(self))]
2531 async fn lock_pending(
2532 &self,
2533 batch_size: usize,
2534 pending_at_or_sooner: DateTime<Utc>,
2535 ffqns: Arc<[FunctionFqn]>,
2536 created_at: DateTime<Utc>,
2537 component_id: ComponentId,
2538 executor_id: ExecutorId,
2539 lock_expires_at: DateTime<Utc>,
2540 run_id: RunId,
2541 retry_config: ComponentRetryConfig,
2542 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2543 let execution_ids_versions = self
2544 .transaction(
2545 move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2546 "get_pending",
2547 )
2548 .await?;
2549 if execution_ids_versions.is_empty() {
2550 Ok(vec![])
2551 } else {
2552 debug!("Locking {execution_ids_versions:?}");
2553 self.transaction(
2554 move |tx| {
2555 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2556 for (execution_id, version) in &execution_ids_versions {
2558 match Self::lock_single_execution(
2559 tx,
2560 created_at,
2561 &component_id,
2562 execution_id,
2563 run_id,
2564 version,
2565 executor_id,
2566 lock_expires_at,
2567 retry_config,
2568 ) {
2569 Ok(locked) => locked_execs.push(locked),
2570 Err(err) => {
2571 warn!("Locking row {execution_id} failed - {err:?}");
2572 }
2573 }
2574 }
2575 Ok(locked_execs)
2576 },
2577 "lock_pending",
2578 )
2579 .await
2580 }
2581 }
2582
2583 #[instrument(level = Level::DEBUG, skip(self))]
2584 async fn lock_one(
2585 &self,
2586 created_at: DateTime<Utc>,
2587 component_id: ComponentId,
2588 execution_id: &ExecutionId,
2589 run_id: RunId,
2590 version: Version,
2591 executor_id: ExecutorId,
2592 lock_expires_at: DateTime<Utc>,
2593 retry_config: ComponentRetryConfig,
2594 ) -> Result<LockedExecution, DbErrorWrite> {
2595 debug!(%execution_id, "lock_one");
2596 let execution_id = execution_id.clone();
2597 self.transaction(
2598 move |tx| {
2599 Self::lock_single_execution(
2600 tx,
2601 created_at,
2602 &component_id,
2603 &execution_id,
2604 run_id,
2605 &version,
2606 executor_id,
2607 lock_expires_at,
2608 retry_config,
2609 )
2610 },
2611 "lock_inner",
2612 )
2613 .await
2614 }
2615
2616 #[instrument(level = Level::DEBUG, skip(self, req))]
2617 async fn append(
2618 &self,
2619 execution_id: ExecutionId,
2620 version: Version,
2621 req: AppendRequest,
2622 ) -> Result<AppendResponse, DbErrorWrite> {
2623 debug!(%req, "append");
2624 trace!(?req, "append");
2625 let created_at = req.created_at;
2626 let (version, notifier) = self
2627 .transaction(
2628 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2629 "append",
2630 )
2631 .await?;
2632 self.notify_all(vec![notifier], created_at);
2633 Ok(version)
2634 }
2635
2636 #[instrument(level = Level::DEBUG, skip_all)]
2637 async fn append_batch_respond_to_parent(
2638 &self,
2639 events: AppendEventsToExecution,
2640 response: AppendResponseToExecution,
2641 current_time: DateTime<Utc>,
2642 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2643 debug!("append_batch_respond_to_parent");
2644 if events.execution_id == response.parent_execution_id {
2645 return Err(DbErrorWrite::NonRetriable(
2648 DbErrorWriteNonRetriable::ValidationFailed(
2649 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2650 ),
2651 ));
2652 }
2653 if events.batch.is_empty() {
2654 error!("Batch cannot be empty");
2655 return Err(DbErrorWrite::NonRetriable(
2656 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2657 ));
2658 }
2659 let (version, notifiers) = {
2660 self.transaction(
2661 move |tx| {
2662 let mut version = events.version.clone();
2663 let mut notifier_of_child = None;
2664 for append_request in &events.batch {
2665 let (v, n) = Self::append(
2666 tx,
2667 &events.execution_id,
2668 append_request.clone(),
2669 version,
2670 )?;
2671 version = v;
2672 notifier_of_child = Some(n);
2673 }
2674
2675 let pending_at_parent = Self::append_response(
2676 tx,
2677 &response.parent_execution_id,
2678 response.parent_response_event.clone(),
2679 )?;
2680 Ok::<_, DbErrorWrite>((
2681 version,
2682 vec![
2683 notifier_of_child.expect("checked that the batch is not empty"),
2684 pending_at_parent,
2685 ],
2686 ))
2687 },
2688 "append_batch_respond_to_parent",
2689 )
2690 .await?
2691 };
2692 self.notify_all(notifiers, current_time);
2693 Ok(version)
2694 }
2695
2696 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2699 async fn wait_for_pending(
2700 &self,
2701 pending_at_or_sooner: DateTime<Utc>,
2702 ffqns: Arc<[FunctionFqn]>,
2703 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2704 ) {
2705 let unique_tag: u64 = rand::random();
2706 let (sender, mut receiver) = mpsc::channel(1); {
2708 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2709 for ffqn in ffqns.as_ref() {
2710 ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2711 }
2712 }
2713 async {
2714 let Ok(execution_ids_versions) = self
2715 .transaction(
2716 {
2717 let ffqns = ffqns.clone();
2718 move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2719 },
2720 "subscribe_to_pending",
2721 )
2722 .await
2723 else {
2724 trace!(
2725 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2726 );
2727 timeout_fut.await;
2728 return;
2729 };
2730 if !execution_ids_versions.is_empty() {
2731 trace!("Not waiting, database already contains new pending executions");
2732 return;
2733 }
2734 tokio::select! { _ = receiver.recv() => {
2736 trace!("Received a notification");
2737 }
2738 () = timeout_fut => {
2739 }
2740 }
2741 }.await;
2742 {
2744 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2745 for ffqn in ffqns.as_ref() {
2746 match ffqn_to_pending_subscription.remove(ffqn) {
2747 Some((_, tag)) if tag == unique_tag => {
2748 }
2750 Some(other) => {
2751 ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2753 }
2754 None => {
2755 }
2757 }
2758 }
2759 }
2760 }
2761}
2762
2763#[async_trait]
2764impl DbConnection for SqlitePool {
2765 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2766 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2767 debug!("create");
2768 trace!(?req, "create");
2769 let created_at = req.created_at;
2770 let (version, notifier) = self
2771 .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2772 .await?;
2773 self.notify_all(vec![notifier], created_at);
2774 Ok(version)
2775 }
2776
2777 #[instrument(level = Level::DEBUG, skip(self, batch))]
2778 async fn append_batch(
2779 &self,
2780 current_time: DateTime<Utc>,
2781 batch: Vec<AppendRequest>,
2782 execution_id: ExecutionId,
2783 version: Version,
2784 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2785 debug!("append_batch");
2786 trace!(?batch, "append_batch");
2787 assert!(!batch.is_empty(), "Empty batch request");
2788
2789 let (version, notifier) = self
2790 .transaction(
2791 move |tx| {
2792 let mut version = version.clone();
2793 let mut notifier = None;
2794 for append_request in &batch {
2795 let (v, n) =
2796 Self::append(tx, &execution_id, append_request.clone(), version)?;
2797 version = v;
2798 notifier = Some(n);
2799 }
2800 Ok::<_, DbErrorWrite>((
2801 version,
2802 notifier.expect("checked that the batch is not empty"),
2803 ))
2804 },
2805 "append_batch",
2806 )
2807 .await?;
2808
2809 self.notify_all(vec![notifier], current_time);
2810 Ok(version)
2811 }
2812
2813 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2814 async fn append_batch_create_new_execution(
2815 &self,
2816 current_time: DateTime<Utc>,
2817 batch: Vec<AppendRequest>,
2818 execution_id: ExecutionId,
2819 version: Version,
2820 child_req: Vec<CreateRequest>,
2821 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2822 debug!("append_batch_create_new_execution");
2823 trace!(?batch, ?child_req, "append_batch_create_new_execution");
2824 assert!(!batch.is_empty(), "Empty batch request");
2825
2826 let (version, notifiers) = self
2827 .transaction(
2828 move |tx| {
2829 let mut notifier = None;
2830 let mut version = version.clone();
2831 for append_request in &batch {
2832 let (v, n) =
2833 Self::append(tx, &execution_id, append_request.clone(), version)?;
2834 version = v;
2835 notifier = Some(n);
2836 }
2837 let mut notifiers = Vec::new();
2838 notifiers.push(notifier.expect("checked that the batch is not empty"));
2839
2840 for child_req in &child_req {
2841 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2842 notifiers.push(notifier);
2843 }
2844 Ok::<_, DbErrorWrite>((version, notifiers))
2845 },
2846 "append_batch_create_new_execution_inner",
2847 )
2848 .await?;
2849 self.notify_all(notifiers, current_time);
2850 Ok(version)
2851 }
2852
2853 #[cfg(feature = "test")]
2854 #[instrument(level = Level::DEBUG, skip(self))]
2855 async fn get(
2856 &self,
2857 execution_id: &ExecutionId,
2858 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2859 trace!("get");
2860 let execution_id = execution_id.clone();
2861 self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2862 .await
2863 }
2864
2865 #[instrument(level = Level::DEBUG, skip(self))]
2866 async fn list_execution_events(
2867 &self,
2868 execution_id: &ExecutionId,
2869 since: &Version,
2870 max_length: VersionType,
2871 include_backtrace_id: bool,
2872 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2873 let execution_id = execution_id.clone();
2874 let since = since.0;
2875 self.transaction(
2876 move |tx| {
2877 Self::list_execution_events(
2878 tx,
2879 &execution_id,
2880 since,
2881 since + max_length,
2882 include_backtrace_id,
2883 )
2884 },
2885 "get",
2886 )
2887 .await
2888 }
2889
2890 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2894 async fn subscribe_to_next_responses(
2895 &self,
2896 execution_id: &ExecutionId,
2897 start_idx: usize,
2898 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2899 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2900 debug!("next_responses");
2901 let unique_tag: u64 = rand::random();
2902 let execution_id = execution_id.clone();
2903
2904 let cleanup = || {
2905 let mut guard = self.0.response_subscribers.lock().unwrap();
2906 match guard.remove(&execution_id) {
2907 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
2909 guard.insert(execution_id.clone(), other);
2911 }
2912 None => {} }
2914 };
2915
2916 let response_subscribers = self.0.response_subscribers.clone();
2917 let resp_or_receiver = {
2918 let execution_id = execution_id.clone();
2919 self.transaction(
2920 move |tx| {
2921 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2922 if responses.is_empty() {
2923 let (sender, receiver) = oneshot::channel();
2925 response_subscribers
2926 .lock()
2927 .unwrap()
2928 .insert(execution_id.clone(), (sender, unique_tag));
2929 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2930 } else {
2931 Ok(itertools::Either::Left(responses))
2932 }
2933 },
2934 "subscribe_to_next_responses",
2935 )
2936 .await
2937 }
2938 .inspect_err(|_| {
2939 cleanup();
2940 })?;
2941 match resp_or_receiver {
2942 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
2944 let res = async move {
2945 tokio::select! {
2946 resp = receiver => {
2947 let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2948 Ok(vec![resp])
2949 }
2950 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2951 }
2952 }
2953 .await;
2954 cleanup();
2955 res
2956 }
2957 }
2958 }
2959
2960 async fn wait_for_finished_result(
2962 &self,
2963 execution_id: &ExecutionId,
2964 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2965 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2966 let unique_tag: u64 = rand::random();
2967 let execution_id = execution_id.clone();
2968 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2969
2970 let cleanup = || {
2971 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2972 if let Some(subscribers) = guard.get_mut(&execution_id) {
2973 subscribers.remove(&unique_tag);
2974 }
2975 };
2976
2977 let resp_or_receiver = {
2978 let execution_id = execution_id.clone();
2979 self.transaction(move |tx| {
2980 let pending_state =
2981 Self::get_combined_state(tx, &execution_id)?.pending_state;
2982 if let PendingState::Finished { finished } = pending_state {
2983 let event =
2984 Self::get_execution_event(tx, &execution_id, finished.version)?;
2985 if let ExecutionEventInner::Finished { result, ..} = event.event {
2986 Ok(itertools::Either::Left(result))
2987 } else {
2988 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2989 Err(DbErrorReadWithTimeout::from(consistency_db_err(
2990 "cannot get finished event based on t_state version"
2991 )))
2992 }
2993 } else {
2994 let (sender, receiver) = oneshot::channel();
2998 let mut guard = execution_finished_subscription.lock().unwrap();
2999 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3000 Ok(itertools::Either::Right(receiver))
3001 }
3002 }, "wait_for_finished_result")
3003 .await
3004 }
3005 .inspect_err(|_| {
3006 cleanup();
3010 })?;
3011
3012 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3013 match resp_or_receiver {
3014 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3016 let res = async move {
3017 tokio::select! {
3018 resp = receiver => {
3019 Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3020 }
3021 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3022 }
3023 }
3024 .await;
3025 cleanup();
3026 res
3027 }
3028 }
3029 }
3030
3031 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3032 async fn append_response(
3033 &self,
3034 created_at: DateTime<Utc>,
3035 execution_id: ExecutionId,
3036 response_event: JoinSetResponseEvent,
3037 ) -> Result<(), DbErrorWrite> {
3038 debug!("append_response");
3039 let event = JoinSetResponseEventOuter {
3040 created_at,
3041 event: response_event,
3042 };
3043 let notifier = self
3044 .transaction(
3045 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3046 "append_response",
3047 )
3048 .await?;
3049 self.notify_all(vec![notifier], created_at);
3050 Ok(())
3051 }
3052
3053 #[instrument(level = Level::DEBUG, skip_all)]
3054 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3055 debug!("append_backtrace");
3056 self.transaction(
3057 move |tx| Self::append_backtrace(tx, &append),
3058 "append_backtrace",
3059 )
3060 .await
3061 }
3062
3063 #[instrument(level = Level::DEBUG, skip_all)]
3064 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3065 debug!("append_backtrace_batch");
3066 self.transaction(
3067 move |tx| {
3068 for append in &batch {
3069 Self::append_backtrace(tx, append)?;
3070 }
3071 Ok(())
3072 },
3073 "append_backtrace",
3074 )
3075 .await
3076 }
3077
3078 #[instrument(level = Level::DEBUG, skip_all)]
3079 async fn get_backtrace(
3080 &self,
3081 execution_id: &ExecutionId,
3082 filter: BacktraceFilter,
3083 ) -> Result<BacktraceInfo, DbErrorRead> {
3084 debug!("get_last_backtrace");
3085 let execution_id = execution_id.clone();
3086
3087 self.transaction(
3088 move |tx| {
3089 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3090 WHERE execution_id = :execution_id";
3091 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3092 let select = match &filter {
3093 BacktraceFilter::Specific(version) =>{
3094 params.push((":version", Box::new(version.0)));
3095 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3096 },
3097 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3098 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3099 };
3100 tx
3101 .prepare(&select)
3102 ?
3103 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3104 params
3105 .iter()
3106 .map(|(key, value)| (*key, value.as_ref()))
3107 .collect::<Vec<_>>()
3108 .as_ref(),
3109 |row| {
3110 Ok(BacktraceInfo {
3111 execution_id: execution_id.clone(),
3112 component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3113 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3114 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3115 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3116 })
3117 },
3118 ).map_err(DbErrorRead::from)
3119 },
3120 "get_last_backtrace",
3121 ).await
3122 }
3123
3124 #[instrument(level = Level::TRACE, skip(self))]
3126 async fn get_expired_timers(
3127 &self,
3128 at: DateTime<Utc>,
3129 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3130 self.transaction(
3131 move |conn| {
3132 let mut expired_timers = conn.prepare(
3133 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3134 )?
3135 .query_map(
3136 named_params! {
3137 ":at": at,
3138 },
3139 |row| {
3140 let execution_id = row.get("execution_id")?;
3141 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3142 let delay_id = row.get::<_, DelayId>("delay_id")?;
3143 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3144 Ok(ExpiredTimer::Delay(delay))
3145 },
3146 )?
3147 .collect::<Result<Vec<_>, _>>()?;
3148 let expired = conn.prepare(&format!(r#"
3150 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3151 executor_id, run_id
3152 FROM t_state
3153 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3154 "#
3155 )
3156 )?
3157 .query_map(
3158 named_params! {
3159 ":at": at,
3160 },
3161 |row| {
3162 let execution_id = row.get("execution_id")?;
3163 let locked_at_version = Version::new(row.get("last_lock_version")?);
3164 let next_version = Version::new(row.get("corresponding_version")?).increment();
3165 let intermittent_event_count = row.get("intermittent_event_count")?;
3166 let max_retries = row.get("max_retries")?;
3167 let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3168 let executor_id = row.get("executor_id")?;
3169 let run_id = row.get("run_id")?;
3170 let lock = ExpiredLock {
3171 execution_id,
3172 locked_at_version,
3173 next_version,
3174 intermittent_event_count,
3175 max_retries,
3176 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3177 locked_by: LockedBy { executor_id, run_id },
3178 };
3179 Ok(ExpiredTimer::Lock(lock))
3180 }
3181 )?
3182 .collect::<Result<Vec<_>, _>>()?;
3183 expired_timers.extend(expired);
3184 if !expired_timers.is_empty() {
3185 debug!("get_expired_timers found {expired_timers:?}");
3186 }
3187 Ok(expired_timers)
3188 }, "get_expired_timers"
3189 )
3190 .await
3191 }
3192
3193 async fn get_execution_event(
3194 &self,
3195 execution_id: &ExecutionId,
3196 version: &Version,
3197 ) -> Result<ExecutionEvent, DbErrorRead> {
3198 let version = version.0;
3199 let execution_id = execution_id.clone();
3200 self.transaction(
3201 move |tx| Self::get_execution_event(tx, &execution_id, version),
3202 "get_execution_event",
3203 )
3204 .await
3205 }
3206
3207 async fn get_pending_state(
3208 &self,
3209 execution_id: &ExecutionId,
3210 ) -> Result<PendingState, DbErrorRead> {
3211 let execution_id = execution_id.clone();
3212 Ok(self
3213 .transaction(
3214 move |tx| Self::get_combined_state(tx, &execution_id),
3215 "get_pending_state",
3216 )
3217 .await?
3218 .pending_state)
3219 }
3220
3221 async fn list_executions(
3222 &self,
3223 ffqn: Option<FunctionFqn>,
3224 top_level_only: bool,
3225 pagination: ExecutionListPagination,
3226 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3227 self.transaction(
3228 move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3229 "list_executions",
3230 )
3231 .await
3232 }
3233
3234 async fn list_responses(
3235 &self,
3236 execution_id: &ExecutionId,
3237 pagination: Pagination<u32>,
3238 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3239 let execution_id = execution_id.clone();
3240 self.transaction(
3241 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3242 "list_executions",
3243 )
3244 .await
3245 }
3246}
3247
3248#[cfg(any(test, feature = "tempfile"))]
3249pub mod tempfile {
3250 use super::{SqliteConfig, SqlitePool};
3251 use tempfile::NamedTempFile;
3252
3253 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3254 if let Ok(path) = std::env::var("SQLITE_FILE") {
3255 (
3256 SqlitePool::new(path, SqliteConfig::default())
3257 .await
3258 .unwrap(),
3259 None,
3260 )
3261 } else {
3262 let file = NamedTempFile::new().unwrap();
3263 let path = file.path();
3264 (
3265 SqlitePool::new(path, SqliteConfig::default())
3266 .await
3267 .unwrap(),
3268 Some(file),
3269 )
3270 }
3271 }
3272}