1use crate::histograms::Histograms;
2use assert_matches::assert_matches;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use concepts::{
6 ComponentId, ExecutionId, FunctionFqn, JoinSetId, StrVariant, SupportedFunctionReturnValue,
7 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8 storage::{
9 AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10 AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11 DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12 DbErrorWrite, DbErrorWritePermanent, DbExecutor, DbPool, DbPoolCloseable, ExecutionEvent,
13 ExecutionEventInner, ExecutionListPagination, ExecutionWithState, ExpiredDelay,
14 ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15 JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, LockedExecution,
16 Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
17 ResponseWithCursor, Version, VersionType,
18 },
19};
20use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
21use hashbrown::HashMap;
22use rusqlite::{
23 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
24 TransactionBehavior, named_params, types::ToSqlOutput,
25};
26use std::{
27 cmp::max,
28 collections::VecDeque,
29 fmt::Debug,
30 ops::DerefMut,
31 path::Path,
32 str::FromStr,
33 sync::{
34 Arc, Mutex,
35 atomic::{AtomicBool, Ordering},
36 },
37 time::Duration,
38};
39use std::{fmt::Write as _, pin::Pin};
40use tokio::{
41 sync::{mpsc, oneshot},
42 time::Instant,
43};
44use tracing::{Level, debug, error, info, instrument, trace, warn};
45
46#[derive(Debug, thiserror::Error)]
47#[error("initialization error")]
48pub struct InitializationError;
49
50#[derive(Debug, Clone)]
51struct DelayReq {
52 join_set_id: JoinSetId,
53 delay_id: DelayId,
54 expires_at: DateTime<Utc>,
55}
56const PRAGMA: [[&str; 2]; 10] = [
68 ["journal_mode", "wal"],
69 ["synchronous", "FULL"],
70 ["foreign_keys", "true"],
71 ["busy_timeout", "1000"],
72 ["cache_size", "10000"], ["temp_store", "MEMORY"],
74 ["page_size", "8192"], ["mmap_size", "134217728"],
76 ["journal_size_limit", "67108864"],
77 ["integrity_check", ""],
78];
79
80const CREATE_TABLE_T_METADATA: &str = r"
82CREATE TABLE IF NOT EXISTS t_metadata (
83 id INTEGER PRIMARY KEY AUTOINCREMENT,
84 schema_version INTEGER NOT NULL,
85 created_at TEXT NOT NULL
86) STRICT
87";
88const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
89
90const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
92CREATE TABLE IF NOT EXISTS t_execution_log (
93 execution_id TEXT NOT NULL,
94 created_at TEXT NOT NULL,
95 json_value TEXT NOT NULL,
96 version INTEGER NOT NULL,
97 variant TEXT NOT NULL,
98 join_set_id TEXT,
99 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
100 PRIMARY KEY (execution_id, version)
101) STRICT
102";
103const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
105CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
106";
107const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
109CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
110";
111
112const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
117CREATE TABLE IF NOT EXISTS t_join_set_response (
118 id INTEGER PRIMARY KEY AUTOINCREMENT,
119 created_at TEXT NOT NULL,
120 execution_id TEXT NOT NULL,
121 join_set_id TEXT NOT NULL,
122
123 delay_id TEXT,
124
125 child_execution_id TEXT,
126 finished_version INTEGER,
127
128 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
129) STRICT
130";
131const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
133CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
134";
135
136const CREATE_TABLE_T_STATE: &str = r"
143CREATE TABLE IF NOT EXISTS t_state (
144 execution_id TEXT NOT NULL,
145 is_top_level INTEGER NOT NULL,
146 corresponding_version INTEGER NOT NULL,
147 pending_expires_finished TEXT NOT NULL,
148 ffqn TEXT NOT NULL,
149 state TEXT NOT NULL,
150 created_at TEXT NOT NULL,
151 updated_at TEXT NOT NULL,
152 scheduled_at TEXT NOT NULL,
153 intermittent_event_count INTEGER NOT NULL,
154 max_retries INTEGER NOT NULL,
155 retry_exp_backoff_millis INTEGER NOT NULL,
156
157 last_lock_version INTEGER,
158 executor_id TEXT,
159 run_id TEXT,
160
161 join_set_id TEXT,
162 join_set_closing INTEGER,
163
164 result_kind TEXT,
165
166 PRIMARY KEY (execution_id)
167) STRICT
168";
169const STATE_PENDING_AT: &str = "PendingAt";
170const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
171const STATE_LOCKED: &str = "Locked";
172const STATE_FINISHED: &str = "Finished";
173const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
174
175const IDX_T_STATE_LOCK_PENDING: &str = r"
177CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
178";
179const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
180CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
181";
182const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
183CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
184";
185const IDX_T_STATE_FFQN: &str = r"
187CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
188";
189const IDX_T_STATE_CREATED_AT: &str = r"
191CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
192";
193
194const CREATE_TABLE_T_DELAY: &str = r"
196CREATE TABLE IF NOT EXISTS t_delay (
197 execution_id TEXT NOT NULL,
198 join_set_id TEXT NOT NULL,
199 delay_id TEXT NOT NULL,
200 expires_at TEXT NOT NULL,
201 PRIMARY KEY (execution_id, join_set_id, delay_id)
202) STRICT
203";
204
205const CREATE_TABLE_T_BACKTRACE: &str = r"
207CREATE TABLE IF NOT EXISTS t_backtrace (
208 execution_id TEXT NOT NULL,
209 component_id TEXT NOT NULL,
210 version_min_including INTEGER NOT NULL,
211 version_max_excluding INTEGER NOT NULL,
212 wasm_backtrace TEXT NOT NULL,
213 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
214) STRICT
215";
216const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
218CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
219";
220
221#[derive(Debug, thiserror::Error, Clone)]
222enum RusqliteError {
223 #[error("not found")]
224 NotFound,
225 #[error("generic: {0}")]
226 Generic(StrVariant),
227}
228
229mod conversions {
230
231 use super::RusqliteError;
232 use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
233 use rusqlite::types::{FromSql, FromSqlError};
234 use std::{fmt::Debug, str::FromStr};
235 use tracing::error;
236
237 impl From<rusqlite::Error> for RusqliteError {
238 fn from(err: rusqlite::Error) -> Self {
239 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
240 RusqliteError::NotFound
241 } else {
242 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
243 RusqliteError::Generic(err.to_string().into())
244 }
245 }
246 }
247
248 impl From<RusqliteError> for DbErrorGeneric {
249 fn from(err: RusqliteError) -> DbErrorGeneric {
250 match err {
251 RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
252 RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
253 }
254 }
255 }
256 impl From<RusqliteError> for DbErrorRead {
257 fn from(err: RusqliteError) -> Self {
258 if matches!(err, RusqliteError::NotFound) {
259 Self::NotFound
260 } else {
261 Self::from(DbErrorGeneric::from(err))
262 }
263 }
264 }
265 impl From<RusqliteError> for DbErrorReadWithTimeout {
266 fn from(err: RusqliteError) -> Self {
267 Self::from(DbErrorRead::from(err))
268 }
269 }
270 impl From<RusqliteError> for DbErrorWrite {
271 fn from(err: RusqliteError) -> Self {
272 if matches!(err, RusqliteError::NotFound) {
273 Self::NotFound
274 } else {
275 Self::from(DbErrorGeneric::from(err))
276 }
277 }
278 }
279
280 pub(crate) struct JsonWrapper<T>(pub(crate) T);
281 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
282 fn column_result(
283 value: rusqlite::types::ValueRef<'_>,
284 ) -> rusqlite::types::FromSqlResult<Self> {
285 let value = match value {
286 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
287 Ok(value)
288 }
289 other => {
290 error!(
291 backtrace = %std::backtrace::Backtrace::capture(),
292 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
293 );
294 Err(FromSqlError::InvalidType)
295 }
296 }?;
297 let value = serde_json::from_slice::<T>(value).map_err(|err| {
298 error!(
299 backtrace = %std::backtrace::Backtrace::capture(),
300 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
301 r#type = std::any::type_name::<T>()
302 );
303 FromSqlError::InvalidType
304 })?;
305 Ok(Self(value))
306 }
307 }
308
309 pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
310 impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
311 fn column_result(
312 value: rusqlite::types::ValueRef<'_>,
313 ) -> rusqlite::types::FromSqlResult<Self> {
314 let value = String::column_result(value)?;
315 let value = T::from_str(&value).map_err(|err| {
316 error!(
317 backtrace = %std::backtrace::Backtrace::capture(),
318 "Cannot convert string `{value}` to type:`{type}` - {err:?}",
319 r#type = std::any::type_name::<T>()
320 );
321 FromSqlError::InvalidType
322 })?;
323 Ok(Self(value))
324 }
325 }
326
327 #[derive(Debug, thiserror::Error)]
329 #[error("{0}")]
330 pub(crate) struct OtherError(&'static str);
331 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
332 FromSqlError::other(OtherError(input)).into()
333 }
334
335 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
336 DbErrorGeneric::Uncategorized(input.into())
337 }
338}
339
340#[derive(Debug)]
341struct CombinedStateDTO {
342 state: String,
343 ffqn: String,
344 pending_expires_finished: DateTime<Utc>,
345 last_lock_version: Option<Version>,
347 executor_id: Option<ExecutorId>,
348 run_id: Option<RunId>,
349 join_set_id: Option<JoinSetId>,
351 join_set_closing: Option<bool>,
352 result_kind: Option<PendingStateFinishedResultKind>,
354}
355#[derive(Debug)]
356struct CombinedState {
357 ffqn: FunctionFqn,
358 pending_state: PendingState,
359 corresponding_version: Version,
360}
361impl CombinedState {
362 fn get_next_version_assert_not_finished(&self) -> Version {
363 assert!(!self.pending_state.is_finished());
364 self.corresponding_version.increment()
365 }
366
367 #[cfg(feature = "test")]
368 fn get_next_version_or_finished(&self) -> Version {
369 if self.pending_state.is_finished() {
370 self.corresponding_version.clone()
371 } else {
372 self.corresponding_version.increment()
373 }
374 }
375}
376
377#[derive(Debug)]
378struct NotifierPendingAt {
379 scheduled_at: DateTime<Utc>,
380 ffqn: FunctionFqn,
381}
382
383#[derive(Debug)]
384struct NotifierExecutionFinished {
385 execution_id: ExecutionId,
386 retval: SupportedFunctionReturnValue,
387}
388
389#[derive(Debug, Default)]
390struct AppendNotifier {
391 pending_at: Option<NotifierPendingAt>,
392 execution_finished: Option<NotifierExecutionFinished>,
393 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
394}
395
396impl CombinedState {
397 fn new(
398 dto: &CombinedStateDTO,
399 corresponding_version: Version,
400 ) -> Result<Self, rusqlite::Error> {
401 let pending_state = match dto {
402 CombinedStateDTO {
403 state,
404 ffqn: _,
405 pending_expires_finished: scheduled_at,
406 last_lock_version: None,
407 executor_id: None,
408 run_id: None,
409 join_set_id: None,
410 join_set_closing: None,
411 result_kind: None,
412 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
413 scheduled_at: *scheduled_at,
414 }),
415 CombinedStateDTO {
416 state,
417 ffqn: _,
418 pending_expires_finished: lock_expires_at,
419 last_lock_version: Some(_),
420 executor_id: Some(executor_id),
421 run_id: Some(run_id),
422 join_set_id: None,
423 join_set_closing: None,
424 result_kind: None,
425 } if state == STATE_LOCKED => Ok(PendingState::Locked {
426 executor_id: *executor_id,
427 run_id: *run_id,
428 lock_expires_at: *lock_expires_at,
429 }),
430 CombinedStateDTO {
431 state,
432 ffqn: _,
433 pending_expires_finished: lock_expires_at,
434 last_lock_version: None,
435 executor_id: None,
436 run_id: None,
437 join_set_id: Some(join_set_id),
438 join_set_closing: Some(join_set_closing),
439 result_kind: None,
440 } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
441 join_set_id: join_set_id.clone(),
442 closing: *join_set_closing,
443 lock_expires_at: *lock_expires_at,
444 }),
445 CombinedStateDTO {
446 state,
447 ffqn: _,
448 pending_expires_finished: finished_at,
449 last_lock_version: None,
450 executor_id: None,
451 run_id: None,
452 join_set_id: None,
453 join_set_closing: None,
454 result_kind: Some(result_kind),
455 } if state == STATE_FINISHED => Ok(PendingState::Finished {
456 finished: PendingStateFinished {
457 finished_at: *finished_at,
458 version: corresponding_version.0,
459 result_kind: *result_kind,
460 },
461 }),
462 _ => {
463 error!("Cannot deserialize pending state from {dto:?}");
464 Err(consistency_rusqlite("invalid `t_state`"))
465 }
466 }?;
467 Ok(Self {
468 ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
469 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
470 consistency_rusqlite("invalid ffqn value in `t_state`")
471 })?,
472 pending_state,
473 corresponding_version,
474 })
475 }
476}
477
478#[derive(derive_more::Debug)]
479struct LogicalTx {
480 #[debug(skip)]
481 func: Box<dyn FnMut(&mut Transaction) + Send>,
482 sent_at: Instant,
483 func_name: &'static str,
484 #[debug(skip)]
485 commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
486}
487
488#[derive(derive_more::Debug)]
489enum ThreadCommand {
490 LogicalTx(LogicalTx),
491 Shutdown,
492}
493
494#[derive(Clone)]
495pub struct SqlitePool(SqlitePoolInner);
496
497type ResponseSubscribers =
498 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
499type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
500type ExecutionFinishedSubscribers =
501 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
502#[derive(Clone)]
503struct SqlitePoolInner {
504 shutdown_requested: Arc<AtomicBool>,
505 shutdown_finished: Arc<AtomicBool>,
506 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
507 response_subscribers: ResponseSubscribers,
508 pending_ffqn_subscribers: PendingFfqnSubscribers,
509 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
510 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
512
513#[async_trait]
514impl DbPoolCloseable for SqlitePool {
515 async fn close(self) {
516 debug!("Sqlite is closing");
517 self.0.shutdown_requested.store(true, Ordering::Release);
518 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
520 while !self.0.shutdown_finished.load(Ordering::Acquire) {
521 tokio::time::sleep(Duration::from_millis(1)).await;
522 }
523 debug!("Sqlite was closed");
524 }
525}
526
527#[async_trait]
528impl DbPool for SqlitePool {
529 fn connection(&self) -> Box<dyn DbConnection> {
530 Box::new(self.clone())
531 }
532}
533impl Drop for SqlitePool {
534 fn drop(&mut self) {
535 let arc = self.0.join_handle.take().expect("join_handle was set");
536 if let Ok(join_handle) = Arc::try_unwrap(arc) {
537 if !join_handle.is_finished() {
539 if !self.0.shutdown_finished.load(Ordering::Acquire) {
540 let backtrace = std::backtrace::Backtrace::capture();
542 warn!("SqlitePool was not closed properly - {backtrace}");
543 self.0.shutdown_requested.store(true, Ordering::Release);
544 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
546 } else {
549 }
551 }
552 }
553 }
554}
555
556#[derive(Debug, Clone)]
557pub struct SqliteConfig {
558 pub queue_capacity: usize,
559 pub pragma_override: Option<HashMap<String, String>>,
560 pub metrics_threshold: Option<Duration>,
561}
562impl Default for SqliteConfig {
563 fn default() -> Self {
564 Self {
565 queue_capacity: 100,
566 pragma_override: None,
567 metrics_threshold: None,
568 }
569 }
570}
571
572impl SqlitePool {
573 fn init_thread(
574 path: &Path,
575 mut pragma_override: HashMap<String, String>,
576 ) -> Result<Connection, InitializationError> {
577 fn conn_execute<P: Params>(
578 conn: &Connection,
579 sql: &str,
580 params: P,
581 ) -> Result<(), InitializationError> {
582 conn.execute(sql, params).map(|_| ()).map_err(|err| {
583 error!("Cannot run `{sql}` - {err:?}");
584 InitializationError
585 })
586 }
587 fn pragma_update(
588 conn: &Connection,
589 name: &str,
590 value: &str,
591 ) -> Result<(), InitializationError> {
592 if value.is_empty() {
593 debug!("Querying PRAGMA {name}");
594 conn.pragma_query(None, name, |row| {
595 debug!("{row:?}");
596 Ok(())
597 })
598 .map_err(|err| {
599 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
600 InitializationError
601 })
602 } else {
603 debug!("Setting PRAGMA {name}={value}");
604 conn.pragma_update(None, name, value).map_err(|err| {
605 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
606 InitializationError
607 })
608 }
609 }
610
611 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
612 error!("cannot open the connection - {err:?}");
613 InitializationError
614 })?;
615
616 for [pragma_name, default_value] in PRAGMA {
617 let pragma_value = pragma_override
618 .remove(pragma_name)
619 .unwrap_or_else(|| default_value.to_string());
620 pragma_update(&conn, pragma_name, &pragma_value)?;
621 }
622 for (pragma_name, pragma_value) in pragma_override.drain() {
624 pragma_update(&conn, &pragma_name, &pragma_value)?;
625 }
626
627 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
629 conn_execute(
631 &conn,
632 &format!(
633 "INSERT INTO t_metadata (schema_version, created_at) VALUES
634 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
635 ),
636 [Utc::now()],
637 )?;
638 let actual_version = conn
640 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
641 .map_err(|err| {
642 error!("cannot select schema version - {err:?}");
643 InitializationError
644 })?
645 .query_row([], |row| row.get::<_, u32>("schema_version"));
646
647 let actual_version = actual_version.map_err(|err| {
648 error!("Cannot read the schema version - {err:?}");
649 InitializationError
650 })?;
651 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
652 error!(
653 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
654 );
655 return Err(InitializationError);
656 }
657
658 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
660 conn_execute(
661 &conn,
662 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
663 [],
664 )?;
665 conn_execute(
666 &conn,
667 CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
668 [],
669 )?;
670 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
672 conn_execute(
673 &conn,
674 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
675 [],
676 )?;
677 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
679 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
680 conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
681 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
682 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
683 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
684 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
686 conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
688 conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
689 Ok(conn)
690 }
691
692 fn connection_rpc(
693 mut conn: Connection,
694 shutdown_requested: &AtomicBool,
695 shutdown_finished: &AtomicBool,
696 mut command_rx: mpsc::Receiver<ThreadCommand>,
697 metrics_threshold: Option<Duration>,
698 ) {
699 let mut histograms = Histograms::new(metrics_threshold);
700 while Self::tick(
701 &mut conn,
702 shutdown_requested,
703 &mut command_rx,
704 &mut histograms,
705 )
706 .is_ok()
707 {
708 }
710 debug!("Closing command thread");
711 shutdown_finished.store(true, Ordering::Release);
712 }
713
714 fn tick(
715 conn: &mut Connection,
716 shutdown_requested: &AtomicBool,
717 command_rx: &mut mpsc::Receiver<ThreadCommand>,
718 histograms: &mut Histograms,
719 ) -> Result<(), ()> {
720 let mut ltx_list = Vec::new();
721 let mut ltx = match command_rx.blocking_recv() {
723 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
724 Some(ThreadCommand::Shutdown) => {
725 debug!("shutdown message received");
726 return Err(());
727 }
728 None => {
729 debug!("command_rx was closed");
730 return Err(());
731 }
732 };
733 let all_fns_start = std::time::Instant::now();
734 let mut physical_tx = conn
735 .transaction_with_behavior(TransactionBehavior::Immediate)
736 .map_err(|begin_err| {
737 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
738 })?;
739 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
740 ltx_list.push(ltx);
741
742 while let Ok(more) = command_rx.try_recv() {
743 let mut ltx = match more {
744 ThreadCommand::Shutdown => {
745 debug!("shutdown message received");
746 return Err(());
747 }
748 ThreadCommand::LogicalTx(ltx) => ltx,
749 };
750
751 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
752 ltx_list.push(ltx);
753 }
754 histograms.record_all_fns(all_fns_start.elapsed());
755
756 {
757 if shutdown_requested.load(Ordering::Relaxed) {
759 debug!("Recveived shutdown during processing of the batch");
760 return Err(());
761 }
762 let commit_result = {
763 let now = std::time::Instant::now();
764 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
765 histograms.record_commit(now.elapsed());
766 commit_result
767 };
768 if commit_result.is_ok() || ltx_list.len() == 1 {
769 for ltx in ltx_list {
771 let _ = ltx.commit_ack_sender.send(commit_result.clone());
773 }
774 } else {
775 for ltx in ltx_list {
777 Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
778 }
779 }
780 }
781 histograms.print_if_elapsed();
782 Ok(())
783 }
784
785 fn ltx_commit_single(
786 mut ltx: LogicalTx,
787 conn: &mut Connection,
788 shutdown_requested: &AtomicBool,
789 histograms: &mut Histograms,
790 ) -> Result<(), ()> {
791 let mut physical_tx = conn
792 .transaction_with_behavior(TransactionBehavior::Immediate)
793 .map_err(|begin_err| {
794 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
795 })?;
796 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
797 if shutdown_requested.load(Ordering::Relaxed) {
798 debug!("Recveived shutdown during processing of the batch");
799 return Err(());
800 }
801 let commit_result = {
802 let now = std::time::Instant::now();
803 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
804 histograms.record_commit(now.elapsed());
805 commit_result
806 };
807 let _ = ltx.commit_ack_sender.send(commit_result);
809 Ok(())
810 }
811
812 fn ltx_apply_to_tx(
813 ltx: &mut LogicalTx,
814 physical_tx: &mut Transaction,
815 histograms: &mut Histograms,
816 ) {
817 let sent_latency = ltx.sent_at.elapsed();
818 let started_at = Instant::now();
819 (ltx.func)(physical_tx);
820 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
821 }
822
823 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
824 pub async fn new<P: AsRef<Path>>(
825 path: P,
826 config: SqliteConfig,
827 ) -> Result<Self, InitializationError> {
828 let path = path.as_ref().to_owned();
829
830 let shutdown_requested = Arc::new(AtomicBool::new(false));
831 let shutdown_finished = Arc::new(AtomicBool::new(false));
832
833 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
834 info!("Sqlite database location: {path:?}");
835 let join_handle = {
836 let init_task = {
838 tokio::task::spawn_blocking(move || {
839 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
840 })
841 .await
842 };
843 let conn = match init_task {
844 Ok(res) => res?,
845 Err(join_err) => {
846 error!("Initialization panic - {join_err:?}");
847 return Err(InitializationError);
848 }
849 };
850 let shutdown_requested = shutdown_requested.clone();
851 let shutdown_finished = shutdown_finished.clone();
852 std::thread::spawn(move || {
854 Self::connection_rpc(
855 conn,
856 &shutdown_requested,
857 &shutdown_finished,
858 command_rx,
859 config.metrics_threshold,
860 );
861 })
862 };
863 Ok(SqlitePool(SqlitePoolInner {
864 shutdown_requested,
865 shutdown_finished,
866 command_tx,
867 response_subscribers: Arc::default(),
868 pending_ffqn_subscribers: Arc::default(),
869 join_handle: Some(Arc::new(join_handle)),
870 execution_finished_subscribers: Arc::default(),
871 }))
872 }
873
874 async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
876 where
877 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
878 T: Send + 'static,
879 E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
880 {
881 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
882 let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
883 let thread_command_func = {
884 let fn_res = fn_res.clone();
885 ThreadCommand::LogicalTx(LogicalTx {
886 func: Box::new(move |tx| {
887 let func_res = func(tx);
888 *fn_res.lock().unwrap() = Some(func_res);
889 }),
890 sent_at: Instant::now(),
891 func_name,
892 commit_ack_sender,
893 })
894 };
895 self.0
896 .command_tx
897 .send(thread_command_func)
898 .await
899 .map_err(|_send_err| DbErrorGeneric::Close)?;
900
901 match commit_ack_receiver.await {
903 Ok(Ok(())) => {
904 let mut guard = fn_res.lock().unwrap();
905 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
906 }
907 Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
908 Err(_) => Err(E::from(DbErrorGeneric::Close)),
909 }
910 }
911
912 fn fetch_created_event(
913 conn: &Connection,
914 execution_id: &ExecutionId,
915 ) -> Result<CreateRequest, DbErrorRead> {
916 let mut stmt = conn.prepare(
917 "SELECT created_at, json_value FROM t_execution_log WHERE \
918 execution_id = :execution_id AND version = 0",
919 )?;
920 let (created_at, event) = stmt.query_row(
921 named_params! {
922 ":execution_id": execution_id.to_string(),
923 },
924 |row| {
925 let created_at = row.get("created_at")?;
926 let event = row
927 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
928 .map_err(|serde| {
929 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
930 consistency_rusqlite("cannot deserialize `Created` event")
931 })?;
932 Ok((created_at, event.0))
933 },
934 )?;
935 if let ExecutionEventInner::Created {
936 ffqn,
937 params,
938 parent,
939 scheduled_at,
940 retry_exp_backoff,
941 max_retries,
942 component_id,
943 metadata,
944 scheduled_by,
945 } = event
946 {
947 Ok(CreateRequest {
948 created_at,
949 execution_id: execution_id.clone(),
950 ffqn,
951 params,
952 parent,
953 scheduled_at,
954 retry_exp_backoff,
955 max_retries,
956 component_id,
957 metadata,
958 scheduled_by,
959 })
960 } else {
961 error!("Row with version=0 must be a `Created` event - {event:?}");
962 Err(consistency_db_err("expected `Created` event").into())
963 }
964 }
965
966 fn check_expected_next_and_appending_version(
967 expected_version: &Version,
968 appending_version: &Version,
969 ) -> Result<(), DbErrorWrite> {
970 if *expected_version != *appending_version {
971 debug!(
972 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
973 );
974 return Err(DbErrorWrite::Permanent(
975 DbErrorWritePermanent::CannotWrite {
976 reason: "version conflict".into(),
977 expected_version: Some(expected_version.clone()),
978 },
979 ));
980 }
981 Ok(())
982 }
983
984 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
985 fn create_inner(
986 tx: &Transaction,
987 req: CreateRequest,
988 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
989 debug!("create_inner");
990
991 let version = Version::default();
992 let execution_id = req.execution_id.clone();
993 let execution_id_str = execution_id.to_string();
994 let ffqn = req.ffqn.clone();
995 let created_at = req.created_at;
996 let scheduled_at = req.scheduled_at;
997 let max_retries = req.max_retries;
998 let backoff_millis =
999 u64::try_from(req.retry_exp_backoff.as_millis()).expect("backoff too big");
1000 let event = ExecutionEventInner::from(req);
1001 let event_ser = serde_json::to_string(&event).map_err(|err| {
1002 error!("Cannot serialize {event:?} - {err:?}");
1003 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1004 })?;
1005 tx.prepare(
1006 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1007 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1008 ?
1009 .execute(named_params! {
1010 ":execution_id": &execution_id_str,
1011 ":created_at": created_at,
1012 ":version": version.0,
1013 ":json_value": event_ser,
1014 ":variant": event.variant(),
1015 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1016 })
1017 ?;
1018 let pending_state = PendingState::PendingAt { scheduled_at };
1019 let pending_at = {
1020 let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
1021 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1022 tx.prepare(
1023 r"
1024 INSERT INTO t_state (
1025 execution_id,
1026 is_top_level,
1027 corresponding_version,
1028 pending_expires_finished,
1029 ffqn,
1030 state,
1031 created_at,
1032 updated_at,
1033 scheduled_at,
1034 intermittent_event_count,
1035 max_retries,
1036 retry_exp_backoff_millis
1037 )
1038 VALUES (
1039 :execution_id,
1040 :is_top_level,
1041 :corresponding_version,
1042 :pending_expires_finished,
1043 :ffqn,
1044 :state,
1045 :created_at,
1046 CURRENT_TIMESTAMP,
1047 :scheduled_at,
1048 0,
1049 :max_retries,
1050 :retry_exp_backoff_millis
1051 )
1052 ",
1053 )?
1054 .execute(named_params! {
1055 ":execution_id": execution_id.to_string(),
1056 ":is_top_level": execution_id.is_top_level(),
1057 ":corresponding_version": version.0,
1058 ":pending_expires_finished": scheduled_at,
1059 ":ffqn": ffqn.to_string(),
1060 ":state": STATE_PENDING_AT,
1061 ":created_at": created_at,
1062 ":scheduled_at": scheduled_at,
1063 ":max_retries": max_retries,
1064 ":retry_exp_backoff_millis": backoff_millis,
1065 })?;
1066 AppendNotifier {
1067 pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1068 execution_finished: None,
1069 response: None,
1070 }
1071 };
1072 let next_version = Version::new(version.0 + 1);
1073 Ok((next_version, pending_at))
1074 }
1075
1076 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1077 fn update_state_pending_after_response_appended(
1078 tx: &Transaction,
1079 execution_id: &ExecutionId,
1080 scheduled_at: DateTime<Utc>, corresponding_version: &Version, ) -> Result<AppendNotifier, DbErrorWrite> {
1083 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1084 let execution_id_str = execution_id.to_string();
1085 let mut stmt = tx
1086 .prepare_cached(
1087 r"
1088 UPDATE t_state
1089 SET
1090 corresponding_version = :corresponding_version,
1091 pending_expires_finished = :pending_expires_finished,
1092 state = :state,
1093 updated_at = CURRENT_TIMESTAMP,
1094
1095 last_lock_version = NULL,
1096 executor_id = NULL,
1097 run_id = NULL,
1098
1099 join_set_id = NULL,
1100 join_set_closing = NULL,
1101
1102 result_kind = NULL
1103 WHERE execution_id = :execution_id
1104 ",
1105 )
1106 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1107 let updated = stmt
1108 .execute(named_params! {
1109 ":execution_id": execution_id_str,
1110 ":corresponding_version": corresponding_version.0,
1111 ":pending_expires_finished": scheduled_at,
1112 ":state": STATE_PENDING_AT,
1113 })
1114 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1115 if updated != 1 {
1116 return Err(DbErrorWrite::NotFound);
1117 }
1118 Ok(AppendNotifier {
1119 pending_at: Some(NotifierPendingAt {
1120 scheduled_at,
1121 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1122 }),
1123 execution_finished: None,
1124 response: None,
1125 })
1126 }
1127
1128 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1129 fn update_state_pending_after_event_appended(
1130 tx: &Transaction,
1131 execution_id: &ExecutionId,
1132 appending_version: &Version,
1133 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1135 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1136 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1137 let mut stmt = tx
1140 .prepare_cached(
1141 r"
1142 UPDATE t_state
1143 SET
1144 corresponding_version = :appending_version,
1145 pending_expires_finished = :pending_expires_finished,
1146 state = :state,
1147 updated_at = CURRENT_TIMESTAMP,
1148 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1149
1150 last_lock_version = NULL,
1151 executor_id = NULL,
1152 run_id = NULL,
1153
1154 join_set_id = NULL,
1155 join_set_closing = NULL,
1156
1157 result_kind = NULL
1158 WHERE execution_id = :execution_id;
1159 ",
1160 )
1161 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1162 let updated = stmt
1163 .execute(named_params! {
1164 ":execution_id": execution_id.to_string(),
1165 ":appending_version": appending_version.0,
1166 ":pending_expires_finished": scheduled_at,
1167 ":state": STATE_PENDING_AT,
1168 ":intermittent_delta": i32::from(intermittent_failure) })
1170 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1171 if updated != 1 {
1172 return Err(DbErrorWrite::NotFound);
1173 }
1174 Ok((
1175 appending_version.increment(),
1176 AppendNotifier {
1177 pending_at: Some(NotifierPendingAt {
1178 scheduled_at,
1179 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1180 }),
1181 execution_finished: None,
1182 response: None,
1183 },
1184 ))
1185 }
1186
1187 fn update_state_locked_get_intermittent_event_count(
1188 tx: &Transaction,
1189 execution_id: &ExecutionId,
1190 executor_id: ExecutorId,
1191 run_id: RunId,
1192 lock_expires_at: DateTime<Utc>,
1193 appending_version: &Version,
1194 ) -> Result<u32, DbErrorWrite> {
1195 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1196 let execution_id_str = execution_id.to_string();
1197 let mut stmt = tx
1198 .prepare_cached(
1199 r"
1200 UPDATE t_state
1201 SET
1202 corresponding_version = :appending_version,
1203 pending_expires_finished = :pending_expires_finished,
1204 state = :state,
1205 updated_at = CURRENT_TIMESTAMP,
1206
1207 last_lock_version = :appending_version,
1208 executor_id = :executor_id,
1209 run_id = :run_id,
1210
1211 join_set_id = NULL,
1212 join_set_closing = NULL,
1213
1214 result_kind = NULL
1215 WHERE execution_id = :execution_id
1216 ",
1217 )
1218 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1219 let updated = stmt
1220 .execute(named_params! {
1221 ":execution_id": execution_id_str,
1222 ":appending_version": appending_version.0,
1223 ":pending_expires_finished": lock_expires_at,
1224 ":state": STATE_LOCKED,
1225 ":executor_id": executor_id.to_string(),
1226 ":run_id": run_id.to_string(),
1227 })
1228 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1229 if updated != 1 {
1230 return Err(DbErrorWrite::NotFound);
1231 }
1232
1233 let intermittent_event_count = tx
1235 .prepare(
1236 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1237 )?
1238 .query_row(
1239 named_params! {
1240 ":execution_id": execution_id_str,
1241 },
1242 |row| {
1243 let intermittent_event_count = row.get("intermittent_event_count")?;
1244 Ok(intermittent_event_count)
1245 },
1246 )
1247 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1248
1249 Ok(intermittent_event_count)
1250 }
1251
1252 fn update_state_blocked(
1253 tx: &Transaction,
1254 execution_id: &ExecutionId,
1255 appending_version: &Version,
1256 join_set_id: &JoinSetId,
1258 lock_expires_at: DateTime<Utc>,
1259 join_set_closing: bool,
1260 ) -> Result<
1261 AppendResponse, DbErrorWrite,
1263 > {
1264 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1265 let execution_id_str = execution_id.to_string();
1266 let mut stmt = tx.prepare_cached(
1267 r"
1268 UPDATE t_state
1269 SET
1270 corresponding_version = :appending_version,
1271 pending_expires_finished = :pending_expires_finished,
1272 state = :state,
1273 updated_at = CURRENT_TIMESTAMP,
1274
1275 last_lock_version = NULL,
1276 executor_id = NULL,
1277 run_id = NULL,
1278
1279 join_set_id = :join_set_id,
1280 join_set_closing = :join_set_closing,
1281
1282 result_kind = NULL
1283 WHERE execution_id = :execution_id
1284 ",
1285 )?;
1286 let updated = stmt.execute(named_params! {
1287 ":execution_id": execution_id_str,
1288 ":appending_version": appending_version.0,
1289 ":pending_expires_finished": lock_expires_at,
1290 ":state": STATE_BLOCKED_BY_JOIN_SET,
1291 ":join_set_id": join_set_id,
1292 ":join_set_closing": join_set_closing,
1293 })?;
1294 if updated != 1 {
1295 return Err(DbErrorWrite::NotFound);
1296 }
1297 Ok(appending_version.increment())
1298 }
1299
1300 fn update_state_finished(
1301 tx: &Transaction,
1302 execution_id: &ExecutionId,
1303 appending_version: &Version,
1304 finished_at: DateTime<Utc>,
1306 result_kind: PendingStateFinishedResultKind,
1307 ) -> Result<(), DbErrorWrite> {
1308 debug!("Setting t_state to Finished");
1309 let execution_id_str = execution_id.to_string();
1310 let mut stmt = tx.prepare_cached(
1311 r"
1312 UPDATE t_state
1313 SET
1314 corresponding_version = :appending_version,
1315 pending_expires_finished = :pending_expires_finished,
1316 state = :state,
1317 updated_at = CURRENT_TIMESTAMP,
1318
1319 last_lock_version = NULL,
1320 executor_id = NULL,
1321 run_id = NULL,
1322
1323 join_set_id = NULL,
1324 join_set_closing = NULL,
1325
1326 result_kind = :result_kind
1327 WHERE execution_id = :execution_id
1328 ",
1329 )?;
1330 let updated = stmt.execute(named_params! {
1331 ":execution_id": execution_id_str,
1332 ":appending_version": appending_version.0,
1333 ":pending_expires_finished": finished_at,
1334 ":state": STATE_FINISHED,
1335 ":result_kind": result_kind.to_string(),
1336 })?;
1337 if updated != 1 {
1338 return Err(DbErrorWrite::NotFound);
1339 }
1340 Ok(())
1341 }
1342
1343 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1345 fn bump_state_next_version(
1346 tx: &Transaction,
1347 execution_id: &ExecutionId,
1348 appending_version: &Version,
1349 delay_req: Option<DelayReq>,
1350 ) -> Result<AppendResponse , DbErrorWrite> {
1351 debug!("update_index_version");
1352 let execution_id_str = execution_id.to_string();
1353 let mut stmt = tx.prepare_cached(
1354 r"
1355 UPDATE t_state
1356 SET
1357 corresponding_version = :appending_version,
1358 updated_at = CURRENT_TIMESTAMP
1359 WHERE execution_id = :execution_id
1360 ",
1361 )?;
1362 let updated = stmt.execute(named_params! {
1363 ":execution_id": execution_id_str,
1364 ":appending_version": appending_version.0,
1365 })?;
1366 if updated != 1 {
1367 return Err(DbErrorWrite::NotFound);
1368 }
1369 if let Some(DelayReq {
1370 join_set_id,
1371 delay_id,
1372 expires_at,
1373 }) = delay_req
1374 {
1375 debug!("Inserting delay to `t_delay`");
1376 let mut stmt = tx.prepare_cached(
1377 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1378 VALUES \
1379 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1380 )?;
1381 stmt.execute(named_params! {
1382 ":execution_id": execution_id_str,
1383 ":join_set_id": join_set_id.to_string(),
1384 ":delay_id": delay_id.to_string(),
1385 ":expires_at": expires_at,
1386 })?;
1387 }
1388 Ok(appending_version.increment())
1389 }
1390
1391 fn get_combined_state(
1392 tx: &Transaction,
1393 execution_id: &ExecutionId,
1394 ) -> Result<CombinedState, DbErrorRead> {
1395 let mut stmt = tx.prepare(
1396 r"
1397 SELECT
1398 state, ffqn, corresponding_version, pending_expires_finished,
1399 last_lock_version, executor_id, run_id,
1400 join_set_id, join_set_closing,
1401 result_kind
1402 FROM t_state
1403 WHERE
1404 execution_id = :execution_id
1405 ",
1406 )?;
1407 stmt.query_row(
1408 named_params! {
1409 ":execution_id": execution_id.to_string(),
1410 },
1411 |row| {
1412 CombinedState::new(
1413 &CombinedStateDTO {
1414 state: row.get("state")?,
1415 ffqn: row.get("ffqn")?,
1416 pending_expires_finished: row
1417 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1418 last_lock_version: row
1419 .get::<_, Option<VersionType>>("last_lock_version")?
1420 .map(Version::new),
1421 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1422 run_id: row.get::<_, Option<RunId>>("run_id")?,
1423 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1424 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1425 result_kind: row
1426 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1427 "result_kind",
1428 )?
1429 .map(|wrapper| wrapper.0),
1430 },
1431 Version::new(row.get("corresponding_version")?),
1432 )
1433 },
1434 )
1435 .map_err(DbErrorRead::from)
1436 }
1437
1438 fn list_executions(
1439 read_tx: &Transaction,
1440 ffqn: Option<&FunctionFqn>,
1441 top_level_only: bool,
1442 pagination: &ExecutionListPagination,
1443 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1444 struct StatementModifier<'a> {
1445 where_vec: Vec<String>,
1446 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1447 limit: u32,
1448 limit_desc: bool,
1449 }
1450
1451 fn paginate<'a, T: rusqlite::ToSql + 'static>(
1452 pagination: &'a Pagination<Option<T>>,
1453 column: &str,
1454 top_level_only: bool,
1455 ) -> StatementModifier<'a> {
1456 let mut where_vec: Vec<String> = vec![];
1457 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1458 let limit = pagination.length();
1459 let limit_desc = pagination.is_desc();
1460 let rel = pagination.rel();
1461 match pagination {
1462 Pagination::NewerThan {
1463 cursor: Some(cursor),
1464 ..
1465 }
1466 | Pagination::OlderThan {
1467 cursor: Some(cursor),
1468 ..
1469 } => {
1470 where_vec.push(format!("{column} {rel} :cursor"));
1471 params.push((":cursor", cursor.to_sql().expect("FIXME")));
1472 }
1473 _ => {}
1474 }
1475 if top_level_only {
1476 where_vec.push("is_top_level=true".to_string());
1477 }
1478 StatementModifier {
1479 where_vec,
1480 params,
1481 limit,
1482 limit_desc,
1483 }
1484 }
1485
1486 let mut statement_mod = match pagination {
1487 ExecutionListPagination::CreatedBy(pagination) => {
1488 paginate(pagination, "created_at", top_level_only)
1489 }
1490 ExecutionListPagination::ExecutionId(pagination) => {
1491 paginate(pagination, "execution_id", top_level_only)
1492 }
1493 };
1494
1495 let ffqn_temporary;
1496 if let Some(ffqn) = ffqn {
1497 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1498 ffqn_temporary = ffqn.to_string();
1499 let ffqn = ffqn_temporary
1500 .to_sql()
1501 .expect("string conversion never fails");
1502
1503 statement_mod.params.push((":ffqn", ffqn));
1504 }
1505
1506 let where_str = if statement_mod.where_vec.is_empty() {
1507 String::new()
1508 } else {
1509 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1510 };
1511 let sql = format!(
1512 r"
1513 SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1514 last_lock_version, executor_id, run_id,
1515 join_set_id, join_set_closing,
1516 result_kind
1517 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1518 ",
1519 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1520 limit = statement_mod.limit,
1521 );
1522 let mut vec: Vec<_> = read_tx
1523 .prepare(&sql)?
1524 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1525 statement_mod
1526 .params
1527 .into_iter()
1528 .collect::<Vec<_>>()
1529 .as_ref(),
1530 |row| {
1531 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1532 let created_at = row.get("created_at")?;
1533 let scheduled_at = row.get("scheduled_at")?;
1534 let combined_state = CombinedState::new(
1535 &CombinedStateDTO {
1536 state: row.get("state")?,
1537 ffqn: row.get("ffqn")?,
1538 pending_expires_finished: row
1539 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1540 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1541
1542 last_lock_version: row
1543 .get::<_, Option<VersionType>>("last_lock_version")?
1544 .map(Version::new),
1545 run_id: row.get::<_, Option<RunId>>("run_id")?,
1546 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1547 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1548 result_kind: row
1549 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1550 "result_kind",
1551 )?
1552 .map(|wrapper| wrapper.0),
1553 },
1554 Version::new(row.get("corresponding_version")?),
1555 )?;
1556 Ok(ExecutionWithState {
1557 execution_id,
1558 ffqn: combined_state.ffqn,
1559 pending_state: combined_state.pending_state,
1560 created_at,
1561 scheduled_at,
1562 })
1563 },
1564 )?
1565 .collect::<Vec<Result<_, _>>>()
1566 .into_iter()
1567 .filter_map(|row| match row {
1568 Ok(row) => Some(row),
1569 Err(err) => {
1570 warn!("Skipping row - {err:?}");
1571 None
1572 }
1573 })
1574 .collect();
1575
1576 if !statement_mod.limit_desc {
1577 vec.reverse();
1579 }
1580 Ok(vec)
1581 }
1582
1583 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1584 #[expect(clippy::too_many_arguments)]
1585 fn lock_single_execution(
1586 tx: &Transaction,
1587 created_at: DateTime<Utc>,
1588 component_id: &ComponentId,
1589 execution_id: &ExecutionId,
1590 run_id: RunId,
1591 appending_version: &Version,
1592 executor_id: ExecutorId,
1593 lock_expires_at: DateTime<Utc>,
1594 ) -> Result<LockedExecution, DbErrorWrite> {
1595 debug!("lock_inner");
1596 let combined_state = Self::get_combined_state(tx, execution_id)?;
1597 combined_state.pending_state.can_append_lock(
1598 created_at,
1599 executor_id,
1600 run_id,
1601 lock_expires_at,
1602 )?;
1603 let expected_version = combined_state.get_next_version_assert_not_finished();
1604 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1605
1606 let event = ExecutionEventInner::Locked {
1608 component_id: component_id.clone(),
1609 executor_id,
1610 lock_expires_at,
1611 run_id,
1612 };
1613 let event_ser = serde_json::to_string(&event).map_err(|err| {
1614 warn!("Cannot serialize {event:?} - {err:?}");
1615 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1616 })?;
1617 let mut stmt = tx
1618 .prepare_cached(
1619 "INSERT INTO t_execution_log \
1620 (execution_id, created_at, json_value, version, variant) \
1621 VALUES \
1622 (:execution_id, :created_at, :json_value, :version, :variant)",
1623 )
1624 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1625 stmt.execute(named_params! {
1626 ":execution_id": execution_id.to_string(),
1627 ":created_at": created_at,
1628 ":json_value": event_ser,
1629 ":version": appending_version.0,
1630 ":variant": event.variant(),
1631 })
1632 .map_err(|err| {
1633 warn!("Cannot lock execution - {err:?}");
1634 DbErrorWritePermanent::CannotWrite {
1635 reason: "cannot lock execution".into(),
1636 expected_version: None,
1637 }
1638 })?;
1639
1640 let responses = Self::list_responses(tx, execution_id, None)?;
1642 let responses = responses.into_iter().map(|resp| resp.event).collect();
1643 trace!("Responses: {responses:?}");
1644
1645 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1646 tx,
1647 execution_id,
1648 executor_id,
1649 run_id,
1650 lock_expires_at,
1651 appending_version,
1652 )?;
1653 let mut events = tx
1655 .prepare(
1656 "SELECT json_value FROM t_execution_log WHERE \
1657 execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1658 ORDER BY version",
1659 )?
1660 .query_map(
1661 named_params! {
1662 ":execution_id": execution_id.to_string(),
1663 ":v1": DUMMY_CREATED.variant(),
1664 ":v2": DUMMY_HISTORY_EVENT.variant(),
1665 },
1666 |row| {
1667 row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1668 .map(|wrapper| wrapper.0)
1669 .map_err(|serde| {
1670 error!("Cannot deserialize {row:?} - {serde:?}");
1671 consistency_rusqlite("cannot deserialize json value")
1672 })
1673 },
1674 )?
1675 .collect::<Result<Vec<_>, _>>()?
1676 .into_iter()
1677 .collect::<VecDeque<_>>();
1678 let Some(ExecutionEventInner::Created {
1679 ffqn,
1680 params,
1681 retry_exp_backoff,
1682 max_retries,
1683 parent,
1684 metadata,
1685 ..
1686 }) = events.pop_front()
1687 else {
1688 error!("Execution log must contain at least `Created` event");
1689 return Err(consistency_db_err("execution log must contain `Created` event").into());
1690 };
1691
1692 let event_history = events
1693 .into_iter()
1694 .map(|event| {
1695 if let ExecutionEventInner::HistoryEvent { event } = event {
1696 Ok(event)
1697 } else {
1698 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1699 Err(consistency_db_err(
1700 "rows can only contain `Created` and `HistoryEvent` event kinds",
1701 ))
1702 }
1703 })
1704 .collect::<Result<Vec<_>, _>>()?;
1705 Ok(LockedExecution {
1706 execution_id: execution_id.clone(),
1707 metadata,
1708 run_id,
1709 next_version: appending_version.increment(),
1710 ffqn,
1711 params,
1712 event_history,
1713 responses,
1714 retry_exp_backoff,
1715 max_retries,
1716 parent,
1717 intermittent_event_count,
1718 })
1719 }
1720
1721 fn count_join_next(
1722 tx: &Transaction,
1723 execution_id: &ExecutionId,
1724 join_set_id: &JoinSetId,
1725 ) -> Result<u64, DbErrorRead> {
1726 let mut stmt = tx.prepare(
1727 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1728 AND history_event_type = :join_next",
1729 )?;
1730 Ok(stmt.query_row(
1731 named_params! {
1732 ":execution_id": execution_id.to_string(),
1733 ":join_set_id": join_set_id.to_string(),
1734 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1735 },
1736 |row| row.get("count"),
1737 )?)
1738 }
1739
1740 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1741 #[expect(clippy::needless_return)]
1742 fn append(
1743 tx: &Transaction,
1744 execution_id: &ExecutionId,
1745 req: AppendRequest,
1746 appending_version: Version,
1747 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1748 if matches!(req.event, ExecutionEventInner::Created { .. }) {
1749 return Err(DbErrorWrite::Permanent(
1750 DbErrorWritePermanent::ValidationFailed(
1751 "cannot append `Created` event - use `create` instead".into(),
1752 ),
1753 ));
1754 }
1755 if let AppendRequest {
1756 event:
1757 ExecutionEventInner::Locked {
1758 component_id,
1759 executor_id,
1760 run_id,
1761 lock_expires_at,
1762 },
1763 created_at,
1764 } = req
1765 {
1766 return Self::lock_single_execution(
1767 tx,
1768 created_at,
1769 &component_id,
1770 execution_id,
1771 run_id,
1772 &appending_version,
1773 executor_id,
1774 lock_expires_at,
1775 )
1776 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1777 }
1778
1779 let combined_state = Self::get_combined_state(tx, execution_id)?;
1780 if combined_state.pending_state.is_finished() {
1781 debug!("Execution is already finished");
1782 return Err(DbErrorWrite::Permanent(
1783 DbErrorWritePermanent::CannotWrite {
1784 reason: "already finished".into(),
1785 expected_version: None,
1786 },
1787 ));
1788 }
1789
1790 Self::check_expected_next_and_appending_version(
1791 &combined_state.get_next_version_assert_not_finished(),
1792 &appending_version,
1793 )?;
1794 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1795 error!("Cannot serialize {:?} - {err:?}", req.event);
1796 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1797 })?;
1798
1799 let mut stmt = tx.prepare(
1800 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1801 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1802 ?;
1803 stmt.execute(named_params! {
1804 ":execution_id": execution_id.to_string(),
1805 ":created_at": req.created_at,
1806 ":json_value": event_ser,
1807 ":version": appending_version.0,
1808 ":variant": req.event.variant(),
1809 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1810 })?;
1811 match &req.event {
1814 ExecutionEventInner::Created { .. } => {
1815 unreachable!("handled in the caller")
1816 }
1817
1818 ExecutionEventInner::Locked { .. } => {
1819 unreachable!("handled above")
1820 }
1821
1822 ExecutionEventInner::TemporarilyFailed {
1823 backoff_expires_at, ..
1824 }
1825 | ExecutionEventInner::TemporarilyTimedOut {
1826 backoff_expires_at, ..
1827 } => {
1828 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1829 tx,
1830 execution_id,
1831 &appending_version,
1832 *backoff_expires_at,
1833 true, )?;
1835 return Ok((next_version, notifier));
1836 }
1837
1838 ExecutionEventInner::Unlocked {
1839 backoff_expires_at, ..
1840 } => {
1841 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1842 tx,
1843 execution_id,
1844 &appending_version,
1845 *backoff_expires_at,
1846 false, )?;
1848 return Ok((next_version, notifier));
1849 }
1850
1851 ExecutionEventInner::Finished { result, .. } => {
1852 Self::update_state_finished(
1853 tx,
1854 execution_id,
1855 &appending_version,
1856 req.created_at,
1857 PendingStateFinishedResultKind::from(result),
1858 )?;
1859 return Ok((
1860 appending_version,
1861 AppendNotifier {
1862 pending_at: None,
1863 execution_finished: Some(NotifierExecutionFinished {
1864 execution_id: execution_id.clone(),
1865 retval: result.clone(),
1866 }),
1867 response: None,
1868 },
1869 ));
1870 }
1871
1872 ExecutionEventInner::HistoryEvent {
1873 event:
1874 HistoryEvent::JoinSetCreate { .. }
1875 | HistoryEvent::JoinSetRequest {
1876 request: JoinSetRequest::ChildExecutionRequest { .. },
1877 ..
1878 }
1879 | HistoryEvent::Persist { .. }
1880 | HistoryEvent::Schedule { .. }
1881 | HistoryEvent::Stub { .. }
1882 | HistoryEvent::JoinNextTooMany { .. },
1883 } => {
1884 return Ok((
1885 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1886 AppendNotifier::default(),
1887 ));
1888 }
1889
1890 ExecutionEventInner::HistoryEvent {
1891 event:
1892 HistoryEvent::JoinSetRequest {
1893 join_set_id,
1894 request:
1895 JoinSetRequest::DelayRequest {
1896 delay_id,
1897 expires_at,
1898 ..
1899 },
1900 },
1901 } => {
1902 return Ok((
1903 Self::bump_state_next_version(
1904 tx,
1905 execution_id,
1906 &appending_version,
1907 Some(DelayReq {
1908 join_set_id: join_set_id.clone(),
1909 delay_id: delay_id.clone(),
1910 expires_at: *expires_at,
1911 }),
1912 )?,
1913 AppendNotifier::default(),
1914 ));
1915 }
1916
1917 ExecutionEventInner::HistoryEvent {
1918 event:
1919 HistoryEvent::JoinNext {
1920 join_set_id,
1921 run_expires_at,
1922 closing,
1923 requested_ffqn: _,
1924 },
1925 } => {
1926 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1928 let nth_response =
1929 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1931 assert!(join_next_count > 0);
1932 if let Some(ResponseWithCursor {
1933 event:
1934 JoinSetResponseEventOuter {
1935 created_at: nth_created_at,
1936 ..
1937 },
1938 cursor: _,
1939 }) = nth_response
1940 {
1941 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1943 tx,
1944 execution_id,
1945 &appending_version,
1946 scheduled_at,
1947 false, )?;
1949 return Ok((next_version, notifier));
1950 }
1951 return Ok((
1952 Self::update_state_blocked(
1953 tx,
1954 execution_id,
1955 &appending_version,
1956 join_set_id,
1957 *run_expires_at,
1958 *closing,
1959 )?,
1960 AppendNotifier::default(),
1961 ));
1962 }
1963 }
1964 }
1965
1966 fn append_response(
1967 tx: &Transaction,
1968 execution_id: &ExecutionId,
1969 response_outer: JoinSetResponseEventOuter,
1970 ) -> Result<AppendNotifier, DbErrorWrite> {
1971 let mut stmt = tx.prepare(
1972 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1973 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1974 )?;
1975 let join_set_id = &response_outer.event.join_set_id;
1976 let delay_id = match &response_outer.event.event {
1977 JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1978 JoinSetResponse::ChildExecutionFinished { .. } => None,
1979 };
1980 let (child_execution_id, finished_version) = match &response_outer.event.event {
1981 JoinSetResponse::ChildExecutionFinished {
1982 child_execution_id,
1983 finished_version,
1984 result: _,
1985 } => (
1986 Some(child_execution_id.to_string()),
1987 Some(finished_version.0),
1988 ),
1989 JoinSetResponse::DelayFinished { .. } => (None, None),
1990 };
1991
1992 stmt.execute(named_params! {
1993 ":execution_id": execution_id.to_string(),
1994 ":created_at": response_outer.created_at,
1995 ":join_set_id": join_set_id.to_string(),
1996 ":delay_id": delay_id,
1997 ":child_execution_id": child_execution_id,
1998 ":finished_version": finished_version,
1999 })?;
2000
2001 let combined_state = Self::get_combined_state(tx, execution_id)?;
2003 debug!("previous_pending_state: {combined_state:?}");
2004 let mut notifier = if let PendingState::BlockedByJoinSet {
2005 join_set_id: found_join_set_id,
2006 lock_expires_at, closing: _,
2008 } = combined_state.pending_state
2009 && *join_set_id == found_join_set_id
2010 {
2011 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2014 Self::update_state_pending_after_response_appended(
2017 tx,
2018 execution_id,
2019 scheduled_at,
2020 &combined_state.corresponding_version, )?
2022 } else {
2023 AppendNotifier::default()
2024 };
2025 if let JoinSetResponseEvent {
2026 join_set_id,
2027 event: JoinSetResponse::DelayFinished { delay_id },
2028 } = &response_outer.event
2029 {
2030 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2031 let mut stmt =
2032 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2033 ?;
2034 stmt.execute(named_params! {
2035 ":execution_id": execution_id.to_string(),
2036 ":join_set_id": join_set_id.to_string(),
2037 ":delay_id": delay_id.to_string(),
2038 })?;
2039 }
2040 notifier.response = Some((execution_id.clone(), response_outer));
2041 Ok(notifier)
2042 }
2043
2044 fn append_backtrace(
2045 tx: &Transaction,
2046 backtrace_info: &BacktraceInfo,
2047 ) -> Result<(), DbErrorWrite> {
2048 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2049 warn!(
2050 "Cannot serialize backtrace {:?} - {err:?}",
2051 backtrace_info.wasm_backtrace
2052 );
2053 DbErrorWritePermanent::ValidationFailed("cannot serialize backtrace".into())
2054 })?;
2055 let mut stmt = tx
2056 .prepare(
2057 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2058 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2059 )
2060 ?;
2061 stmt.execute(named_params! {
2062 ":execution_id": backtrace_info.execution_id.to_string(),
2063 ":component_id": backtrace_info.component_id.to_string(),
2064 ":version_min_including": backtrace_info.version_min_including.0,
2065 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2066 ":wasm_backtrace": backtrace,
2067 })?;
2068 Ok(())
2069 }
2070
2071 #[cfg(feature = "test")]
2072 fn get(
2073 tx: &Transaction,
2074 execution_id: &ExecutionId,
2075 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2076 let mut stmt = tx.prepare(
2077 "SELECT created_at, json_value FROM t_execution_log WHERE \
2078 execution_id = :execution_id ORDER BY version",
2079 )?;
2080 let events = stmt
2081 .query_map(
2082 named_params! {
2083 ":execution_id": execution_id.to_string(),
2084 },
2085 |row| {
2086 let created_at = row.get("created_at")?;
2087 let event = row
2088 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2089 .map_err(|serde| {
2090 error!("Cannot deserialize {row:?} - {serde:?}");
2091 consistency_rusqlite("cannot deserialize event")
2092 })?
2093 .0;
2094
2095 Ok(ExecutionEvent {
2096 created_at,
2097 event,
2098 backtrace_id: None,
2099 })
2100 },
2101 )?
2102 .collect::<Result<Vec<_>, _>>()?;
2103 if events.is_empty() {
2104 return Err(DbErrorRead::NotFound);
2105 }
2106 let combined_state = Self::get_combined_state(tx, execution_id)?;
2107 let responses = Self::list_responses(tx, execution_id, None)?
2108 .into_iter()
2109 .map(|resp| resp.event)
2110 .collect();
2111 Ok(concepts::storage::ExecutionLog {
2112 execution_id: execution_id.clone(),
2113 events,
2114 responses,
2115 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2117 })
2118 }
2119
2120 fn list_execution_events(
2121 tx: &Transaction,
2122 execution_id: &ExecutionId,
2123 version_min: VersionType,
2124 version_max_excluding: VersionType,
2125 include_backtrace_id: bool,
2126 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2127 let select = if include_backtrace_id {
2128 "SELECT
2129 log.created_at,
2130 log.json_value,
2131 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2132 bt.version_min_including AS backtrace_id
2133 FROM
2134 t_execution_log AS log
2135 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2136 t_backtrace AS bt ON log.execution_id = bt.execution_id
2137 -- Check if the log's version falls within the backtrace's range
2138 AND log.version >= bt.version_min_including
2139 AND log.version < bt.version_max_excluding
2140 WHERE
2141 log.execution_id = :execution_id
2142 AND log.version >= :version_min
2143 AND log.version < :version_max_excluding
2144 ORDER BY
2145 log.version;"
2146 } else {
2147 "SELECT
2148 created_at, json_value, NULL as backtrace_id
2149 FROM t_execution_log WHERE
2150 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2151 ORDER BY version"
2152 };
2153 tx.prepare(select)?
2154 .query_map(
2155 named_params! {
2156 ":execution_id": execution_id.to_string(),
2157 ":version_min": version_min,
2158 ":version_max_excluding": version_max_excluding
2159 },
2160 |row| {
2161 let created_at = row.get("created_at")?;
2162 let backtrace_id = row
2163 .get::<_, Option<VersionType>>("backtrace_id")?
2164 .map(Version::new);
2165 let event = row
2166 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2167 .map(|event| ExecutionEvent {
2168 created_at,
2169 event: event.0,
2170 backtrace_id,
2171 })
2172 .map_err(|serde| {
2173 error!("Cannot deserialize {row:?} - {serde:?}");
2174 consistency_rusqlite("cannot deserialize")
2175 })?;
2176 Ok(event)
2177 },
2178 )?
2179 .collect::<Result<Vec<_>, _>>()
2180 .map_err(DbErrorRead::from)
2181 }
2182
2183 fn get_execution_event(
2184 tx: &Transaction,
2185 execution_id: &ExecutionId,
2186 version: VersionType,
2187 ) -> Result<ExecutionEvent, DbErrorRead> {
2188 let mut stmt = tx.prepare(
2189 "SELECT created_at, json_value FROM t_execution_log WHERE \
2190 execution_id = :execution_id AND version = :version",
2191 )?;
2192 stmt.query_row(
2193 named_params! {
2194 ":execution_id": execution_id.to_string(),
2195 ":version": version,
2196 },
2197 |row| {
2198 let created_at = row.get("created_at")?;
2199 let event = row
2200 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2201 .map_err(|serde| {
2202 error!("Cannot deserialize {row:?} - {serde:?}");
2203 consistency_rusqlite("cannot deserialize event")
2204 })?;
2205
2206 Ok(ExecutionEvent {
2207 created_at,
2208 event: event.0,
2209 backtrace_id: None,
2210 })
2211 },
2212 )
2213 .map_err(DbErrorRead::from)
2214 }
2215
2216 fn list_responses(
2217 tx: &Transaction,
2218 execution_id: &ExecutionId,
2219 pagination: Option<Pagination<u32>>,
2220 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2221 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2223 let mut sql = "SELECT \
2224 r.id, r.created_at, r.join_set_id, r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2225 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2226 WHERE \
2227 r.execution_id = :execution_id \
2228 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2229 "
2230 .to_string();
2231 let limit = match &pagination {
2232 Some(
2233 pagination @ (Pagination::NewerThan { cursor, .. }
2234 | Pagination::OlderThan { cursor, .. }),
2235 ) => {
2236 params.push((":cursor", Box::new(cursor)));
2237 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2238 Some(pagination.length())
2239 }
2240 None => None,
2241 };
2242 sql.push_str(" ORDER BY id");
2243 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2244 sql.push_str(" DESC");
2245 }
2246 if let Some(limit) = limit {
2247 write!(sql, " LIMIT {limit}").unwrap();
2248 }
2249 params.push((":execution_id", Box::new(execution_id.to_string())));
2250 tx.prepare(&sql)?
2251 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2252 params
2253 .iter()
2254 .map(|(key, value)| (*key, value.as_ref()))
2255 .collect::<Vec<_>>()
2256 .as_ref(),
2257 Self::parse_response_with_cursor,
2258 )?
2259 .collect::<Result<Vec<_>, rusqlite::Error>>()
2260 .map_err(DbErrorRead::from)
2261 }
2262
2263 fn parse_response_with_cursor(
2264 row: &rusqlite::Row<'_>,
2265 ) -> Result<ResponseWithCursor, rusqlite::Error> {
2266 let id = row.get("id")?;
2267 let created_at: DateTime<Utc> = row.get("created_at")?;
2268 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2269 let event = match (
2270 row.get::<_, Option<DelayId>>("delay_id")?,
2271 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2272 row.get::<_, Option<VersionType>>("finished_version")?,
2273 row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2274 ) {
2275 (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2276 (
2277 None,
2278 Some(child_execution_id),
2279 Some(finished_version),
2280 Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2281 ) => JoinSetResponse::ChildExecutionFinished {
2282 child_execution_id,
2283 finished_version: Version(finished_version),
2284 result,
2285 },
2286 (delay, child, finished, result) => {
2287 error!(
2288 "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2289 delay,
2290 result.map(|it| it.0)
2291 );
2292 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2293 }
2294 };
2295 Ok(ResponseWithCursor {
2296 cursor: id,
2297 event: JoinSetResponseEventOuter {
2298 event: JoinSetResponseEvent { join_set_id, event },
2299 created_at,
2300 },
2301 })
2302 }
2303
2304 fn nth_response(
2305 tx: &Transaction,
2306 execution_id: &ExecutionId,
2307 join_set_id: &JoinSetId,
2308 skip_rows: u64,
2309 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2310 tx
2312 .prepare(
2313 "SELECT r.id, r.created_at, r.join_set_id, \
2314 r.delay_id, \
2315 r.child_execution_id, r.finished_version, l.json_value \
2316 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2317 WHERE \
2318 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2319 (
2320 r.finished_version = l.version \
2321 OR \
2322 r.child_execution_id IS NULL \
2323 ) \
2324 ORDER BY id \
2325 LIMIT 1 OFFSET :offset",
2326 )
2327 ?
2328 .query_row(
2329 named_params! {
2330 ":execution_id": execution_id.to_string(),
2331 ":join_set_id": join_set_id.to_string(),
2332 ":offset": skip_rows,
2333 },
2334 Self::parse_response_with_cursor,
2335 )
2336 .optional()
2337 .map_err(DbErrorRead::from)
2338 }
2339
2340 #[instrument(level = Level::TRACE, skip_all)]
2342 fn get_responses_with_offset(
2343 tx: &Transaction,
2344 execution_id: &ExecutionId,
2345 skip_rows: usize,
2346 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2347 tx.prepare(
2349 "SELECT r.id, r.created_at, r.join_set_id, \
2350 r.delay_id, \
2351 r.child_execution_id, r.finished_version, l.json_value \
2352 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2353 WHERE \
2354 r.execution_id = :execution_id AND \
2355 ( \
2356 r.finished_version = l.version \
2357 OR r.child_execution_id IS NULL \
2358 ) \
2359 ORDER BY id \
2360 LIMIT -1 OFFSET :offset",
2361 )
2362 ?
2363 .query_map(
2364 named_params! {
2365 ":execution_id": execution_id.to_string(),
2366 ":offset": skip_rows,
2367 },
2368 Self::parse_response_with_cursor,
2369 )
2370 ?
2371 .collect::<Result<Vec<_>, _>>()
2372 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2373 .map_err(DbErrorRead::from)
2374 }
2375
2376 fn get_pending_of_single_ffqn(
2377 mut stmt: CachedStatement,
2378 batch_size: usize,
2379 pending_at_or_sooner: DateTime<Utc>,
2380 ffqn: &FunctionFqn,
2381 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2382 stmt.query_map(
2383 named_params! {
2384 ":pending_expires_finished": pending_at_or_sooner,
2385 ":ffqn": ffqn.to_string(),
2386 ":batch_size": batch_size,
2387 },
2388 |row| {
2389 let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2390 let next_version =
2391 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2392 Ok(execution_id.map(|exe| (exe, next_version)))
2393 },
2394 )
2395 .map_err(|err| {
2396 warn!("Ignoring consistency error {err:?}");
2397 })?
2398 .collect::<Result<Vec<_>, _>>()
2399 .map_err(|err| {
2400 warn!("Ignoring consistency error {err:?}");
2401 })?
2402 .into_iter()
2403 .collect::<Result<Vec<_>, _>>()
2404 .map_err(|err| {
2405 warn!("Ignoring consistency error {err:?}");
2406 })
2407 }
2408
2409 fn get_pending(
2411 conn: &Connection,
2412 batch_size: usize,
2413 pending_at_or_sooner: DateTime<Utc>,
2414 ffqns: &[FunctionFqn],
2415 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2416 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2417 for ffqn in ffqns {
2418 let stmt = conn
2420 .prepare_cached(&format!(
2421 r#"
2422 SELECT execution_id, corresponding_version FROM t_state WHERE
2423 state = "{STATE_PENDING_AT}" AND
2424 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2425 ORDER BY pending_expires_finished LIMIT :batch_size
2426 "#
2427 ))
2428 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2429
2430 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2431 stmt,
2432 batch_size - execution_ids_versions.len(),
2433 pending_at_or_sooner,
2434 ffqn,
2435 ) {
2436 execution_ids_versions.extend(execs_and_versions);
2437 if execution_ids_versions.len() == batch_size {
2438 break;
2440 }
2441 }
2443 }
2444 Ok(execution_ids_versions)
2445 }
2446
2447 #[instrument(level = Level::TRACE, skip_all)]
2449 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2450 let (pending_ats, finished_execs, responses) = {
2451 let (mut pending_ats, mut finished_execs, mut responses) =
2452 (Vec::new(), Vec::new(), Vec::new());
2453 for notifier in notifiers {
2454 if let Some(pending_at) = notifier.pending_at {
2455 pending_ats.push(pending_at);
2456 }
2457 if let Some(finished) = notifier.execution_finished {
2458 finished_execs.push(finished);
2459 }
2460 if let Some(response) = notifier.response {
2461 responses.push(response);
2462 }
2463 }
2464 (pending_ats, finished_execs, responses)
2465 };
2466
2467 if !pending_ats.is_empty() {
2469 let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2470 for pending_at in pending_ats {
2471 Self::notify_pending_locked(&pending_at, current_time, &guard);
2472 }
2473 }
2474 if !finished_execs.is_empty() {
2477 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2478 for finished in finished_execs {
2479 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2480 for (_tag, sender) in listeners_of_exe_id {
2481 let _ = sender.send(finished.retval.clone());
2484 }
2485 }
2486 }
2487 }
2488 if !responses.is_empty() {
2490 let mut guard = self.0.response_subscribers.lock().unwrap();
2491 for (execution_id, response) in responses {
2492 if let Some((sender, _)) = guard.remove(&execution_id) {
2493 let _ = sender.send(response);
2494 }
2495 }
2496 }
2497 }
2498
2499 fn notify_pending_locked(
2500 notifier: &NotifierPendingAt,
2501 current_time: DateTime<Utc>,
2502 ffqn_to_pending_subscription: &std::sync::MutexGuard<
2503 HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2504 >,
2505 ) {
2506 if notifier.scheduled_at <= current_time
2508 && let Some((subscription, _)) = ffqn_to_pending_subscription.get(¬ifier.ffqn)
2509 {
2510 debug!("Notifying pending subscriber");
2511 let _ = subscription.try_send(());
2513 }
2514 }
2515}
2516
2517#[async_trait]
2518impl DbExecutor for SqlitePool {
2519 #[instrument(level = Level::TRACE, skip(self))]
2520 async fn lock_pending(
2521 &self,
2522 batch_size: usize,
2523 pending_at_or_sooner: DateTime<Utc>,
2524 ffqns: Arc<[FunctionFqn]>,
2525 created_at: DateTime<Utc>,
2526 component_id: ComponentId,
2527 executor_id: ExecutorId,
2528 lock_expires_at: DateTime<Utc>,
2529 run_id: RunId,
2530 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2531 let execution_ids_versions = self
2532 .transaction(
2533 move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2534 "get_pending",
2535 )
2536 .await?;
2537 if execution_ids_versions.is_empty() {
2538 Ok(vec![])
2539 } else {
2540 debug!("Locking {execution_ids_versions:?}");
2541 self.transaction(
2542 move |tx| {
2543 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2544 for (execution_id, version) in &execution_ids_versions {
2546 match Self::lock_single_execution(
2547 tx,
2548 created_at,
2549 &component_id,
2550 execution_id,
2551 run_id,
2552 version,
2553 executor_id,
2554 lock_expires_at,
2555 ) {
2556 Ok(locked) => locked_execs.push(locked),
2557 Err(err) => {
2558 warn!("Locking row {execution_id} failed - {err:?}");
2559 }
2560 }
2561 }
2562 Ok(locked_execs)
2563 },
2564 "lock_pending",
2565 )
2566 .await
2567 }
2568 }
2569
2570 #[instrument(level = Level::DEBUG, skip(self))]
2571 async fn lock_one(
2572 &self,
2573 created_at: DateTime<Utc>,
2574 component_id: ComponentId,
2575 execution_id: &ExecutionId,
2576 run_id: RunId,
2577 version: Version,
2578 executor_id: ExecutorId,
2579 lock_expires_at: DateTime<Utc>,
2580 ) -> Result<LockedExecution, DbErrorWrite> {
2581 debug!(%execution_id, "lock_extend");
2582 let execution_id = execution_id.clone();
2583 self.transaction(
2584 move |tx| {
2585 Self::lock_single_execution(
2586 tx,
2587 created_at,
2588 &component_id,
2589 &execution_id,
2590 run_id,
2591 &version,
2592 executor_id,
2593 lock_expires_at,
2594 )
2595 },
2596 "lock_inner",
2597 )
2598 .await
2599 }
2600
2601 #[instrument(level = Level::DEBUG, skip(self, req))]
2602 async fn append(
2603 &self,
2604 execution_id: ExecutionId,
2605 version: Version,
2606 req: AppendRequest,
2607 ) -> Result<AppendResponse, DbErrorWrite> {
2608 debug!(%req, "append");
2609 trace!(?req, "append");
2610 let created_at = req.created_at;
2611 let (version, notifier) = self
2612 .transaction(
2613 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2614 "append",
2615 )
2616 .await?;
2617 self.notify_all(vec![notifier], created_at);
2618 Ok(version)
2619 }
2620
2621 #[instrument(level = Level::DEBUG, skip_all)]
2622 async fn append_batch_respond_to_parent(
2623 &self,
2624 events: AppendEventsToExecution,
2625 response: AppendResponseToExecution,
2626 current_time: DateTime<Utc>,
2627 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2628 debug!("append_batch_respond_to_parent");
2629 if events.execution_id == response.parent_execution_id {
2630 return Err(DbErrorWrite::Permanent(
2633 DbErrorWritePermanent::ValidationFailed(
2634 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2635 ),
2636 ));
2637 }
2638 assert!(!events.batch.is_empty(), "Empty batch request"); let (version, notifiers) = {
2640 self.transaction(
2641 move |tx| {
2642 let mut version = events.version.clone();
2643 let mut notifier_of_child = None;
2644 for append_request in &events.batch {
2645 let (v, n) = Self::append(
2646 tx,
2647 &events.execution_id,
2648 append_request.clone(),
2649 version,
2650 )?;
2651 version = v;
2652 notifier_of_child = Some(n);
2653 }
2654
2655 let pending_at_parent = Self::append_response(
2656 tx,
2657 &response.parent_execution_id,
2658 response.parent_response_event.clone(),
2659 )?;
2660 Ok::<_, DbErrorWrite>((
2661 version,
2662 vec![
2663 notifier_of_child.expect("checked that the batch is not empty"),
2664 pending_at_parent,
2665 ],
2666 ))
2667 },
2668 "append_batch_respond_to_parent",
2669 )
2670 .await?
2671 };
2672 self.notify_all(notifiers, current_time);
2673 Ok(version)
2674 }
2675
2676 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2679 async fn wait_for_pending(
2680 &self,
2681 pending_at_or_sooner: DateTime<Utc>,
2682 ffqns: Arc<[FunctionFqn]>,
2683 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2684 ) {
2685 let unique_tag: u64 = rand::random();
2686 let (sender, mut receiver) = mpsc::channel(1); {
2688 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2689 for ffqn in ffqns.as_ref() {
2690 ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2691 }
2692 }
2693 async {
2694 let Ok(execution_ids_versions) = self
2695 .transaction(
2696 {
2697 let ffqns = ffqns.clone();
2698 move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2699 },
2700 "subscribe_to_pending",
2701 )
2702 .await
2703 else {
2704 trace!(
2705 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2706 );
2707 timeout_fut.await;
2708 return;
2709 };
2710 if !execution_ids_versions.is_empty() {
2711 trace!("Not waiting, database already contains new pending executions");
2712 return;
2713 }
2714 tokio::select! { _ = receiver.recv() => {
2716 trace!("Received a notification");
2717 }
2718 () = timeout_fut => {
2719 }
2720 }
2721 }.await;
2722 {
2724 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2725 for ffqn in ffqns.as_ref() {
2726 match ffqn_to_pending_subscription.remove(ffqn) {
2727 Some((_, tag)) if tag == unique_tag => {
2728 }
2730 Some(other) => {
2731 ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2733 }
2734 None => {
2735 }
2737 }
2738 }
2739 }
2740 }
2741}
2742
2743#[async_trait]
2744impl DbConnection for SqlitePool {
2745 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2746 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2747 debug!("create");
2748 trace!(?req, "create");
2749 let created_at = req.created_at;
2750 let (version, notifier) = self
2751 .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2752 .await?;
2753 self.notify_all(vec![notifier], created_at);
2754 Ok(version)
2755 }
2756
2757 #[instrument(level = Level::DEBUG, skip(self, batch))]
2758 async fn append_batch(
2759 &self,
2760 current_time: DateTime<Utc>,
2761 batch: Vec<AppendRequest>,
2762 execution_id: ExecutionId,
2763 version: Version,
2764 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2765 debug!("append_batch");
2766 trace!(?batch, "append_batch");
2767 assert!(!batch.is_empty(), "Empty batch request");
2768
2769 let (version, notifier) = self
2770 .transaction(
2771 move |tx| {
2772 let mut version = version.clone();
2773 let mut notifier = None;
2774 for append_request in &batch {
2775 let (v, n) =
2776 Self::append(tx, &execution_id, append_request.clone(), version)?;
2777 version = v;
2778 notifier = Some(n);
2779 }
2780 Ok::<_, DbErrorWrite>((
2781 version,
2782 notifier.expect("checked that the batch is not empty"),
2783 ))
2784 },
2785 "append_batch",
2786 )
2787 .await?;
2788
2789 self.notify_all(vec![notifier], current_time);
2790 Ok(version)
2791 }
2792
2793 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2794 async fn append_batch_create_new_execution(
2795 &self,
2796 current_time: DateTime<Utc>,
2797 batch: Vec<AppendRequest>,
2798 execution_id: ExecutionId,
2799 version: Version,
2800 child_req: Vec<CreateRequest>,
2801 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2802 debug!("append_batch_create_new_execution");
2803 trace!(?batch, ?child_req, "append_batch_create_new_execution");
2804 assert!(!batch.is_empty(), "Empty batch request");
2805
2806 let (version, notifiers) = self
2807 .transaction(
2808 move |tx| {
2809 let mut notifier = None;
2810 let mut version = version.clone();
2811 for append_request in &batch {
2812 let (v, n) =
2813 Self::append(tx, &execution_id, append_request.clone(), version)?;
2814 version = v;
2815 notifier = Some(n);
2816 }
2817 let mut notifiers = Vec::new();
2818 notifiers.push(notifier.expect("checked that the batch is not empty"));
2819
2820 for child_req in &child_req {
2821 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2822 notifiers.push(notifier);
2823 }
2824 Ok::<_, DbErrorWrite>((version, notifiers))
2825 },
2826 "append_batch_create_new_execution_inner",
2827 )
2828 .await?;
2829 self.notify_all(notifiers, current_time);
2830 Ok(version)
2831 }
2832
2833 #[cfg(feature = "test")]
2834 #[instrument(level = Level::DEBUG, skip(self))]
2835 async fn get(
2836 &self,
2837 execution_id: &ExecutionId,
2838 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2839 trace!("get");
2840 let execution_id = execution_id.clone();
2841 self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2842 .await
2843 }
2844
2845 #[instrument(level = Level::DEBUG, skip(self))]
2846 async fn list_execution_events(
2847 &self,
2848 execution_id: &ExecutionId,
2849 since: &Version,
2850 max_length: VersionType,
2851 include_backtrace_id: bool,
2852 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2853 let execution_id = execution_id.clone();
2854 let since = since.0;
2855 self.transaction(
2856 move |tx| {
2857 Self::list_execution_events(
2858 tx,
2859 &execution_id,
2860 since,
2861 since + max_length,
2862 include_backtrace_id,
2863 )
2864 },
2865 "get",
2866 )
2867 .await
2868 }
2869
2870 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2874 async fn subscribe_to_next_responses(
2875 &self,
2876 execution_id: &ExecutionId,
2877 start_idx: usize,
2878 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2879 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2880 debug!("next_responses");
2881 let unique_tag: u64 = rand::random();
2882 let execution_id = execution_id.clone();
2883
2884 let cleanup = || {
2885 let mut guard = self.0.response_subscribers.lock().unwrap();
2886 match guard.remove(&execution_id) {
2887 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
2889 guard.insert(execution_id.clone(), other);
2891 }
2892 None => {} }
2894 };
2895
2896 let response_subscribers = self.0.response_subscribers.clone();
2897 let resp_or_receiver = {
2898 let execution_id = execution_id.clone();
2899 self.transaction(
2900 move |tx| {
2901 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2902 if responses.is_empty() {
2903 let (sender, receiver) = oneshot::channel();
2905 response_subscribers
2906 .lock()
2907 .unwrap()
2908 .insert(execution_id.clone(), (sender, unique_tag));
2909 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2910 } else {
2911 Ok(itertools::Either::Left(responses))
2912 }
2913 },
2914 "subscribe_to_next_responses",
2915 )
2916 .await
2917 }
2918 .inspect_err(|_| {
2919 cleanup();
2920 })?;
2921 match resp_or_receiver {
2922 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
2924 let res = async move {
2925 tokio::select! {
2926 resp = receiver => {
2927 let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2928 Ok(vec![resp])
2929 }
2930 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2931 }
2932 }
2933 .await;
2934 cleanup();
2935 res
2936 }
2937 }
2938 }
2939
2940 async fn wait_for_finished_result(
2942 &self,
2943 execution_id: &ExecutionId,
2944 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2945 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2946 let unique_tag: u64 = rand::random();
2947 let execution_id = execution_id.clone();
2948 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2949
2950 let cleanup = || {
2951 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2952 if let Some(subscribers) = guard.get_mut(&execution_id) {
2953 subscribers.remove(&unique_tag);
2954 }
2955 };
2956
2957 let resp_or_receiver = {
2958 let execution_id = execution_id.clone();
2959 self.transaction(move |tx| {
2960 let pending_state =
2961 Self::get_combined_state(tx, &execution_id)?.pending_state;
2962 if let PendingState::Finished { finished } = pending_state {
2963 let event =
2964 Self::get_execution_event(tx, &execution_id, finished.version)?;
2965 if let ExecutionEventInner::Finished { result, ..} = event.event {
2966 Ok(itertools::Either::Left(result))
2967 } else {
2968 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2969 Err(DbErrorReadWithTimeout::from(consistency_db_err(
2970 "cannot get finished event based on t_state version"
2971 )))
2972 }
2973 } else {
2974 let (sender, receiver) = oneshot::channel();
2978 let mut guard = execution_finished_subscription.lock().unwrap();
2979 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
2980 Ok(itertools::Either::Right(receiver))
2981 }
2982 }, "wait_for_finished_result")
2983 .await
2984 }
2985 .inspect_err(|_| {
2986 cleanup();
2990 })?;
2991
2992 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
2993 match resp_or_receiver {
2994 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
2996 let res = async move {
2997 tokio::select! {
2998 resp = receiver => {
2999 Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3000 }
3001 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3002 }
3003 }
3004 .await;
3005 cleanup();
3006 res
3007 }
3008 }
3009 }
3010
3011 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3012 async fn append_response(
3013 &self,
3014 created_at: DateTime<Utc>,
3015 execution_id: ExecutionId,
3016 response_event: JoinSetResponseEvent,
3017 ) -> Result<(), DbErrorWrite> {
3018 debug!("append_response");
3019 let event = JoinSetResponseEventOuter {
3020 created_at,
3021 event: response_event,
3022 };
3023 let notifier = self
3024 .transaction(
3025 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3026 "append_response",
3027 )
3028 .await?;
3029 self.notify_all(vec![notifier], created_at);
3030 Ok(())
3031 }
3032
3033 #[instrument(level = Level::DEBUG, skip_all)]
3034 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3035 debug!("append_backtrace");
3036 self.transaction(
3037 move |tx| Self::append_backtrace(tx, &append),
3038 "append_backtrace",
3039 )
3040 .await
3041 }
3042
3043 #[instrument(level = Level::DEBUG, skip_all)]
3044 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3045 debug!("append_backtrace_batch");
3046 self.transaction(
3047 move |tx| {
3048 for append in &batch {
3049 Self::append_backtrace(tx, append)?;
3050 }
3051 Ok(())
3052 },
3053 "append_backtrace",
3054 )
3055 .await
3056 }
3057
3058 #[instrument(level = Level::DEBUG, skip_all)]
3059 async fn get_backtrace(
3060 &self,
3061 execution_id: &ExecutionId,
3062 filter: BacktraceFilter,
3063 ) -> Result<BacktraceInfo, DbErrorRead> {
3064 debug!("get_last_backtrace");
3065 let execution_id = execution_id.clone();
3066
3067 self.transaction(
3068 move |tx| {
3069 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3070 WHERE execution_id = :execution_id";
3071 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3072 let select = match &filter {
3073 BacktraceFilter::Specific(version) =>{
3074 params.push((":version", Box::new(version.0)));
3075 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3076 },
3077 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3078 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3079 };
3080 tx
3081 .prepare(&select)
3082 ?
3083 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3084 params
3085 .iter()
3086 .map(|(key, value)| (*key, value.as_ref()))
3087 .collect::<Vec<_>>()
3088 .as_ref(),
3089 |row| {
3090 Ok(BacktraceInfo {
3091 execution_id: execution_id.clone(),
3092 component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3093 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3094 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3095 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3096 })
3097 },
3098 ).map_err(DbErrorRead::from)
3099 },
3100 "get_last_backtrace",
3101 ).await
3102 }
3103
3104 #[instrument(level = Level::TRACE, skip(self))]
3106 async fn get_expired_timers(
3107 &self,
3108 at: DateTime<Utc>,
3109 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3110 self.transaction(
3111 move |conn| {
3112 let mut expired_timers = conn.prepare(
3113 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3114 )?
3115 .query_map(
3116 named_params! {
3117 ":at": at,
3118 },
3119 |row| {
3120 let execution_id = row.get("execution_id")?;
3121 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3122 let delay_id = row.get::<_, DelayId>("delay_id")?;
3123 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3124 Ok(ExpiredTimer::Delay(delay))
3125 },
3126 )?
3127 .collect::<Result<Vec<_>, _>>()?;
3128 let expired = conn.prepare(&format!(r#"
3130 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis
3131 FROM t_state
3132 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3133 "#
3134 )
3135 )?
3136 .query_map(
3137 named_params! {
3138 ":at": at,
3139 },
3140 |row| {
3141 let execution_id = row.get("execution_id")?;
3142 let locked_at_version = Version::new(row.get("last_lock_version")?);
3143 let next_version = Version::new(row.get("corresponding_version")?).increment();
3144 let intermittent_event_count = row.get("intermittent_event_count")?;
3145 let max_retries = row.get("max_retries")?;
3146 let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3147 let parent = if let ExecutionId::Derived(derived) = &execution_id {
3148 derived.split_to_parts().inspect_err(|err| error!("cannot split execution {execution_id} to parts: {err:?}")).ok()
3149 } else {
3150 None
3151 };
3152 let lock = ExpiredLock {
3153 execution_id,
3154 locked_at_version,
3155 next_version,
3156 intermittent_event_count,
3157 max_retries,
3158 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3159 parent
3160 };
3161 Ok(ExpiredTimer::Lock(lock))
3162 }
3163 )?
3164 .collect::<Result<Vec<_>, _>>()?;
3165 expired_timers.extend(expired);
3166 if !expired_timers.is_empty() {
3167 debug!("get_expired_timers found {expired_timers:?}");
3168 }
3169 Ok(expired_timers)
3170 }, "get_expired_timers"
3171 )
3172 .await
3173 }
3174
3175 async fn get_execution_event(
3176 &self,
3177 execution_id: &ExecutionId,
3178 version: &Version,
3179 ) -> Result<ExecutionEvent, DbErrorRead> {
3180 let version = version.0;
3181 let execution_id = execution_id.clone();
3182 self.transaction(
3183 move |tx| Self::get_execution_event(tx, &execution_id, version),
3184 "get_execution_event",
3185 )
3186 .await
3187 }
3188
3189 async fn get_pending_state(
3190 &self,
3191 execution_id: &ExecutionId,
3192 ) -> Result<PendingState, DbErrorRead> {
3193 let execution_id = execution_id.clone();
3194 Ok(self
3195 .transaction(
3196 move |tx| Self::get_combined_state(tx, &execution_id),
3197 "get_pending_state",
3198 )
3199 .await?
3200 .pending_state)
3201 }
3202
3203 async fn list_executions(
3204 &self,
3205 ffqn: Option<FunctionFqn>,
3206 top_level_only: bool,
3207 pagination: ExecutionListPagination,
3208 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3209 self.transaction(
3210 move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3211 "list_executions",
3212 )
3213 .await
3214 }
3215
3216 async fn list_responses(
3217 &self,
3218 execution_id: &ExecutionId,
3219 pagination: Pagination<u32>,
3220 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3221 let execution_id = execution_id.clone();
3222 self.transaction(
3223 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3224 "list_executions",
3225 )
3226 .await
3227 }
3228}
3229
3230#[cfg(any(test, feature = "tempfile"))]
3231pub mod tempfile {
3232 use super::{SqliteConfig, SqlitePool};
3233 use tempfile::NamedTempFile;
3234
3235 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3236 if let Ok(path) = std::env::var("SQLITE_FILE") {
3237 (
3238 SqlitePool::new(path, SqliteConfig::default())
3239 .await
3240 .unwrap(),
3241 None,
3242 )
3243 } else {
3244 let file = NamedTempFile::new().unwrap();
3245 let path = file.path();
3246 (
3247 SqlitePool::new(path, SqliteConfig::default())
3248 .await
3249 .unwrap(),
3250 Some(file),
3251 )
3252 }
3253 }
3254}