1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6 SupportedFunctionReturnValue,
7 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8 storage::{
9 AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10 AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11 DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12 DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool, DbPoolCloseable,
13 ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
14 ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15 JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy,
16 LockedExecution, Pagination, PendingState, PendingStateFinished,
17 PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
18 VersionType,
19 },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
25 TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28 cmp::max,
29 collections::VecDeque,
30 fmt::Debug,
31 ops::DerefMut,
32 path::Path,
33 str::FromStr,
34 sync::{
35 Arc, Mutex,
36 atomic::{AtomicBool, Ordering},
37 },
38 time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42 sync::{mpsc, oneshot},
43 time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53 join_set_id: JoinSetId,
54 delay_id: DelayId,
55 expires_at: DateTime<Utc>,
56}
57const PRAGMA: [[&str; 2]; 10] = [
69 ["journal_mode", "wal"],
70 ["synchronous", "FULL"],
71 ["foreign_keys", "true"],
72 ["busy_timeout", "1000"],
73 ["cache_size", "10000"], ["temp_store", "MEMORY"],
75 ["page_size", "8192"], ["mmap_size", "134217728"],
77 ["journal_size_limit", "67108864"],
78 ["integrity_check", ""],
79];
80
81const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84 id INTEGER PRIMARY KEY AUTOINCREMENT,
85 schema_version INTEGER NOT NULL,
86 created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94 execution_id TEXT NOT NULL,
95 created_at TEXT NOT NULL,
96 json_value TEXT NOT NULL,
97 version INTEGER NOT NULL,
98 variant TEXT NOT NULL,
99 join_set_id TEXT,
100 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101 PRIMARY KEY (execution_id, version)
102) STRICT
103";
104const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
107";
108const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
111";
112
113const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119 id INTEGER PRIMARY KEY AUTOINCREMENT,
120 created_at TEXT NOT NULL,
121 execution_id TEXT NOT NULL,
122 join_set_id TEXT NOT NULL,
123
124 delay_id TEXT,
125
126 child_execution_id TEXT,
127 finished_version INTEGER,
128
129 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145 execution_id TEXT NOT NULL,
146 is_top_level INTEGER NOT NULL,
147 corresponding_version INTEGER NOT NULL,
148 pending_expires_finished TEXT NOT NULL,
149 ffqn TEXT NOT NULL,
150 state TEXT NOT NULL,
151 created_at TEXT NOT NULL,
152 updated_at TEXT NOT NULL,
153 scheduled_at TEXT NOT NULL,
154 intermittent_event_count INTEGER NOT NULL,
155
156 max_retries INTEGER,
157 retry_exp_backoff_millis INTEGER,
158 last_lock_version INTEGER,
159 executor_id TEXT,
160 run_id TEXT,
161
162 join_set_id TEXT,
163 join_set_closing INTEGER,
164
165 result_kind TEXT,
166
167 PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198 execution_id TEXT NOT NULL,
199 join_set_id TEXT NOT NULL,
200 delay_id TEXT NOT NULL,
201 expires_at TEXT NOT NULL,
202 PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209 execution_id TEXT NOT NULL,
210 component_id TEXT NOT NULL,
211 version_min_including INTEGER NOT NULL,
212 version_max_excluding INTEGER NOT NULL,
213 wasm_backtrace TEXT NOT NULL,
214 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221
222#[derive(Debug, thiserror::Error, Clone)]
223enum RusqliteError {
224 #[error("not found")]
225 NotFound,
226 #[error("generic: {0}")]
227 Generic(StrVariant),
228}
229
230mod conversions {
231
232 use super::RusqliteError;
233 use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
234 use rusqlite::types::{FromSql, FromSqlError};
235 use std::{fmt::Debug, str::FromStr};
236 use tracing::error;
237
238 impl From<rusqlite::Error> for RusqliteError {
239 fn from(err: rusqlite::Error) -> Self {
240 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
241 RusqliteError::NotFound
242 } else {
243 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
244 RusqliteError::Generic(err.to_string().into())
245 }
246 }
247 }
248
249 impl From<RusqliteError> for DbErrorGeneric {
250 fn from(err: RusqliteError) -> DbErrorGeneric {
251 match err {
252 RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
253 RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
254 }
255 }
256 }
257 impl From<RusqliteError> for DbErrorRead {
258 fn from(err: RusqliteError) -> Self {
259 if matches!(err, RusqliteError::NotFound) {
260 Self::NotFound
261 } else {
262 Self::from(DbErrorGeneric::from(err))
263 }
264 }
265 }
266 impl From<RusqliteError> for DbErrorReadWithTimeout {
267 fn from(err: RusqliteError) -> Self {
268 Self::from(DbErrorRead::from(err))
269 }
270 }
271 impl From<RusqliteError> for DbErrorWrite {
272 fn from(err: RusqliteError) -> Self {
273 if matches!(err, RusqliteError::NotFound) {
274 Self::NotFound
275 } else {
276 Self::from(DbErrorGeneric::from(err))
277 }
278 }
279 }
280
281 pub(crate) struct JsonWrapper<T>(pub(crate) T);
282 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
283 fn column_result(
284 value: rusqlite::types::ValueRef<'_>,
285 ) -> rusqlite::types::FromSqlResult<Self> {
286 let value = match value {
287 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
288 Ok(value)
289 }
290 other => {
291 error!(
292 backtrace = %std::backtrace::Backtrace::capture(),
293 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
294 );
295 Err(FromSqlError::InvalidType)
296 }
297 }?;
298 let value = serde_json::from_slice::<T>(value).map_err(|err| {
299 error!(
300 backtrace = %std::backtrace::Backtrace::capture(),
301 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
302 r#type = std::any::type_name::<T>()
303 );
304 FromSqlError::InvalidType
305 })?;
306 Ok(Self(value))
307 }
308 }
309
310 pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
311 impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
312 fn column_result(
313 value: rusqlite::types::ValueRef<'_>,
314 ) -> rusqlite::types::FromSqlResult<Self> {
315 let value = String::column_result(value)?;
316 let value = T::from_str(&value).map_err(|err| {
317 error!(
318 backtrace = %std::backtrace::Backtrace::capture(),
319 "Cannot convert string `{value}` to type:`{type}` - {err:?}",
320 r#type = std::any::type_name::<T>()
321 );
322 FromSqlError::InvalidType
323 })?;
324 Ok(Self(value))
325 }
326 }
327
328 #[derive(Debug, thiserror::Error)]
330 #[error("{0}")]
331 pub(crate) struct OtherError(&'static str);
332 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
333 FromSqlError::other(OtherError(input)).into()
334 }
335
336 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
337 DbErrorGeneric::Uncategorized(input.into())
338 }
339}
340
341#[derive(Debug)]
342struct CombinedStateDTO {
343 state: String,
344 ffqn: String,
345 pending_expires_finished: DateTime<Utc>,
346 last_lock_version: Option<Version>,
348 executor_id: Option<ExecutorId>,
349 run_id: Option<RunId>,
350 join_set_id: Option<JoinSetId>,
352 join_set_closing: Option<bool>,
353 result_kind: Option<PendingStateFinishedResultKind>,
355}
356#[derive(Debug)]
357struct CombinedState {
358 ffqn: FunctionFqn,
359 pending_state: PendingState,
360 corresponding_version: Version,
361}
362impl CombinedState {
363 fn get_next_version_assert_not_finished(&self) -> Version {
364 assert!(!self.pending_state.is_finished());
365 self.corresponding_version.increment()
366 }
367
368 #[cfg(feature = "test")]
369 fn get_next_version_or_finished(&self) -> Version {
370 if self.pending_state.is_finished() {
371 self.corresponding_version.clone()
372 } else {
373 self.corresponding_version.increment()
374 }
375 }
376}
377
378#[derive(Debug)]
379struct NotifierPendingAt {
380 scheduled_at: DateTime<Utc>,
381 ffqn: FunctionFqn,
382}
383
384#[derive(Debug)]
385struct NotifierExecutionFinished {
386 execution_id: ExecutionId,
387 retval: SupportedFunctionReturnValue,
388}
389
390#[derive(Debug, Default)]
391struct AppendNotifier {
392 pending_at: Option<NotifierPendingAt>,
393 execution_finished: Option<NotifierExecutionFinished>,
394 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
395}
396
397impl CombinedState {
398 fn new(
399 dto: &CombinedStateDTO,
400 corresponding_version: Version,
401 ) -> Result<Self, rusqlite::Error> {
402 let pending_state = match dto {
403 CombinedStateDTO {
405 state,
406 ffqn: _,
407 pending_expires_finished: scheduled_at,
408 last_lock_version: None,
409 executor_id: None,
410 run_id: None,
411 join_set_id: None,
412 join_set_closing: None,
413 result_kind: None,
414 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
415 scheduled_at: *scheduled_at,
416 last_lock: None,
417 }),
418 CombinedStateDTO {
420 state,
421 ffqn: _,
422 pending_expires_finished: scheduled_at,
423 last_lock_version: None,
424 executor_id: Some(executor_id),
425 run_id: Some(run_id),
426 join_set_id: None,
427 join_set_closing: None,
428 result_kind: None,
429 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
430 scheduled_at: *scheduled_at,
431 last_lock: Some(LockedBy {
432 executor_id: *executor_id,
433 run_id: *run_id,
434 }),
435 }),
436 CombinedStateDTO {
437 state,
438 ffqn: _,
439 pending_expires_finished: lock_expires_at,
440 last_lock_version: Some(_),
441 executor_id: Some(executor_id),
442 run_id: Some(run_id),
443 join_set_id: None,
444 join_set_closing: None,
445 result_kind: None,
446 } if state == STATE_LOCKED => Ok(PendingState::Locked(PendingStateLocked {
447 locked_by: LockedBy {
448 executor_id: *executor_id,
449 run_id: *run_id,
450 },
451 lock_expires_at: *lock_expires_at,
452 })),
453 CombinedStateDTO {
454 state,
455 ffqn: _,
456 pending_expires_finished: lock_expires_at,
457 last_lock_version: None,
458 executor_id: _,
459 run_id: _,
460 join_set_id: Some(join_set_id),
461 join_set_closing: Some(join_set_closing),
462 result_kind: None,
463 } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
464 join_set_id: join_set_id.clone(),
465 closing: *join_set_closing,
466 lock_expires_at: *lock_expires_at,
467 }),
468 CombinedStateDTO {
469 state,
470 ffqn: _,
471 pending_expires_finished: finished_at,
472 last_lock_version: None,
473 executor_id: None,
474 run_id: None,
475 join_set_id: None,
476 join_set_closing: None,
477 result_kind: Some(result_kind),
478 } if state == STATE_FINISHED => Ok(PendingState::Finished {
479 finished: PendingStateFinished {
480 finished_at: *finished_at,
481 version: corresponding_version.0,
482 result_kind: *result_kind,
483 },
484 }),
485 _ => {
486 error!("Cannot deserialize pending state from {dto:?}");
487 Err(consistency_rusqlite("invalid `t_state`"))
488 }
489 }?;
490 Ok(Self {
491 ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
492 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
493 consistency_rusqlite("invalid ffqn value in `t_state`")
494 })?,
495 pending_state,
496 corresponding_version,
497 })
498 }
499}
500
501#[derive(derive_more::Debug)]
502struct LogicalTx {
503 #[debug(skip)]
504 func: Box<dyn FnMut(&mut Transaction) + Send>,
505 sent_at: Instant,
506 func_name: &'static str,
507 #[debug(skip)]
508 commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
509}
510
511#[derive(derive_more::Debug)]
512enum ThreadCommand {
513 LogicalTx(LogicalTx),
514 Shutdown,
515}
516
517#[derive(Clone)]
518pub struct SqlitePool(SqlitePoolInner);
519
520type ResponseSubscribers =
521 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
522type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
523type ExecutionFinishedSubscribers =
524 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
525#[derive(Clone)]
526struct SqlitePoolInner {
527 shutdown_requested: Arc<AtomicBool>,
528 shutdown_finished: Arc<AtomicBool>,
529 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
530 response_subscribers: ResponseSubscribers,
531 pending_ffqn_subscribers: PendingFfqnSubscribers,
532 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
533 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
535
536#[async_trait]
537impl DbPoolCloseable for SqlitePool {
538 async fn close(self) {
539 debug!("Sqlite is closing");
540 self.0.shutdown_requested.store(true, Ordering::Release);
541 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
543 while !self.0.shutdown_finished.load(Ordering::Acquire) {
544 tokio::time::sleep(Duration::from_millis(1)).await;
545 }
546 debug!("Sqlite was closed");
547 }
548}
549
550#[async_trait]
551impl DbPool for SqlitePool {
552 fn connection(&self) -> Box<dyn DbConnection> {
553 Box::new(self.clone())
554 }
555}
556impl Drop for SqlitePool {
557 fn drop(&mut self) {
558 let arc = self.0.join_handle.take().expect("join_handle was set");
559 if let Ok(join_handle) = Arc::try_unwrap(arc) {
560 if !join_handle.is_finished() {
562 if !self.0.shutdown_finished.load(Ordering::Acquire) {
563 let backtrace = std::backtrace::Backtrace::capture();
565 warn!("SqlitePool was not closed properly - {backtrace}");
566 self.0.shutdown_requested.store(true, Ordering::Release);
567 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
569 } else {
572 }
574 }
575 }
576 }
577}
578
579#[derive(Debug, Clone)]
580pub struct SqliteConfig {
581 pub queue_capacity: usize,
582 pub pragma_override: Option<HashMap<String, String>>,
583 pub metrics_threshold: Option<Duration>,
584}
585impl Default for SqliteConfig {
586 fn default() -> Self {
587 Self {
588 queue_capacity: 100,
589 pragma_override: None,
590 metrics_threshold: None,
591 }
592 }
593}
594
595impl SqlitePool {
596 fn init_thread(
597 path: &Path,
598 mut pragma_override: HashMap<String, String>,
599 ) -> Result<Connection, InitializationError> {
600 fn conn_execute<P: Params>(
601 conn: &Connection,
602 sql: &str,
603 params: P,
604 ) -> Result<(), InitializationError> {
605 conn.execute(sql, params).map(|_| ()).map_err(|err| {
606 error!("Cannot run `{sql}` - {err:?}");
607 InitializationError
608 })
609 }
610 fn pragma_update(
611 conn: &Connection,
612 name: &str,
613 value: &str,
614 ) -> Result<(), InitializationError> {
615 if value.is_empty() {
616 debug!("Querying PRAGMA {name}");
617 conn.pragma_query(None, name, |row| {
618 debug!("{row:?}");
619 Ok(())
620 })
621 .map_err(|err| {
622 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
623 InitializationError
624 })
625 } else {
626 debug!("Setting PRAGMA {name}={value}");
627 conn.pragma_update(None, name, value).map_err(|err| {
628 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
629 InitializationError
630 })
631 }
632 }
633
634 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
635 error!("cannot open the connection - {err:?}");
636 InitializationError
637 })?;
638
639 for [pragma_name, default_value] in PRAGMA {
640 let pragma_value = pragma_override
641 .remove(pragma_name)
642 .unwrap_or_else(|| default_value.to_string());
643 pragma_update(&conn, pragma_name, &pragma_value)?;
644 }
645 for (pragma_name, pragma_value) in pragma_override.drain() {
647 pragma_update(&conn, &pragma_name, &pragma_value)?;
648 }
649
650 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
652 conn_execute(
654 &conn,
655 &format!(
656 "INSERT INTO t_metadata (schema_version, created_at) VALUES
657 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
658 ),
659 [Utc::now()],
660 )?;
661 let actual_version = conn
663 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
664 .map_err(|err| {
665 error!("cannot select schema version - {err:?}");
666 InitializationError
667 })?
668 .query_row([], |row| row.get::<_, u32>("schema_version"));
669
670 let actual_version = actual_version.map_err(|err| {
671 error!("Cannot read the schema version - {err:?}");
672 InitializationError
673 })?;
674 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
675 error!(
676 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
677 );
678 return Err(InitializationError);
679 }
680
681 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
683 conn_execute(
684 &conn,
685 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
686 [],
687 )?;
688 conn_execute(
689 &conn,
690 CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
691 [],
692 )?;
693 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
695 conn_execute(
696 &conn,
697 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
698 [],
699 )?;
700 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
702 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
703 conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
704 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
705 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
706 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
707 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
709 conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
711 conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
712 Ok(conn)
713 }
714
715 fn connection_rpc(
716 mut conn: Connection,
717 shutdown_requested: &AtomicBool,
718 shutdown_finished: &AtomicBool,
719 mut command_rx: mpsc::Receiver<ThreadCommand>,
720 metrics_threshold: Option<Duration>,
721 ) {
722 let mut histograms = Histograms::new(metrics_threshold);
723 while Self::tick(
724 &mut conn,
725 shutdown_requested,
726 &mut command_rx,
727 &mut histograms,
728 )
729 .is_ok()
730 {
731 }
733 debug!("Closing command thread");
734 shutdown_finished.store(true, Ordering::Release);
735 }
736
737 fn tick(
738 conn: &mut Connection,
739 shutdown_requested: &AtomicBool,
740 command_rx: &mut mpsc::Receiver<ThreadCommand>,
741 histograms: &mut Histograms,
742 ) -> Result<(), ()> {
743 let mut ltx_list = Vec::new();
744 let mut ltx = match command_rx.blocking_recv() {
746 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
747 Some(ThreadCommand::Shutdown) => {
748 debug!("shutdown message received");
749 return Err(());
750 }
751 None => {
752 debug!("command_rx was closed");
753 return Err(());
754 }
755 };
756 let all_fns_start = std::time::Instant::now();
757 let mut physical_tx = conn
758 .transaction_with_behavior(TransactionBehavior::Immediate)
759 .map_err(|begin_err| {
760 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
761 })?;
762 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
763 ltx_list.push(ltx);
764
765 while let Ok(more) = command_rx.try_recv() {
766 let mut ltx = match more {
767 ThreadCommand::Shutdown => {
768 debug!("shutdown message received");
769 return Err(());
770 }
771 ThreadCommand::LogicalTx(ltx) => ltx,
772 };
773
774 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
775 ltx_list.push(ltx);
776 }
777 histograms.record_all_fns(all_fns_start.elapsed());
778
779 {
780 if shutdown_requested.load(Ordering::Relaxed) {
782 debug!("Recveived shutdown during processing of the batch");
783 return Err(());
784 }
785 let commit_result = {
786 let now = std::time::Instant::now();
787 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
788 histograms.record_commit(now.elapsed());
789 commit_result
790 };
791 if commit_result.is_ok() || ltx_list.len() == 1 {
792 for ltx in ltx_list {
794 let _ = ltx.commit_ack_sender.send(commit_result.clone());
796 }
797 } else {
798 warn!(
799 "Bulk transaction failed, committing {} transactions serially",
800 ltx_list.len()
801 );
802 for ltx in ltx_list {
804 Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
805 }
806 }
807 }
808 histograms.print_if_elapsed();
809 Ok(())
810 }
811
812 fn ltx_commit_single(
813 mut ltx: LogicalTx,
814 conn: &mut Connection,
815 shutdown_requested: &AtomicBool,
816 histograms: &mut Histograms,
817 ) -> Result<(), ()> {
818 let mut physical_tx = conn
819 .transaction_with_behavior(TransactionBehavior::Immediate)
820 .map_err(|begin_err| {
821 error!("Cannot open transaction, closing sqlite - {begin_err:?}");
822 })?;
823 Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
824 if shutdown_requested.load(Ordering::Relaxed) {
825 debug!("Recveived shutdown during processing of the batch");
826 return Err(());
827 }
828 let commit_result = {
829 let now = std::time::Instant::now();
830 let commit_result = physical_tx.commit().map_err(RusqliteError::from);
831 histograms.record_commit(now.elapsed());
832 commit_result
833 };
834 let _ = ltx.commit_ack_sender.send(commit_result);
836 Ok(())
837 }
838
839 fn ltx_apply_to_tx(
840 ltx: &mut LogicalTx,
841 physical_tx: &mut Transaction,
842 histograms: &mut Histograms,
843 ) {
844 let sent_latency = ltx.sent_at.elapsed();
845 let started_at = Instant::now();
846 (ltx.func)(physical_tx);
847 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
848 }
849
850 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
851 pub async fn new<P: AsRef<Path>>(
852 path: P,
853 config: SqliteConfig,
854 ) -> Result<Self, InitializationError> {
855 let path = path.as_ref().to_owned();
856
857 let shutdown_requested = Arc::new(AtomicBool::new(false));
858 let shutdown_finished = Arc::new(AtomicBool::new(false));
859
860 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
861 info!("Sqlite database location: {path:?}");
862 let join_handle = {
863 let init_task = {
865 tokio::task::spawn_blocking(move || {
866 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
867 })
868 .await
869 };
870 let conn = match init_task {
871 Ok(res) => res?,
872 Err(join_err) => {
873 error!("Initialization panic - {join_err:?}");
874 return Err(InitializationError);
875 }
876 };
877 let shutdown_requested = shutdown_requested.clone();
878 let shutdown_finished = shutdown_finished.clone();
879 std::thread::spawn(move || {
881 Self::connection_rpc(
882 conn,
883 &shutdown_requested,
884 &shutdown_finished,
885 command_rx,
886 config.metrics_threshold,
887 );
888 })
889 };
890 Ok(SqlitePool(SqlitePoolInner {
891 shutdown_requested,
892 shutdown_finished,
893 command_tx,
894 response_subscribers: Arc::default(),
895 pending_ffqn_subscribers: Arc::default(),
896 join_handle: Some(Arc::new(join_handle)),
897 execution_finished_subscribers: Arc::default(),
898 }))
899 }
900
901 async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
903 where
904 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
905 T: Send + 'static,
906 E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
907 {
908 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
909 let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
910 let thread_command_func = {
911 let fn_res = fn_res.clone();
912 ThreadCommand::LogicalTx(LogicalTx {
913 func: Box::new(move |tx| {
914 let func_res = func(tx);
915 *fn_res.lock().unwrap() = Some(func_res);
916 }),
917 sent_at: Instant::now(),
918 func_name,
919 commit_ack_sender,
920 })
921 };
922 self.0
923 .command_tx
924 .send(thread_command_func)
925 .await
926 .map_err(|_send_err| DbErrorGeneric::Close)?;
927
928 match commit_ack_receiver.await {
930 Ok(Ok(())) => {
931 let mut guard = fn_res.lock().unwrap();
932 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
933 }
934 Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
935 Err(_) => Err(E::from(DbErrorGeneric::Close)),
936 }
937 }
938
939 fn fetch_created_event(
940 conn: &Connection,
941 execution_id: &ExecutionId,
942 ) -> Result<CreateRequest, DbErrorRead> {
943 let mut stmt = conn.prepare(
944 "SELECT created_at, json_value FROM t_execution_log WHERE \
945 execution_id = :execution_id AND version = 0",
946 )?;
947 let (created_at, event) = stmt.query_row(
948 named_params! {
949 ":execution_id": execution_id.to_string(),
950 },
951 |row| {
952 let created_at = row.get("created_at")?;
953 let event = row
954 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
955 .map_err(|serde| {
956 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
957 consistency_rusqlite("cannot deserialize `Created` event")
958 })?;
959 Ok((created_at, event.0))
960 },
961 )?;
962 if let ExecutionEventInner::Created {
963 ffqn,
964 params,
965 parent,
966 scheduled_at,
967 component_id,
968 metadata,
969 scheduled_by,
970 } = event
971 {
972 Ok(CreateRequest {
973 created_at,
974 execution_id: execution_id.clone(),
975 ffqn,
976 params,
977 parent,
978 scheduled_at,
979 component_id,
980 metadata,
981 scheduled_by,
982 })
983 } else {
984 error!("Row with version=0 must be a `Created` event - {event:?}");
985 Err(consistency_db_err("expected `Created` event").into())
986 }
987 }
988
989 fn check_expected_next_and_appending_version(
990 expected_version: &Version,
991 appending_version: &Version,
992 ) -> Result<(), DbErrorWrite> {
993 if *expected_version != *appending_version {
994 debug!(
995 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
996 );
997 return Err(DbErrorWrite::NonRetriable(
998 DbErrorWriteNonRetriable::VersionConflict {
999 expected: expected_version.clone(),
1000 requested: appending_version.clone(),
1001 },
1002 ));
1003 }
1004 Ok(())
1005 }
1006
1007 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1008 fn create_inner(
1009 tx: &Transaction,
1010 req: CreateRequest,
1011 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1012 debug!("create_inner");
1013
1014 let version = Version::default();
1015 let execution_id = req.execution_id.clone();
1016 let execution_id_str = execution_id.to_string();
1017 let ffqn = req.ffqn.clone();
1018 let created_at = req.created_at;
1019 let scheduled_at = req.scheduled_at;
1020 let event = ExecutionEventInner::from(req);
1021 let event_ser = serde_json::to_string(&event).map_err(|err| {
1022 error!("Cannot serialize {event:?} - {err:?}");
1023 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1024 })?;
1025 tx.prepare(
1026 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1027 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1028 ?
1029 .execute(named_params! {
1030 ":execution_id": &execution_id_str,
1031 ":created_at": created_at,
1032 ":version": version.0,
1033 ":json_value": event_ser,
1034 ":variant": event.variant(),
1035 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1036 })
1037 ?;
1038 let pending_at = {
1039 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1040 tx.prepare(
1041 r"
1042 INSERT INTO t_state (
1043 execution_id,
1044 is_top_level,
1045 corresponding_version,
1046 pending_expires_finished,
1047 ffqn,
1048 state,
1049 created_at,
1050 updated_at,
1051 scheduled_at,
1052 intermittent_event_count
1053 )
1054 VALUES (
1055 :execution_id,
1056 :is_top_level,
1057 :corresponding_version,
1058 :pending_expires_finished,
1059 :ffqn,
1060 :state,
1061 :created_at,
1062 CURRENT_TIMESTAMP,
1063 :scheduled_at,
1064 0
1065 )
1066 ",
1067 )?
1068 .execute(named_params! {
1069 ":execution_id": execution_id.to_string(),
1070 ":is_top_level": execution_id.is_top_level(),
1071 ":corresponding_version": version.0,
1072 ":pending_expires_finished": scheduled_at,
1073 ":ffqn": ffqn.to_string(),
1074 ":state": STATE_PENDING_AT,
1075 ":created_at": created_at,
1076 ":scheduled_at": scheduled_at,
1077 })?;
1078 AppendNotifier {
1079 pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1080 execution_finished: None,
1081 response: None,
1082 }
1083 };
1084 let next_version = Version::new(version.0 + 1);
1085 Ok((next_version, pending_at))
1086 }
1087
1088 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1089 fn update_state_pending_after_response_appended(
1090 tx: &Transaction,
1091 execution_id: &ExecutionId,
1092 scheduled_at: DateTime<Utc>, corresponding_version: &Version, ) -> Result<AppendNotifier, DbErrorWrite> {
1095 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1096 let execution_id_str = execution_id.to_string();
1097 let mut stmt = tx
1098 .prepare_cached(
1099 r"
1100 UPDATE t_state
1101 SET
1102 corresponding_version = :corresponding_version,
1103 pending_expires_finished = :pending_expires_finished,
1104 state = :state,
1105 updated_at = CURRENT_TIMESTAMP,
1106
1107 last_lock_version = NULL,
1108
1109 join_set_id = NULL,
1110 join_set_closing = NULL,
1111
1112 result_kind = NULL
1113 WHERE execution_id = :execution_id
1114 ",
1115 )
1116 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1117 let updated = stmt
1118 .execute(named_params! {
1119 ":execution_id": execution_id_str,
1120 ":corresponding_version": corresponding_version.0,
1121 ":pending_expires_finished": scheduled_at,
1122 ":state": STATE_PENDING_AT,
1123 })
1124 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1125 if updated != 1 {
1126 return Err(DbErrorWrite::NotFound);
1127 }
1128 Ok(AppendNotifier {
1129 pending_at: Some(NotifierPendingAt {
1130 scheduled_at,
1131 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1132 }),
1133 execution_finished: None,
1134 response: None,
1135 })
1136 }
1137
1138 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1139 fn update_state_pending_after_event_appended(
1140 tx: &Transaction,
1141 execution_id: &ExecutionId,
1142 appending_version: &Version,
1143 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1145 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1146 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1147 let mut stmt = tx
1150 .prepare_cached(
1151 r"
1152 UPDATE t_state
1153 SET
1154 corresponding_version = :appending_version,
1155 pending_expires_finished = :pending_expires_finished,
1156 state = :state,
1157 updated_at = CURRENT_TIMESTAMP,
1158 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1159
1160 last_lock_version = NULL,
1161
1162 join_set_id = NULL,
1163 join_set_closing = NULL,
1164
1165 result_kind = NULL
1166 WHERE execution_id = :execution_id;
1167 ",
1168 )
1169 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1170 let updated = stmt
1171 .execute(named_params! {
1172 ":execution_id": execution_id.to_string(),
1173 ":appending_version": appending_version.0,
1174 ":pending_expires_finished": scheduled_at,
1175 ":state": STATE_PENDING_AT,
1176 ":intermittent_delta": i32::from(intermittent_failure) })
1178 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1179 if updated != 1 {
1180 return Err(DbErrorWrite::NotFound);
1181 }
1182 Ok((
1183 appending_version.increment(),
1184 AppendNotifier {
1185 pending_at: Some(NotifierPendingAt {
1186 scheduled_at,
1187 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1188 }),
1189 execution_finished: None,
1190 response: None,
1191 },
1192 ))
1193 }
1194
1195 fn update_state_locked_get_intermittent_event_count(
1196 tx: &Transaction,
1197 execution_id: &ExecutionId,
1198 executor_id: ExecutorId,
1199 run_id: RunId,
1200 lock_expires_at: DateTime<Utc>,
1201 appending_version: &Version,
1202 retry_config: ComponentRetryConfig,
1203 ) -> Result<u32, DbErrorWrite> {
1204 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1205 let backoff_millis =
1206 u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1207 let execution_id_str = execution_id.to_string();
1208 let mut stmt = tx
1209 .prepare_cached(
1210 r"
1211 UPDATE t_state
1212 SET
1213 corresponding_version = :appending_version,
1214 pending_expires_finished = :pending_expires_finished,
1215 state = :state,
1216 updated_at = CURRENT_TIMESTAMP,
1217
1218 max_retries = :max_retries,
1219 retry_exp_backoff_millis = :retry_exp_backoff_millis,
1220 last_lock_version = :appending_version,
1221 executor_id = :executor_id,
1222 run_id = :run_id,
1223
1224 join_set_id = NULL,
1225 join_set_closing = NULL,
1226
1227 result_kind = NULL
1228 WHERE execution_id = :execution_id
1229 ",
1230 )
1231 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1232 let updated = stmt
1233 .execute(named_params! {
1234 ":execution_id": execution_id_str,
1235 ":appending_version": appending_version.0,
1236 ":pending_expires_finished": lock_expires_at,
1237 ":state": STATE_LOCKED,
1238 ":max_retries": retry_config.max_retries,
1239 ":retry_exp_backoff_millis": backoff_millis,
1240 ":executor_id": executor_id.to_string(),
1241 ":run_id": run_id.to_string(),
1242 })
1243 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1244 if updated != 1 {
1245 return Err(DbErrorWrite::NotFound);
1246 }
1247
1248 let intermittent_event_count = tx
1250 .prepare(
1251 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1252 )?
1253 .query_row(
1254 named_params! {
1255 ":execution_id": execution_id_str,
1256 },
1257 |row| {
1258 let intermittent_event_count = row.get("intermittent_event_count")?;
1259 Ok(intermittent_event_count)
1260 },
1261 )
1262 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1263
1264 Ok(intermittent_event_count)
1265 }
1266
1267 fn update_state_blocked(
1268 tx: &Transaction,
1269 execution_id: &ExecutionId,
1270 appending_version: &Version,
1271 join_set_id: &JoinSetId,
1273 lock_expires_at: DateTime<Utc>,
1274 join_set_closing: bool,
1275 ) -> Result<
1276 AppendResponse, DbErrorWrite,
1278 > {
1279 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1280 let execution_id_str = execution_id.to_string();
1281 let mut stmt = tx.prepare_cached(
1282 r"
1283 UPDATE t_state
1284 SET
1285 corresponding_version = :appending_version,
1286 pending_expires_finished = :pending_expires_finished,
1287 state = :state,
1288 updated_at = CURRENT_TIMESTAMP,
1289
1290 last_lock_version = NULL,
1291
1292 join_set_id = :join_set_id,
1293 join_set_closing = :join_set_closing,
1294
1295 result_kind = NULL
1296 WHERE execution_id = :execution_id
1297 ",
1298 )?;
1299 let updated = stmt.execute(named_params! {
1300 ":execution_id": execution_id_str,
1301 ":appending_version": appending_version.0,
1302 ":pending_expires_finished": lock_expires_at,
1303 ":state": STATE_BLOCKED_BY_JOIN_SET,
1304 ":join_set_id": join_set_id,
1305 ":join_set_closing": join_set_closing,
1306 })?;
1307 if updated != 1 {
1308 return Err(DbErrorWrite::NotFound);
1309 }
1310 Ok(appending_version.increment())
1311 }
1312
1313 fn update_state_finished(
1314 tx: &Transaction,
1315 execution_id: &ExecutionId,
1316 appending_version: &Version,
1317 finished_at: DateTime<Utc>,
1319 result_kind: PendingStateFinishedResultKind,
1320 ) -> Result<(), DbErrorWrite> {
1321 debug!("Setting t_state to Finished");
1322 let execution_id_str = execution_id.to_string();
1323 let mut stmt = tx.prepare_cached(
1324 r"
1325 UPDATE t_state
1326 SET
1327 corresponding_version = :appending_version,
1328 pending_expires_finished = :pending_expires_finished,
1329 state = :state,
1330 updated_at = CURRENT_TIMESTAMP,
1331
1332 last_lock_version = NULL,
1333 executor_id = NULL,
1334 run_id = NULL,
1335
1336 join_set_id = NULL,
1337 join_set_closing = NULL,
1338
1339 result_kind = :result_kind
1340 WHERE execution_id = :execution_id
1341 ",
1342 )?;
1343 let updated = stmt.execute(named_params! {
1344 ":execution_id": execution_id_str,
1345 ":appending_version": appending_version.0,
1346 ":pending_expires_finished": finished_at,
1347 ":state": STATE_FINISHED,
1348 ":result_kind": result_kind.to_string(),
1349 })?;
1350 if updated != 1 {
1351 return Err(DbErrorWrite::NotFound);
1352 }
1353 Ok(())
1354 }
1355
1356 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1358 fn bump_state_next_version(
1359 tx: &Transaction,
1360 execution_id: &ExecutionId,
1361 appending_version: &Version,
1362 delay_req: Option<DelayReq>,
1363 ) -> Result<AppendResponse , DbErrorWrite> {
1364 debug!("update_index_version");
1365 let execution_id_str = execution_id.to_string();
1366 let mut stmt = tx.prepare_cached(
1367 r"
1368 UPDATE t_state
1369 SET
1370 corresponding_version = :appending_version,
1371 updated_at = CURRENT_TIMESTAMP
1372 WHERE execution_id = :execution_id
1373 ",
1374 )?;
1375 let updated = stmt.execute(named_params! {
1376 ":execution_id": execution_id_str,
1377 ":appending_version": appending_version.0,
1378 })?;
1379 if updated != 1 {
1380 return Err(DbErrorWrite::NotFound);
1381 }
1382 if let Some(DelayReq {
1383 join_set_id,
1384 delay_id,
1385 expires_at,
1386 }) = delay_req
1387 {
1388 debug!("Inserting delay to `t_delay`");
1389 let mut stmt = tx.prepare_cached(
1390 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1391 VALUES \
1392 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1393 )?;
1394 stmt.execute(named_params! {
1395 ":execution_id": execution_id_str,
1396 ":join_set_id": join_set_id.to_string(),
1397 ":delay_id": delay_id.to_string(),
1398 ":expires_at": expires_at,
1399 })?;
1400 }
1401 Ok(appending_version.increment())
1402 }
1403
1404 fn get_combined_state(
1405 tx: &Transaction,
1406 execution_id: &ExecutionId,
1407 ) -> Result<CombinedState, DbErrorRead> {
1408 let mut stmt = tx.prepare(
1409 r"
1410 SELECT
1411 state, ffqn, corresponding_version, pending_expires_finished,
1412 last_lock_version, executor_id, run_id,
1413 join_set_id, join_set_closing,
1414 result_kind
1415 FROM t_state
1416 WHERE
1417 execution_id = :execution_id
1418 ",
1419 )?;
1420 stmt.query_row(
1421 named_params! {
1422 ":execution_id": execution_id.to_string(),
1423 },
1424 |row| {
1425 CombinedState::new(
1426 &CombinedStateDTO {
1427 state: row.get("state")?,
1428 ffqn: row.get("ffqn")?,
1429 pending_expires_finished: row
1430 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1431 last_lock_version: row
1432 .get::<_, Option<VersionType>>("last_lock_version")?
1433 .map(Version::new),
1434 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1435 run_id: row.get::<_, Option<RunId>>("run_id")?,
1436 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1437 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1438 result_kind: row
1439 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1440 "result_kind",
1441 )?
1442 .map(|wrapper| wrapper.0),
1443 },
1444 Version::new(row.get("corresponding_version")?),
1445 )
1446 },
1447 )
1448 .map_err(DbErrorRead::from)
1449 }
1450
1451 fn list_executions(
1452 read_tx: &Transaction,
1453 ffqn: Option<&FunctionFqn>,
1454 top_level_only: bool,
1455 pagination: &ExecutionListPagination,
1456 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1457 struct StatementModifier<'a> {
1458 where_vec: Vec<String>,
1459 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1460 limit: u32,
1461 limit_desc: bool,
1462 }
1463
1464 fn paginate<'a, T: rusqlite::ToSql + 'static>(
1465 pagination: &'a Pagination<Option<T>>,
1466 column: &str,
1467 top_level_only: bool,
1468 ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1469 let mut where_vec: Vec<String> = vec![];
1470 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1471 let limit = pagination.length();
1472 let limit_desc = pagination.is_desc();
1473 let rel = pagination.rel();
1474 match pagination {
1475 Pagination::NewerThan {
1476 cursor: Some(cursor),
1477 ..
1478 }
1479 | Pagination::OlderThan {
1480 cursor: Some(cursor),
1481 ..
1482 } => {
1483 where_vec.push(format!("{column} {rel} :cursor"));
1484 let cursor = cursor.to_sql().map_err(|err| {
1485 error!("Possible program error - cannot convert cursor to sql - {err:?}");
1486 DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1487 })?;
1488 params.push((":cursor", cursor));
1489 }
1490 _ => {}
1491 }
1492 if top_level_only {
1493 where_vec.push("is_top_level=true".to_string());
1494 }
1495 Ok(StatementModifier {
1496 where_vec,
1497 params,
1498 limit,
1499 limit_desc,
1500 })
1501 }
1502
1503 let mut statement_mod = match pagination {
1504 ExecutionListPagination::CreatedBy(pagination) => {
1505 paginate(pagination, "created_at", top_level_only)?
1506 }
1507 ExecutionListPagination::ExecutionId(pagination) => {
1508 paginate(pagination, "execution_id", top_level_only)?
1509 }
1510 };
1511
1512 let ffqn_temporary;
1513 if let Some(ffqn) = ffqn {
1514 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1515 ffqn_temporary = ffqn.to_string();
1516 let ffqn = ffqn_temporary
1517 .to_sql()
1518 .expect("string conversion never fails");
1519
1520 statement_mod.params.push((":ffqn", ffqn));
1521 }
1522
1523 let where_str = if statement_mod.where_vec.is_empty() {
1524 String::new()
1525 } else {
1526 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1527 };
1528 let sql = format!(
1529 r"
1530 SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1531 last_lock_version, executor_id, run_id,
1532 join_set_id, join_set_closing,
1533 result_kind
1534 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1535 ",
1536 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1537 limit = statement_mod.limit,
1538 );
1539 let mut vec: Vec<_> = read_tx
1540 .prepare(&sql)?
1541 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1542 statement_mod
1543 .params
1544 .into_iter()
1545 .collect::<Vec<_>>()
1546 .as_ref(),
1547 |row| {
1548 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1549 let created_at = row.get("created_at")?;
1550 let scheduled_at = row.get("scheduled_at")?;
1551 let combined_state = CombinedState::new(
1552 &CombinedStateDTO {
1553 state: row.get("state")?,
1554 ffqn: row.get("ffqn")?,
1555 pending_expires_finished: row
1556 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1557 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1558
1559 last_lock_version: row
1560 .get::<_, Option<VersionType>>("last_lock_version")?
1561 .map(Version::new),
1562 run_id: row.get::<_, Option<RunId>>("run_id")?,
1563 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1564 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1565 result_kind: row
1566 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1567 "result_kind",
1568 )?
1569 .map(|wrapper| wrapper.0),
1570 },
1571 Version::new(row.get("corresponding_version")?),
1572 )?;
1573 Ok(ExecutionWithState {
1574 execution_id,
1575 ffqn: combined_state.ffqn,
1576 pending_state: combined_state.pending_state,
1577 created_at,
1578 scheduled_at,
1579 })
1580 },
1581 )?
1582 .collect::<Vec<Result<_, _>>>()
1583 .into_iter()
1584 .filter_map(|row| match row {
1585 Ok(row) => Some(row),
1586 Err(err) => {
1587 warn!("Skipping row - {err:?}");
1588 None
1589 }
1590 })
1591 .collect();
1592
1593 if !statement_mod.limit_desc {
1594 vec.reverse();
1596 }
1597 Ok(vec)
1598 }
1599
1600 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1601 #[expect(clippy::too_many_arguments)]
1602 fn lock_single_execution(
1603 tx: &Transaction,
1604 created_at: DateTime<Utc>,
1605 component_id: &ComponentId,
1606 execution_id: &ExecutionId,
1607 run_id: RunId,
1608 appending_version: &Version,
1609 executor_id: ExecutorId,
1610 lock_expires_at: DateTime<Utc>,
1611 retry_config: ComponentRetryConfig,
1612 ) -> Result<LockedExecution, DbErrorWrite> {
1613 debug!("lock_single_execution");
1614 let combined_state = Self::get_combined_state(tx, execution_id)?;
1615 combined_state.pending_state.can_append_lock(
1616 created_at,
1617 executor_id,
1618 run_id,
1619 lock_expires_at,
1620 )?;
1621 let expected_version = combined_state.get_next_version_assert_not_finished();
1622 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1623
1624 let locked_event = Locked {
1626 component_id: component_id.clone(),
1627 executor_id,
1628 lock_expires_at,
1629 run_id,
1630 retry_config,
1631 };
1632 let event = ExecutionEventInner::Locked(locked_event.clone());
1633 let event_ser = serde_json::to_string(&event).map_err(|err| {
1634 warn!("Cannot serialize {event:?} - {err:?}");
1635 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1636 })?;
1637 let mut stmt = tx
1638 .prepare_cached(
1639 "INSERT INTO t_execution_log \
1640 (execution_id, created_at, json_value, version, variant) \
1641 VALUES \
1642 (:execution_id, :created_at, :json_value, :version, :variant)",
1643 )
1644 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1645 stmt.execute(named_params! {
1646 ":execution_id": execution_id.to_string(),
1647 ":created_at": created_at,
1648 ":json_value": event_ser,
1649 ":version": appending_version.0,
1650 ":variant": event.variant(),
1651 })
1652 .map_err(|err| {
1653 warn!("Cannot lock execution - {err:?}");
1654 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1655 })?;
1656
1657 let responses = Self::list_responses(tx, execution_id, None)?;
1659 let responses = responses.into_iter().map(|resp| resp.event).collect();
1660 trace!("Responses: {responses:?}");
1661
1662 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1663 tx,
1664 execution_id,
1665 executor_id,
1666 run_id,
1667 lock_expires_at,
1668 appending_version,
1669 retry_config,
1670 )?;
1671 let mut events = tx
1673 .prepare(
1674 "SELECT json_value FROM t_execution_log WHERE \
1675 execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1676 ORDER BY version",
1677 )?
1678 .query_map(
1679 named_params! {
1680 ":execution_id": execution_id.to_string(),
1681 ":v1": DUMMY_CREATED.variant(),
1682 ":v2": DUMMY_HISTORY_EVENT.variant(),
1683 },
1684 |row| {
1685 row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1686 .map(|wrapper| wrapper.0)
1687 .map_err(|serde| {
1688 error!("Cannot deserialize {row:?} - {serde:?}");
1689 consistency_rusqlite("cannot deserialize json value")
1690 })
1691 },
1692 )?
1693 .collect::<Result<Vec<_>, _>>()?
1694 .into_iter()
1695 .collect::<VecDeque<_>>();
1696 let Some(ExecutionEventInner::Created {
1697 ffqn,
1698 params,
1699 parent,
1700 metadata,
1701 ..
1702 }) = events.pop_front()
1703 else {
1704 error!("Execution log must contain at least `Created` event");
1705 return Err(consistency_db_err("execution log must contain `Created` event").into());
1706 };
1707
1708 let event_history = events
1709 .into_iter()
1710 .map(|event| {
1711 if let ExecutionEventInner::HistoryEvent { event } = event {
1712 Ok(event)
1713 } else {
1714 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1715 Err(consistency_db_err(
1716 "rows can only contain `Created` and `HistoryEvent` event kinds",
1717 ))
1718 }
1719 })
1720 .collect::<Result<Vec<_>, _>>()?;
1721
1722 Ok(LockedExecution {
1723 execution_id: execution_id.clone(),
1724 metadata,
1725 next_version: appending_version.increment(),
1726 ffqn,
1727 params,
1728 event_history,
1729 responses,
1730 parent,
1731 intermittent_event_count,
1732 locked_event,
1733 })
1734 }
1735
1736 fn count_join_next(
1737 tx: &Transaction,
1738 execution_id: &ExecutionId,
1739 join_set_id: &JoinSetId,
1740 ) -> Result<u64, DbErrorRead> {
1741 let mut stmt = tx.prepare(
1742 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1743 AND history_event_type = :join_next",
1744 )?;
1745 Ok(stmt.query_row(
1746 named_params! {
1747 ":execution_id": execution_id.to_string(),
1748 ":join_set_id": join_set_id.to_string(),
1749 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1750 },
1751 |row| row.get("count"),
1752 )?)
1753 }
1754
1755 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1756 #[expect(clippy::needless_return)]
1757 fn append(
1758 tx: &Transaction,
1759 execution_id: &ExecutionId,
1760 req: AppendRequest,
1761 appending_version: Version,
1762 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1763 if matches!(req.event, ExecutionEventInner::Created { .. }) {
1764 return Err(DbErrorWrite::NonRetriable(
1765 DbErrorWriteNonRetriable::ValidationFailed(
1766 "cannot append `Created` event - use `create` instead".into(),
1767 ),
1768 ));
1769 }
1770 if let AppendRequest {
1771 event:
1772 ExecutionEventInner::Locked(Locked {
1773 component_id,
1774 executor_id,
1775 run_id,
1776 lock_expires_at,
1777 retry_config,
1778 }),
1779 created_at,
1780 } = req
1781 {
1782 return Self::lock_single_execution(
1783 tx,
1784 created_at,
1785 &component_id,
1786 execution_id,
1787 run_id,
1788 &appending_version,
1789 executor_id,
1790 lock_expires_at,
1791 retry_config,
1792 )
1793 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1794 }
1795
1796 let combined_state = Self::get_combined_state(tx, execution_id)?;
1797 if combined_state.pending_state.is_finished() {
1798 debug!("Execution is already finished");
1799 return Err(DbErrorWrite::NonRetriable(
1800 DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1801 ));
1802 }
1803
1804 Self::check_expected_next_and_appending_version(
1805 &combined_state.get_next_version_assert_not_finished(),
1806 &appending_version,
1807 )?;
1808 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1809 error!("Cannot serialize {:?} - {err:?}", req.event);
1810 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1811 })?;
1812
1813 let mut stmt = tx.prepare(
1814 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1815 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1816 ?;
1817 stmt.execute(named_params! {
1818 ":execution_id": execution_id.to_string(),
1819 ":created_at": req.created_at,
1820 ":json_value": event_ser,
1821 ":version": appending_version.0,
1822 ":variant": req.event.variant(),
1823 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1824 })?;
1825 match &req.event {
1828 ExecutionEventInner::Created { .. } => {
1829 unreachable!("handled in the caller")
1830 }
1831
1832 ExecutionEventInner::Locked { .. } => {
1833 unreachable!("handled above")
1834 }
1835
1836 ExecutionEventInner::TemporarilyFailed {
1837 backoff_expires_at, ..
1838 }
1839 | ExecutionEventInner::TemporarilyTimedOut {
1840 backoff_expires_at, ..
1841 } => {
1842 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1843 tx,
1844 execution_id,
1845 &appending_version,
1846 *backoff_expires_at,
1847 true, )?;
1849 return Ok((next_version, notifier));
1850 }
1851
1852 ExecutionEventInner::Unlocked {
1853 backoff_expires_at, ..
1854 } => {
1855 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1856 tx,
1857 execution_id,
1858 &appending_version,
1859 *backoff_expires_at,
1860 false, )?;
1862 return Ok((next_version, notifier));
1863 }
1864
1865 ExecutionEventInner::Finished { result, .. } => {
1866 Self::update_state_finished(
1867 tx,
1868 execution_id,
1869 &appending_version,
1870 req.created_at,
1871 PendingStateFinishedResultKind::from(result),
1872 )?;
1873 return Ok((
1874 appending_version,
1875 AppendNotifier {
1876 pending_at: None,
1877 execution_finished: Some(NotifierExecutionFinished {
1878 execution_id: execution_id.clone(),
1879 retval: result.clone(),
1880 }),
1881 response: None,
1882 },
1883 ));
1884 }
1885
1886 ExecutionEventInner::HistoryEvent {
1887 event:
1888 HistoryEvent::JoinSetCreate { .. }
1889 | HistoryEvent::JoinSetRequest {
1890 request: JoinSetRequest::ChildExecutionRequest { .. },
1891 ..
1892 }
1893 | HistoryEvent::Persist { .. }
1894 | HistoryEvent::Schedule { .. }
1895 | HistoryEvent::Stub { .. }
1896 | HistoryEvent::JoinNextTooMany { .. },
1897 } => {
1898 return Ok((
1899 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1900 AppendNotifier::default(),
1901 ));
1902 }
1903
1904 ExecutionEventInner::HistoryEvent {
1905 event:
1906 HistoryEvent::JoinSetRequest {
1907 join_set_id,
1908 request:
1909 JoinSetRequest::DelayRequest {
1910 delay_id,
1911 expires_at,
1912 ..
1913 },
1914 },
1915 } => {
1916 return Ok((
1917 Self::bump_state_next_version(
1918 tx,
1919 execution_id,
1920 &appending_version,
1921 Some(DelayReq {
1922 join_set_id: join_set_id.clone(),
1923 delay_id: delay_id.clone(),
1924 expires_at: *expires_at,
1925 }),
1926 )?,
1927 AppendNotifier::default(),
1928 ));
1929 }
1930
1931 ExecutionEventInner::HistoryEvent {
1932 event:
1933 HistoryEvent::JoinNext {
1934 join_set_id,
1935 run_expires_at,
1936 closing,
1937 requested_ffqn: _,
1938 },
1939 } => {
1940 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1942 let nth_response =
1943 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1945 assert!(join_next_count > 0);
1946 if let Some(ResponseWithCursor {
1947 event:
1948 JoinSetResponseEventOuter {
1949 created_at: nth_created_at,
1950 ..
1951 },
1952 cursor: _,
1953 }) = nth_response
1954 {
1955 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1957 tx,
1958 execution_id,
1959 &appending_version,
1960 scheduled_at,
1961 false, )?;
1963 return Ok((next_version, notifier));
1964 }
1965 return Ok((
1966 Self::update_state_blocked(
1967 tx,
1968 execution_id,
1969 &appending_version,
1970 join_set_id,
1971 *run_expires_at,
1972 *closing,
1973 )?,
1974 AppendNotifier::default(),
1975 ));
1976 }
1977 }
1978 }
1979
1980 fn append_response(
1981 tx: &Transaction,
1982 execution_id: &ExecutionId,
1983 response_outer: JoinSetResponseEventOuter,
1984 ) -> Result<AppendNotifier, DbErrorWrite> {
1985 let mut stmt = tx.prepare(
1986 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1987 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1988 )?;
1989 let join_set_id = &response_outer.event.join_set_id;
1990 let delay_id = match &response_outer.event.event {
1991 JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1992 JoinSetResponse::ChildExecutionFinished { .. } => None,
1993 };
1994 let (child_execution_id, finished_version) = match &response_outer.event.event {
1995 JoinSetResponse::ChildExecutionFinished {
1996 child_execution_id,
1997 finished_version,
1998 result: _,
1999 } => (
2000 Some(child_execution_id.to_string()),
2001 Some(finished_version.0),
2002 ),
2003 JoinSetResponse::DelayFinished { .. } => (None, None),
2004 };
2005
2006 stmt.execute(named_params! {
2007 ":execution_id": execution_id.to_string(),
2008 ":created_at": response_outer.created_at,
2009 ":join_set_id": join_set_id.to_string(),
2010 ":delay_id": delay_id,
2011 ":child_execution_id": child_execution_id,
2012 ":finished_version": finished_version,
2013 })?;
2014
2015 let combined_state = Self::get_combined_state(tx, execution_id)?;
2017 debug!("previous_pending_state: {combined_state:?}");
2018 let mut notifier = if let PendingState::BlockedByJoinSet {
2019 join_set_id: found_join_set_id,
2020 lock_expires_at, closing: _,
2022 } = combined_state.pending_state
2023 && *join_set_id == found_join_set_id
2024 {
2025 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2028 Self::update_state_pending_after_response_appended(
2031 tx,
2032 execution_id,
2033 scheduled_at,
2034 &combined_state.corresponding_version, )?
2036 } else {
2037 AppendNotifier::default()
2038 };
2039 if let JoinSetResponseEvent {
2040 join_set_id,
2041 event: JoinSetResponse::DelayFinished { delay_id },
2042 } = &response_outer.event
2043 {
2044 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2045 let mut stmt =
2046 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2047 ?;
2048 stmt.execute(named_params! {
2049 ":execution_id": execution_id.to_string(),
2050 ":join_set_id": join_set_id.to_string(),
2051 ":delay_id": delay_id.to_string(),
2052 })?;
2053 }
2054 notifier.response = Some((execution_id.clone(), response_outer));
2055 Ok(notifier)
2056 }
2057
2058 fn append_backtrace(
2059 tx: &Transaction,
2060 backtrace_info: &BacktraceInfo,
2061 ) -> Result<(), DbErrorWrite> {
2062 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2063 warn!(
2064 "Cannot serialize backtrace {:?} - {err:?}",
2065 backtrace_info.wasm_backtrace
2066 );
2067 DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2068 })?;
2069 let mut stmt = tx
2070 .prepare(
2071 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2072 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2073 )
2074 ?;
2075 stmt.execute(named_params! {
2076 ":execution_id": backtrace_info.execution_id.to_string(),
2077 ":component_id": backtrace_info.component_id.to_string(),
2078 ":version_min_including": backtrace_info.version_min_including.0,
2079 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2080 ":wasm_backtrace": backtrace,
2081 })?;
2082 Ok(())
2083 }
2084
2085 #[cfg(feature = "test")]
2086 fn get(
2087 tx: &Transaction,
2088 execution_id: &ExecutionId,
2089 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2090 let mut stmt = tx.prepare(
2091 "SELECT created_at, json_value FROM t_execution_log WHERE \
2092 execution_id = :execution_id ORDER BY version",
2093 )?;
2094 let events = stmt
2095 .query_map(
2096 named_params! {
2097 ":execution_id": execution_id.to_string(),
2098 },
2099 |row| {
2100 let created_at = row.get("created_at")?;
2101 let event = row
2102 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2103 .map_err(|serde| {
2104 error!("Cannot deserialize {row:?} - {serde:?}");
2105 consistency_rusqlite("cannot deserialize event")
2106 })?
2107 .0;
2108
2109 Ok(ExecutionEvent {
2110 created_at,
2111 event,
2112 backtrace_id: None,
2113 })
2114 },
2115 )?
2116 .collect::<Result<Vec<_>, _>>()?;
2117 if events.is_empty() {
2118 return Err(DbErrorRead::NotFound);
2119 }
2120 let combined_state = Self::get_combined_state(tx, execution_id)?;
2121 let responses = Self::list_responses(tx, execution_id, None)?
2122 .into_iter()
2123 .map(|resp| resp.event)
2124 .collect();
2125 Ok(concepts::storage::ExecutionLog {
2126 execution_id: execution_id.clone(),
2127 events,
2128 responses,
2129 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2131 })
2132 }
2133
2134 fn list_execution_events(
2135 tx: &Transaction,
2136 execution_id: &ExecutionId,
2137 version_min: VersionType,
2138 version_max_excluding: VersionType,
2139 include_backtrace_id: bool,
2140 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2141 let select = if include_backtrace_id {
2142 "SELECT
2143 log.created_at,
2144 log.json_value,
2145 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2146 bt.version_min_including AS backtrace_id
2147 FROM
2148 t_execution_log AS log
2149 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2150 t_backtrace AS bt ON log.execution_id = bt.execution_id
2151 -- Check if the log's version falls within the backtrace's range
2152 AND log.version >= bt.version_min_including
2153 AND log.version < bt.version_max_excluding
2154 WHERE
2155 log.execution_id = :execution_id
2156 AND log.version >= :version_min
2157 AND log.version < :version_max_excluding
2158 ORDER BY
2159 log.version;"
2160 } else {
2161 "SELECT
2162 created_at, json_value, NULL as backtrace_id
2163 FROM t_execution_log WHERE
2164 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2165 ORDER BY version"
2166 };
2167 tx.prepare(select)?
2168 .query_map(
2169 named_params! {
2170 ":execution_id": execution_id.to_string(),
2171 ":version_min": version_min,
2172 ":version_max_excluding": version_max_excluding
2173 },
2174 |row| {
2175 let created_at = row.get("created_at")?;
2176 let backtrace_id = row
2177 .get::<_, Option<VersionType>>("backtrace_id")?
2178 .map(Version::new);
2179 let event = row
2180 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2181 .map(|event| ExecutionEvent {
2182 created_at,
2183 event: event.0,
2184 backtrace_id,
2185 })
2186 .map_err(|serde| {
2187 error!("Cannot deserialize {row:?} - {serde:?}");
2188 consistency_rusqlite("cannot deserialize")
2189 })?;
2190 Ok(event)
2191 },
2192 )?
2193 .collect::<Result<Vec<_>, _>>()
2194 .map_err(DbErrorRead::from)
2195 }
2196
2197 fn get_execution_event(
2198 tx: &Transaction,
2199 execution_id: &ExecutionId,
2200 version: VersionType,
2201 ) -> Result<ExecutionEvent, DbErrorRead> {
2202 let mut stmt = tx.prepare(
2203 "SELECT created_at, json_value FROM t_execution_log WHERE \
2204 execution_id = :execution_id AND version = :version",
2205 )?;
2206 stmt.query_row(
2207 named_params! {
2208 ":execution_id": execution_id.to_string(),
2209 ":version": version,
2210 },
2211 |row| {
2212 let created_at = row.get("created_at")?;
2213 let event = row
2214 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2215 .map_err(|serde| {
2216 error!("Cannot deserialize {row:?} - {serde:?}");
2217 consistency_rusqlite("cannot deserialize event")
2218 })?;
2219
2220 Ok(ExecutionEvent {
2221 created_at,
2222 event: event.0,
2223 backtrace_id: None,
2224 })
2225 },
2226 )
2227 .map_err(DbErrorRead::from)
2228 }
2229
2230 fn list_responses(
2231 tx: &Transaction,
2232 execution_id: &ExecutionId,
2233 pagination: Option<Pagination<u32>>,
2234 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2235 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2237 let mut sql = "SELECT \
2238 r.id, r.created_at, r.join_set_id, r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2239 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2240 WHERE \
2241 r.execution_id = :execution_id \
2242 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2243 "
2244 .to_string();
2245 let limit = match &pagination {
2246 Some(
2247 pagination @ (Pagination::NewerThan { cursor, .. }
2248 | Pagination::OlderThan { cursor, .. }),
2249 ) => {
2250 params.push((":cursor", Box::new(cursor)));
2251 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2252 Some(pagination.length())
2253 }
2254 None => None,
2255 };
2256 sql.push_str(" ORDER BY id");
2257 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2258 sql.push_str(" DESC");
2259 }
2260 if let Some(limit) = limit {
2261 write!(sql, " LIMIT {limit}").unwrap();
2262 }
2263 params.push((":execution_id", Box::new(execution_id.to_string())));
2264 tx.prepare(&sql)?
2265 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2266 params
2267 .iter()
2268 .map(|(key, value)| (*key, value.as_ref()))
2269 .collect::<Vec<_>>()
2270 .as_ref(),
2271 Self::parse_response_with_cursor,
2272 )?
2273 .collect::<Result<Vec<_>, rusqlite::Error>>()
2274 .map_err(DbErrorRead::from)
2275 }
2276
2277 fn parse_response_with_cursor(
2278 row: &rusqlite::Row<'_>,
2279 ) -> Result<ResponseWithCursor, rusqlite::Error> {
2280 let id = row.get("id")?;
2281 let created_at: DateTime<Utc> = row.get("created_at")?;
2282 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2283 let event = match (
2284 row.get::<_, Option<DelayId>>("delay_id")?,
2285 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2286 row.get::<_, Option<VersionType>>("finished_version")?,
2287 row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2288 ) {
2289 (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2290 (
2291 None,
2292 Some(child_execution_id),
2293 Some(finished_version),
2294 Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2295 ) => JoinSetResponse::ChildExecutionFinished {
2296 child_execution_id,
2297 finished_version: Version(finished_version),
2298 result,
2299 },
2300 (delay, child, finished, result) => {
2301 error!(
2302 "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2303 delay,
2304 result.map(|it| it.0)
2305 );
2306 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2307 }
2308 };
2309 Ok(ResponseWithCursor {
2310 cursor: id,
2311 event: JoinSetResponseEventOuter {
2312 event: JoinSetResponseEvent { join_set_id, event },
2313 created_at,
2314 },
2315 })
2316 }
2317
2318 fn nth_response(
2319 tx: &Transaction,
2320 execution_id: &ExecutionId,
2321 join_set_id: &JoinSetId,
2322 skip_rows: u64,
2323 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2324 tx
2326 .prepare(
2327 "SELECT r.id, r.created_at, r.join_set_id, \
2328 r.delay_id, \
2329 r.child_execution_id, r.finished_version, l.json_value \
2330 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2331 WHERE \
2332 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2333 (
2334 r.finished_version = l.version \
2335 OR \
2336 r.child_execution_id IS NULL \
2337 ) \
2338 ORDER BY id \
2339 LIMIT 1 OFFSET :offset",
2340 )
2341 ?
2342 .query_row(
2343 named_params! {
2344 ":execution_id": execution_id.to_string(),
2345 ":join_set_id": join_set_id.to_string(),
2346 ":offset": skip_rows,
2347 },
2348 Self::parse_response_with_cursor,
2349 )
2350 .optional()
2351 .map_err(DbErrorRead::from)
2352 }
2353
2354 #[instrument(level = Level::TRACE, skip_all)]
2356 fn get_responses_with_offset(
2357 tx: &Transaction,
2358 execution_id: &ExecutionId,
2359 skip_rows: usize,
2360 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2361 tx.prepare(
2363 "SELECT r.id, r.created_at, r.join_set_id, \
2364 r.delay_id, \
2365 r.child_execution_id, r.finished_version, l.json_value \
2366 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2367 WHERE \
2368 r.execution_id = :execution_id AND \
2369 ( \
2370 r.finished_version = l.version \
2371 OR r.child_execution_id IS NULL \
2372 ) \
2373 ORDER BY id \
2374 LIMIT -1 OFFSET :offset",
2375 )
2376 ?
2377 .query_map(
2378 named_params! {
2379 ":execution_id": execution_id.to_string(),
2380 ":offset": skip_rows,
2381 },
2382 Self::parse_response_with_cursor,
2383 )
2384 ?
2385 .collect::<Result<Vec<_>, _>>()
2386 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2387 .map_err(DbErrorRead::from)
2388 }
2389
2390 fn get_pending_of_single_ffqn(
2391 mut stmt: CachedStatement,
2392 batch_size: usize,
2393 pending_at_or_sooner: DateTime<Utc>,
2394 ffqn: &FunctionFqn,
2395 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2396 stmt.query_map(
2397 named_params! {
2398 ":pending_expires_finished": pending_at_or_sooner,
2399 ":ffqn": ffqn.to_string(),
2400 ":batch_size": batch_size,
2401 },
2402 |row| {
2403 let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2404 let next_version =
2405 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2406 Ok(execution_id.map(|exe| (exe, next_version)))
2407 },
2408 )
2409 .map_err(|err| {
2410 warn!("Ignoring consistency error {err:?}");
2411 })?
2412 .collect::<Result<Vec<_>, _>>()
2413 .map_err(|err| {
2414 warn!("Ignoring consistency error {err:?}");
2415 })?
2416 .into_iter()
2417 .collect::<Result<Vec<_>, _>>()
2418 .map_err(|err| {
2419 warn!("Ignoring consistency error {err:?}");
2420 })
2421 }
2422
2423 fn get_pending(
2425 conn: &Connection,
2426 batch_size: usize,
2427 pending_at_or_sooner: DateTime<Utc>,
2428 ffqns: &[FunctionFqn],
2429 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2430 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2431 for ffqn in ffqns {
2432 let stmt = conn
2434 .prepare_cached(&format!(
2435 r#"
2436 SELECT execution_id, corresponding_version FROM t_state WHERE
2437 state = "{STATE_PENDING_AT}" AND
2438 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2439 ORDER BY pending_expires_finished LIMIT :batch_size
2440 "#
2441 ))
2442 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2443
2444 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2445 stmt,
2446 batch_size - execution_ids_versions.len(),
2447 pending_at_or_sooner,
2448 ffqn,
2449 ) {
2450 execution_ids_versions.extend(execs_and_versions);
2451 if execution_ids_versions.len() == batch_size {
2452 break;
2454 }
2455 }
2457 }
2458 Ok(execution_ids_versions)
2459 }
2460
2461 #[instrument(level = Level::TRACE, skip_all)]
2463 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2464 let (pending_ats, finished_execs, responses) = {
2465 let (mut pending_ats, mut finished_execs, mut responses) =
2466 (Vec::new(), Vec::new(), Vec::new());
2467 for notifier in notifiers {
2468 if let Some(pending_at) = notifier.pending_at {
2469 pending_ats.push(pending_at);
2470 }
2471 if let Some(finished) = notifier.execution_finished {
2472 finished_execs.push(finished);
2473 }
2474 if let Some(response) = notifier.response {
2475 responses.push(response);
2476 }
2477 }
2478 (pending_ats, finished_execs, responses)
2479 };
2480
2481 if !pending_ats.is_empty() {
2483 let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2484 for pending_at in pending_ats {
2485 Self::notify_pending_locked(&pending_at, current_time, &guard);
2486 }
2487 }
2488 if !finished_execs.is_empty() {
2491 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2492 for finished in finished_execs {
2493 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2494 for (_tag, sender) in listeners_of_exe_id {
2495 let _ = sender.send(finished.retval.clone());
2498 }
2499 }
2500 }
2501 }
2502 if !responses.is_empty() {
2504 let mut guard = self.0.response_subscribers.lock().unwrap();
2505 for (execution_id, response) in responses {
2506 if let Some((sender, _)) = guard.remove(&execution_id) {
2507 let _ = sender.send(response);
2508 }
2509 }
2510 }
2511 }
2512
2513 fn notify_pending_locked(
2514 notifier: &NotifierPendingAt,
2515 current_time: DateTime<Utc>,
2516 ffqn_to_pending_subscription: &std::sync::MutexGuard<
2517 HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2518 >,
2519 ) {
2520 if notifier.scheduled_at <= current_time
2522 && let Some((subscription, _)) = ffqn_to_pending_subscription.get(¬ifier.ffqn)
2523 {
2524 debug!("Notifying pending subscriber");
2525 let _ = subscription.try_send(());
2527 }
2528 }
2529}
2530
2531#[async_trait]
2532impl DbExecutor for SqlitePool {
2533 #[instrument(level = Level::TRACE, skip(self))]
2534 async fn lock_pending(
2535 &self,
2536 batch_size: usize,
2537 pending_at_or_sooner: DateTime<Utc>,
2538 ffqns: Arc<[FunctionFqn]>,
2539 created_at: DateTime<Utc>,
2540 component_id: ComponentId,
2541 executor_id: ExecutorId,
2542 lock_expires_at: DateTime<Utc>,
2543 run_id: RunId,
2544 retry_config: ComponentRetryConfig,
2545 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2546 let execution_ids_versions = self
2547 .transaction(
2548 move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2549 "get_pending",
2550 )
2551 .await?;
2552 if execution_ids_versions.is_empty() {
2553 Ok(vec![])
2554 } else {
2555 debug!("Locking {execution_ids_versions:?}");
2556 self.transaction(
2557 move |tx| {
2558 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2559 for (execution_id, version) in &execution_ids_versions {
2561 match Self::lock_single_execution(
2562 tx,
2563 created_at,
2564 &component_id,
2565 execution_id,
2566 run_id,
2567 version,
2568 executor_id,
2569 lock_expires_at,
2570 retry_config,
2571 ) {
2572 Ok(locked) => locked_execs.push(locked),
2573 Err(err) => {
2574 warn!("Locking row {execution_id} failed - {err:?}");
2575 }
2576 }
2577 }
2578 Ok(locked_execs)
2579 },
2580 "lock_pending",
2581 )
2582 .await
2583 }
2584 }
2585
2586 #[instrument(level = Level::DEBUG, skip(self))]
2587 async fn lock_one(
2588 &self,
2589 created_at: DateTime<Utc>,
2590 component_id: ComponentId,
2591 execution_id: &ExecutionId,
2592 run_id: RunId,
2593 version: Version,
2594 executor_id: ExecutorId,
2595 lock_expires_at: DateTime<Utc>,
2596 retry_config: ComponentRetryConfig,
2597 ) -> Result<LockedExecution, DbErrorWrite> {
2598 debug!(%execution_id, "lock_one");
2599 let execution_id = execution_id.clone();
2600 self.transaction(
2601 move |tx| {
2602 Self::lock_single_execution(
2603 tx,
2604 created_at,
2605 &component_id,
2606 &execution_id,
2607 run_id,
2608 &version,
2609 executor_id,
2610 lock_expires_at,
2611 retry_config,
2612 )
2613 },
2614 "lock_inner",
2615 )
2616 .await
2617 }
2618
2619 #[instrument(level = Level::DEBUG, skip(self, req))]
2620 async fn append(
2621 &self,
2622 execution_id: ExecutionId,
2623 version: Version,
2624 req: AppendRequest,
2625 ) -> Result<AppendResponse, DbErrorWrite> {
2626 debug!(%req, "append");
2627 trace!(?req, "append");
2628 let created_at = req.created_at;
2629 let (version, notifier) = self
2630 .transaction(
2631 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2632 "append",
2633 )
2634 .await?;
2635 self.notify_all(vec![notifier], created_at);
2636 Ok(version)
2637 }
2638
2639 #[instrument(level = Level::DEBUG, skip_all)]
2640 async fn append_batch_respond_to_parent(
2641 &self,
2642 events: AppendEventsToExecution,
2643 response: AppendResponseToExecution,
2644 current_time: DateTime<Utc>,
2645 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2646 debug!("append_batch_respond_to_parent");
2647 if events.execution_id == response.parent_execution_id {
2648 return Err(DbErrorWrite::NonRetriable(
2651 DbErrorWriteNonRetriable::ValidationFailed(
2652 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2653 ),
2654 ));
2655 }
2656 if events.batch.is_empty() {
2657 error!("Batch cannot be empty");
2658 return Err(DbErrorWrite::NonRetriable(
2659 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2660 ));
2661 }
2662 let (version, notifiers) = {
2663 self.transaction(
2664 move |tx| {
2665 let mut version = events.version.clone();
2666 let mut notifier_of_child = None;
2667 for append_request in &events.batch {
2668 let (v, n) = Self::append(
2669 tx,
2670 &events.execution_id,
2671 append_request.clone(),
2672 version,
2673 )?;
2674 version = v;
2675 notifier_of_child = Some(n);
2676 }
2677
2678 let pending_at_parent = Self::append_response(
2679 tx,
2680 &response.parent_execution_id,
2681 response.parent_response_event.clone(),
2682 )?;
2683 Ok::<_, DbErrorWrite>((
2684 version,
2685 vec![
2686 notifier_of_child.expect("checked that the batch is not empty"),
2687 pending_at_parent,
2688 ],
2689 ))
2690 },
2691 "append_batch_respond_to_parent",
2692 )
2693 .await?
2694 };
2695 self.notify_all(notifiers, current_time);
2696 Ok(version)
2697 }
2698
2699 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2702 async fn wait_for_pending(
2703 &self,
2704 pending_at_or_sooner: DateTime<Utc>,
2705 ffqns: Arc<[FunctionFqn]>,
2706 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2707 ) {
2708 let unique_tag: u64 = rand::random();
2709 let (sender, mut receiver) = mpsc::channel(1); {
2711 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2712 for ffqn in ffqns.as_ref() {
2713 ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2714 }
2715 }
2716 async {
2717 let Ok(execution_ids_versions) = self
2718 .transaction(
2719 {
2720 let ffqns = ffqns.clone();
2721 move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2722 },
2723 "subscribe_to_pending",
2724 )
2725 .await
2726 else {
2727 trace!(
2728 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2729 );
2730 timeout_fut.await;
2731 return;
2732 };
2733 if !execution_ids_versions.is_empty() {
2734 trace!("Not waiting, database already contains new pending executions");
2735 return;
2736 }
2737 tokio::select! { _ = receiver.recv() => {
2739 trace!("Received a notification");
2740 }
2741 () = timeout_fut => {
2742 }
2743 }
2744 }.await;
2745 {
2747 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2748 for ffqn in ffqns.as_ref() {
2749 match ffqn_to_pending_subscription.remove(ffqn) {
2750 Some((_, tag)) if tag == unique_tag => {
2751 }
2753 Some(other) => {
2754 ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2756 }
2757 None => {
2758 }
2760 }
2761 }
2762 }
2763 }
2764}
2765
2766#[async_trait]
2767impl DbConnection for SqlitePool {
2768 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2769 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2770 debug!("create");
2771 trace!(?req, "create");
2772 let created_at = req.created_at;
2773 let (version, notifier) = self
2774 .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2775 .await?;
2776 self.notify_all(vec![notifier], created_at);
2777 Ok(version)
2778 }
2779
2780 #[instrument(level = Level::DEBUG, skip(self, batch))]
2781 async fn append_batch(
2782 &self,
2783 current_time: DateTime<Utc>,
2784 batch: Vec<AppendRequest>,
2785 execution_id: ExecutionId,
2786 version: Version,
2787 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2788 debug!("append_batch");
2789 trace!(?batch, "append_batch");
2790 assert!(!batch.is_empty(), "Empty batch request");
2791
2792 let (version, notifier) = self
2793 .transaction(
2794 move |tx| {
2795 let mut version = version.clone();
2796 let mut notifier = None;
2797 for append_request in &batch {
2798 let (v, n) =
2799 Self::append(tx, &execution_id, append_request.clone(), version)?;
2800 version = v;
2801 notifier = Some(n);
2802 }
2803 Ok::<_, DbErrorWrite>((
2804 version,
2805 notifier.expect("checked that the batch is not empty"),
2806 ))
2807 },
2808 "append_batch",
2809 )
2810 .await?;
2811
2812 self.notify_all(vec![notifier], current_time);
2813 Ok(version)
2814 }
2815
2816 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2817 async fn append_batch_create_new_execution(
2818 &self,
2819 current_time: DateTime<Utc>,
2820 batch: Vec<AppendRequest>,
2821 execution_id: ExecutionId,
2822 version: Version,
2823 child_req: Vec<CreateRequest>,
2824 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2825 debug!("append_batch_create_new_execution");
2826 trace!(?batch, ?child_req, "append_batch_create_new_execution");
2827 assert!(!batch.is_empty(), "Empty batch request");
2828
2829 let (version, notifiers) = self
2830 .transaction(
2831 move |tx| {
2832 let mut notifier = None;
2833 let mut version = version.clone();
2834 for append_request in &batch {
2835 let (v, n) =
2836 Self::append(tx, &execution_id, append_request.clone(), version)?;
2837 version = v;
2838 notifier = Some(n);
2839 }
2840 let mut notifiers = Vec::new();
2841 notifiers.push(notifier.expect("checked that the batch is not empty"));
2842
2843 for child_req in &child_req {
2844 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2845 notifiers.push(notifier);
2846 }
2847 Ok::<_, DbErrorWrite>((version, notifiers))
2848 },
2849 "append_batch_create_new_execution_inner",
2850 )
2851 .await?;
2852 self.notify_all(notifiers, current_time);
2853 Ok(version)
2854 }
2855
2856 #[cfg(feature = "test")]
2857 #[instrument(level = Level::DEBUG, skip(self))]
2858 async fn get(
2859 &self,
2860 execution_id: &ExecutionId,
2861 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2862 trace!("get");
2863 let execution_id = execution_id.clone();
2864 self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2865 .await
2866 }
2867
2868 #[instrument(level = Level::DEBUG, skip(self))]
2869 async fn list_execution_events(
2870 &self,
2871 execution_id: &ExecutionId,
2872 since: &Version,
2873 max_length: VersionType,
2874 include_backtrace_id: bool,
2875 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2876 let execution_id = execution_id.clone();
2877 let since = since.0;
2878 self.transaction(
2879 move |tx| {
2880 Self::list_execution_events(
2881 tx,
2882 &execution_id,
2883 since,
2884 since + max_length,
2885 include_backtrace_id,
2886 )
2887 },
2888 "get",
2889 )
2890 .await
2891 }
2892
2893 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2897 async fn subscribe_to_next_responses(
2898 &self,
2899 execution_id: &ExecutionId,
2900 start_idx: usize,
2901 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2902 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2903 debug!("next_responses");
2904 let unique_tag: u64 = rand::random();
2905 let execution_id = execution_id.clone();
2906
2907 let cleanup = || {
2908 let mut guard = self.0.response_subscribers.lock().unwrap();
2909 match guard.remove(&execution_id) {
2910 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
2912 guard.insert(execution_id.clone(), other);
2914 }
2915 None => {} }
2917 };
2918
2919 let response_subscribers = self.0.response_subscribers.clone();
2920 let resp_or_receiver = {
2921 let execution_id = execution_id.clone();
2922 self.transaction(
2923 move |tx| {
2924 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2925 if responses.is_empty() {
2926 let (sender, receiver) = oneshot::channel();
2928 response_subscribers
2929 .lock()
2930 .unwrap()
2931 .insert(execution_id.clone(), (sender, unique_tag));
2932 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2933 } else {
2934 Ok(itertools::Either::Left(responses))
2935 }
2936 },
2937 "subscribe_to_next_responses",
2938 )
2939 .await
2940 }
2941 .inspect_err(|_| {
2942 cleanup();
2943 })?;
2944 match resp_or_receiver {
2945 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
2947 let res = async move {
2948 tokio::select! {
2949 resp = receiver => {
2950 let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2951 Ok(vec![resp])
2952 }
2953 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2954 }
2955 }
2956 .await;
2957 cleanup();
2958 res
2959 }
2960 }
2961 }
2962
2963 async fn wait_for_finished_result(
2965 &self,
2966 execution_id: &ExecutionId,
2967 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2968 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2969 let unique_tag: u64 = rand::random();
2970 let execution_id = execution_id.clone();
2971 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2972
2973 let cleanup = || {
2974 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2975 if let Some(subscribers) = guard.get_mut(&execution_id) {
2976 subscribers.remove(&unique_tag);
2977 }
2978 };
2979
2980 let resp_or_receiver = {
2981 let execution_id = execution_id.clone();
2982 self.transaction(move |tx| {
2983 let pending_state =
2984 Self::get_combined_state(tx, &execution_id)?.pending_state;
2985 if let PendingState::Finished { finished } = pending_state {
2986 let event =
2987 Self::get_execution_event(tx, &execution_id, finished.version)?;
2988 if let ExecutionEventInner::Finished { result, ..} = event.event {
2989 Ok(itertools::Either::Left(result))
2990 } else {
2991 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2992 Err(DbErrorReadWithTimeout::from(consistency_db_err(
2993 "cannot get finished event based on t_state version"
2994 )))
2995 }
2996 } else {
2997 let (sender, receiver) = oneshot::channel();
3001 let mut guard = execution_finished_subscription.lock().unwrap();
3002 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3003 Ok(itertools::Either::Right(receiver))
3004 }
3005 }, "wait_for_finished_result")
3006 .await
3007 }
3008 .inspect_err(|_| {
3009 cleanup();
3013 })?;
3014
3015 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3016 match resp_or_receiver {
3017 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3019 let res = async move {
3020 tokio::select! {
3021 resp = receiver => {
3022 Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3023 }
3024 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3025 }
3026 }
3027 .await;
3028 cleanup();
3029 res
3030 }
3031 }
3032 }
3033
3034 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3035 async fn append_response(
3036 &self,
3037 created_at: DateTime<Utc>,
3038 execution_id: ExecutionId,
3039 response_event: JoinSetResponseEvent,
3040 ) -> Result<(), DbErrorWrite> {
3041 debug!("append_response");
3042 let event = JoinSetResponseEventOuter {
3043 created_at,
3044 event: response_event,
3045 };
3046 let notifier = self
3047 .transaction(
3048 move |tx| Self::append_response(tx, &execution_id, event.clone()),
3049 "append_response",
3050 )
3051 .await?;
3052 self.notify_all(vec![notifier], created_at);
3053 Ok(())
3054 }
3055
3056 #[instrument(level = Level::DEBUG, skip_all)]
3057 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3058 debug!("append_backtrace");
3059 self.transaction(
3060 move |tx| Self::append_backtrace(tx, &append),
3061 "append_backtrace",
3062 )
3063 .await
3064 }
3065
3066 #[instrument(level = Level::DEBUG, skip_all)]
3067 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3068 debug!("append_backtrace_batch");
3069 self.transaction(
3070 move |tx| {
3071 for append in &batch {
3072 Self::append_backtrace(tx, append)?;
3073 }
3074 Ok(())
3075 },
3076 "append_backtrace",
3077 )
3078 .await
3079 }
3080
3081 #[instrument(level = Level::DEBUG, skip_all)]
3082 async fn get_backtrace(
3083 &self,
3084 execution_id: &ExecutionId,
3085 filter: BacktraceFilter,
3086 ) -> Result<BacktraceInfo, DbErrorRead> {
3087 debug!("get_last_backtrace");
3088 let execution_id = execution_id.clone();
3089
3090 self.transaction(
3091 move |tx| {
3092 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3093 WHERE execution_id = :execution_id";
3094 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3095 let select = match &filter {
3096 BacktraceFilter::Specific(version) =>{
3097 params.push((":version", Box::new(version.0)));
3098 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3099 },
3100 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3101 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3102 };
3103 tx
3104 .prepare(&select)
3105 ?
3106 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3107 params
3108 .iter()
3109 .map(|(key, value)| (*key, value.as_ref()))
3110 .collect::<Vec<_>>()
3111 .as_ref(),
3112 |row| {
3113 Ok(BacktraceInfo {
3114 execution_id: execution_id.clone(),
3115 component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3116 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3117 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3118 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3119 })
3120 },
3121 ).map_err(DbErrorRead::from)
3122 },
3123 "get_last_backtrace",
3124 ).await
3125 }
3126
3127 #[instrument(level = Level::TRACE, skip(self))]
3129 async fn get_expired_timers(
3130 &self,
3131 at: DateTime<Utc>,
3132 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3133 self.transaction(
3134 move |conn| {
3135 let mut expired_timers = conn.prepare(
3136 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3137 )?
3138 .query_map(
3139 named_params! {
3140 ":at": at,
3141 },
3142 |row| {
3143 let execution_id = row.get("execution_id")?;
3144 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3145 let delay_id = row.get::<_, DelayId>("delay_id")?;
3146 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3147 Ok(ExpiredTimer::Delay(delay))
3148 },
3149 )?
3150 .collect::<Result<Vec<_>, _>>()?;
3151 let expired = conn.prepare(&format!(r#"
3153 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3154 executor_id, run_id
3155 FROM t_state
3156 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3157 "#
3158 )
3159 )?
3160 .query_map(
3161 named_params! {
3162 ":at": at,
3163 },
3164 |row| {
3165 let execution_id = row.get("execution_id")?;
3166 let locked_at_version = Version::new(row.get("last_lock_version")?);
3167 let next_version = Version::new(row.get("corresponding_version")?).increment();
3168 let intermittent_event_count = row.get("intermittent_event_count")?;
3169 let max_retries = row.get("max_retries")?;
3170 let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3171 let executor_id = row.get("executor_id")?;
3172 let run_id = row.get("run_id")?;
3173 let lock = ExpiredLock {
3174 execution_id,
3175 locked_at_version,
3176 next_version,
3177 intermittent_event_count,
3178 max_retries,
3179 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3180 locked_by: LockedBy { executor_id, run_id },
3181 };
3182 Ok(ExpiredTimer::Lock(lock))
3183 }
3184 )?
3185 .collect::<Result<Vec<_>, _>>()?;
3186 expired_timers.extend(expired);
3187 if !expired_timers.is_empty() {
3188 debug!("get_expired_timers found {expired_timers:?}");
3189 }
3190 Ok(expired_timers)
3191 }, "get_expired_timers"
3192 )
3193 .await
3194 }
3195
3196 async fn get_execution_event(
3197 &self,
3198 execution_id: &ExecutionId,
3199 version: &Version,
3200 ) -> Result<ExecutionEvent, DbErrorRead> {
3201 let version = version.0;
3202 let execution_id = execution_id.clone();
3203 self.transaction(
3204 move |tx| Self::get_execution_event(tx, &execution_id, version),
3205 "get_execution_event",
3206 )
3207 .await
3208 }
3209
3210 async fn get_pending_state(
3211 &self,
3212 execution_id: &ExecutionId,
3213 ) -> Result<PendingState, DbErrorRead> {
3214 let execution_id = execution_id.clone();
3215 Ok(self
3216 .transaction(
3217 move |tx| Self::get_combined_state(tx, &execution_id),
3218 "get_pending_state",
3219 )
3220 .await?
3221 .pending_state)
3222 }
3223
3224 async fn list_executions(
3225 &self,
3226 ffqn: Option<FunctionFqn>,
3227 top_level_only: bool,
3228 pagination: ExecutionListPagination,
3229 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3230 self.transaction(
3231 move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3232 "list_executions",
3233 )
3234 .await
3235 }
3236
3237 async fn list_responses(
3238 &self,
3239 execution_id: &ExecutionId,
3240 pagination: Pagination<u32>,
3241 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3242 let execution_id = execution_id.clone();
3243 self.transaction(
3244 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3245 "list_executions",
3246 )
3247 .await
3248 }
3249}
3250
3251#[cfg(any(test, feature = "tempfile"))]
3252pub mod tempfile {
3253 use super::{SqliteConfig, SqlitePool};
3254 use tempfile::NamedTempFile;
3255
3256 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3257 if let Ok(path) = std::env::var("SQLITE_FILE") {
3258 (
3259 SqlitePool::new(path, SqliteConfig::default())
3260 .await
3261 .unwrap(),
3262 None,
3263 )
3264 } else {
3265 let file = NamedTempFile::new().unwrap();
3266 let path = file.path();
3267 (
3268 SqlitePool::new(path, SqliteConfig::default())
3269 .await
3270 .unwrap(),
3271 Some(file),
3272 )
3273 }
3274 }
3275}