1use crate::histograms::Histograms;
2use assert_matches::assert_matches;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use concepts::{
6 ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
7 SupportedFunctionReturnValue,
8 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
11 AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
12 DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
13 DbErrorWrite, DbErrorWritePermanent, DbExecutor, DbPool, DbPoolCloseable, ExecutionEvent,
14 ExecutionEventInner, ExecutionListPagination, ExecutionWithState, ExpiredDelay,
15 ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
16 JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, LockedExecution,
17 Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
18 ResponseWithCursor, Version, VersionType,
19 },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
25 TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28 cmp::max,
29 collections::VecDeque,
30 fmt::Debug,
31 ops::DerefMut,
32 path::Path,
33 str::FromStr,
34 sync::{
35 Arc, Mutex,
36 atomic::{AtomicBool, Ordering},
37 },
38 time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42 sync::{mpsc, oneshot},
43 time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53 join_set_id: JoinSetId,
54 delay_id: DelayId,
55 expires_at: DateTime<Utc>,
56}
57const PRAGMA: [[&str; 2]; 10] = [
69 ["journal_mode", "wal"],
70 ["synchronous", "FULL"],
71 ["foreign_keys", "true"],
72 ["busy_timeout", "1000"],
73 ["cache_size", "10000"], ["temp_store", "MEMORY"],
75 ["page_size", "8192"], ["mmap_size", "134217728"],
77 ["journal_size_limit", "67108864"],
78 ["integrity_check", ""],
79];
80
81const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84 id INTEGER PRIMARY KEY AUTOINCREMENT,
85 schema_version INTEGER NOT NULL,
86 created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94 execution_id TEXT NOT NULL,
95 created_at TEXT NOT NULL,
96 json_value TEXT NOT NULL,
97 version INTEGER NOT NULL,
98 variant TEXT NOT NULL,
99 join_set_id TEXT,
100 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101 PRIMARY KEY (execution_id, version)
102) STRICT
103";
104const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
107";
108const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
111";
112
113const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119 id INTEGER PRIMARY KEY AUTOINCREMENT,
120 created_at TEXT NOT NULL,
121 execution_id TEXT NOT NULL,
122 join_set_id TEXT NOT NULL,
123
124 delay_id TEXT,
125
126 child_execution_id TEXT,
127 finished_version INTEGER,
128
129 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145 execution_id TEXT NOT NULL,
146 is_top_level INTEGER NOT NULL,
147 corresponding_version INTEGER NOT NULL,
148 pending_expires_finished TEXT NOT NULL,
149 ffqn TEXT NOT NULL,
150 state TEXT NOT NULL,
151 created_at TEXT NOT NULL,
152 updated_at TEXT NOT NULL,
153 scheduled_at TEXT NOT NULL,
154 intermittent_event_count INTEGER NOT NULL,
155
156 max_retries INTEGER,
157 retry_exp_backoff_millis INTEGER,
158 last_lock_version INTEGER,
159 executor_id TEXT,
160 run_id TEXT,
161
162 join_set_id TEXT,
163 join_set_closing INTEGER,
164
165 result_kind TEXT,
166
167 PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198 execution_id TEXT NOT NULL,
199 join_set_id TEXT NOT NULL,
200 delay_id TEXT NOT NULL,
201 expires_at TEXT NOT NULL,
202 PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209 execution_id TEXT NOT NULL,
210 component_id TEXT NOT NULL,
211 version_min_including INTEGER NOT NULL,
212 version_max_excluding INTEGER NOT NULL,
213 wasm_backtrace TEXT NOT NULL,
214 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221
222#[derive(Debug, thiserror::Error, Clone)]
223enum RusqliteError {
224 #[error("not found")]
225 NotFound,
226 #[error("generic: {0}")]
227 Generic(StrVariant),
228}
229
230mod conversions {
231
232 use super::RusqliteError;
233 use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
234 use rusqlite::types::{FromSql, FromSqlError};
235 use std::{fmt::Debug, str::FromStr};
236 use tracing::error;
237
238 impl From<rusqlite::Error> for RusqliteError {
239 fn from(err: rusqlite::Error) -> Self {
240 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
241 RusqliteError::NotFound
242 } else {
243 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
244 RusqliteError::Generic(err.to_string().into())
245 }
246 }
247 }
248
249 impl From<RusqliteError> for DbErrorGeneric {
250 fn from(err: RusqliteError) -> DbErrorGeneric {
251 match err {
252 RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
253 RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
254 }
255 }
256 }
257 impl From<RusqliteError> for DbErrorRead {
258 fn from(err: RusqliteError) -> Self {
259 if matches!(err, RusqliteError::NotFound) {
260 Self::NotFound
261 } else {
262 Self::from(DbErrorGeneric::from(err))
263 }
264 }
265 }
266 impl From<RusqliteError> for DbErrorReadWithTimeout {
267 fn from(err: RusqliteError) -> Self {
268 Self::from(DbErrorRead::from(err))
269 }
270 }
271 impl From<RusqliteError> for DbErrorWrite {
272 fn from(err: RusqliteError) -> Self {
273 if matches!(err, RusqliteError::NotFound) {
274 Self::NotFound
275 } else {
276 Self::from(DbErrorGeneric::from(err))
277 }
278 }
279 }
280
281 pub(crate) struct JsonWrapper<T>(pub(crate) T);
282 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
283 fn column_result(
284 value: rusqlite::types::ValueRef<'_>,
285 ) -> rusqlite::types::FromSqlResult<Self> {
286 let value = match value {
287 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
288 Ok(value)
289 }
290 other => {
291 error!(
292 backtrace = %std::backtrace::Backtrace::capture(),
293 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
294 );
295 Err(FromSqlError::InvalidType)
296 }
297 }?;
298 let value = serde_json::from_slice::<T>(value).map_err(|err| {
299 error!(
300 backtrace = %std::backtrace::Backtrace::capture(),
301 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
302 r#type = std::any::type_name::<T>()
303 );
304 FromSqlError::InvalidType
305 })?;
306 Ok(Self(value))
307 }
308 }
309
310 pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
311 impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
312 fn column_result(
313 value: rusqlite::types::ValueRef<'_>,
314 ) -> rusqlite::types::FromSqlResult<Self> {
315 let value = String::column_result(value)?;
316 let value = T::from_str(&value).map_err(|err| {
317 error!(
318 backtrace = %std::backtrace::Backtrace::capture(),
319 "Cannot convert string `{value}` to type:`{type}` - {err:?}",
320 r#type = std::any::type_name::<T>()
321 );
322 FromSqlError::InvalidType
323 })?;
324 Ok(Self(value))
325 }
326 }
327
328 #[derive(Debug, thiserror::Error)]
330 #[error("{0}")]
331 pub(crate) struct OtherError(&'static str);
332 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
333 FromSqlError::other(OtherError(input)).into()
334 }
335
336 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
337 DbErrorGeneric::Uncategorized(input.into())
338 }
339}
340
341#[derive(Debug)]
342struct CombinedStateDTO {
343 state: String,
344 ffqn: String,
345 pending_expires_finished: DateTime<Utc>,
346 last_lock_version: Option<Version>,
348 executor_id: Option<ExecutorId>,
349 run_id: Option<RunId>,
350 join_set_id: Option<JoinSetId>,
352 join_set_closing: Option<bool>,
353 result_kind: Option<PendingStateFinishedResultKind>,
355}
356#[derive(Debug)]
357struct CombinedState {
358 ffqn: FunctionFqn,
359 pending_state: PendingState,
360 corresponding_version: Version,
361}
362impl CombinedState {
363 fn get_next_version_assert_not_finished(&self) -> Version {
364 assert!(!self.pending_state.is_finished());
365 self.corresponding_version.increment()
366 }
367
368 #[cfg(feature = "test")]
369 fn get_next_version_or_finished(&self) -> Version {
370 if self.pending_state.is_finished() {
371 self.corresponding_version.clone()
372 } else {
373 self.corresponding_version.increment()
374 }
375 }
376}
377
378#[derive(Debug)]
379struct NotifierPendingAt {
380 scheduled_at: DateTime<Utc>,
381 ffqn: FunctionFqn,
382}
383
384#[derive(Debug)]
385struct NotifierExecutionFinished {
386 execution_id: ExecutionId,
387 retval: SupportedFunctionReturnValue,
388}
389
390#[derive(Debug, Default)]
391struct AppendNotifier {
392 pending_at: Option<NotifierPendingAt>,
393 execution_finished: Option<NotifierExecutionFinished>,
394 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
395}
396
397impl CombinedState {
398 fn new(
399 dto: &CombinedStateDTO,
400 corresponding_version: Version,
401 ) -> Result<Self, rusqlite::Error> {
402 let pending_state = match dto {
403 CombinedStateDTO {
404 state,
405 ffqn: _,
406 pending_expires_finished: scheduled_at,
407 last_lock_version: None,
408 executor_id: None,
409 run_id: None,
410 join_set_id: None,
411 join_set_closing: None,
412 result_kind: None,
413 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
414 scheduled_at: *scheduled_at,
415 }),
416 CombinedStateDTO {
417 state,
418 ffqn: _,
419 pending_expires_finished: lock_expires_at,
420 last_lock_version: Some(_),
421 executor_id: Some(executor_id),
422 run_id: Some(run_id),
423 join_set_id: None,
424 join_set_closing: None,
425 result_kind: None,
426 } if state == STATE_LOCKED => Ok(PendingState::Locked {
427 executor_id: *executor_id,
428 run_id: *run_id,
429 lock_expires_at: *lock_expires_at,
430 }),
431 CombinedStateDTO {
432 state,
433 ffqn: _,
434 pending_expires_finished: lock_expires_at,
435 last_lock_version: None,
436 executor_id: None,
437 run_id: None,
438 join_set_id: Some(join_set_id),
439 join_set_closing: Some(join_set_closing),
440 result_kind: None,
441 } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
442 join_set_id: join_set_id.clone(),
443 closing: *join_set_closing,
444 lock_expires_at: *lock_expires_at,
445 }),
446 CombinedStateDTO {
447 state,
448 ffqn: _,
449 pending_expires_finished: finished_at,
450 last_lock_version: None,
451 executor_id: None,
452 run_id: None,
453 join_set_id: None,
454 join_set_closing: None,
455 result_kind: Some(result_kind),
456 } if state == STATE_FINISHED => Ok(PendingState::Finished {
457 finished: PendingStateFinished {
458 finished_at: *finished_at,
459 version: corresponding_version.0,
460 result_kind: *result_kind,
461 },
462 }),
463 _ => {
464 error!("Cannot deserialize pending state from {dto:?}");
465 Err(consistency_rusqlite("invalid `t_state`"))
466 }
467 }?;
468 Ok(Self {
469 ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
470 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
471 consistency_rusqlite("invalid ffqn value in `t_state`")
472 })?,
473 pending_state,
474 corresponding_version,
475 })
476 }
477}
478
479#[derive(derive_more::Debug)]
480struct LogicalTx {
481 #[debug(skip)]
482 func: Box<dyn FnMut(&mut Transaction) + Send>,
483 sent_at: Instant,
484 func_name: &'static str,
485 #[debug(skip)]
486 commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
487}
488
489#[derive(derive_more::Debug)]
490enum ThreadCommand {
491 LogicalTx(LogicalTx),
492 Shutdown,
493}
494
495#[derive(Clone)]
496pub struct SqlitePool(SqlitePoolInner);
497
498type ResponseSubscribers =
499 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
500type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
501type ExecutionFinishedSubscribers =
502 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
503#[derive(Clone)]
504struct SqlitePoolInner {
505 shutdown_requested: Arc<AtomicBool>,
506 shutdown_finished: Arc<AtomicBool>,
507 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
508 response_subscribers: ResponseSubscribers,
509 pending_ffqn_subscribers: PendingFfqnSubscribers,
510 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
511 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
513
514#[async_trait]
515impl DbPoolCloseable for SqlitePool {
516 async fn close(self) {
517 debug!("Sqlite is closing");
518 self.0.shutdown_requested.store(true, Ordering::Release);
519 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
521 while !self.0.shutdown_finished.load(Ordering::Acquire) {
522 tokio::time::sleep(Duration::from_millis(1)).await;
523 }
524 debug!("Sqlite was closed");
525 }
526}
527
528#[async_trait]
529impl DbPool for SqlitePool {
530 fn connection(&self) -> Box<dyn DbConnection> {
531 Box::new(self.clone())
532 }
533}
534impl Drop for SqlitePool {
535 fn drop(&mut self) {
536 let arc = self.0.join_handle.take().expect("join_handle was set");
537 if let Ok(join_handle) = Arc::try_unwrap(arc) {
538 if !join_handle.is_finished() {
540 if !self.0.shutdown_finished.load(Ordering::Acquire) {
541 let backtrace = std::backtrace::Backtrace::capture();
543 warn!("SqlitePool was not closed properly - {backtrace}");
544 self.0.shutdown_requested.store(true, Ordering::Release);
545 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
547 } else {
550 }
552 }
553 }
554 }
555}
556
557#[derive(Debug, Clone)]
558pub struct SqliteConfig {
559 pub queue_capacity: usize,
560 pub pragma_override: Option<HashMap<String, String>>,
561 pub metrics_threshold: Option<Duration>,
562}
563impl Default for SqliteConfig {
564 fn default() -> Self {
565 Self {
566 queue_capacity: 100,
567 pragma_override: None,
568 metrics_threshold: None,
569 }
570 }
571}
572
573impl SqlitePool {
574 fn init_thread(
575 path: &Path,
576 mut pragma_override: HashMap<String, String>,
577 ) -> Result<Connection, InitializationError> {
578 fn conn_execute<P: Params>(
579 conn: &Connection,
580 sql: &str,
581 params: P,
582 ) -> Result<(), InitializationError> {
583 conn.execute(sql, params).map(|_| ()).map_err(|err| {
584 error!("Cannot run `{sql}` - {err:?}");
585 InitializationError
586 })
587 }
588 fn pragma_update(
589 conn: &Connection,
590 name: &str,
591 value: &str,
592 ) -> Result<(), InitializationError> {
593 if value.is_empty() {
594 debug!("Querying PRAGMA {name}");
595 conn.pragma_query(None, name, |row| {
596 debug!("{row:?}");
597 Ok(())
598 })
599 .map_err(|err| {
600 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
601 InitializationError
602 })
603 } else {
604 debug!("Setting PRAGMA {name}={value}");
605 conn.pragma_update(None, name, value).map_err(|err| {
606 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
607 InitializationError
608 })
609 }
610 }
611
612 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
613 error!("cannot open the connection - {err:?}");
614 InitializationError
615 })?;
616
617 for [pragma_name, default_value] in PRAGMA {
618 let pragma_value = pragma_override
619 .remove(pragma_name)
620 .unwrap_or_else(|| default_value.to_string());
621 pragma_update(&conn, pragma_name, &pragma_value)?;
622 }
623 for (pragma_name, pragma_value) in pragma_override.drain() {
625 pragma_update(&conn, &pragma_name, &pragma_value)?;
626 }
627
628 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
630 conn_execute(
632 &conn,
633 &format!(
634 "INSERT INTO t_metadata (schema_version, created_at) VALUES
635 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
636 ),
637 [Utc::now()],
638 )?;
639 let actual_version = conn
641 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
642 .map_err(|err| {
643 error!("cannot select schema version - {err:?}");
644 InitializationError
645 })?
646 .query_row([], |row| row.get::<_, u32>("schema_version"));
647
648 let actual_version = actual_version.map_err(|err| {
649 error!("Cannot read the schema version - {err:?}");
650 InitializationError
651 })?;
652 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
653 error!(
654 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
655 );
656 return Err(InitializationError);
657 }
658
659 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
661 conn_execute(
662 &conn,
663 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
664 [],
665 )?;
666 conn_execute(
667 &conn,
668 CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
669 [],
670 )?;
671 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
673 conn_execute(
674 &conn,
675 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
676 [],
677 )?;
678 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
680 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
681 conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
682 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
683 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
684 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
685 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
687 conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
689 conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
690 Ok(conn)
691 }
692
693 fn connection_rpc(
694 mut conn: Connection,
695 shutdown_requested: &AtomicBool,
696 shutdown_finished: &AtomicBool,
697 mut command_rx: mpsc::Receiver<ThreadCommand>,
698 metrics_threshold: Option<Duration>,
699 ) {
700 let mut histograms = Histograms::new(metrics_threshold);
701 while Self::tick(
702 &mut conn,
703 shutdown_requested,
704 &mut command_rx,
705 &mut histograms,
706 )
707 .is_ok()
708 {
709 }
711 debug!("Closing command thread");
712 shutdown_finished.store(true, Ordering::Release);
713 }
714
715 fn tick(
716 conn: &mut Connection,
717 shutdown_requested: &AtomicBool,
718 command_rx: &mut mpsc::Receiver<ThreadCommand>,
719 histograms: &mut Histograms,
720 ) -> Result<(), ()> {
721 let mut ltx_list = Vec::new();
722 let mut ltx = match command_rx.blocking_recv() {
724 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
725 Some(ThreadCommand::Shutdown) => {
726 debug!("shutdown message received");
727 return Err(());
728 }
729 None => {
730 debug!("command_rx was closed");
731 return Err(());
732 }
733 };
734 let all_fns_start = std::time::Instant::now();
735 let mut physical_tx = conn
736 .transaction_with_behavior(TransactionBehavior::Immediate)
737 .map_err(|begin_err| {
738 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
739 })?;
740 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
741 ltx_list.push(ltx);
742
743 while let Ok(more) = command_rx.try_recv() {
744 let mut ltx = match more {
745 ThreadCommand::Shutdown => {
746 debug!("shutdown message received");
747 return Err(());
748 }
749 ThreadCommand::LogicalTx(ltx) => ltx,
750 };
751
752 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
753 ltx_list.push(ltx);
754 }
755 histograms.record_all_fns(all_fns_start.elapsed());
756
757 {
758 if shutdown_requested.load(Ordering::Relaxed) {
760 debug!("Recveived shutdown during processing of the batch");
761 return Err(());
762 }
763 let commit_result = {
764 let now = std::time::Instant::now();
765 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
766 histograms.record_commit(now.elapsed());
767 commit_result
768 };
769 if commit_result.is_ok() || ltx_list.len() == 1 {
770 for ltx in ltx_list {
772 let _ = ltx.commit_ack_sender.send(commit_result.clone());
774 }
775 } else {
776 for ltx in ltx_list {
778 Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
779 }
780 }
781 }
782 histograms.print_if_elapsed();
783 Ok(())
784 }
785
786 fn ltx_commit_single(
787 mut ltx: LogicalTx,
788 conn: &mut Connection,
789 shutdown_requested: &AtomicBool,
790 histograms: &mut Histograms,
791 ) -> Result<(), ()> {
792 let mut physical_tx = conn
793 .transaction_with_behavior(TransactionBehavior::Immediate)
794 .map_err(|begin_err| {
795 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
796 })?;
797 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
798 if shutdown_requested.load(Ordering::Relaxed) {
799 debug!("Recveived shutdown during processing of the batch");
800 return Err(());
801 }
802 let commit_result = {
803 let now = std::time::Instant::now();
804 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
805 histograms.record_commit(now.elapsed());
806 commit_result
807 };
808 let _ = ltx.commit_ack_sender.send(commit_result);
810 Ok(())
811 }
812
813 fn ltx_apply_to_tx(
814 ltx: &mut LogicalTx,
815 physical_tx: &mut Transaction,
816 histograms: &mut Histograms,
817 ) {
818 let sent_latency = ltx.sent_at.elapsed();
819 let started_at = Instant::now();
820 (ltx.func)(physical_tx);
821 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
822 }
823
824 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
825 pub async fn new<P: AsRef<Path>>(
826 path: P,
827 config: SqliteConfig,
828 ) -> Result<Self, InitializationError> {
829 let path = path.as_ref().to_owned();
830
831 let shutdown_requested = Arc::new(AtomicBool::new(false));
832 let shutdown_finished = Arc::new(AtomicBool::new(false));
833
834 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
835 info!("Sqlite database location: {path:?}");
836 let join_handle = {
837 let init_task = {
839 tokio::task::spawn_blocking(move || {
840 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
841 })
842 .await
843 };
844 let conn = match init_task {
845 Ok(res) => res?,
846 Err(join_err) => {
847 error!("Initialization panic - {join_err:?}");
848 return Err(InitializationError);
849 }
850 };
851 let shutdown_requested = shutdown_requested.clone();
852 let shutdown_finished = shutdown_finished.clone();
853 std::thread::spawn(move || {
855 Self::connection_rpc(
856 conn,
857 &shutdown_requested,
858 &shutdown_finished,
859 command_rx,
860 config.metrics_threshold,
861 );
862 })
863 };
864 Ok(SqlitePool(SqlitePoolInner {
865 shutdown_requested,
866 shutdown_finished,
867 command_tx,
868 response_subscribers: Arc::default(),
869 pending_ffqn_subscribers: Arc::default(),
870 join_handle: Some(Arc::new(join_handle)),
871 execution_finished_subscribers: Arc::default(),
872 }))
873 }
874
875 async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
877 where
878 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
879 T: Send + 'static,
880 E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
881 {
882 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
883 let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
884 let thread_command_func = {
885 let fn_res = fn_res.clone();
886 ThreadCommand::LogicalTx(LogicalTx {
887 func: Box::new(move |tx| {
888 let func_res = func(tx);
889 *fn_res.lock().unwrap() = Some(func_res);
890 }),
891 sent_at: Instant::now(),
892 func_name,
893 commit_ack_sender,
894 })
895 };
896 self.0
897 .command_tx
898 .send(thread_command_func)
899 .await
900 .map_err(|_send_err| DbErrorGeneric::Close)?;
901
902 match commit_ack_receiver.await {
904 Ok(Ok(())) => {
905 let mut guard = fn_res.lock().unwrap();
906 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
907 }
908 Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
909 Err(_) => Err(E::from(DbErrorGeneric::Close)),
910 }
911 }
912
913 fn fetch_created_event(
914 conn: &Connection,
915 execution_id: &ExecutionId,
916 ) -> Result<CreateRequest, DbErrorRead> {
917 let mut stmt = conn.prepare(
918 "SELECT created_at, json_value FROM t_execution_log WHERE \
919 execution_id = :execution_id AND version = 0",
920 )?;
921 let (created_at, event) = stmt.query_row(
922 named_params! {
923 ":execution_id": execution_id.to_string(),
924 },
925 |row| {
926 let created_at = row.get("created_at")?;
927 let event = row
928 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
929 .map_err(|serde| {
930 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
931 consistency_rusqlite("cannot deserialize `Created` event")
932 })?;
933 Ok((created_at, event.0))
934 },
935 )?;
936 if let ExecutionEventInner::Created {
937 ffqn,
938 params,
939 parent,
940 scheduled_at,
941 component_id,
942 metadata,
943 scheduled_by,
944 } = event
945 {
946 Ok(CreateRequest {
947 created_at,
948 execution_id: execution_id.clone(),
949 ffqn,
950 params,
951 parent,
952 scheduled_at,
953 component_id,
954 metadata,
955 scheduled_by,
956 })
957 } else {
958 error!("Row with version=0 must be a `Created` event - {event:?}");
959 Err(consistency_db_err("expected `Created` event").into())
960 }
961 }
962
963 fn check_expected_next_and_appending_version(
964 expected_version: &Version,
965 appending_version: &Version,
966 ) -> Result<(), DbErrorWrite> {
967 if *expected_version != *appending_version {
968 debug!(
969 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
970 );
971 return Err(DbErrorWrite::Permanent(
972 DbErrorWritePermanent::CannotWrite {
973 reason: "version conflict".into(),
974 expected_version: Some(expected_version.clone()),
975 },
976 ));
977 }
978 Ok(())
979 }
980
981 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
982 fn create_inner(
983 tx: &Transaction,
984 req: CreateRequest,
985 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
986 debug!("create_inner");
987
988 let version = Version::default();
989 let execution_id = req.execution_id.clone();
990 let execution_id_str = execution_id.to_string();
991 let ffqn = req.ffqn.clone();
992 let created_at = req.created_at;
993 let scheduled_at = req.scheduled_at;
994 let event = ExecutionEventInner::from(req);
995 let event_ser = serde_json::to_string(&event).map_err(|err| {
996 error!("Cannot serialize {event:?} - {err:?}");
997 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
998 })?;
999 tx.prepare(
1000 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1001 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1002 ?
1003 .execute(named_params! {
1004 ":execution_id": &execution_id_str,
1005 ":created_at": created_at,
1006 ":version": version.0,
1007 ":json_value": event_ser,
1008 ":variant": event.variant(),
1009 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1010 })
1011 ?;
1012 let pending_state = PendingState::PendingAt { scheduled_at };
1013 let pending_at = {
1014 let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
1015 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1016 tx.prepare(
1017 r"
1018 INSERT INTO t_state (
1019 execution_id,
1020 is_top_level,
1021 corresponding_version,
1022 pending_expires_finished,
1023 ffqn,
1024 state,
1025 created_at,
1026 updated_at,
1027 scheduled_at,
1028 intermittent_event_count
1029 )
1030 VALUES (
1031 :execution_id,
1032 :is_top_level,
1033 :corresponding_version,
1034 :pending_expires_finished,
1035 :ffqn,
1036 :state,
1037 :created_at,
1038 CURRENT_TIMESTAMP,
1039 :scheduled_at,
1040 0
1041 )
1042 ",
1043 )?
1044 .execute(named_params! {
1045 ":execution_id": execution_id.to_string(),
1046 ":is_top_level": execution_id.is_top_level(),
1047 ":corresponding_version": version.0,
1048 ":pending_expires_finished": scheduled_at,
1049 ":ffqn": ffqn.to_string(),
1050 ":state": STATE_PENDING_AT,
1051 ":created_at": created_at,
1052 ":scheduled_at": scheduled_at,
1053 })?;
1054 AppendNotifier {
1055 pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1056 execution_finished: None,
1057 response: None,
1058 }
1059 };
1060 let next_version = Version::new(version.0 + 1);
1061 Ok((next_version, pending_at))
1062 }
1063
1064 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1065 fn update_state_pending_after_response_appended(
1066 tx: &Transaction,
1067 execution_id: &ExecutionId,
1068 scheduled_at: DateTime<Utc>, corresponding_version: &Version, ) -> Result<AppendNotifier, DbErrorWrite> {
1071 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1072 let execution_id_str = execution_id.to_string();
1073 let mut stmt = tx
1074 .prepare_cached(
1075 r"
1076 UPDATE t_state
1077 SET
1078 corresponding_version = :corresponding_version,
1079 pending_expires_finished = :pending_expires_finished,
1080 state = :state,
1081 updated_at = CURRENT_TIMESTAMP,
1082
1083 last_lock_version = NULL,
1084 executor_id = NULL,
1085 run_id = NULL,
1086
1087 join_set_id = NULL,
1088 join_set_closing = NULL,
1089
1090 result_kind = NULL
1091 WHERE execution_id = :execution_id
1092 ",
1093 )
1094 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1095 let updated = stmt
1096 .execute(named_params! {
1097 ":execution_id": execution_id_str,
1098 ":corresponding_version": corresponding_version.0,
1099 ":pending_expires_finished": scheduled_at,
1100 ":state": STATE_PENDING_AT,
1101 })
1102 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1103 if updated != 1 {
1104 return Err(DbErrorWrite::NotFound);
1105 }
1106 Ok(AppendNotifier {
1107 pending_at: Some(NotifierPendingAt {
1108 scheduled_at,
1109 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1110 }),
1111 execution_finished: None,
1112 response: None,
1113 })
1114 }
1115
1116 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1117 fn update_state_pending_after_event_appended(
1118 tx: &Transaction,
1119 execution_id: &ExecutionId,
1120 appending_version: &Version,
1121 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1123 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1124 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1125 let mut stmt = tx
1128 .prepare_cached(
1129 r"
1130 UPDATE t_state
1131 SET
1132 corresponding_version = :appending_version,
1133 pending_expires_finished = :pending_expires_finished,
1134 state = :state,
1135 updated_at = CURRENT_TIMESTAMP,
1136 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1137
1138 last_lock_version = NULL,
1139 executor_id = NULL,
1140 run_id = NULL,
1141
1142 join_set_id = NULL,
1143 join_set_closing = NULL,
1144
1145 result_kind = NULL
1146 WHERE execution_id = :execution_id;
1147 ",
1148 )
1149 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1150 let updated = stmt
1151 .execute(named_params! {
1152 ":execution_id": execution_id.to_string(),
1153 ":appending_version": appending_version.0,
1154 ":pending_expires_finished": scheduled_at,
1155 ":state": STATE_PENDING_AT,
1156 ":intermittent_delta": i32::from(intermittent_failure) })
1158 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1159 if updated != 1 {
1160 return Err(DbErrorWrite::NotFound);
1161 }
1162 Ok((
1163 appending_version.increment(),
1164 AppendNotifier {
1165 pending_at: Some(NotifierPendingAt {
1166 scheduled_at,
1167 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1168 }),
1169 execution_finished: None,
1170 response: None,
1171 },
1172 ))
1173 }
1174
1175 fn update_state_locked_get_intermittent_event_count(
1176 tx: &Transaction,
1177 execution_id: &ExecutionId,
1178 executor_id: ExecutorId,
1179 run_id: RunId,
1180 lock_expires_at: DateTime<Utc>,
1181 appending_version: &Version,
1182 retry_config: ComponentRetryConfig,
1183 ) -> Result<u32, DbErrorWrite> {
1184 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1185 let backoff_millis =
1186 u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1187 let execution_id_str = execution_id.to_string();
1188 let mut stmt = tx
1189 .prepare_cached(
1190 r"
1191 UPDATE t_state
1192 SET
1193 corresponding_version = :appending_version,
1194 pending_expires_finished = :pending_expires_finished,
1195 state = :state,
1196 updated_at = CURRENT_TIMESTAMP,
1197
1198 max_retries = :max_retries,
1199 retry_exp_backoff_millis = :retry_exp_backoff_millis,
1200 last_lock_version = :appending_version,
1201 executor_id = :executor_id,
1202 run_id = :run_id,
1203
1204 join_set_id = NULL,
1205 join_set_closing = NULL,
1206
1207 result_kind = NULL
1208 WHERE execution_id = :execution_id
1209 ",
1210 )
1211 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1212 let updated = stmt
1213 .execute(named_params! {
1214 ":execution_id": execution_id_str,
1215 ":appending_version": appending_version.0,
1216 ":pending_expires_finished": lock_expires_at,
1217 ":state": STATE_LOCKED,
1218 ":max_retries": retry_config.max_retries,
1219 ":retry_exp_backoff_millis": backoff_millis,
1220 ":executor_id": executor_id.to_string(),
1221 ":run_id": run_id.to_string(),
1222 })
1223 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1224 if updated != 1 {
1225 return Err(DbErrorWrite::NotFound);
1226 }
1227
1228 let intermittent_event_count = tx
1230 .prepare(
1231 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1232 )?
1233 .query_row(
1234 named_params! {
1235 ":execution_id": execution_id_str,
1236 },
1237 |row| {
1238 let intermittent_event_count = row.get("intermittent_event_count")?;
1239 Ok(intermittent_event_count)
1240 },
1241 )
1242 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1243
1244 Ok(intermittent_event_count)
1245 }
1246
1247 fn update_state_blocked(
1248 tx: &Transaction,
1249 execution_id: &ExecutionId,
1250 appending_version: &Version,
1251 join_set_id: &JoinSetId,
1253 lock_expires_at: DateTime<Utc>,
1254 join_set_closing: bool,
1255 ) -> Result<
1256 AppendResponse, DbErrorWrite,
1258 > {
1259 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1260 let execution_id_str = execution_id.to_string();
1261 let mut stmt = tx.prepare_cached(
1262 r"
1263 UPDATE t_state
1264 SET
1265 corresponding_version = :appending_version,
1266 pending_expires_finished = :pending_expires_finished,
1267 state = :state,
1268 updated_at = CURRENT_TIMESTAMP,
1269
1270 last_lock_version = NULL,
1271 executor_id = NULL,
1272 run_id = NULL,
1273
1274 join_set_id = :join_set_id,
1275 join_set_closing = :join_set_closing,
1276
1277 result_kind = NULL
1278 WHERE execution_id = :execution_id
1279 ",
1280 )?;
1281 let updated = stmt.execute(named_params! {
1282 ":execution_id": execution_id_str,
1283 ":appending_version": appending_version.0,
1284 ":pending_expires_finished": lock_expires_at,
1285 ":state": STATE_BLOCKED_BY_JOIN_SET,
1286 ":join_set_id": join_set_id,
1287 ":join_set_closing": join_set_closing,
1288 })?;
1289 if updated != 1 {
1290 return Err(DbErrorWrite::NotFound);
1291 }
1292 Ok(appending_version.increment())
1293 }
1294
1295 fn update_state_finished(
1296 tx: &Transaction,
1297 execution_id: &ExecutionId,
1298 appending_version: &Version,
1299 finished_at: DateTime<Utc>,
1301 result_kind: PendingStateFinishedResultKind,
1302 ) -> Result<(), DbErrorWrite> {
1303 debug!("Setting t_state to Finished");
1304 let execution_id_str = execution_id.to_string();
1305 let mut stmt = tx.prepare_cached(
1306 r"
1307 UPDATE t_state
1308 SET
1309 corresponding_version = :appending_version,
1310 pending_expires_finished = :pending_expires_finished,
1311 state = :state,
1312 updated_at = CURRENT_TIMESTAMP,
1313
1314 last_lock_version = NULL,
1315 executor_id = NULL,
1316 run_id = NULL,
1317
1318 join_set_id = NULL,
1319 join_set_closing = NULL,
1320
1321 result_kind = :result_kind
1322 WHERE execution_id = :execution_id
1323 ",
1324 )?;
1325 let updated = stmt.execute(named_params! {
1326 ":execution_id": execution_id_str,
1327 ":appending_version": appending_version.0,
1328 ":pending_expires_finished": finished_at,
1329 ":state": STATE_FINISHED,
1330 ":result_kind": result_kind.to_string(),
1331 })?;
1332 if updated != 1 {
1333 return Err(DbErrorWrite::NotFound);
1334 }
1335 Ok(())
1336 }
1337
1338 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1340 fn bump_state_next_version(
1341 tx: &Transaction,
1342 execution_id: &ExecutionId,
1343 appending_version: &Version,
1344 delay_req: Option<DelayReq>,
1345 ) -> Result<AppendResponse , DbErrorWrite> {
1346 debug!("update_index_version");
1347 let execution_id_str = execution_id.to_string();
1348 let mut stmt = tx.prepare_cached(
1349 r"
1350 UPDATE t_state
1351 SET
1352 corresponding_version = :appending_version,
1353 updated_at = CURRENT_TIMESTAMP
1354 WHERE execution_id = :execution_id
1355 ",
1356 )?;
1357 let updated = stmt.execute(named_params! {
1358 ":execution_id": execution_id_str,
1359 ":appending_version": appending_version.0,
1360 })?;
1361 if updated != 1 {
1362 return Err(DbErrorWrite::NotFound);
1363 }
1364 if let Some(DelayReq {
1365 join_set_id,
1366 delay_id,
1367 expires_at,
1368 }) = delay_req
1369 {
1370 debug!("Inserting delay to `t_delay`");
1371 let mut stmt = tx.prepare_cached(
1372 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1373 VALUES \
1374 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1375 )?;
1376 stmt.execute(named_params! {
1377 ":execution_id": execution_id_str,
1378 ":join_set_id": join_set_id.to_string(),
1379 ":delay_id": delay_id.to_string(),
1380 ":expires_at": expires_at,
1381 })?;
1382 }
1383 Ok(appending_version.increment())
1384 }
1385
1386 fn get_combined_state(
1387 tx: &Transaction,
1388 execution_id: &ExecutionId,
1389 ) -> Result<CombinedState, DbErrorRead> {
1390 let mut stmt = tx.prepare(
1391 r"
1392 SELECT
1393 state, ffqn, corresponding_version, pending_expires_finished,
1394 last_lock_version, executor_id, run_id,
1395 join_set_id, join_set_closing,
1396 result_kind
1397 FROM t_state
1398 WHERE
1399 execution_id = :execution_id
1400 ",
1401 )?;
1402 stmt.query_row(
1403 named_params! {
1404 ":execution_id": execution_id.to_string(),
1405 },
1406 |row| {
1407 CombinedState::new(
1408 &CombinedStateDTO {
1409 state: row.get("state")?,
1410 ffqn: row.get("ffqn")?,
1411 pending_expires_finished: row
1412 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1413 last_lock_version: row
1414 .get::<_, Option<VersionType>>("last_lock_version")?
1415 .map(Version::new),
1416 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1417 run_id: row.get::<_, Option<RunId>>("run_id")?,
1418 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1419 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1420 result_kind: row
1421 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1422 "result_kind",
1423 )?
1424 .map(|wrapper| wrapper.0),
1425 },
1426 Version::new(row.get("corresponding_version")?),
1427 )
1428 },
1429 )
1430 .map_err(DbErrorRead::from)
1431 }
1432
1433 fn list_executions(
1434 read_tx: &Transaction,
1435 ffqn: Option<&FunctionFqn>,
1436 top_level_only: bool,
1437 pagination: &ExecutionListPagination,
1438 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1439 struct StatementModifier<'a> {
1440 where_vec: Vec<String>,
1441 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1442 limit: u32,
1443 limit_desc: bool,
1444 }
1445
1446 fn paginate<'a, T: rusqlite::ToSql + 'static>(
1447 pagination: &'a Pagination<Option<T>>,
1448 column: &str,
1449 top_level_only: bool,
1450 ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1451 let mut where_vec: Vec<String> = vec![];
1452 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1453 let limit = pagination.length();
1454 let limit_desc = pagination.is_desc();
1455 let rel = pagination.rel();
1456 match pagination {
1457 Pagination::NewerThan {
1458 cursor: Some(cursor),
1459 ..
1460 }
1461 | Pagination::OlderThan {
1462 cursor: Some(cursor),
1463 ..
1464 } => {
1465 where_vec.push(format!("{column} {rel} :cursor"));
1466 let cursor = cursor.to_sql().map_err(|err| {
1467 error!("Possible program error - cannot convert cursor to sql - {err:?}");
1468 DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1469 })?;
1470 params.push((":cursor", cursor));
1471 }
1472 _ => {}
1473 }
1474 if top_level_only {
1475 where_vec.push("is_top_level=true".to_string());
1476 }
1477 Ok(StatementModifier {
1478 where_vec,
1479 params,
1480 limit,
1481 limit_desc,
1482 })
1483 }
1484
1485 let mut statement_mod = match pagination {
1486 ExecutionListPagination::CreatedBy(pagination) => {
1487 paginate(pagination, "created_at", top_level_only)?
1488 }
1489 ExecutionListPagination::ExecutionId(pagination) => {
1490 paginate(pagination, "execution_id", top_level_only)?
1491 }
1492 };
1493
1494 let ffqn_temporary;
1495 if let Some(ffqn) = ffqn {
1496 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1497 ffqn_temporary = ffqn.to_string();
1498 let ffqn = ffqn_temporary
1499 .to_sql()
1500 .expect("string conversion never fails");
1501
1502 statement_mod.params.push((":ffqn", ffqn));
1503 }
1504
1505 let where_str = if statement_mod.where_vec.is_empty() {
1506 String::new()
1507 } else {
1508 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1509 };
1510 let sql = format!(
1511 r"
1512 SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1513 last_lock_version, executor_id, run_id,
1514 join_set_id, join_set_closing,
1515 result_kind
1516 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1517 ",
1518 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1519 limit = statement_mod.limit,
1520 );
1521 let mut vec: Vec<_> = read_tx
1522 .prepare(&sql)?
1523 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1524 statement_mod
1525 .params
1526 .into_iter()
1527 .collect::<Vec<_>>()
1528 .as_ref(),
1529 |row| {
1530 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1531 let created_at = row.get("created_at")?;
1532 let scheduled_at = row.get("scheduled_at")?;
1533 let combined_state = CombinedState::new(
1534 &CombinedStateDTO {
1535 state: row.get("state")?,
1536 ffqn: row.get("ffqn")?,
1537 pending_expires_finished: row
1538 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1539 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1540
1541 last_lock_version: row
1542 .get::<_, Option<VersionType>>("last_lock_version")?
1543 .map(Version::new),
1544 run_id: row.get::<_, Option<RunId>>("run_id")?,
1545 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1546 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1547 result_kind: row
1548 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1549 "result_kind",
1550 )?
1551 .map(|wrapper| wrapper.0),
1552 },
1553 Version::new(row.get("corresponding_version")?),
1554 )?;
1555 Ok(ExecutionWithState {
1556 execution_id,
1557 ffqn: combined_state.ffqn,
1558 pending_state: combined_state.pending_state,
1559 created_at,
1560 scheduled_at,
1561 })
1562 },
1563 )?
1564 .collect::<Vec<Result<_, _>>>()
1565 .into_iter()
1566 .filter_map(|row| match row {
1567 Ok(row) => Some(row),
1568 Err(err) => {
1569 warn!("Skipping row - {err:?}");
1570 None
1571 }
1572 })
1573 .collect();
1574
1575 if !statement_mod.limit_desc {
1576 vec.reverse();
1578 }
1579 Ok(vec)
1580 }
1581
1582 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1583 #[expect(clippy::too_many_arguments)]
1584 fn lock_single_execution(
1585 tx: &Transaction,
1586 created_at: DateTime<Utc>,
1587 component_id: &ComponentId,
1588 execution_id: &ExecutionId,
1589 run_id: RunId,
1590 appending_version: &Version,
1591 executor_id: ExecutorId,
1592 lock_expires_at: DateTime<Utc>,
1593 retry_config: ComponentRetryConfig,
1594 ) -> Result<LockedExecution, DbErrorWrite> {
1595 debug!("lock_single_execution");
1596 let combined_state = Self::get_combined_state(tx, execution_id)?;
1597 combined_state.pending_state.can_append_lock(
1598 created_at,
1599 executor_id,
1600 run_id,
1601 lock_expires_at,
1602 )?;
1603 let expected_version = combined_state.get_next_version_assert_not_finished();
1604 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1605
1606 let event = ExecutionEventInner::Locked {
1608 component_id: component_id.clone(),
1609 executor_id,
1610 lock_expires_at,
1611 run_id,
1612 retry_config,
1613 };
1614 let event_ser = serde_json::to_string(&event).map_err(|err| {
1615 warn!("Cannot serialize {event:?} - {err:?}");
1616 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1617 })?;
1618 let mut stmt = tx
1619 .prepare_cached(
1620 "INSERT INTO t_execution_log \
1621 (execution_id, created_at, json_value, version, variant) \
1622 VALUES \
1623 (:execution_id, :created_at, :json_value, :version, :variant)",
1624 )
1625 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1626 stmt.execute(named_params! {
1627 ":execution_id": execution_id.to_string(),
1628 ":created_at": created_at,
1629 ":json_value": event_ser,
1630 ":version": appending_version.0,
1631 ":variant": event.variant(),
1632 })
1633 .map_err(|err| {
1634 warn!("Cannot lock execution - {err:?}");
1635 DbErrorWritePermanent::CannotWrite {
1636 reason: "cannot lock execution".into(),
1637 expected_version: None,
1638 }
1639 })?;
1640
1641 let responses = Self::list_responses(tx, execution_id, None)?;
1643 let responses = responses.into_iter().map(|resp| resp.event).collect();
1644 trace!("Responses: {responses:?}");
1645
1646 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1647 tx,
1648 execution_id,
1649 executor_id,
1650 run_id,
1651 lock_expires_at,
1652 appending_version,
1653 retry_config,
1654 )?;
1655 let mut events = tx
1657 .prepare(
1658 "SELECT json_value FROM t_execution_log WHERE \
1659 execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1660 ORDER BY version",
1661 )?
1662 .query_map(
1663 named_params! {
1664 ":execution_id": execution_id.to_string(),
1665 ":v1": DUMMY_CREATED.variant(),
1666 ":v2": DUMMY_HISTORY_EVENT.variant(),
1667 },
1668 |row| {
1669 row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1670 .map(|wrapper| wrapper.0)
1671 .map_err(|serde| {
1672 error!("Cannot deserialize {row:?} - {serde:?}");
1673 consistency_rusqlite("cannot deserialize json value")
1674 })
1675 },
1676 )?
1677 .collect::<Result<Vec<_>, _>>()?
1678 .into_iter()
1679 .collect::<VecDeque<_>>();
1680 let Some(ExecutionEventInner::Created {
1681 ffqn,
1682 params,
1683 parent,
1684 metadata,
1685 ..
1686 }) = events.pop_front()
1687 else {
1688 error!("Execution log must contain at least `Created` event");
1689 return Err(consistency_db_err("execution log must contain `Created` event").into());
1690 };
1691
1692 let event_history = events
1693 .into_iter()
1694 .map(|event| {
1695 if let ExecutionEventInner::HistoryEvent { event } = event {
1696 Ok(event)
1697 } else {
1698 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1699 Err(consistency_db_err(
1700 "rows can only contain `Created` and `HistoryEvent` event kinds",
1701 ))
1702 }
1703 })
1704 .collect::<Result<Vec<_>, _>>()?;
1705 Ok(LockedExecution {
1706 execution_id: execution_id.clone(),
1707 metadata,
1708 run_id,
1709 next_version: appending_version.increment(),
1710 ffqn,
1711 params,
1712 event_history,
1713 responses,
1714 parent,
1715 intermittent_event_count,
1716 })
1717 }
1718
1719 fn count_join_next(
1720 tx: &Transaction,
1721 execution_id: &ExecutionId,
1722 join_set_id: &JoinSetId,
1723 ) -> Result<u64, DbErrorRead> {
1724 let mut stmt = tx.prepare(
1725 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1726 AND history_event_type = :join_next",
1727 )?;
1728 Ok(stmt.query_row(
1729 named_params! {
1730 ":execution_id": execution_id.to_string(),
1731 ":join_set_id": join_set_id.to_string(),
1732 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1733 },
1734 |row| row.get("count"),
1735 )?)
1736 }
1737
1738 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1739 #[expect(clippy::needless_return)]
1740 fn append(
1741 tx: &Transaction,
1742 execution_id: &ExecutionId,
1743 req: AppendRequest,
1744 appending_version: Version,
1745 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1746 if matches!(req.event, ExecutionEventInner::Created { .. }) {
1747 return Err(DbErrorWrite::Permanent(
1748 DbErrorWritePermanent::ValidationFailed(
1749 "cannot append `Created` event - use `create` instead".into(),
1750 ),
1751 ));
1752 }
1753 if let AppendRequest {
1754 event:
1755 ExecutionEventInner::Locked {
1756 component_id,
1757 executor_id,
1758 run_id,
1759 lock_expires_at,
1760 retry_config,
1761 },
1762 created_at,
1763 } = req
1764 {
1765 return Self::lock_single_execution(
1766 tx,
1767 created_at,
1768 &component_id,
1769 execution_id,
1770 run_id,
1771 &appending_version,
1772 executor_id,
1773 lock_expires_at,
1774 retry_config,
1775 )
1776 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1777 }
1778
1779 let combined_state = Self::get_combined_state(tx, execution_id)?;
1780 if combined_state.pending_state.is_finished() {
1781 debug!("Execution is already finished");
1782 return Err(DbErrorWrite::Permanent(
1783 DbErrorWritePermanent::CannotWrite {
1784 reason: "already finished".into(),
1785 expected_version: None,
1786 },
1787 ));
1788 }
1789
1790 Self::check_expected_next_and_appending_version(
1791 &combined_state.get_next_version_assert_not_finished(),
1792 &appending_version,
1793 )?;
1794 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1795 error!("Cannot serialize {:?} - {err:?}", req.event);
1796 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1797 })?;
1798
1799 let mut stmt = tx.prepare(
1800 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1801 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1802 ?;
1803 stmt.execute(named_params! {
1804 ":execution_id": execution_id.to_string(),
1805 ":created_at": req.created_at,
1806 ":json_value": event_ser,
1807 ":version": appending_version.0,
1808 ":variant": req.event.variant(),
1809 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1810 })?;
1811 match &req.event {
1814 ExecutionEventInner::Created { .. } => {
1815 unreachable!("handled in the caller")
1816 }
1817
1818 ExecutionEventInner::Locked { .. } => {
1819 unreachable!("handled above")
1820 }
1821
1822 ExecutionEventInner::TemporarilyFailed {
1823 backoff_expires_at, ..
1824 }
1825 | ExecutionEventInner::TemporarilyTimedOut {
1826 backoff_expires_at, ..
1827 } => {
1828 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1829 tx,
1830 execution_id,
1831 &appending_version,
1832 *backoff_expires_at,
1833 true, )?;
1835 return Ok((next_version, notifier));
1836 }
1837
1838 ExecutionEventInner::Unlocked {
1839 backoff_expires_at, ..
1840 } => {
1841 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1842 tx,
1843 execution_id,
1844 &appending_version,
1845 *backoff_expires_at,
1846 false, )?;
1848 return Ok((next_version, notifier));
1849 }
1850
1851 ExecutionEventInner::Finished { result, .. } => {
1852 Self::update_state_finished(
1853 tx,
1854 execution_id,
1855 &appending_version,
1856 req.created_at,
1857 PendingStateFinishedResultKind::from(result),
1858 )?;
1859 return Ok((
1860 appending_version,
1861 AppendNotifier {
1862 pending_at: None,
1863 execution_finished: Some(NotifierExecutionFinished {
1864 execution_id: execution_id.clone(),
1865 retval: result.clone(),
1866 }),
1867 response: None,
1868 },
1869 ));
1870 }
1871
1872 ExecutionEventInner::HistoryEvent {
1873 event:
1874 HistoryEvent::JoinSetCreate { .. }
1875 | HistoryEvent::JoinSetRequest {
1876 request: JoinSetRequest::ChildExecutionRequest { .. },
1877 ..
1878 }
1879 | HistoryEvent::Persist { .. }
1880 | HistoryEvent::Schedule { .. }
1881 | HistoryEvent::Stub { .. }
1882 | HistoryEvent::JoinNextTooMany { .. },
1883 } => {
1884 return Ok((
1885 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1886 AppendNotifier::default(),
1887 ));
1888 }
1889
1890 ExecutionEventInner::HistoryEvent {
1891 event:
1892 HistoryEvent::JoinSetRequest {
1893 join_set_id,
1894 request:
1895 JoinSetRequest::DelayRequest {
1896 delay_id,
1897 expires_at,
1898 ..
1899 },
1900 },
1901 } => {
1902 return Ok((
1903 Self::bump_state_next_version(
1904 tx,
1905 execution_id,
1906 &appending_version,
1907 Some(DelayReq {
1908 join_set_id: join_set_id.clone(),
1909 delay_id: delay_id.clone(),
1910 expires_at: *expires_at,
1911 }),
1912 )?,
1913 AppendNotifier::default(),
1914 ));
1915 }
1916
1917 ExecutionEventInner::HistoryEvent {
1918 event:
1919 HistoryEvent::JoinNext {
1920 join_set_id,
1921 run_expires_at,
1922 closing,
1923 requested_ffqn: _,
1924 },
1925 } => {
1926 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1928 let nth_response =
1929 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1931 assert!(join_next_count > 0);
1932 if let Some(ResponseWithCursor {
1933 event:
1934 JoinSetResponseEventOuter {
1935 created_at: nth_created_at,
1936 ..
1937 },
1938 cursor: _,
1939 }) = nth_response
1940 {
1941 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1943 tx,
1944 execution_id,
1945 &appending_version,
1946 scheduled_at,
1947 false, )?;
1949 return Ok((next_version, notifier));
1950 }
1951 return Ok((
1952 Self::update_state_blocked(
1953 tx,
1954 execution_id,
1955 &appending_version,
1956 join_set_id,
1957 *run_expires_at,
1958 *closing,
1959 )?,
1960 AppendNotifier::default(),
1961 ));
1962 }
1963 }
1964 }
1965
1966 fn append_response(
1967 tx: &Transaction,
1968 execution_id: &ExecutionId,
1969 response_outer: JoinSetResponseEventOuter,
1970 ) -> Result<AppendNotifier, DbErrorWrite> {
1971 let mut stmt = tx.prepare(
1972 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1973 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1974 )?;
1975 let join_set_id = &response_outer.event.join_set_id;
1976 let delay_id = match &response_outer.event.event {
1977 JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1978 JoinSetResponse::ChildExecutionFinished { .. } => None,
1979 };
1980 let (child_execution_id, finished_version) = match &response_outer.event.event {
1981 JoinSetResponse::ChildExecutionFinished {
1982 child_execution_id,
1983 finished_version,
1984 result: _,
1985 } => (
1986 Some(child_execution_id.to_string()),
1987 Some(finished_version.0),
1988 ),
1989 JoinSetResponse::DelayFinished { .. } => (None, None),
1990 };
1991
1992 stmt.execute(named_params! {
1993 ":execution_id": execution_id.to_string(),
1994 ":created_at": response_outer.created_at,
1995 ":join_set_id": join_set_id.to_string(),
1996 ":delay_id": delay_id,
1997 ":child_execution_id": child_execution_id,
1998 ":finished_version": finished_version,
1999 })?;
2000
2001 let combined_state = Self::get_combined_state(tx, execution_id)?;
2003 debug!("previous_pending_state: {combined_state:?}");
2004 let mut notifier = if let PendingState::BlockedByJoinSet {
2005 join_set_id: found_join_set_id,
2006 lock_expires_at, closing: _,
2008 } = combined_state.pending_state
2009 && *join_set_id == found_join_set_id
2010 {
2011 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2014 Self::update_state_pending_after_response_appended(
2017 tx,
2018 execution_id,
2019 scheduled_at,
2020 &combined_state.corresponding_version, )?
2022 } else {
2023 AppendNotifier::default()
2024 };
2025 if let JoinSetResponseEvent {
2026 join_set_id,
2027 event: JoinSetResponse::DelayFinished { delay_id },
2028 } = &response_outer.event
2029 {
2030 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2031 let mut stmt =
2032 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2033 ?;
2034 stmt.execute(named_params! {
2035 ":execution_id": execution_id.to_string(),
2036 ":join_set_id": join_set_id.to_string(),
2037 ":delay_id": delay_id.to_string(),
2038 })?;
2039 }
2040 notifier.response = Some((execution_id.clone(), response_outer));
2041 Ok(notifier)
2042 }
2043
2044 fn append_backtrace(
2045 tx: &Transaction,
2046 backtrace_info: &BacktraceInfo,
2047 ) -> Result<(), DbErrorWrite> {
2048 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2049 warn!(
2050 "Cannot serialize backtrace {:?} - {err:?}",
2051 backtrace_info.wasm_backtrace
2052 );
2053 DbErrorWritePermanent::ValidationFailed("cannot serialize backtrace".into())
2054 })?;
2055 let mut stmt = tx
2056 .prepare(
2057 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2058 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2059 )
2060 ?;
2061 stmt.execute(named_params! {
2062 ":execution_id": backtrace_info.execution_id.to_string(),
2063 ":component_id": backtrace_info.component_id.to_string(),
2064 ":version_min_including": backtrace_info.version_min_including.0,
2065 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2066 ":wasm_backtrace": backtrace,
2067 })?;
2068 Ok(())
2069 }
2070
2071 #[cfg(feature = "test")]
2072 fn get(
2073 tx: &Transaction,
2074 execution_id: &ExecutionId,
2075 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2076 let mut stmt = tx.prepare(
2077 "SELECT created_at, json_value FROM t_execution_log WHERE \
2078 execution_id = :execution_id ORDER BY version",
2079 )?;
2080 let events = stmt
2081 .query_map(
2082 named_params! {
2083 ":execution_id": execution_id.to_string(),
2084 },
2085 |row| {
2086 let created_at = row.get("created_at")?;
2087 let event = row
2088 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2089 .map_err(|serde| {
2090 error!("Cannot deserialize {row:?} - {serde:?}");
2091 consistency_rusqlite("cannot deserialize event")
2092 })?
2093 .0;
2094
2095 Ok(ExecutionEvent {
2096 created_at,
2097 event,
2098 backtrace_id: None,
2099 })
2100 },
2101 )?
2102 .collect::<Result<Vec<_>, _>>()?;
2103 if events.is_empty() {
2104 return Err(DbErrorRead::NotFound);
2105 }
2106 let combined_state = Self::get_combined_state(tx, execution_id)?;
2107 let responses = Self::list_responses(tx, execution_id, None)?
2108 .into_iter()
2109 .map(|resp| resp.event)
2110 .collect();
2111 Ok(concepts::storage::ExecutionLog {
2112 execution_id: execution_id.clone(),
2113 events,
2114 responses,
2115 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2117 })
2118 }
2119
2120 fn list_execution_events(
2121 tx: &Transaction,
2122 execution_id: &ExecutionId,
2123 version_min: VersionType,
2124 version_max_excluding: VersionType,
2125 include_backtrace_id: bool,
2126 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2127 let select = if include_backtrace_id {
2128 "SELECT
2129 log.created_at,
2130 log.json_value,
2131 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2132 bt.version_min_including AS backtrace_id
2133 FROM
2134 t_execution_log AS log
2135 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2136 t_backtrace AS bt ON log.execution_id = bt.execution_id
2137 -- Check if the log's version falls within the backtrace's range
2138 AND log.version >= bt.version_min_including
2139 AND log.version < bt.version_max_excluding
2140 WHERE
2141 log.execution_id = :execution_id
2142 AND log.version >= :version_min
2143 AND log.version < :version_max_excluding
2144 ORDER BY
2145 log.version;"
2146 } else {
2147 "SELECT
2148 created_at, json_value, NULL as backtrace_id
2149 FROM t_execution_log WHERE
2150 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2151 ORDER BY version"
2152 };
2153 tx.prepare(select)?
2154 .query_map(
2155 named_params! {
2156 ":execution_id": execution_id.to_string(),
2157 ":version_min": version_min,
2158 ":version_max_excluding": version_max_excluding
2159 },
2160 |row| {
2161 let created_at = row.get("created_at")?;
2162 let backtrace_id = row
2163 .get::<_, Option<VersionType>>("backtrace_id")?
2164 .map(Version::new);
2165 let event = row
2166 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2167 .map(|event| ExecutionEvent {
2168 created_at,
2169 event: event.0,
2170 backtrace_id,
2171 })
2172 .map_err(|serde| {
2173 error!("Cannot deserialize {row:?} - {serde:?}");
2174 consistency_rusqlite("cannot deserialize")
2175 })?;
2176 Ok(event)
2177 },
2178 )?
2179 .collect::<Result<Vec<_>, _>>()
2180 .map_err(DbErrorRead::from)
2181 }
2182
2183 fn get_execution_event(
2184 tx: &Transaction,
2185 execution_id: &ExecutionId,
2186 version: VersionType,
2187 ) -> Result<ExecutionEvent, DbErrorRead> {
2188 let mut stmt = tx.prepare(
2189 "SELECT created_at, json_value FROM t_execution_log WHERE \
2190 execution_id = :execution_id AND version = :version",
2191 )?;
2192 stmt.query_row(
2193 named_params! {
2194 ":execution_id": execution_id.to_string(),
2195 ":version": version,
2196 },
2197 |row| {
2198 let created_at = row.get("created_at")?;
2199 let event = row
2200 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2201 .map_err(|serde| {
2202 error!("Cannot deserialize {row:?} - {serde:?}");
2203 consistency_rusqlite("cannot deserialize event")
2204 })?;
2205
2206 Ok(ExecutionEvent {
2207 created_at,
2208 event: event.0,
2209 backtrace_id: None,
2210 })
2211 },
2212 )
2213 .map_err(DbErrorRead::from)
2214 }
2215
2216 fn list_responses(
2217 tx: &Transaction,
2218 execution_id: &ExecutionId,
2219 pagination: Option<Pagination<u32>>,
2220 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2221 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2223 let mut sql = "SELECT \
2224 r.id, r.created_at, r.join_set_id, r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2225 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2226 WHERE \
2227 r.execution_id = :execution_id \
2228 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2229 "
2230 .to_string();
2231 let limit = match &pagination {
2232 Some(
2233 pagination @ (Pagination::NewerThan { cursor, .. }
2234 | Pagination::OlderThan { cursor, .. }),
2235 ) => {
2236 params.push((":cursor", Box::new(cursor)));
2237 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2238 Some(pagination.length())
2239 }
2240 None => None,
2241 };
2242 sql.push_str(" ORDER BY id");
2243 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2244 sql.push_str(" DESC");
2245 }
2246 if let Some(limit) = limit {
2247 write!(sql, " LIMIT {limit}").unwrap();
2248 }
2249 params.push((":execution_id", Box::new(execution_id.to_string())));
2250 tx.prepare(&sql)?
2251 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2252 params
2253 .iter()
2254 .map(|(key, value)| (*key, value.as_ref()))
2255 .collect::<Vec<_>>()
2256 .as_ref(),
2257 Self::parse_response_with_cursor,
2258 )?
2259 .collect::<Result<Vec<_>, rusqlite::Error>>()
2260 .map_err(DbErrorRead::from)
2261 }
2262
2263 fn parse_response_with_cursor(
2264 row: &rusqlite::Row<'_>,
2265 ) -> Result<ResponseWithCursor, rusqlite::Error> {
2266 let id = row.get("id")?;
2267 let created_at: DateTime<Utc> = row.get("created_at")?;
2268 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2269 let event = match (
2270 row.get::<_, Option<DelayId>>("delay_id")?,
2271 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2272 row.get::<_, Option<VersionType>>("finished_version")?,
2273 row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2274 ) {
2275 (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2276 (
2277 None,
2278 Some(child_execution_id),
2279 Some(finished_version),
2280 Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2281 ) => JoinSetResponse::ChildExecutionFinished {
2282 child_execution_id,
2283 finished_version: Version(finished_version),
2284 result,
2285 },
2286 (delay, child, finished, result) => {
2287 error!(
2288 "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2289 delay,
2290 result.map(|it| it.0)
2291 );
2292 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2293 }
2294 };
2295 Ok(ResponseWithCursor {
2296 cursor: id,
2297 event: JoinSetResponseEventOuter {
2298 event: JoinSetResponseEvent { join_set_id, event },
2299 created_at,
2300 },
2301 })
2302 }
2303
2304 fn nth_response(
2305 tx: &Transaction,
2306 execution_id: &ExecutionId,
2307 join_set_id: &JoinSetId,
2308 skip_rows: u64,
2309 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2310 tx
2312 .prepare(
2313 "SELECT r.id, r.created_at, r.join_set_id, \
2314 r.delay_id, \
2315 r.child_execution_id, r.finished_version, l.json_value \
2316 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2317 WHERE \
2318 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2319 (
2320 r.finished_version = l.version \
2321 OR \
2322 r.child_execution_id IS NULL \
2323 ) \
2324 ORDER BY id \
2325 LIMIT 1 OFFSET :offset",
2326 )
2327 ?
2328 .query_row(
2329 named_params! {
2330 ":execution_id": execution_id.to_string(),
2331 ":join_set_id": join_set_id.to_string(),
2332 ":offset": skip_rows,
2333 },
2334 Self::parse_response_with_cursor,
2335 )
2336 .optional()
2337 .map_err(DbErrorRead::from)
2338 }
2339
2340 #[instrument(level = Level::TRACE, skip_all)]
2342 fn get_responses_with_offset(
2343 tx: &Transaction,
2344 execution_id: &ExecutionId,
2345 skip_rows: usize,
2346 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2347 tx.prepare(
2349 "SELECT r.id, r.created_at, r.join_set_id, \
2350 r.delay_id, \
2351 r.child_execution_id, r.finished_version, l.json_value \
2352 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2353 WHERE \
2354 r.execution_id = :execution_id AND \
2355 ( \
2356 r.finished_version = l.version \
2357 OR r.child_execution_id IS NULL \
2358 ) \
2359 ORDER BY id \
2360 LIMIT -1 OFFSET :offset",
2361 )
2362 ?
2363 .query_map(
2364 named_params! {
2365 ":execution_id": execution_id.to_string(),
2366 ":offset": skip_rows,
2367 },
2368 Self::parse_response_with_cursor,
2369 )
2370 ?
2371 .collect::<Result<Vec<_>, _>>()
2372 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2373 .map_err(DbErrorRead::from)
2374 }
2375
2376 fn get_pending_of_single_ffqn(
2377 mut stmt: CachedStatement,
2378 batch_size: usize,
2379 pending_at_or_sooner: DateTime<Utc>,
2380 ffqn: &FunctionFqn,
2381 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2382 stmt.query_map(
2383 named_params! {
2384 ":pending_expires_finished": pending_at_or_sooner,
2385 ":ffqn": ffqn.to_string(),
2386 ":batch_size": batch_size,
2387 },
2388 |row| {
2389 let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2390 let next_version =
2391 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2392 Ok(execution_id.map(|exe| (exe, next_version)))
2393 },
2394 )
2395 .map_err(|err| {
2396 warn!("Ignoring consistency error {err:?}");
2397 })?
2398 .collect::<Result<Vec<_>, _>>()
2399 .map_err(|err| {
2400 warn!("Ignoring consistency error {err:?}");
2401 })?
2402 .into_iter()
2403 .collect::<Result<Vec<_>, _>>()
2404 .map_err(|err| {
2405 warn!("Ignoring consistency error {err:?}");
2406 })
2407 }
2408
2409 fn get_pending(
2411 conn: &Connection,
2412 batch_size: usize,
2413 pending_at_or_sooner: DateTime<Utc>,
2414 ffqns: &[FunctionFqn],
2415 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2416 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2417 for ffqn in ffqns {
2418 let stmt = conn
2420 .prepare_cached(&format!(
2421 r#"
2422 SELECT execution_id, corresponding_version FROM t_state WHERE
2423 state = "{STATE_PENDING_AT}" AND
2424 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2425 ORDER BY pending_expires_finished LIMIT :batch_size
2426 "#
2427 ))
2428 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2429
2430 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2431 stmt,
2432 batch_size - execution_ids_versions.len(),
2433 pending_at_or_sooner,
2434 ffqn,
2435 ) {
2436 execution_ids_versions.extend(execs_and_versions);
2437 if execution_ids_versions.len() == batch_size {
2438 break;
2440 }
2441 }
2443 }
2444 Ok(execution_ids_versions)
2445 }
2446
2447 #[instrument(level = Level::TRACE, skip_all)]
2449 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2450 let (pending_ats, finished_execs, responses) = {
2451 let (mut pending_ats, mut finished_execs, mut responses) =
2452 (Vec::new(), Vec::new(), Vec::new());
2453 for notifier in notifiers {
2454 if let Some(pending_at) = notifier.pending_at {
2455 pending_ats.push(pending_at);
2456 }
2457 if let Some(finished) = notifier.execution_finished {
2458 finished_execs.push(finished);
2459 }
2460 if let Some(response) = notifier.response {
2461 responses.push(response);
2462 }
2463 }
2464 (pending_ats, finished_execs, responses)
2465 };
2466
2467 if !pending_ats.is_empty() {
2469 let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2470 for pending_at in pending_ats {
2471 Self::notify_pending_locked(&pending_at, current_time, &guard);
2472 }
2473 }
2474 if !finished_execs.is_empty() {
2477 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2478 for finished in finished_execs {
2479 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2480 for (_tag, sender) in listeners_of_exe_id {
2481 let _ = sender.send(finished.retval.clone());
2484 }
2485 }
2486 }
2487 }
2488 if !responses.is_empty() {
2490 let mut guard = self.0.response_subscribers.lock().unwrap();
2491 for (execution_id, response) in responses {
2492 if let Some((sender, _)) = guard.remove(&execution_id) {
2493 let _ = sender.send(response);
2494 }
2495 }
2496 }
2497 }
2498
2499 fn notify_pending_locked(
2500 notifier: &NotifierPendingAt,
2501 current_time: DateTime<Utc>,
2502 ffqn_to_pending_subscription: &std::sync::MutexGuard<
2503 HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2504 >,
2505 ) {
2506 if notifier.scheduled_at <= current_time
2508 && let Some((subscription, _)) = ffqn_to_pending_subscription.get(¬ifier.ffqn)
2509 {
2510 debug!("Notifying pending subscriber");
2511 let _ = subscription.try_send(());
2513 }
2514 }
2515}
2516
2517#[async_trait]
2518impl DbExecutor for SqlitePool {
2519 #[instrument(level = Level::TRACE, skip(self))]
2520 async fn lock_pending(
2521 &self,
2522 batch_size: usize,
2523 pending_at_or_sooner: DateTime<Utc>,
2524 ffqns: Arc<[FunctionFqn]>,
2525 created_at: DateTime<Utc>,
2526 component_id: ComponentId,
2527 executor_id: ExecutorId,
2528 lock_expires_at: DateTime<Utc>,
2529 run_id: RunId,
2530 retry_config: ComponentRetryConfig,
2531 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2532 let execution_ids_versions = self
2533 .transaction(
2534 move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2535 "get_pending",
2536 )
2537 .await?;
2538 if execution_ids_versions.is_empty() {
2539 Ok(vec![])
2540 } else {
2541 debug!("Locking {execution_ids_versions:?}");
2542 self.transaction(
2543 move |tx| {
2544 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2545 for (execution_id, version) in &execution_ids_versions {
2547 match Self::lock_single_execution(
2548 tx,
2549 created_at,
2550 &component_id,
2551 execution_id,
2552 run_id,
2553 version,
2554 executor_id,
2555 lock_expires_at,
2556 retry_config,
2557 ) {
2558 Ok(locked) => locked_execs.push(locked),
2559 Err(err) => {
2560 warn!("Locking row {execution_id} failed - {err:?}");
2561 }
2562 }
2563 }
2564 Ok(locked_execs)
2565 },
2566 "lock_pending",
2567 )
2568 .await
2569 }
2570 }
2571
2572 #[instrument(level = Level::DEBUG, skip(self))]
2573 async fn lock_one(
2574 &self,
2575 created_at: DateTime<Utc>,
2576 component_id: ComponentId,
2577 execution_id: &ExecutionId,
2578 run_id: RunId,
2579 version: Version,
2580 executor_id: ExecutorId,
2581 lock_expires_at: DateTime<Utc>,
2582 retry_config: ComponentRetryConfig,
2583 ) -> Result<LockedExecution, DbErrorWrite> {
2584 debug!(%execution_id, "lock_one");
2585 let execution_id = execution_id.clone();
2586 self.transaction(
2587 move |tx| {
2588 Self::lock_single_execution(
2589 tx,
2590 created_at,
2591 &component_id,
2592 &execution_id,
2593 run_id,
2594 &version,
2595 executor_id,
2596 lock_expires_at,
2597 retry_config,
2598 )
2599 },
2600 "lock_inner",
2601 )
2602 .await
2603 }
2604
2605 #[instrument(level = Level::DEBUG, skip(self, req))]
2606 async fn append(
2607 &self,
2608 execution_id: ExecutionId,
2609 version: Version,
2610 req: AppendRequest,
2611 ) -> Result<AppendResponse, DbErrorWrite> {
2612 debug!(%req, "append");
2613 trace!(?req, "append");
2614 let created_at = req.created_at;
2615 let (version, notifier) = self
2616 .transaction(
2617 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2618 "append",
2619 )
2620 .await?;
2621 self.notify_all(vec![notifier], created_at);
2622 Ok(version)
2623 }
2624
2625 #[instrument(level = Level::DEBUG, skip_all)]
2626 async fn append_batch_respond_to_parent(
2627 &self,
2628 events: AppendEventsToExecution,
2629 response: AppendResponseToExecution,
2630 current_time: DateTime<Utc>,
2631 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2632 debug!("append_batch_respond_to_parent");
2633 if events.execution_id == response.parent_execution_id {
2634 return Err(DbErrorWrite::Permanent(
2637 DbErrorWritePermanent::ValidationFailed(
2638 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2639 ),
2640 ));
2641 }
2642 if events.batch.is_empty() {
2643 error!("Batch cannot be empty");
2644 return Err(DbErrorWrite::Permanent(
2645 DbErrorWritePermanent::ValidationFailed("batch cannot be empty".into()),
2646 ));
2647 }
2648 let (version, notifiers) = {
2649 self.transaction(
2650 move |tx| {
2651 let mut version = events.version.clone();
2652 let mut notifier_of_child = None;
2653 for append_request in &events.batch {
2654 let (v, n) = Self::append(
2655 tx,
2656 &events.execution_id,
2657 append_request.clone(),
2658 version,
2659 )?;
2660 version = v;
2661 notifier_of_child = Some(n);
2662 }
2663
2664 let pending_at_parent = Self::append_response(
2665 tx,
2666 &response.parent_execution_id,
2667 response.parent_response_event.clone(),
2668 )?;
2669 Ok::<_, DbErrorWrite>((
2670 version,
2671 vec![
2672 notifier_of_child.expect("checked that the batch is not empty"),
2673 pending_at_parent,
2674 ],
2675 ))
2676 },
2677 "append_batch_respond_to_parent",
2678 )
2679 .await?
2680 };
2681 self.notify_all(notifiers, current_time);
2682 Ok(version)
2683 }
2684
2685 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2688 async fn wait_for_pending(
2689 &self,
2690 pending_at_or_sooner: DateTime<Utc>,
2691 ffqns: Arc<[FunctionFqn]>,
2692 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2693 ) {
2694 let unique_tag: u64 = rand::random();
2695 let (sender, mut receiver) = mpsc::channel(1); {
2697 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2698 for ffqn in ffqns.as_ref() {
2699 ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2700 }
2701 }
2702 async {
2703 let Ok(execution_ids_versions) = self
2704 .transaction(
2705 {
2706 let ffqns = ffqns.clone();
2707 move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2708 },
2709 "subscribe_to_pending",
2710 )
2711 .await
2712 else {
2713 trace!(
2714 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2715 );
2716 timeout_fut.await;
2717 return;
2718 };
2719 if !execution_ids_versions.is_empty() {
2720 trace!("Not waiting, database already contains new pending executions");
2721 return;
2722 }
2723 tokio::select! { _ = receiver.recv() => {
2725 trace!("Received a notification");
2726 }
2727 () = timeout_fut => {
2728 }
2729 }
2730 }.await;
2731 {
2733 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2734 for ffqn in ffqns.as_ref() {
2735 match ffqn_to_pending_subscription.remove(ffqn) {
2736 Some((_, tag)) if tag == unique_tag => {
2737 }
2739 Some(other) => {
2740 ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2742 }
2743 None => {
2744 }
2746 }
2747 }
2748 }
2749 }
2750}
2751
2752#[async_trait]
2753impl DbConnection for SqlitePool {
2754 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2755 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2756 debug!("create");
2757 trace!(?req, "create");
2758 let created_at = req.created_at;
2759 let (version, notifier) = self
2760 .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2761 .await?;
2762 self.notify_all(vec![notifier], created_at);
2763 Ok(version)
2764 }
2765
2766 #[instrument(level = Level::DEBUG, skip(self, batch))]
2767 async fn append_batch(
2768 &self,
2769 current_time: DateTime<Utc>,
2770 batch: Vec<AppendRequest>,
2771 execution_id: ExecutionId,
2772 version: Version,
2773 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2774 debug!("append_batch");
2775 trace!(?batch, "append_batch");
2776 assert!(!batch.is_empty(), "Empty batch request");
2777
2778 let (version, notifier) = self
2779 .transaction(
2780 move |tx| {
2781 let mut version = version.clone();
2782 let mut notifier = None;
2783 for append_request in &batch {
2784 let (v, n) =
2785 Self::append(tx, &execution_id, append_request.clone(), version)?;
2786 version = v;
2787 notifier = Some(n);
2788 }
2789 Ok::<_, DbErrorWrite>((
2790 version,
2791 notifier.expect("checked that the batch is not empty"),
2792 ))
2793 },
2794 "append_batch",
2795 )
2796 .await?;
2797
2798 self.notify_all(vec![notifier], current_time);
2799 Ok(version)
2800 }
2801
2802 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2803 async fn append_batch_create_new_execution(
2804 &self,
2805 current_time: DateTime<Utc>,
2806 batch: Vec<AppendRequest>,
2807 execution_id: ExecutionId,
2808 version: Version,
2809 child_req: Vec<CreateRequest>,
2810 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2811 debug!("append_batch_create_new_execution");
2812 trace!(?batch, ?child_req, "append_batch_create_new_execution");
2813 assert!(!batch.is_empty(), "Empty batch request");
2814
2815 let (version, notifiers) = self
2816 .transaction(
2817 move |tx| {
2818 let mut notifier = None;
2819 let mut version = version.clone();
2820 for append_request in &batch {
2821 let (v, n) =
2822 Self::append(tx, &execution_id, append_request.clone(), version)?;
2823 version = v;
2824 notifier = Some(n);
2825 }
2826 let mut notifiers = Vec::new();
2827 notifiers.push(notifier.expect("checked that the batch is not empty"));
2828
2829 for child_req in &child_req {
2830 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2831 notifiers.push(notifier);
2832 }
2833 Ok::<_, DbErrorWrite>((version, notifiers))
2834 },
2835 "append_batch_create_new_execution_inner",
2836 )
2837 .await?;
2838 self.notify_all(notifiers, current_time);
2839 Ok(version)
2840 }
2841
2842 #[cfg(feature = "test")]
2843 #[instrument(level = Level::DEBUG, skip(self))]
2844 async fn get(
2845 &self,
2846 execution_id: &ExecutionId,
2847 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2848 trace!("get");
2849 let execution_id = execution_id.clone();
2850 self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2851 .await
2852 }
2853
2854 #[instrument(level = Level::DEBUG, skip(self))]
2855 async fn list_execution_events(
2856 &self,
2857 execution_id: &ExecutionId,
2858 since: &Version,
2859 max_length: VersionType,
2860 include_backtrace_id: bool,
2861 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2862 let execution_id = execution_id.clone();
2863 let since = since.0;
2864 self.transaction(
2865 move |tx| {
2866 Self::list_execution_events(
2867 tx,
2868 &execution_id,
2869 since,
2870 since + max_length,
2871 include_backtrace_id,
2872 )
2873 },
2874 "get",
2875 )
2876 .await
2877 }
2878
2879 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2883 async fn subscribe_to_next_responses(
2884 &self,
2885 execution_id: &ExecutionId,
2886 start_idx: usize,
2887 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2888 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2889 debug!("next_responses");
2890 let unique_tag: u64 = rand::random();
2891 let execution_id = execution_id.clone();
2892
2893 let cleanup = || {
2894 let mut guard = self.0.response_subscribers.lock().unwrap();
2895 match guard.remove(&execution_id) {
2896 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
2898 guard.insert(execution_id.clone(), other);
2900 }
2901 None => {} }
2903 };
2904
2905 let response_subscribers = self.0.response_subscribers.clone();
2906 let resp_or_receiver = {
2907 let execution_id = execution_id.clone();
2908 self.transaction(
2909 move |tx| {
2910 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2911 if responses.is_empty() {
2912 let (sender, receiver) = oneshot::channel();
2914 response_subscribers
2915 .lock()
2916 .unwrap()
2917 .insert(execution_id.clone(), (sender, unique_tag));
2918 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2919 } else {
2920 Ok(itertools::Either::Left(responses))
2921 }
2922 },
2923 "subscribe_to_next_responses",
2924 )
2925 .await
2926 }
2927 .inspect_err(|_| {
2928 cleanup();
2929 })?;
2930 match resp_or_receiver {
2931 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
2933 let res = async move {
2934 tokio::select! {
2935 resp = receiver => {
2936 let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2937 Ok(vec![resp])
2938 }
2939 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2940 }
2941 }
2942 .await;
2943 cleanup();
2944 res
2945 }
2946 }
2947 }
2948
2949 async fn wait_for_finished_result(
2951 &self,
2952 execution_id: &ExecutionId,
2953 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2954 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2955 let unique_tag: u64 = rand::random();
2956 let execution_id = execution_id.clone();
2957 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2958
2959 let cleanup = || {
2960 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2961 if let Some(subscribers) = guard.get_mut(&execution_id) {
2962 subscribers.remove(&unique_tag);
2963 }
2964 };
2965
2966 let resp_or_receiver = {
2967 let execution_id = execution_id.clone();
2968 self.transaction(move |tx| {
2969 let pending_state =
2970 Self::get_combined_state(tx, &execution_id)?.pending_state;
2971 if let PendingState::Finished { finished } = pending_state {
2972 let event =
2973 Self::get_execution_event(tx, &execution_id, finished.version)?;
2974 if let ExecutionEventInner::Finished { result, ..} = event.event {
2975 Ok(itertools::Either::Left(result))
2976 } else {
2977 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2978 Err(DbErrorReadWithTimeout::from(consistency_db_err(
2979 "cannot get finished event based on t_state version"
2980 )))
2981 }
2982 } else {
2983 let (sender, receiver) = oneshot::channel();
2987 let mut guard = execution_finished_subscription.lock().unwrap();
2988 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
2989 Ok(itertools::Either::Right(receiver))
2990 }
2991 }, "wait_for_finished_result")
2992 .await
2993 }
2994 .inspect_err(|_| {
2995 cleanup();
2999 })?;
3000
3001 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3002 match resp_or_receiver {
3003 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3005 let res = async move {
3006 tokio::select! {
3007 resp = receiver => {
3008 Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3009 }
3010 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3011 }
3012 }
3013 .await;
3014 cleanup();
3015 res
3016 }
3017 }
3018 }
3019
3020 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3021 async fn append_response(
3022 &self,
3023 created_at: DateTime<Utc>,
3024 execution_id: ExecutionId,
3025 response_event: JoinSetResponseEvent,
3026 ) -> Result<(), DbErrorWrite> {
3027 debug!("append_response");
3028 let event = JoinSetResponseEventOuter {
3029 created_at,
3030 event: response_event,
3031 };
3032 let notifier = self
3033 .transaction(
3034 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3035 "append_response",
3036 )
3037 .await?;
3038 self.notify_all(vec![notifier], created_at);
3039 Ok(())
3040 }
3041
3042 #[instrument(level = Level::DEBUG, skip_all)]
3043 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3044 debug!("append_backtrace");
3045 self.transaction(
3046 move |tx| Self::append_backtrace(tx, &append),
3047 "append_backtrace",
3048 )
3049 .await
3050 }
3051
3052 #[instrument(level = Level::DEBUG, skip_all)]
3053 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3054 debug!("append_backtrace_batch");
3055 self.transaction(
3056 move |tx| {
3057 for append in &batch {
3058 Self::append_backtrace(tx, append)?;
3059 }
3060 Ok(())
3061 },
3062 "append_backtrace",
3063 )
3064 .await
3065 }
3066
3067 #[instrument(level = Level::DEBUG, skip_all)]
3068 async fn get_backtrace(
3069 &self,
3070 execution_id: &ExecutionId,
3071 filter: BacktraceFilter,
3072 ) -> Result<BacktraceInfo, DbErrorRead> {
3073 debug!("get_last_backtrace");
3074 let execution_id = execution_id.clone();
3075
3076 self.transaction(
3077 move |tx| {
3078 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3079 WHERE execution_id = :execution_id";
3080 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3081 let select = match &filter {
3082 BacktraceFilter::Specific(version) =>{
3083 params.push((":version", Box::new(version.0)));
3084 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3085 },
3086 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3087 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3088 };
3089 tx
3090 .prepare(&select)
3091 ?
3092 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3093 params
3094 .iter()
3095 .map(|(key, value)| (*key, value.as_ref()))
3096 .collect::<Vec<_>>()
3097 .as_ref(),
3098 |row| {
3099 Ok(BacktraceInfo {
3100 execution_id: execution_id.clone(),
3101 component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3102 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3103 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3104 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3105 })
3106 },
3107 ).map_err(DbErrorRead::from)
3108 },
3109 "get_last_backtrace",
3110 ).await
3111 }
3112
3113 #[instrument(level = Level::TRACE, skip(self))]
3115 async fn get_expired_timers(
3116 &self,
3117 at: DateTime<Utc>,
3118 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3119 self.transaction(
3120 move |conn| {
3121 let mut expired_timers = conn.prepare(
3122 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3123 )?
3124 .query_map(
3125 named_params! {
3126 ":at": at,
3127 },
3128 |row| {
3129 let execution_id = row.get("execution_id")?;
3130 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3131 let delay_id = row.get::<_, DelayId>("delay_id")?;
3132 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3133 Ok(ExpiredTimer::Delay(delay))
3134 },
3135 )?
3136 .collect::<Result<Vec<_>, _>>()?;
3137 let expired = conn.prepare(&format!(r#"
3139 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis
3140 FROM t_state
3141 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3142 "#
3143 )
3144 )?
3145 .query_map(
3146 named_params! {
3147 ":at": at,
3148 },
3149 |row| {
3150 let execution_id = row.get("execution_id")?;
3151 let locked_at_version = Version::new(row.get("last_lock_version")?);
3152 let next_version = Version::new(row.get("corresponding_version")?).increment();
3153 let intermittent_event_count = row.get("intermittent_event_count")?;
3154 let max_retries = row.get("max_retries")?;
3155 let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3156 let lock = ExpiredLock {
3157 execution_id,
3158 locked_at_version,
3159 next_version,
3160 intermittent_event_count,
3161 max_retries,
3162 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3163 };
3164 Ok(ExpiredTimer::Lock(lock))
3165 }
3166 )?
3167 .collect::<Result<Vec<_>, _>>()?;
3168 expired_timers.extend(expired);
3169 if !expired_timers.is_empty() {
3170 debug!("get_expired_timers found {expired_timers:?}");
3171 }
3172 Ok(expired_timers)
3173 }, "get_expired_timers"
3174 )
3175 .await
3176 }
3177
3178 async fn get_execution_event(
3179 &self,
3180 execution_id: &ExecutionId,
3181 version: &Version,
3182 ) -> Result<ExecutionEvent, DbErrorRead> {
3183 let version = version.0;
3184 let execution_id = execution_id.clone();
3185 self.transaction(
3186 move |tx| Self::get_execution_event(tx, &execution_id, version),
3187 "get_execution_event",
3188 )
3189 .await
3190 }
3191
3192 async fn get_pending_state(
3193 &self,
3194 execution_id: &ExecutionId,
3195 ) -> Result<PendingState, DbErrorRead> {
3196 let execution_id = execution_id.clone();
3197 Ok(self
3198 .transaction(
3199 move |tx| Self::get_combined_state(tx, &execution_id),
3200 "get_pending_state",
3201 )
3202 .await?
3203 .pending_state)
3204 }
3205
3206 async fn list_executions(
3207 &self,
3208 ffqn: Option<FunctionFqn>,
3209 top_level_only: bool,
3210 pagination: ExecutionListPagination,
3211 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3212 self.transaction(
3213 move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3214 "list_executions",
3215 )
3216 .await
3217 }
3218
3219 async fn list_responses(
3220 &self,
3221 execution_id: &ExecutionId,
3222 pagination: Pagination<u32>,
3223 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3224 let execution_id = execution_id.clone();
3225 self.transaction(
3226 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3227 "list_executions",
3228 )
3229 .await
3230 }
3231}
3232
3233#[cfg(any(test, feature = "tempfile"))]
3234pub mod tempfile {
3235 use super::{SqliteConfig, SqlitePool};
3236 use tempfile::NamedTempFile;
3237
3238 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3239 if let Ok(path) = std::env::var("SQLITE_FILE") {
3240 (
3241 SqlitePool::new(path, SqliteConfig::default())
3242 .await
3243 .unwrap(),
3244 None,
3245 )
3246 } else {
3247 let file = NamedTempFile::new().unwrap();
3248 let path = file.path();
3249 (
3250 SqlitePool::new(path, SqliteConfig::default())
3251 .await
3252 .unwrap(),
3253 Some(file),
3254 )
3255 }
3256 }
3257}