1use crate::histograms::Histograms;
2use assert_matches::assert_matches;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use concepts::{
6 ComponentId, ExecutionId, FunctionFqn, JoinSetId, SupportedFunctionReturnValue,
7 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8 storage::{
9 AppendBatchResponse, AppendRequest, AppendResponse, BacktraceFilter, BacktraceInfo,
10 CreateRequest, DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric,
11 DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite, DbErrorWritePermanent, DbExecutor,
12 DbPool, DbPoolCloseable, ExecutionEvent, ExecutionEventInner, ExecutionListPagination,
13 ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest,
14 JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse,
15 LockedExecution, Pagination, PendingState, PendingStateFinished,
16 PendingStateFinishedResultKind, ResponseWithCursor, Version, VersionType,
17 },
18};
19use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
20use conversions::{
21 intermittent_err_generic, prepare_err_generic, result_err_generic, result_err_read,
22};
23use hashbrown::HashMap;
24use rusqlite::{
25 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
26 named_params,
27};
28use std::{
29 cmp::max,
30 collections::VecDeque,
31 fmt::Debug,
32 path::Path,
33 str::FromStr,
34 sync::{
35 Arc, Mutex,
36 atomic::{AtomicBool, Ordering},
37 },
38 time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42 sync::{mpsc, oneshot},
43 time::Instant,
44};
45use tracing::{Level, Span, debug, debug_span, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53 join_set_id: JoinSetId,
54 delay_id: DelayId,
55 expires_at: DateTime<Utc>,
56}
57const PRAGMA: [[&str; 2]; 10] = [
69 ["journal_mode", "wal"],
70 ["synchronous", "FULL"],
71 ["foreign_keys", "true"],
72 ["busy_timeout", "1000"],
73 ["cache_size", "10000"], ["temp_store", "MEMORY"],
75 ["page_size", "8192"], ["mmap_size", "134217728"],
77 ["journal_size_limit", "67108864"],
78 ["integrity_check", ""],
79];
80
81const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84 id INTEGER PRIMARY KEY AUTOINCREMENT,
85 schema_version INTEGER NOT NULL,
86 created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94 execution_id TEXT NOT NULL,
95 created_at TEXT NOT NULL,
96 json_value TEXT NOT NULL,
97 version INTEGER NOT NULL,
98 variant TEXT NOT NULL,
99 join_set_id TEXT,
100 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101 PRIMARY KEY (execution_id, version)
102) STRICT
103";
104const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
107";
108const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
111";
112
113const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119 id INTEGER PRIMARY KEY AUTOINCREMENT,
120 created_at TEXT NOT NULL,
121 execution_id TEXT NOT NULL,
122 join_set_id TEXT NOT NULL,
123
124 delay_id TEXT,
125
126 child_execution_id TEXT,
127 finished_version INTEGER,
128
129 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145 execution_id TEXT NOT NULL,
146 is_top_level INTEGER NOT NULL,
147 corresponding_version INTEGER NOT NULL,
148 pending_expires_finished TEXT NOT NULL,
149 ffqn TEXT NOT NULL,
150 state TEXT NOT NULL,
151 created_at TEXT NOT NULL,
152 updated_at TEXT NOT NULL,
153 scheduled_at TEXT NOT NULL,
154 intermittent_event_count INTEGER NOT NULL,
155 max_retries INTEGER NOT NULL,
156 retry_exp_backoff_millis INTEGER NOT NULL,
157
158 last_lock_version INTEGER,
159 executor_id TEXT,
160 run_id TEXT,
161
162 join_set_id TEXT,
163 join_set_closing INTEGER,
164
165 result_kind TEXT,
166
167 PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198 execution_id TEXT NOT NULL,
199 join_set_id TEXT NOT NULL,
200 delay_id TEXT NOT NULL,
201 expires_at TEXT NOT NULL,
202 PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209 execution_id TEXT NOT NULL,
210 component_id TEXT NOT NULL,
211 version_min_including INTEGER NOT NULL,
212 version_max_excluding INTEGER NOT NULL,
213 wasm_backtrace TEXT NOT NULL,
214 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221mod conversions {
222
223 use concepts::storage::{DbErrorGeneric, DbErrorRead};
224 use rusqlite::types::{FromSql, FromSqlError};
225 use std::{fmt::Debug, str::FromStr};
226 use tracing::error;
227
228 #[expect(clippy::needless_pass_by_value)]
229 pub(crate) fn prepare_err_generic(err: rusqlite::Error) -> DbErrorGeneric {
230 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
231 DbErrorGeneric::Uncategorized(err.to_string().into())
232 }
233 #[expect(clippy::needless_pass_by_value)]
234 pub(crate) fn intermittent_err_generic(err: rusqlite::Error) -> DbErrorGeneric {
235 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
236 DbErrorGeneric::Uncategorized(err.to_string().into())
237 }
238 #[expect(clippy::needless_pass_by_value)]
239 pub(crate) fn result_err_generic(err: rusqlite::Error) -> DbErrorGeneric {
240 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
241 DbErrorGeneric::Uncategorized(err.to_string().into())
242 }
243 pub(crate) fn result_err_read(err: rusqlite::Error) -> DbErrorRead {
244 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
245 DbErrorRead::NotFound
246 } else {
247 result_err_generic(err).into()
248 }
249 }
250
251 pub(crate) struct JsonWrapper<T>(pub(crate) T);
252 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
253 fn column_result(
254 value: rusqlite::types::ValueRef<'_>,
255 ) -> rusqlite::types::FromSqlResult<Self> {
256 let value = match value {
257 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
258 Ok(value)
259 }
260 other => {
261 error!(
262 backtrace = %std::backtrace::Backtrace::capture(),
263 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
264 );
265 Err(FromSqlError::InvalidType)
266 }
267 }?;
268 let value = serde_json::from_slice::<T>(value).map_err(|err| {
269 error!(
270 backtrace = %std::backtrace::Backtrace::capture(),
271 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
272 r#type = std::any::type_name::<T>()
273 );
274 FromSqlError::InvalidType
275 })?;
276 Ok(Self(value))
277 }
278 }
279
280 pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
281 impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
282 fn column_result(
283 value: rusqlite::types::ValueRef<'_>,
284 ) -> rusqlite::types::FromSqlResult<Self> {
285 let value = String::column_result(value)?;
286 let value = T::from_str(&value).map_err(|err| {
287 error!(
288 backtrace = %std::backtrace::Backtrace::capture(),
289 "Cannot convert string `{value}` to type:`{type}` - {err:?}",
290 r#type = std::any::type_name::<T>()
291 );
292 FromSqlError::InvalidType
293 })?;
294 Ok(Self(value))
295 }
296 }
297
298 #[derive(Debug, thiserror::Error)]
300 #[error("{0}")]
301 pub(crate) struct OtherError(&'static str);
302 pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
303 FromSqlError::other(OtherError(input)).into()
304 }
305
306 pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
307 DbErrorGeneric::Uncategorized(input.into())
308 }
309}
310#[derive(Debug)]
311struct CombinedStateDTO {
312 state: String,
313 ffqn: String,
314 pending_expires_finished: DateTime<Utc>,
315 last_lock_version: Option<Version>,
317 executor_id: Option<ExecutorId>,
318 run_id: Option<RunId>,
319 join_set_id: Option<JoinSetId>,
321 join_set_closing: Option<bool>,
322 result_kind: Option<PendingStateFinishedResultKind>,
324}
325#[derive(Debug)]
326struct CombinedState {
327 ffqn: FunctionFqn,
328 pending_state: PendingState,
329 corresponding_version: Version,
330}
331impl CombinedState {
332 fn get_next_version_assert_not_finished(&self) -> Version {
333 assert!(!self.pending_state.is_finished());
334 self.corresponding_version.increment()
335 }
336
337 #[cfg(feature = "test")]
338 fn get_next_version_or_finished(&self) -> Version {
339 if self.pending_state.is_finished() {
340 self.corresponding_version.clone()
341 } else {
342 self.corresponding_version.increment()
343 }
344 }
345}
346
347#[derive(Debug)]
348struct NotifierPendingAt {
349 scheduled_at: DateTime<Utc>,
350 ffqn: FunctionFqn,
351}
352
353#[derive(Debug)]
354struct NotifierExecutionFinished {
355 execution_id: ExecutionId,
356 retval: SupportedFunctionReturnValue,
357}
358
359#[derive(Debug, Default)]
360struct AppendNotifier {
361 pending_at: Option<NotifierPendingAt>,
362 execution_finished: Option<NotifierExecutionFinished>,
363 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
364}
365
366impl CombinedState {
367 fn new(
368 dto: &CombinedStateDTO,
369 corresponding_version: Version,
370 ) -> Result<Self, rusqlite::Error> {
371 let pending_state = match dto {
372 CombinedStateDTO {
373 state,
374 ffqn: _,
375 pending_expires_finished: scheduled_at,
376 last_lock_version: None,
377 executor_id: None,
378 run_id: None,
379 join_set_id: None,
380 join_set_closing: None,
381 result_kind: None,
382 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
383 scheduled_at: *scheduled_at,
384 }),
385 CombinedStateDTO {
386 state,
387 ffqn: _,
388 pending_expires_finished: lock_expires_at,
389 last_lock_version: Some(_),
390 executor_id: Some(executor_id),
391 run_id: Some(run_id),
392 join_set_id: None,
393 join_set_closing: None,
394 result_kind: None,
395 } if state == STATE_LOCKED => Ok(PendingState::Locked {
396 executor_id: *executor_id,
397 run_id: *run_id,
398 lock_expires_at: *lock_expires_at,
399 }),
400 CombinedStateDTO {
401 state,
402 ffqn: _,
403 pending_expires_finished: lock_expires_at,
404 last_lock_version: None,
405 executor_id: None,
406 run_id: None,
407 join_set_id: Some(join_set_id),
408 join_set_closing: Some(join_set_closing),
409 result_kind: None,
410 } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
411 join_set_id: join_set_id.clone(),
412 closing: *join_set_closing,
413 lock_expires_at: *lock_expires_at,
414 }),
415 CombinedStateDTO {
416 state,
417 ffqn: _,
418 pending_expires_finished: finished_at,
419 last_lock_version: None,
420 executor_id: None,
421 run_id: None,
422 join_set_id: None,
423 join_set_closing: None,
424 result_kind: Some(result_kind),
425 } if state == STATE_FINISHED => Ok(PendingState::Finished {
426 finished: PendingStateFinished {
427 finished_at: *finished_at,
428 version: corresponding_version.0,
429 result_kind: *result_kind,
430 },
431 }),
432 _ => {
433 error!("Cannot deserialize pending state from {dto:?}");
434 Err(consistency_rusqlite("invalid `t_state`"))
435 }
436 }?;
437 Ok(Self {
438 ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
439 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
440 consistency_rusqlite("invalid ffqn value in `t_state`")
441 })?,
442 pending_state,
443 corresponding_version,
444 })
445 }
446}
447
448#[derive(Debug, Clone, Copy, PartialEq, Eq)]
449enum CommandPriority {
450 High,
452 Medium,
454 Low,
456}
457
458#[derive(derive_more::Debug)]
459enum ThreadCommand {
460 Func {
461 #[debug(skip)]
462 func: Box<dyn FnOnce(&mut Connection) + Send>,
463 priority: CommandPriority,
464 sent_at: Instant,
465 name: &'static str,
466 },
467 Shutdown,
468 Dummy,
469}
470
471#[derive(Clone)]
472pub struct SqlitePool(SqlitePoolInner);
473
474type ResponseSubscribers =
475 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
476type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
477type ExecutionFinishedSubscribers =
478 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
479#[derive(Clone)]
480struct SqlitePoolInner {
481 shutdown_requested: Arc<AtomicBool>,
482 shutdown_finished: Arc<AtomicBool>,
483 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
484 response_subscribers: ResponseSubscribers,
485 pending_ffqn_subscribers: PendingFfqnSubscribers,
486 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
487 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
489
490#[async_trait]
491impl DbPoolCloseable for SqlitePool {
492 async fn close(self) {
493 self.0.shutdown_requested.store(true, Ordering::Release);
494 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
496 while !self.0.shutdown_finished.load(Ordering::Acquire) {
497 tokio::time::sleep(Duration::from_millis(1)).await;
498 }
499 }
500}
501
502#[async_trait]
503impl DbPool for SqlitePool {
504 fn connection(&self) -> Box<dyn DbConnection> {
505 Box::new(self.clone())
506 }
507}
508impl Drop for SqlitePool {
509 fn drop(&mut self) {
510 let arc = self.0.join_handle.take().expect("join_handle was set");
511 if let Ok(join_handle) = Arc::try_unwrap(arc) {
512 if !join_handle.is_finished() {
514 if !self.0.shutdown_finished.load(Ordering::Acquire) {
515 let backtrace = std::backtrace::Backtrace::capture();
517 warn!("SqlitePool was not closed properly - {backtrace}");
518 self.0.shutdown_requested.store(true, Ordering::Release);
519 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
521 } else {
524 }
526 }
527 }
528 }
529}
530
531#[derive(Debug, Clone)]
532pub struct SqliteConfig {
533 pub queue_capacity: usize,
534 pub low_prio_threshold: usize,
535 pub pragma_override: Option<HashMap<String, String>>,
536 pub metrics_threshold: Option<Duration>,
537}
538impl Default for SqliteConfig {
539 fn default() -> Self {
540 Self {
541 queue_capacity: 100,
542 low_prio_threshold: 100,
543 pragma_override: None,
544 metrics_threshold: None,
545 }
546 }
547}
548
549impl SqlitePool {
550 fn init_thread(
551 path: &Path,
552 mut pragma_override: HashMap<String, String>,
553 ) -> Result<Connection, InitializationError> {
554 fn execute<P: Params>(
555 conn: &Connection,
556 sql: &str,
557 params: P,
558 ) -> Result<(), InitializationError> {
559 conn.execute(sql, params).map(|_| ()).map_err(|err| {
560 error!("Cannot run `{sql}` - {err:?}");
561 InitializationError
562 })
563 }
564 fn pragma_update(
565 conn: &Connection,
566 name: &str,
567 value: &str,
568 ) -> Result<(), InitializationError> {
569 if value.is_empty() {
570 debug!("Querying PRAGMA {name}");
571 conn.pragma_query(None, name, |row| {
572 debug!("{row:?}");
573 Ok(())
574 })
575 .map_err(|err| {
576 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
577 InitializationError
578 })
579 } else {
580 debug!("Setting PRAGMA {name}={value}");
581 conn.pragma_update(None, name, value).map_err(|err| {
582 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
583 InitializationError
584 })
585 }
586 }
587
588 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
589 error!("cannot open the connection - {err:?}");
590 InitializationError
591 })?;
592
593 for [pragma_name, default_value] in PRAGMA {
594 let pragma_value = pragma_override
595 .remove(pragma_name)
596 .unwrap_or_else(|| default_value.to_string());
597 pragma_update(&conn, pragma_name, &pragma_value)?;
598 }
599 for (pragma_name, pragma_value) in pragma_override.drain() {
601 pragma_update(&conn, &pragma_name, &pragma_value)?;
602 }
603
604 execute(&conn, CREATE_TABLE_T_METADATA, [])?;
606 execute(
608 &conn,
609 &format!(
610 "INSERT INTO t_metadata (schema_version, created_at) VALUES
611 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
612 ),
613 [Utc::now()],
614 )?;
615 let actual_version = conn
617 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
618 .map_err(|err| {
619 error!("cannot select schema version - {err:?}");
620 InitializationError
621 })?
622 .query_row([], |row| row.get::<_, u32>("schema_version"));
623
624 let actual_version = actual_version.map_err(|err| {
625 error!("Cannot read the schema version - {err:?}");
626 InitializationError
627 })?;
628 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
629 error!(
630 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
631 );
632 return Err(InitializationError);
633 }
634
635 execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
637 execute(
638 &conn,
639 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
640 [],
641 )?;
642 execute(
643 &conn,
644 CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
645 [],
646 )?;
647 execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
649 execute(
650 &conn,
651 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
652 [],
653 )?;
654 execute(&conn, CREATE_TABLE_T_STATE, [])?;
656 execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
657 execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
658 execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
659 execute(&conn, IDX_T_STATE_FFQN, [])?;
660 execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
661 execute(&conn, CREATE_TABLE_T_DELAY, [])?;
663 execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
665 execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
666 Ok(conn)
667 }
668
669 fn execute(
670 priority_filter: CommandPriority,
671 vec: &mut Vec<ThreadCommand>,
672 conn: &mut Connection,
673 shutdown_requested: &AtomicBool,
674 histograms: &mut Histograms,
675 ) -> Result<usize, ()> {
676 let mut processed = 0;
677 for item in vec {
678 if matches!(item, ThreadCommand::Func { priority, .. } if *priority == priority_filter )
679 {
680 let item = std::mem::replace(item, ThreadCommand::Dummy); let (func, sent_at, func_name) = assert_matches!(item, ThreadCommand::Func{func, sent_at, name, ..} => (func, sent_at, name));
682 let sent_latency = sent_at.elapsed();
683 let started_at = Instant::now();
684 func(conn);
685 let func_duration = started_at.elapsed();
686 processed += 1;
687 histograms.record(sent_latency, func_name, func_duration);
688 }
689 if shutdown_requested.load(Ordering::Acquire) {
690 debug!("Recveived shutdown during processing of the batch");
692 return Err(());
693 }
694 }
695 Ok(processed)
696 }
697
698 fn connection_rpc(
699 mut conn: Connection,
700 shutdown_requested: &AtomicBool,
701 shutdown_finished: &AtomicBool,
702 mut command_rx: mpsc::Receiver<ThreadCommand>,
703 queue_capacity: usize,
704 low_prio_threshold: usize,
705 metrics_threshold: Option<Duration>,
706 ) {
707 let mut vec: Vec<ThreadCommand> = Vec::with_capacity(queue_capacity);
708
709 let mut histograms = Histograms::new(metrics_threshold);
710
711 let mut metrics_instant = std::time::Instant::now();
712 while Self::tick(
713 &mut vec,
714 &mut conn,
715 shutdown_requested,
716 &mut command_rx,
717 low_prio_threshold,
718 &mut histograms,
719 &mut metrics_instant,
720 )
721 .is_ok()
722 {
723 }
725 debug!("Closing command thread");
726 shutdown_finished.store(true, Ordering::Release);
727 }
728
729 fn tick(
730 vec: &mut Vec<ThreadCommand>,
731 conn: &mut Connection,
732 shutdown_requested: &AtomicBool,
733 command_rx: &mut mpsc::Receiver<ThreadCommand>,
734 low_prio_threshold: usize,
735 histograms: &mut Histograms,
736 metrics_instant: &mut std::time::Instant,
737 ) -> Result<(), ()> {
738 vec.clear();
739 if let Some(item) = command_rx.blocking_recv() {
740 vec.push(item);
741 } else {
742 debug!("command_rx was closed");
743 return Err(());
744 }
745 while let Ok(more) = command_rx.try_recv() {
746 vec.push(more);
747 }
748 let shutdown_found = vec
750 .iter()
751 .any(|item| matches!(item, ThreadCommand::Shutdown));
752 if shutdown_found || shutdown_requested.load(Ordering::Acquire) {
753 debug!("Recveived shutdown before processing the batch");
754 return Err(());
755 }
756
757 let processed = Self::execute(
758 CommandPriority::High,
759 vec,
760 conn,
761 shutdown_requested,
762 histograms,
763 )? + Self::execute(
764 CommandPriority::Medium,
765 vec,
766 conn,
767 shutdown_requested,
768 histograms,
769 )?;
770 if processed < low_prio_threshold {
771 Self::execute(
772 CommandPriority::Low,
773 vec,
774 conn,
775 shutdown_requested,
776 histograms,
777 )?;
778 }
779
780 histograms.print_if_elapsed(metrics_instant);
781 Ok(())
782 }
783
784 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
785 pub async fn new<P: AsRef<Path>>(
786 path: P,
787 config: SqliteConfig,
788 ) -> Result<Self, InitializationError> {
789 let path = path.as_ref().to_owned();
790
791 let shutdown_requested = Arc::new(AtomicBool::new(false));
792 let shutdown_finished = Arc::new(AtomicBool::new(false));
793
794 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
795 info!("Sqlite database location: {path:?}");
796 let join_handle = {
797 let init_task = {
799 tokio::task::spawn_blocking(move || {
800 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
801 })
802 .await
803 };
804 let conn = match init_task {
805 Ok(res) => res?,
806 Err(join_err) => {
807 error!("Initialization panic - {join_err:?}");
808 return Err(InitializationError);
809 }
810 };
811 let shutdown_requested = shutdown_requested.clone();
812 let shutdown_finished = shutdown_finished.clone();
813 std::thread::spawn(move || {
815 Self::connection_rpc(
816 conn,
817 &shutdown_requested,
818 &shutdown_finished,
819 command_rx,
820 config.queue_capacity,
821 config.low_prio_threshold,
822 config.metrics_threshold,
823 );
824 })
825 };
826 Ok(SqlitePool(SqlitePoolInner {
827 shutdown_requested,
828 shutdown_finished,
829 command_tx,
830 response_subscribers: Arc::default(),
831 pending_ffqn_subscribers: Arc::default(),
832 join_handle: Some(Arc::new(join_handle)),
833 execution_finished_subscribers: Arc::default(),
834 }))
835 }
836
837 #[instrument(level = Level::TRACE, skip_all, fields(name))]
838 pub async fn transaction_write<F, T, E>(
839 &self,
840 func: F,
841 name: &'static str,
842 ) -> Result<Result<T, E>, DbErrorGeneric >
843 where
844 F: FnOnce(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
845 T: Send + 'static,
846 E: From<DbErrorGeneric> + Send + 'static,
847 {
848 self.transaction(func, true, name).await
849 }
850
851 #[instrument(level = Level::TRACE, skip_all, fields(name))]
852 pub async fn transaction_read<F, T, E>(
853 &self,
854 func: F,
855 name: &'static str,
856 ) -> Result<Result<T, E>, DbErrorGeneric >
857 where
858 F: FnOnce(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
859 T: Send + 'static,
860 E: From<DbErrorGeneric> + Send + 'static,
861 {
862 self.transaction(func, false, name).await
863 }
864
865 async fn transaction<F, T, E>(
867 &self,
868 func: F,
869 write: bool,
870 name: &'static str,
871 ) -> Result<Result<T, E>, DbErrorGeneric >
872 where
873 F: FnOnce(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
874 T: Send + 'static,
875 E: From<DbErrorGeneric> + Send + 'static,
876 {
877 let (tx, rx) = oneshot::channel();
878 let parent_span = Span::current();
879
880 self.0
881 .command_tx
882 .send(ThreadCommand::Func {
883 priority: if write {
884 CommandPriority::High
885 } else {
886 CommandPriority::Medium
887 },
888 func: Box::new(move |conn| {
889 let tx_begin =
890 debug_span!(parent: &parent_span, "tx_begin", name).in_scope(|| {
891 conn.transaction_with_behavior(if write {
892 rusqlite::TransactionBehavior::Immediate
893 } else {
894 rusqlite::TransactionBehavior::Deferred
895 })
896 .map_err(result_err_generic)
897 .map_err(E::from)
898 });
899 let tx_apply = tx_begin.and_then(|mut transaction| {
900 parent_span.in_scope(|| func(&mut transaction).map(|ok| (ok, transaction)))
901 });
902 let tx_commit = tx_apply.and_then(|(ok, transaction)| {
903 debug_span!(parent: &parent_span, "tx_commit", name).in_scope(|| {
904 transaction
905 .commit()
906 .map(|()| ok)
907 .map_err(result_err_generic)
908 .map_err(E::from)
909 })
910 });
911 tx.send(tx_commit)
912 .unwrap_or_else(|_| unreachable!("outer fn waits forever for the result"));
913 }),
914 sent_at: Instant::now(),
915 name,
916 })
917 .await
918 .map_err(|_send_err| DbErrorGeneric::Close)?;
919 rx.await.map_err(|_recv_err| DbErrorGeneric::Close)
920 }
921
922 #[instrument(level = Level::TRACE, skip_all, fields(name))]
923 pub async fn conn_low_prio<F, T>(
924 &self,
925 func: F,
926 name: &'static str,
927 ) -> Result<T, DbErrorGeneric>
928 where
929 F: FnOnce(&Connection) -> Result<T, DbErrorGeneric> + Send + 'static,
930 T: Send + 'static + Default,
931 {
932 let (tx, rx) = oneshot::channel();
933 let span = tracing::trace_span!("tx_function");
934 self.0
935 .command_tx
936 .send(ThreadCommand::Func {
937 priority: CommandPriority::Low,
938 func: Box::new(move |conn| {
939 tx.send(span.in_scope(|| func(conn)))
940 .unwrap_or_else(|_| unreachable!("outer fn waits forever for the result"));
941 }),
942 sent_at: Instant::now(),
943 name,
944 })
945 .await
946 .map_err(|_send_err| DbErrorGeneric::Close)?;
947 match rx.await {
948 Ok(res) => res,
949 Err(_recv_err) => Ok(T::default()), }
951 }
952
953 fn fetch_created_event(
954 conn: &Connection,
955 execution_id: &ExecutionId,
956 ) -> Result<CreateRequest, DbErrorRead> {
957 let mut stmt = conn
958 .prepare(
959 "SELECT created_at, json_value FROM t_execution_log WHERE \
960 execution_id = :execution_id AND version = 0",
961 )
962 .map_err(prepare_err_generic)?;
963 let (created_at, event) = stmt
964 .query_row(
965 named_params! {
966 ":execution_id": execution_id.to_string(),
967 },
968 |row| {
969 let created_at = row.get("created_at")?;
970 let event = row
971 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
972 .map_err(|serde| {
973 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
974 consistency_rusqlite("cannot deserialize `Created` event")
975 })?;
976 Ok((created_at, event.0))
977 },
978 )
979 .map_err(result_err_generic)?;
980 if let ExecutionEventInner::Created {
981 ffqn,
982 params,
983 parent,
984 scheduled_at,
985 retry_exp_backoff,
986 max_retries,
987 component_id,
988 metadata,
989 scheduled_by,
990 } = event
991 {
992 Ok(CreateRequest {
993 created_at,
994 execution_id: execution_id.clone(),
995 ffqn,
996 params,
997 parent,
998 scheduled_at,
999 retry_exp_backoff,
1000 max_retries,
1001 component_id,
1002 metadata,
1003 scheduled_by,
1004 })
1005 } else {
1006 error!("Row with version=0 must be a `Created` event - {event:?}");
1007 Err(consistency_db_err("expected `Created` event").into())
1008 }
1009 }
1010
1011 fn check_expected_next_and_appending_version(
1012 expected_version: &Version,
1013 appending_version: &Version,
1014 ) -> Result<(), DbErrorWrite> {
1015 if *expected_version != *appending_version {
1016 debug!(
1017 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1018 );
1019 return Err(DbErrorWrite::Permanent(
1020 DbErrorWritePermanent::CannotWrite {
1021 reason: "version conflict".into(),
1022 expected_version: Some(expected_version.clone()),
1023 },
1024 ));
1025 }
1026 Ok(())
1027 }
1028
1029 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1030 fn create_inner(
1031 tx: &Transaction,
1032 req: CreateRequest,
1033 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1034 debug!("create_inner");
1035
1036 let version = Version::default();
1037 let execution_id = req.execution_id.clone();
1038 let execution_id_str = execution_id.to_string();
1039 let ffqn = req.ffqn.clone();
1040 let created_at = req.created_at;
1041 let scheduled_at = req.scheduled_at;
1042 let max_retries = req.max_retries;
1043 let backoff_millis =
1044 u64::try_from(req.retry_exp_backoff.as_millis()).expect("backoff too big");
1045 let event = ExecutionEventInner::from(req);
1046 let event_ser = serde_json::to_string(&event).map_err(|err| {
1047 error!("Cannot serialize {event:?} - {err:?}");
1048 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1049 })?;
1050 tx.prepare(
1051 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1052 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1053 .map_err(prepare_err_generic)?
1054 .execute(named_params! {
1055 ":execution_id": &execution_id_str,
1056 ":created_at": created_at,
1057 ":version": version.0,
1058 ":json_value": event_ser,
1059 ":variant": event.variant(),
1060 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1061 })
1062 .map_err(result_err_generic)?;
1063 let pending_state = PendingState::PendingAt { scheduled_at };
1064 let pending_at = {
1065 let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
1066 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1067 tx.prepare(
1068 r"
1069 INSERT INTO t_state (
1070 execution_id,
1071 is_top_level,
1072 corresponding_version,
1073 pending_expires_finished,
1074 ffqn,
1075 state,
1076 created_at,
1077 updated_at,
1078 scheduled_at,
1079 intermittent_event_count,
1080 max_retries,
1081 retry_exp_backoff_millis
1082 )
1083 VALUES (
1084 :execution_id,
1085 :is_top_level,
1086 :corresponding_version,
1087 :pending_expires_finished,
1088 :ffqn,
1089 :state,
1090 :created_at,
1091 CURRENT_TIMESTAMP,
1092 :scheduled_at,
1093 0,
1094 :max_retries,
1095 :retry_exp_backoff_millis
1096 )
1097 ",
1098 )
1099 .map_err(prepare_err_generic)?
1100 .execute(named_params! {
1101 ":execution_id": execution_id.to_string(),
1102 ":is_top_level": execution_id.is_top_level(),
1103 ":corresponding_version": version.0,
1104 ":pending_expires_finished": scheduled_at,
1105 ":ffqn": ffqn.to_string(),
1106 ":state": STATE_PENDING_AT,
1107 ":created_at": created_at,
1108 ":scheduled_at": scheduled_at,
1109 ":max_retries": max_retries,
1110 ":retry_exp_backoff_millis": backoff_millis,
1111 })
1112 .map_err(result_err_generic)?;
1113 AppendNotifier {
1114 pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1115 execution_finished: None,
1116 response: None,
1117 }
1118 };
1119 let next_version = Version::new(version.0 + 1);
1120 Ok((next_version, pending_at))
1121 }
1122
1123 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1124 fn update_state_pending_after_response_appended(
1125 tx: &Transaction,
1126 execution_id: &ExecutionId,
1127 scheduled_at: DateTime<Utc>, corresponding_version: &Version, ) -> Result<AppendNotifier, DbErrorWrite> {
1130 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1131 let execution_id_str = execution_id.to_string();
1132 let mut stmt = tx
1133 .prepare_cached(
1134 r"
1135 UPDATE t_state
1136 SET
1137 corresponding_version = :corresponding_version,
1138 pending_expires_finished = :pending_expires_finished,
1139 state = :state,
1140 updated_at = CURRENT_TIMESTAMP,
1141
1142 last_lock_version = NULL,
1143 executor_id = NULL,
1144 run_id = NULL,
1145
1146 join_set_id = NULL,
1147 join_set_closing = NULL,
1148
1149 result_kind = NULL
1150 WHERE execution_id = :execution_id
1151 ",
1152 )
1153 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1154 let updated = stmt
1155 .execute(named_params! {
1156 ":execution_id": execution_id_str,
1157 ":corresponding_version": corresponding_version.0,
1158 ":pending_expires_finished": scheduled_at,
1159 ":state": STATE_PENDING_AT,
1160 })
1161 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1162 if updated != 1 {
1163 return Err(DbErrorWrite::NotFound);
1164 }
1165 Ok(AppendNotifier {
1166 pending_at: Some(NotifierPendingAt {
1167 scheduled_at,
1168 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1169 }),
1170 execution_finished: None,
1171 response: None,
1172 })
1173 }
1174
1175 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1176 fn update_state_pending_after_event_appended(
1177 tx: &Transaction,
1178 execution_id: &ExecutionId,
1179 appending_version: &Version,
1180 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1182 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1183 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1184 let mut stmt = tx
1187 .prepare_cached(
1188 r"
1189 UPDATE t_state
1190 SET
1191 corresponding_version = :appending_version,
1192 pending_expires_finished = :pending_expires_finished,
1193 state = :state,
1194 updated_at = CURRENT_TIMESTAMP,
1195 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1196
1197 last_lock_version = NULL,
1198 executor_id = NULL,
1199 run_id = NULL,
1200
1201 join_set_id = NULL,
1202 join_set_closing = NULL,
1203
1204 result_kind = NULL
1205 WHERE execution_id = :execution_id;
1206 ",
1207 )
1208 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1209 let updated = stmt
1210 .execute(named_params! {
1211 ":execution_id": execution_id.to_string(),
1212 ":appending_version": appending_version.0,
1213 ":pending_expires_finished": scheduled_at,
1214 ":state": STATE_PENDING_AT,
1215 ":intermittent_delta": i32::from(intermittent_failure) })
1217 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1218 if updated != 1 {
1219 return Err(DbErrorWrite::NotFound);
1220 }
1221 Ok((
1222 appending_version.increment(),
1223 AppendNotifier {
1224 pending_at: Some(NotifierPendingAt {
1225 scheduled_at,
1226 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1227 }),
1228 execution_finished: None,
1229 response: None,
1230 },
1231 ))
1232 }
1233
1234 fn update_state_locked_get_intermittent_event_count(
1235 tx: &Transaction,
1236 execution_id: &ExecutionId,
1237 executor_id: ExecutorId,
1238 run_id: RunId,
1239 lock_expires_at: DateTime<Utc>,
1240 appending_version: &Version,
1241 ) -> Result<u32, DbErrorWrite> {
1242 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1243 let execution_id_str = execution_id.to_string();
1244 let mut stmt = tx
1245 .prepare_cached(
1246 r"
1247 UPDATE t_state
1248 SET
1249 corresponding_version = :appending_version,
1250 pending_expires_finished = :pending_expires_finished,
1251 state = :state,
1252 updated_at = CURRENT_TIMESTAMP,
1253
1254 last_lock_version = :appending_version,
1255 executor_id = :executor_id,
1256 run_id = :run_id,
1257
1258 join_set_id = NULL,
1259 join_set_closing = NULL,
1260
1261 result_kind = NULL
1262 WHERE execution_id = :execution_id
1263 ",
1264 )
1265 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1266 let updated = stmt
1267 .execute(named_params! {
1268 ":execution_id": execution_id_str,
1269 ":appending_version": appending_version.0,
1270 ":pending_expires_finished": lock_expires_at,
1271 ":state": STATE_LOCKED,
1272 ":executor_id": executor_id.to_string(),
1273 ":run_id": run_id.to_string(),
1274 })
1275 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1276 if updated != 1 {
1277 return Err(DbErrorWrite::NotFound);
1278 }
1279
1280 let intermittent_event_count = tx
1282 .prepare(
1283 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1284 )
1285 .map_err(prepare_err_generic)?
1286 .query_row(
1287 named_params! {
1288 ":execution_id": execution_id_str,
1289 },
1290 |row| {
1291 let intermittent_event_count = row.get("intermittent_event_count")?;
1292 Ok(intermittent_event_count)
1293 },
1294 )
1295 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1296
1297 Ok(intermittent_event_count)
1298 }
1299
1300 fn update_state_blocked(
1301 tx: &Transaction,
1302 execution_id: &ExecutionId,
1303 appending_version: &Version,
1304 join_set_id: &JoinSetId,
1306 lock_expires_at: DateTime<Utc>,
1307 join_set_closing: bool,
1308 ) -> Result<
1309 AppendResponse, DbErrorWrite,
1311 > {
1312 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1313 let execution_id_str = execution_id.to_string();
1314 let mut stmt = tx
1315 .prepare_cached(
1316 r"
1317 UPDATE t_state
1318 SET
1319 corresponding_version = :appending_version,
1320 pending_expires_finished = :pending_expires_finished,
1321 state = :state,
1322 updated_at = CURRENT_TIMESTAMP,
1323
1324 last_lock_version = NULL,
1325 executor_id = NULL,
1326 run_id = NULL,
1327
1328 join_set_id = :join_set_id,
1329 join_set_closing = :join_set_closing,
1330
1331 result_kind = NULL
1332 WHERE execution_id = :execution_id
1333 ",
1334 )
1335 .map_err(prepare_err_generic)?;
1336 let updated = stmt
1337 .execute(named_params! {
1338 ":execution_id": execution_id_str,
1339 ":appending_version": appending_version.0,
1340 ":pending_expires_finished": lock_expires_at,
1341 ":state": STATE_BLOCKED_BY_JOIN_SET,
1342 ":join_set_id": join_set_id,
1343 ":join_set_closing": join_set_closing,
1344 })
1345 .map_err(result_err_generic)?;
1346 if updated != 1 {
1347 return Err(DbErrorWrite::NotFound);
1348 }
1349 Ok(appending_version.increment())
1350 }
1351
1352 fn update_state_finished(
1353 tx: &Transaction,
1354 execution_id: &ExecutionId,
1355 appending_version: &Version,
1356 finished_at: DateTime<Utc>,
1358 result_kind: PendingStateFinishedResultKind,
1359 ) -> Result<(), DbErrorWrite> {
1360 debug!("Setting t_state to Finished");
1361 let execution_id_str = execution_id.to_string();
1362 let mut stmt = tx
1363 .prepare_cached(
1364 r"
1365 UPDATE t_state
1366 SET
1367 corresponding_version = :appending_version,
1368 pending_expires_finished = :pending_expires_finished,
1369 state = :state,
1370 updated_at = CURRENT_TIMESTAMP,
1371
1372 last_lock_version = NULL,
1373 executor_id = NULL,
1374 run_id = NULL,
1375
1376 join_set_id = NULL,
1377 join_set_closing = NULL,
1378
1379 result_kind = :result_kind
1380 WHERE execution_id = :execution_id
1381 ",
1382 )
1383 .map_err(prepare_err_generic)?;
1384 let updated = stmt
1385 .execute(named_params! {
1386 ":execution_id": execution_id_str,
1387 ":appending_version": appending_version.0,
1388 ":pending_expires_finished": finished_at,
1389 ":state": STATE_FINISHED,
1390 ":result_kind": result_kind.to_string(),
1391 })
1392 .map_err(result_err_generic)?;
1393 if updated != 1 {
1394 return Err(DbErrorWrite::NotFound);
1395 }
1396 Ok(())
1397 }
1398
1399 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1401 fn bump_state_next_version(
1402 tx: &Transaction,
1403 execution_id: &ExecutionId,
1404 appending_version: &Version,
1405 delay_req: Option<DelayReq>,
1406 ) -> Result<AppendResponse , DbErrorWrite> {
1407 debug!("update_index_version");
1408 let execution_id_str = execution_id.to_string();
1409 let mut stmt = tx
1410 .prepare_cached(
1411 r"
1412 UPDATE t_state
1413 SET
1414 corresponding_version = :appending_version,
1415 updated_at = CURRENT_TIMESTAMP
1416 WHERE execution_id = :execution_id
1417 ",
1418 )
1419 .map_err(prepare_err_generic)?;
1420 let updated = stmt
1421 .execute(named_params! {
1422 ":execution_id": execution_id_str,
1423 ":appending_version": appending_version.0,
1424 })
1425 .map_err(result_err_generic)?;
1426 if updated != 1 {
1427 return Err(DbErrorWrite::NotFound);
1428 }
1429 if let Some(DelayReq {
1430 join_set_id,
1431 delay_id,
1432 expires_at,
1433 }) = delay_req
1434 {
1435 debug!("Inserting delay to `t_delay`");
1436 let mut stmt = tx
1437 .prepare_cached(
1438 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1439 VALUES \
1440 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1441 )
1442 .map_err(prepare_err_generic)?;
1443 stmt.execute(named_params! {
1444 ":execution_id": execution_id_str,
1445 ":join_set_id": join_set_id.to_string(),
1446 ":delay_id": delay_id.to_string(),
1447 ":expires_at": expires_at,
1448 })
1449 .map_err(result_err_generic)?;
1450 }
1451 Ok(appending_version.increment())
1452 }
1453
1454 fn get_combined_state(
1455 tx: &Transaction,
1456 execution_id: &ExecutionId,
1457 ) -> Result<CombinedState, DbErrorRead> {
1458 let mut stmt = tx
1459 .prepare(
1460 r"
1461 SELECT
1462 state, ffqn, corresponding_version, pending_expires_finished,
1463 last_lock_version, executor_id, run_id,
1464 join_set_id, join_set_closing,
1465 result_kind
1466 FROM t_state
1467 WHERE
1468 execution_id = :execution_id
1469 ",
1470 )
1471 .map_err(prepare_err_generic)?;
1472 stmt.query_row(
1473 named_params! {
1474 ":execution_id": execution_id.to_string(),
1475 },
1476 |row| {
1477 CombinedState::new(
1478 &CombinedStateDTO {
1479 state: row.get("state")?,
1480 ffqn: row.get("ffqn")?,
1481 pending_expires_finished: row
1482 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1483 last_lock_version: row
1484 .get::<_, Option<VersionType>>("last_lock_version")?
1485 .map(Version::new),
1486 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1487 run_id: row.get::<_, Option<RunId>>("run_id")?,
1488 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1489 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1490 result_kind: row
1491 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1492 "result_kind",
1493 )?
1494 .map(|wrapper| wrapper.0),
1495 },
1496 Version::new(row.get("corresponding_version")?),
1497 )
1498 },
1499 )
1500 .map_err(result_err_read)
1501 }
1502
1503 fn list_executions(
1504 read_tx: &Transaction,
1505 ffqn: Option<FunctionFqn>,
1506 top_level_only: bool,
1507 pagination: ExecutionListPagination,
1508 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1509 struct StatementModifier {
1510 where_vec: Vec<String>,
1511 params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)>,
1512 limit: u32,
1513 limit_desc: bool,
1514 }
1515 fn paginate<T: rusqlite::ToSql + 'static>(
1516 pagination: Pagination<Option<T>>,
1517 column: &str,
1518 top_level_only: bool,
1519 ) -> StatementModifier {
1520 let mut where_vec: Vec<String> = vec![];
1521 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1522 let limit = pagination.length();
1523 let limit_desc = pagination.is_desc();
1524 let rel = pagination.rel();
1525 match pagination {
1526 Pagination::NewerThan {
1527 cursor: Some(cursor),
1528 ..
1529 }
1530 | Pagination::OlderThan {
1531 cursor: Some(cursor),
1532 ..
1533 } => {
1534 where_vec.push(format!("{column} {rel} :cursor"));
1535 params.push((":cursor", Box::new(cursor)));
1536 }
1537 _ => {}
1538 }
1539 if top_level_only {
1540 where_vec.push("is_top_level=true".to_string());
1541 }
1542 StatementModifier {
1543 where_vec,
1544 params,
1545 limit,
1546 limit_desc,
1547 }
1548 }
1549 let mut statement_mod = match pagination {
1550 ExecutionListPagination::CreatedBy(pagination) => {
1551 paginate(pagination, "created_at", top_level_only)
1552 }
1553 ExecutionListPagination::ExecutionId(pagination) => {
1554 paginate(pagination, "execution_id", top_level_only)
1555 }
1556 };
1557
1558 if let Some(ffqn) = ffqn {
1559 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1560 statement_mod
1561 .params
1562 .push((":ffqn", Box::new(ffqn.to_string())));
1563 }
1564
1565 let where_str = if statement_mod.where_vec.is_empty() {
1566 String::new()
1567 } else {
1568 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1569 };
1570 let sql = format!(
1571 r"
1572 SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1573 last_lock_version, executor_id, run_id,
1574 join_set_id, join_set_closing,
1575 result_kind
1576 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1577 ",
1578 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1579 limit = statement_mod.limit
1580 );
1581 let mut vec: Vec<_> = read_tx
1582 .prepare(&sql)
1583 .map_err(prepare_err_generic)?
1584 .query_map::<_, &[(&'static str, &dyn rusqlite::ToSql)], _>(
1585 statement_mod
1586 .params
1587 .iter()
1588 .map(|(key, value)| (*key, value.as_ref()))
1589 .collect::<Vec<_>>()
1590 .as_ref(),
1591 |row| {
1592 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1593 let created_at = row.get("created_at")?;
1594 let scheduled_at = row.get("scheduled_at")?;
1595 let combined_state = CombinedState::new(
1596 &CombinedStateDTO {
1597 state: row.get("state")?,
1598 ffqn: row.get("ffqn")?,
1599 pending_expires_finished: row
1600 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1601 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1602
1603 last_lock_version: row
1604 .get::<_, Option<VersionType>>("last_lock_version")?
1605 .map(Version::new),
1606 run_id: row.get::<_, Option<RunId>>("run_id")?,
1607 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1608 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1609 result_kind: row
1610 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1611 "result_kind",
1612 )?
1613 .map(|wrapper| wrapper.0),
1614 },
1615 Version::new(row.get("corresponding_version")?),
1616 )?;
1617 Ok(ExecutionWithState {
1618 execution_id,
1619 ffqn: combined_state.ffqn,
1620 pending_state: combined_state.pending_state,
1621 created_at,
1622 scheduled_at,
1623 })
1624 },
1625 )
1626 .map_err(intermittent_err_generic)?
1627 .collect::<Vec<Result<_, _>>>()
1628 .into_iter()
1629 .filter_map(|row| match row {
1630 Ok(row) => Some(row),
1631 Err(err) => {
1632 warn!("Skipping row - {err:?}");
1633 None
1634 }
1635 })
1636 .collect();
1637
1638 if !statement_mod.limit_desc {
1639 vec.reverse();
1641 }
1642 Ok(vec)
1643 }
1644
1645 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1646 #[expect(clippy::too_many_arguments)]
1647 fn lock_single_execution(
1648 tx: &Transaction,
1649 created_at: DateTime<Utc>,
1650 component_id: ComponentId,
1651 execution_id: &ExecutionId,
1652 run_id: RunId,
1653 appending_version: &Version,
1654 executor_id: ExecutorId,
1655 lock_expires_at: DateTime<Utc>,
1656 ) -> Result<LockedExecution, DbErrorWrite> {
1657 debug!("lock_inner");
1658 let combined_state = Self::get_combined_state(tx, execution_id)?;
1659 combined_state.pending_state.can_append_lock(
1660 created_at,
1661 executor_id,
1662 run_id,
1663 lock_expires_at,
1664 )?;
1665 let expected_version = combined_state.get_next_version_assert_not_finished();
1666 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1667
1668 let event = ExecutionEventInner::Locked {
1670 component_id,
1671 executor_id,
1672 lock_expires_at,
1673 run_id,
1674 };
1675 let event_ser = serde_json::to_string(&event).map_err(|err| {
1676 warn!("Cannot serialize {event:?} - {err:?}");
1677 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1678 })?;
1679 let mut stmt = tx
1680 .prepare_cached(
1681 "INSERT INTO t_execution_log \
1682 (execution_id, created_at, json_value, version, variant) \
1683 VALUES \
1684 (:execution_id, :created_at, :json_value, :version, :variant)",
1685 )
1686 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1687 stmt.execute(named_params! {
1688 ":execution_id": execution_id.to_string(),
1689 ":created_at": created_at,
1690 ":json_value": event_ser,
1691 ":version": appending_version.0,
1692 ":variant": event.variant(),
1693 })
1694 .map_err(|err| {
1695 warn!("Cannot lock execution - {err:?}");
1696 DbErrorWritePermanent::CannotWrite {
1697 reason: "cannot lock execution".into(),
1698 expected_version: None,
1699 }
1700 })?;
1701
1702 let responses = Self::list_responses(tx, execution_id, None)?;
1704 let responses = responses.into_iter().map(|resp| resp.event).collect();
1705 trace!("Responses: {responses:?}");
1706
1707 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1708 tx,
1709 execution_id,
1710 executor_id,
1711 run_id,
1712 lock_expires_at,
1713 appending_version,
1714 )?;
1715 let mut events = tx
1717 .prepare(
1718 "SELECT json_value FROM t_execution_log WHERE \
1719 execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1720 ORDER BY version",
1721 )
1722 .map_err(prepare_err_generic)?
1723 .query_map(
1724 named_params! {
1725 ":execution_id": execution_id.to_string(),
1726 ":v1": DUMMY_CREATED.variant(),
1727 ":v2": DUMMY_HISTORY_EVENT.variant(),
1728 },
1729 |row| {
1730 row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1731 .map(|wrapper| wrapper.0)
1732 .map_err(|serde| {
1733 error!("Cannot deserialize {row:?} - {serde:?}");
1734 consistency_rusqlite("cannot deserialize json value")
1735 })
1736 },
1737 )
1738 .map_err(result_err_generic)?
1739 .collect::<Result<Vec<_>, _>>()
1740 .map_err(result_err_generic)?
1741 .into_iter()
1742 .collect::<VecDeque<_>>();
1743 let Some(ExecutionEventInner::Created {
1744 ffqn,
1745 params,
1746 retry_exp_backoff,
1747 max_retries,
1748 parent,
1749 metadata,
1750 ..
1751 }) = events.pop_front()
1752 else {
1753 error!("Execution log must contain at least `Created` event");
1754 return Err(consistency_db_err("execution log must contain `Created` event").into());
1755 };
1756
1757 let event_history = events
1758 .into_iter()
1759 .map(|event| {
1760 if let ExecutionEventInner::HistoryEvent { event } = event {
1761 Ok(event)
1762 } else {
1763 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1764 Err(consistency_db_err(
1765 "rows can only contain `Created` and `HistoryEvent` event kinds",
1766 ))
1767 }
1768 })
1769 .collect::<Result<Vec<_>, _>>()?;
1770 Ok(LockedExecution {
1771 execution_id: execution_id.clone(),
1772 metadata,
1773 run_id,
1774 next_version: appending_version.increment(),
1775 ffqn,
1776 params,
1777 event_history,
1778 responses,
1779 retry_exp_backoff,
1780 max_retries,
1781 parent,
1782 intermittent_event_count,
1783 })
1784 }
1785
1786 fn count_join_next(
1787 tx: &Transaction,
1788 execution_id: &ExecutionId,
1789 join_set_id: &JoinSetId,
1790 ) -> Result<u64, DbErrorRead> {
1791 let mut stmt = tx.prepare(
1792 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1793 AND history_event_type = :join_next",
1794 ).map_err(prepare_err_generic)?;
1795 Ok(stmt
1796 .query_row(
1797 named_params! {
1798 ":execution_id": execution_id.to_string(),
1799 ":join_set_id": join_set_id.to_string(),
1800 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1801 },
1802 |row| row.get("count"),
1803 )
1804 .map_err(result_err_generic)?)
1805 }
1806
1807 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1808 #[expect(clippy::needless_return)]
1809 fn append(
1810 tx: &Transaction,
1811 execution_id: &ExecutionId,
1812 req: AppendRequest,
1813 appending_version: Version,
1814 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1815 if matches!(req.event, ExecutionEventInner::Created { .. }) {
1816 return Err(DbErrorWrite::Permanent(
1817 DbErrorWritePermanent::ValidationFailed(
1818 "cannot append `Created` event - use `create` instead".into(),
1819 ),
1820 ));
1821 }
1822 if let AppendRequest {
1823 event:
1824 ExecutionEventInner::Locked {
1825 component_id,
1826 executor_id,
1827 run_id,
1828 lock_expires_at,
1829 },
1830 created_at,
1831 } = req
1832 {
1833 return Self::lock_single_execution(
1834 tx,
1835 created_at,
1836 component_id,
1837 execution_id,
1838 run_id,
1839 &appending_version,
1840 executor_id,
1841 lock_expires_at,
1842 )
1843 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1844 }
1845
1846 let combined_state = Self::get_combined_state(tx, execution_id)?;
1847 if combined_state.pending_state.is_finished() {
1848 debug!("Execution is already finished");
1849 return Err(DbErrorWrite::Permanent(
1850 DbErrorWritePermanent::CannotWrite {
1851 reason: "already finished".into(),
1852 expected_version: None,
1853 },
1854 ));
1855 }
1856
1857 Self::check_expected_next_and_appending_version(
1858 &combined_state.get_next_version_assert_not_finished(),
1859 &appending_version,
1860 )?;
1861 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1862 error!("Cannot serialize {:?} - {err:?}", req.event);
1863 DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1864 })?;
1865
1866 let mut stmt = tx.prepare(
1867 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1868 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1869 .map_err(prepare_err_generic)?;
1870 stmt.execute(named_params! {
1871 ":execution_id": execution_id.to_string(),
1872 ":created_at": req.created_at,
1873 ":json_value": event_ser,
1874 ":version": appending_version.0,
1875 ":variant": req.event.variant(),
1876 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1877 })
1878 .map_err(result_err_generic)?;
1879 match &req.event {
1882 ExecutionEventInner::Created { .. } => {
1883 unreachable!("handled in the caller")
1884 }
1885
1886 ExecutionEventInner::Locked { .. } => {
1887 unreachable!("handled above")
1888 }
1889
1890 ExecutionEventInner::TemporarilyFailed {
1891 backoff_expires_at, ..
1892 }
1893 | ExecutionEventInner::TemporarilyTimedOut {
1894 backoff_expires_at, ..
1895 } => {
1896 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1897 tx,
1898 execution_id,
1899 &appending_version,
1900 *backoff_expires_at,
1901 true, )?;
1903 return Ok((next_version, notifier));
1904 }
1905
1906 ExecutionEventInner::Unlocked {
1907 backoff_expires_at, ..
1908 } => {
1909 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1910 tx,
1911 execution_id,
1912 &appending_version,
1913 *backoff_expires_at,
1914 false, )?;
1916 return Ok((next_version, notifier));
1917 }
1918
1919 ExecutionEventInner::Finished { result, .. } => {
1920 Self::update_state_finished(
1921 tx,
1922 execution_id,
1923 &appending_version,
1924 req.created_at,
1925 PendingStateFinishedResultKind::from(result),
1926 )?;
1927 return Ok((
1928 appending_version,
1929 AppendNotifier {
1930 pending_at: None,
1931 execution_finished: Some(NotifierExecutionFinished {
1932 execution_id: execution_id.clone(),
1933 retval: result.clone(),
1934 }),
1935 response: None,
1936 },
1937 ));
1938 }
1939
1940 ExecutionEventInner::HistoryEvent {
1941 event:
1942 HistoryEvent::JoinSetCreate { .. }
1943 | HistoryEvent::JoinSetRequest {
1944 request: JoinSetRequest::ChildExecutionRequest { .. },
1945 ..
1946 }
1947 | HistoryEvent::Persist { .. }
1948 | HistoryEvent::Schedule { .. }
1949 | HistoryEvent::Stub { .. }
1950 | HistoryEvent::JoinNextTooMany { .. },
1951 } => {
1952 return Ok((
1953 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1954 AppendNotifier::default(),
1955 ));
1956 }
1957
1958 ExecutionEventInner::HistoryEvent {
1959 event:
1960 HistoryEvent::JoinSetRequest {
1961 join_set_id,
1962 request:
1963 JoinSetRequest::DelayRequest {
1964 delay_id,
1965 expires_at,
1966 ..
1967 },
1968 },
1969 } => {
1970 return Ok((
1971 Self::bump_state_next_version(
1972 tx,
1973 execution_id,
1974 &appending_version,
1975 Some(DelayReq {
1976 join_set_id: join_set_id.clone(),
1977 delay_id: delay_id.clone(),
1978 expires_at: *expires_at,
1979 }),
1980 )?,
1981 AppendNotifier::default(),
1982 ));
1983 }
1984
1985 ExecutionEventInner::HistoryEvent {
1986 event:
1987 HistoryEvent::JoinNext {
1988 join_set_id,
1989 run_expires_at,
1990 closing,
1991 requested_ffqn: _,
1992 },
1993 } => {
1994 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1996 let nth_response =
1997 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1999 assert!(join_next_count > 0);
2000 if let Some(ResponseWithCursor {
2001 event:
2002 JoinSetResponseEventOuter {
2003 created_at: nth_created_at,
2004 ..
2005 },
2006 cursor: _,
2007 }) = nth_response
2008 {
2009 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2011 tx,
2012 execution_id,
2013 &appending_version,
2014 scheduled_at,
2015 false, )?;
2017 return Ok((next_version, notifier));
2018 }
2019 return Ok((
2020 Self::update_state_blocked(
2021 tx,
2022 execution_id,
2023 &appending_version,
2024 join_set_id,
2025 *run_expires_at,
2026 *closing,
2027 )?,
2028 AppendNotifier::default(),
2029 ));
2030 }
2031 }
2032 }
2033
2034 fn append_response(
2035 tx: &Transaction,
2036 execution_id: &ExecutionId,
2037 response_outer: JoinSetResponseEventOuter,
2038 ) -> Result<AppendNotifier, DbErrorWrite> {
2039 let mut stmt = tx.prepare(
2040 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
2041 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
2042 ).map_err(prepare_err_generic)?;
2043 let join_set_id = &response_outer.event.join_set_id;
2044 let delay_id = match &response_outer.event.event {
2045 JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
2046 JoinSetResponse::ChildExecutionFinished { .. } => None,
2047 };
2048 let (child_execution_id, finished_version) = match &response_outer.event.event {
2049 JoinSetResponse::ChildExecutionFinished {
2050 child_execution_id,
2051 finished_version,
2052 result: _,
2053 } => (
2054 Some(child_execution_id.to_string()),
2055 Some(finished_version.0),
2056 ),
2057 JoinSetResponse::DelayFinished { .. } => (None, None),
2058 };
2059
2060 stmt.execute(named_params! {
2061 ":execution_id": execution_id.to_string(),
2062 ":created_at": response_outer.created_at,
2063 ":join_set_id": join_set_id.to_string(),
2064 ":delay_id": delay_id,
2065 ":child_execution_id": child_execution_id,
2066 ":finished_version": finished_version,
2067 })
2068 .map_err(result_err_generic)?;
2069
2070 let combined_state = Self::get_combined_state(tx, execution_id)?;
2072 debug!("previous_pending_state: {combined_state:?}");
2073 let mut notifier = if let PendingState::BlockedByJoinSet {
2074 join_set_id: found_join_set_id,
2075 lock_expires_at, closing: _,
2077 } = combined_state.pending_state
2078 && *join_set_id == found_join_set_id
2079 {
2080 let scheduled_at = max(lock_expires_at, response_outer.created_at);
2083 Self::update_state_pending_after_response_appended(
2086 tx,
2087 execution_id,
2088 scheduled_at,
2089 &combined_state.corresponding_version, )?
2091 } else {
2092 AppendNotifier::default()
2093 };
2094 if let JoinSetResponseEvent {
2095 join_set_id,
2096 event: JoinSetResponse::DelayFinished { delay_id },
2097 } = &response_outer.event
2098 {
2099 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2100 let mut stmt =
2101 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2102 .map_err(prepare_err_generic)?;
2103 stmt.execute(named_params! {
2104 ":execution_id": execution_id.to_string(),
2105 ":join_set_id": join_set_id.to_string(),
2106 ":delay_id": delay_id.to_string(),
2107 })
2108 .map_err(result_err_generic)?;
2109 }
2110 notifier.response = Some((execution_id.clone(), response_outer));
2111 Ok(notifier)
2112 }
2113
2114 fn append_backtrace(
2115 tx: &Transaction,
2116 backtrace_info: &BacktraceInfo,
2117 ) -> Result<(), DbErrorWrite> {
2118 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2119 warn!(
2120 "Cannot serialize backtrace {:?} - {err:?}",
2121 backtrace_info.wasm_backtrace
2122 );
2123 DbErrorWritePermanent::ValidationFailed("cannot serialize backtrace".into())
2124 })?;
2125 let mut stmt = tx
2126 .prepare(
2127 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2128 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2129 )
2130 .map_err(prepare_err_generic)?;
2131 stmt.execute(named_params! {
2132 ":execution_id": backtrace_info.execution_id.to_string(),
2133 ":component_id": backtrace_info.component_id.to_string(),
2134 ":version_min_including": backtrace_info.version_min_including.0,
2135 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2136 ":wasm_backtrace": backtrace,
2137 })
2138 .map_err(result_err_generic)?;
2139 Ok(())
2140 }
2141
2142 #[cfg(feature = "test")]
2143 fn get(
2144 tx: &Transaction,
2145 execution_id: &ExecutionId,
2146 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2147 let mut stmt = tx
2148 .prepare(
2149 "SELECT created_at, json_value FROM t_execution_log WHERE \
2150 execution_id = :execution_id ORDER BY version",
2151 )
2152 .map_err(prepare_err_generic)?;
2153 let events = stmt
2154 .query_map(
2155 named_params! {
2156 ":execution_id": execution_id.to_string(),
2157 },
2158 |row| {
2159 let created_at = row.get("created_at")?;
2160 let event = row
2161 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2162 .map_err(|serde| {
2163 error!("Cannot deserialize {row:?} - {serde:?}");
2164 consistency_rusqlite("cannot deserialize event")
2165 })?
2166 .0;
2167
2168 Ok(ExecutionEvent {
2169 created_at,
2170 event,
2171 backtrace_id: None,
2172 })
2173 },
2174 )
2175 .map_err(result_err_read)?
2176 .collect::<Result<Vec<_>, _>>()
2177 .map_err(result_err_read)?;
2178 if events.is_empty() {
2179 return Err(DbErrorRead::NotFound);
2180 }
2181 let combined_state = Self::get_combined_state(tx, execution_id)?;
2182 let responses = Self::list_responses(tx, execution_id, None)?
2183 .into_iter()
2184 .map(|resp| resp.event)
2185 .collect();
2186 Ok(concepts::storage::ExecutionLog {
2187 execution_id: execution_id.clone(),
2188 events,
2189 responses,
2190 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.pending_state,
2192 })
2193 }
2194
2195 fn list_execution_events(
2196 tx: &Transaction,
2197 execution_id: &ExecutionId,
2198 version_min: VersionType,
2199 version_max_excluding: VersionType,
2200 include_backtrace_id: bool,
2201 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2202 let select = if include_backtrace_id {
2203 "SELECT
2204 log.created_at,
2205 log.json_value,
2206 -- Select version_min_including from backtrace if a match is found, otherwise NULL
2207 bt.version_min_including AS backtrace_id
2208 FROM
2209 t_execution_log AS log
2210 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2211 t_backtrace AS bt ON log.execution_id = bt.execution_id
2212 -- Check if the log's version falls within the backtrace's range
2213 AND log.version >= bt.version_min_including
2214 AND log.version < bt.version_max_excluding
2215 WHERE
2216 log.execution_id = :execution_id
2217 AND log.version >= :version_min
2218 AND log.version < :version_max_excluding
2219 ORDER BY
2220 log.version;"
2221 } else {
2222 "SELECT
2223 created_at, json_value, NULL as backtrace_id
2224 FROM t_execution_log WHERE
2225 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2226 ORDER BY version"
2227 };
2228 tx.prepare(select)
2229 .map_err(prepare_err_generic)?
2230 .query_map(
2231 named_params! {
2232 ":execution_id": execution_id.to_string(),
2233 ":version_min": version_min,
2234 ":version_max_excluding": version_max_excluding
2235 },
2236 |row| {
2237 let created_at = row.get("created_at")?;
2238 let backtrace_id = row
2239 .get::<_, Option<VersionType>>("backtrace_id")?
2240 .map(Version::new);
2241 let event = row
2242 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2243 .map(|event| ExecutionEvent {
2244 created_at,
2245 event: event.0,
2246 backtrace_id,
2247 })
2248 .map_err(|serde| {
2249 error!("Cannot deserialize {row:?} - {serde:?}");
2250 consistency_rusqlite("cannot deserialize")
2251 })?;
2252 Ok(event)
2253 },
2254 )
2255 .map_err(result_err_read)?
2256 .collect::<Result<Vec<_>, _>>()
2257 .map_err(result_err_read)
2258 }
2259
2260 fn get_execution_event(
2261 tx: &Transaction,
2262 execution_id: &ExecutionId,
2263 version: VersionType,
2264 ) -> Result<ExecutionEvent, DbErrorRead> {
2265 let mut stmt = tx
2266 .prepare(
2267 "SELECT created_at, json_value FROM t_execution_log WHERE \
2268 execution_id = :execution_id AND version = :version",
2269 )
2270 .map_err(prepare_err_generic)?;
2271 stmt.query_row(
2272 named_params! {
2273 ":execution_id": execution_id.to_string(),
2274 ":version": version,
2275 },
2276 |row| {
2277 let created_at = row.get("created_at")?;
2278 let event = row
2279 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2280 .map_err(|serde| {
2281 error!("Cannot deserialize {row:?} - {serde:?}");
2282 consistency_rusqlite("cannot deserialize event")
2283 })?;
2284
2285 Ok(ExecutionEvent {
2286 created_at,
2287 event: event.0,
2288 backtrace_id: None,
2289 })
2290 },
2291 )
2292 .map_err(result_err_read)
2293 }
2294
2295 fn list_responses(
2296 tx: &Transaction,
2297 execution_id: &ExecutionId,
2298 pagination: Option<Pagination<u32>>,
2299 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2300 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2302 let mut sql = "SELECT \
2303 r.id, r.created_at, r.join_set_id, r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2304 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2305 WHERE \
2306 r.execution_id = :execution_id \
2307 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2308 "
2309 .to_string();
2310 let limit = match &pagination {
2311 Some(
2312 pagination @ (Pagination::NewerThan { cursor, .. }
2313 | Pagination::OlderThan { cursor, .. }),
2314 ) => {
2315 params.push((":cursor", Box::new(cursor)));
2316 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2317 Some(pagination.length())
2318 }
2319 None => None,
2320 };
2321 sql.push_str(" ORDER BY id");
2322 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2323 sql.push_str(" DESC");
2324 }
2325 if let Some(limit) = limit {
2326 write!(sql, " LIMIT {limit}").unwrap();
2327 }
2328 params.push((":execution_id", Box::new(execution_id.to_string())));
2329 tx.prepare(&sql)
2330 .map_err(prepare_err_generic)?
2331 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2332 params
2333 .iter()
2334 .map(|(key, value)| (*key, value.as_ref()))
2335 .collect::<Vec<_>>()
2336 .as_ref(),
2337 Self::parse_response_with_cursor,
2338 )
2339 .map_err(intermittent_err_generic)?
2340 .collect::<Result<Vec<_>, rusqlite::Error>>()
2341 .map_err(result_err_read)
2342 }
2343
2344 fn parse_response_with_cursor(
2345 row: &rusqlite::Row<'_>,
2346 ) -> Result<ResponseWithCursor, rusqlite::Error> {
2347 let id = row.get("id")?;
2348 let created_at: DateTime<Utc> = row.get("created_at")?;
2349 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2350 let event = match (
2351 row.get::<_, Option<DelayId>>("delay_id")?,
2352 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2353 row.get::<_, Option<VersionType>>("finished_version")?,
2354 row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2355 ) {
2356 (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2357 (
2358 None,
2359 Some(child_execution_id),
2360 Some(finished_version),
2361 Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2362 ) => JoinSetResponse::ChildExecutionFinished {
2363 child_execution_id,
2364 finished_version: Version(finished_version),
2365 result,
2366 },
2367 (delay, child, finished, result) => {
2368 error!(
2369 "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2370 delay,
2371 result.map(|it| it.0)
2372 );
2373 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2374 }
2375 };
2376 Ok(ResponseWithCursor {
2377 cursor: id,
2378 event: JoinSetResponseEventOuter {
2379 event: JoinSetResponseEvent { join_set_id, event },
2380 created_at,
2381 },
2382 })
2383 }
2384
2385 fn nth_response(
2386 tx: &Transaction,
2387 execution_id: &ExecutionId,
2388 join_set_id: &JoinSetId,
2389 skip_rows: u64,
2390 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2391 tx
2393 .prepare(
2394 "SELECT r.id, r.created_at, r.join_set_id, \
2395 r.delay_id, \
2396 r.child_execution_id, r.finished_version, l.json_value \
2397 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2398 WHERE \
2399 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2400 (
2401 r.finished_version = l.version \
2402 OR \
2403 r.child_execution_id IS NULL \
2404 ) \
2405 ORDER BY id \
2406 LIMIT 1 OFFSET :offset",
2407 )
2408 .map_err(prepare_err_generic)?
2409 .query_row(
2410 named_params! {
2411 ":execution_id": execution_id.to_string(),
2412 ":join_set_id": join_set_id.to_string(),
2413 ":offset": skip_rows,
2414 },
2415 Self::parse_response_with_cursor,
2416 )
2417 .optional()
2418 .map_err(result_err_read)
2419 }
2420
2421 #[instrument(level = Level::TRACE, skip_all)]
2423 fn get_responses_with_offset(
2424 tx: &Transaction,
2425 execution_id: &ExecutionId,
2426 skip_rows: usize,
2427 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2428 tx.prepare(
2430 "SELECT r.id, r.created_at, r.join_set_id, \
2431 r.delay_id, \
2432 r.child_execution_id, r.finished_version, l.json_value \
2433 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2434 WHERE \
2435 r.execution_id = :execution_id AND \
2436 ( \
2437 r.finished_version = l.version \
2438 OR r.child_execution_id IS NULL \
2439 ) \
2440 ORDER BY id \
2441 LIMIT -1 OFFSET :offset",
2442 )
2443 .map_err(prepare_err_generic)?
2444 .query_map(
2445 named_params! {
2446 ":execution_id": execution_id.to_string(),
2447 ":offset": skip_rows,
2448 },
2449 Self::parse_response_with_cursor,
2450 )
2451 .map_err(result_err_read)?
2452 .collect::<Result<Vec<_>, _>>()
2453 .map_err(result_err_read)
2454 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2455 }
2456
2457 fn get_pending_of_single_ffqn(
2458 mut stmt: CachedStatement,
2459 batch_size: usize,
2460 pending_at_or_sooner: DateTime<Utc>,
2461 ffqn: &FunctionFqn,
2462 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2463 stmt.query_map(
2464 named_params! {
2465 ":pending_expires_finished": pending_at_or_sooner,
2466 ":ffqn": ffqn.to_string(),
2467 ":batch_size": batch_size,
2468 },
2469 |row| {
2470 let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2471 let next_version =
2472 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2473 Ok(execution_id.map(|exe| (exe, next_version)))
2474 },
2475 )
2476 .map_err(|err| {
2477 warn!("Ignoring consistency error {err:?}");
2478 })?
2479 .collect::<Result<Vec<_>, _>>()
2480 .map_err(|err| {
2481 warn!("Ignoring consistency error {err:?}");
2482 })?
2483 .into_iter()
2484 .collect::<Result<Vec<_>, _>>()
2485 .map_err(|err| {
2486 warn!("Ignoring consistency error {err:?}");
2487 })
2488 }
2489
2490 fn get_pending(
2492 conn: &Connection,
2493 batch_size: usize,
2494 pending_at_or_sooner: DateTime<Utc>,
2495 ffqns: &[FunctionFqn],
2496 ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2497 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2498 for ffqn in ffqns {
2499 let stmt = conn
2501 .prepare_cached(&format!(
2502 r#"
2503 SELECT execution_id, corresponding_version FROM t_state WHERE
2504 state = "{STATE_PENDING_AT}" AND
2505 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2506 ORDER BY pending_expires_finished LIMIT :batch_size
2507 "#
2508 ))
2509 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2510
2511 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2512 stmt,
2513 batch_size - execution_ids_versions.len(),
2514 pending_at_or_sooner,
2515 ffqn,
2516 ) {
2517 execution_ids_versions.extend(execs_and_versions);
2518 if execution_ids_versions.len() == batch_size {
2519 break;
2521 }
2522 }
2524 }
2525 Ok(execution_ids_versions)
2526 }
2527
2528 #[instrument(level = Level::TRACE, skip_all)]
2530 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2531 let (pending_ats, finished_execs, responses) = {
2532 let (mut pending_ats, mut finished_execs, mut responses) =
2533 (Vec::new(), Vec::new(), Vec::new());
2534 for notifier in notifiers {
2535 if let Some(pending_at) = notifier.pending_at {
2536 pending_ats.push(pending_at);
2537 }
2538 if let Some(finished) = notifier.execution_finished {
2539 finished_execs.push(finished);
2540 }
2541 if let Some(response) = notifier.response {
2542 responses.push(response);
2543 }
2544 }
2545 (pending_ats, finished_execs, responses)
2546 };
2547
2548 if !pending_ats.is_empty() {
2550 let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2551 for pending_at in pending_ats {
2552 Self::notify_pending_locked(&pending_at, current_time, &guard);
2553 }
2554 }
2555 if !finished_execs.is_empty() {
2558 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2559 for finished in finished_execs {
2560 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2561 for (_tag, sender) in listeners_of_exe_id {
2562 let _ = sender.send(finished.retval.clone());
2565 }
2566 }
2567 }
2568 }
2569 if !responses.is_empty() {
2571 let mut guard = self.0.response_subscribers.lock().unwrap();
2572 for (execution_id, response) in responses {
2573 if let Some((sender, _)) = guard.remove(&execution_id) {
2574 let _ = sender.send(response);
2575 }
2576 }
2577 }
2578 }
2579
2580 fn notify_pending_locked(
2581 notifier: &NotifierPendingAt,
2582 current_time: DateTime<Utc>,
2583 ffqn_to_pending_subscription: &std::sync::MutexGuard<
2584 HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2585 >,
2586 ) {
2587 if notifier.scheduled_at <= current_time
2589 && let Some((subscription, _)) = ffqn_to_pending_subscription.get(¬ifier.ffqn)
2590 {
2591 debug!("Notifying pending subscriber");
2592 let _ = subscription.try_send(());
2594 }
2595 }
2596}
2597
2598#[async_trait]
2599impl DbExecutor for SqlitePool {
2600 #[instrument(level = Level::TRACE, skip(self))]
2601 async fn lock_pending(
2602 &self,
2603 batch_size: usize,
2604 pending_at_or_sooner: DateTime<Utc>,
2605 ffqns: Arc<[FunctionFqn]>,
2606 created_at: DateTime<Utc>,
2607 component_id: ComponentId,
2608 executor_id: ExecutorId,
2609 lock_expires_at: DateTime<Utc>,
2610 run_id: RunId,
2611 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2612 let execution_ids_versions = self
2613 .conn_low_prio(
2614 move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2615 "get_pending",
2616 )
2617 .await?;
2618 if execution_ids_versions.is_empty() {
2619 Ok(vec![])
2620 } else {
2621 debug!("Locking {execution_ids_versions:?}");
2622 self.transaction_write(
2623 move |tx| {
2624 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2625 for (execution_id, version) in execution_ids_versions {
2627 match Self::lock_single_execution(
2628 tx,
2629 created_at,
2630 component_id.clone(),
2631 &execution_id,
2632 run_id,
2633 &version,
2634 executor_id,
2635 lock_expires_at,
2636 ) {
2637 Ok(locked) => locked_execs.push(locked),
2638 Err(err) => {
2639 warn!("Locking row {execution_id} failed - {err:?}");
2640 }
2641 }
2642 }
2643 Ok(locked_execs)
2644 },
2645 "lock_pending",
2646 )
2647 .await?
2648 }
2649 }
2650
2651 #[instrument(level = Level::DEBUG, skip(self))]
2652 async fn lock_one(
2653 &self,
2654 created_at: DateTime<Utc>,
2655 component_id: ComponentId,
2656 execution_id: &ExecutionId,
2657 run_id: RunId,
2658 version: Version,
2659 executor_id: ExecutorId,
2660 lock_expires_at: DateTime<Utc>,
2661 ) -> Result<LockedExecution, DbErrorWrite> {
2662 debug!(%execution_id, "lock_extend");
2663 let execution_id = execution_id.clone();
2664 self.transaction_write(
2665 move |tx| {
2666 Self::lock_single_execution(
2667 tx,
2668 created_at,
2669 component_id,
2670 &execution_id,
2671 run_id,
2672 &version,
2673 executor_id,
2674 lock_expires_at,
2675 )
2676 },
2677 "lock_inner",
2678 )
2679 .await?
2680 }
2681
2682 #[instrument(level = Level::DEBUG, skip(self, req))]
2683 async fn append(
2684 &self,
2685 execution_id: ExecutionId,
2686 version: Version,
2687 req: AppendRequest,
2688 ) -> Result<AppendResponse, DbErrorWrite> {
2689 debug!(%req, "append");
2690 trace!(?req, "append");
2691 let created_at = req.created_at;
2692 let (version, notifier) = self
2693 .transaction_write(
2694 move |tx| Self::append(tx, &execution_id, req, version),
2695 "append",
2696 )
2697 .await??;
2698 self.notify_all(vec![notifier], created_at);
2699 Ok(version)
2700 }
2701
2702 #[instrument(level = Level::DEBUG, skip(self, batch, parent_response_event))]
2703 async fn append_batch_respond_to_parent(
2704 &self,
2705 execution_id: ExecutionIdDerived,
2706 current_time: DateTime<Utc>,
2707 batch: Vec<AppendRequest>,
2708 version: Version,
2709 parent_execution_id: ExecutionId,
2710 parent_response_event: JoinSetResponseEventOuter,
2711 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2712 debug!("append_batch_respond_to_parent");
2713 let execution_id = ExecutionId::Derived(execution_id);
2714 if execution_id == parent_execution_id {
2715 return Err(DbErrorWrite::Permanent(
2718 DbErrorWritePermanent::ValidationFailed(
2719 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2720 ),
2721 ));
2722 }
2723 trace!(
2724 ?batch,
2725 ?parent_response_event,
2726 "append_batch_respond_to_parent"
2727 );
2728 assert!(!batch.is_empty(), "Empty batch request");
2729 let (version, notifiers) = {
2730 self.transaction_write(
2731 move |tx| {
2732 let mut version = version;
2733 let mut notifier_of_child = None;
2734 for append_request in batch {
2735 let (v, n) = Self::append(tx, &execution_id, append_request, version)?;
2736 version = v;
2737 notifier_of_child = Some(n);
2738 }
2739
2740 let pending_at_parent =
2741 Self::append_response(tx, &parent_execution_id, parent_response_event)?;
2742 Ok::<_, DbErrorWrite>((
2743 version,
2744 vec![
2745 notifier_of_child.expect("checked that the batch is not empty"),
2746 pending_at_parent,
2747 ],
2748 ))
2749 },
2750 "append_batch_respond_to_parent",
2751 )
2752 .await??
2753 };
2754 self.notify_all(notifiers, current_time);
2755 Ok(version)
2756 }
2757
2758 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2761 async fn wait_for_pending(
2762 &self,
2763 pending_at_or_sooner: DateTime<Utc>,
2764 ffqns: Arc<[FunctionFqn]>,
2765 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2766 ) {
2767 let unique_tag: u64 = rand::random();
2768 let (sender, mut receiver) = mpsc::channel(1); {
2770 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2771 for ffqn in ffqns.as_ref() {
2772 ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2773 }
2774 }
2775 async {
2776 let Ok(execution_ids_versions) = self
2777 .conn_low_prio(
2778 {
2779 let ffqns = ffqns.clone();
2780 move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2781 },
2782 "subscribe_to_pending",
2783 )
2784 .await
2785 else {
2786 trace!(
2787 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2788 );
2789 timeout_fut.await;
2790 return;
2791 };
2792 if !execution_ids_versions.is_empty() {
2793 trace!("Not waiting, database already contains new pending executions");
2794 return;
2795 }
2796 tokio::select! { _ = receiver.recv() => {
2798 trace!("Received a notification");
2799 }
2800 () = timeout_fut => {
2801 }
2802 }
2803 }.await;
2804 {
2806 let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2807 for ffqn in ffqns.as_ref() {
2808 match ffqn_to_pending_subscription.remove(ffqn) {
2809 Some((_, tag)) if tag == unique_tag => {
2810 }
2812 Some(other) => {
2813 ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2815 }
2816 None => {
2817 }
2819 }
2820 }
2821 }
2822 }
2823}
2824
2825#[async_trait]
2826impl DbConnection for SqlitePool {
2827 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2828 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2829 debug!("create");
2830 trace!(?req, "create");
2831 let created_at = req.created_at;
2832 let (version, notifier) = self
2833 .transaction_write(move |tx| Self::create_inner(tx, req), "create")
2834 .await??;
2835 self.notify_all(vec![notifier], created_at);
2836 Ok(version)
2837 }
2838
2839 #[instrument(level = Level::DEBUG, skip(self, batch))]
2840 async fn append_batch(
2841 &self,
2842 current_time: DateTime<Utc>,
2843 batch: Vec<AppendRequest>,
2844 execution_id: ExecutionId,
2845 version: Version,
2846 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2847 debug!("append_batch");
2848 trace!(?batch, "append_batch");
2849 assert!(!batch.is_empty(), "Empty batch request");
2850
2851 let (version, notifier) = self
2852 .transaction_write(
2853 move |tx| {
2854 let mut version = version;
2855 let mut notifier = None;
2856 for append_request in batch {
2857 let (v, n) = Self::append(tx, &execution_id, append_request, version)?;
2858 version = v;
2859 notifier = Some(n);
2860 }
2861 Ok::<_, DbErrorWrite>((
2862 version,
2863 notifier.expect("checked that the batch is not empty"),
2864 ))
2865 },
2866 "append_batch",
2867 )
2868 .await??;
2869
2870 self.notify_all(vec![notifier], current_time);
2871 Ok(version)
2872 }
2873
2874 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2875 async fn append_batch_create_new_execution(
2876 &self,
2877 current_time: DateTime<Utc>,
2878 batch: Vec<AppendRequest>,
2879 execution_id: ExecutionId,
2880 version: Version,
2881 child_req: Vec<CreateRequest>,
2882 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2883 debug!("append_batch_create_new_execution");
2884 trace!(?batch, ?child_req, "append_batch_create_new_execution");
2885 assert!(!batch.is_empty(), "Empty batch request");
2886
2887 let (version, notifiers) = self
2888 .transaction_write(
2889 move |tx| {
2890 let mut notifier = None;
2891 let mut version = version;
2892 for append_request in batch {
2893 let (v, n) = Self::append(tx, &execution_id, append_request, version)?;
2894 version = v;
2895 notifier = Some(n);
2896 }
2897 let mut notifiers = Vec::new();
2898 notifiers.push(notifier.expect("checked that the batch is not empty"));
2899
2900 for child_req in child_req {
2901 let (_, notifier) = Self::create_inner(tx, child_req)?;
2902 notifiers.push(notifier);
2903 }
2904 Ok::<_, DbErrorWrite>((version, notifiers))
2905 },
2906 "append_batch_create_new_execution_inner",
2907 )
2908 .await??;
2909 self.notify_all(notifiers, current_time);
2910 Ok(version)
2911 }
2912
2913 #[cfg(feature = "test")]
2914 #[instrument(level = Level::DEBUG, skip(self))]
2915 async fn get(
2916 &self,
2917 execution_id: &ExecutionId,
2918 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2919 trace!("get");
2920 let execution_id = execution_id.clone();
2921 self.transaction_read(move |tx| Self::get(tx, &execution_id), "get")
2922 .await?
2923 }
2924
2925 #[instrument(level = Level::DEBUG, skip(self))]
2926 async fn list_execution_events(
2927 &self,
2928 execution_id: &ExecutionId,
2929 since: &Version,
2930 max_length: VersionType,
2931 include_backtrace_id: bool,
2932 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2933 let execution_id = execution_id.clone();
2934 let since = since.0;
2935 self.transaction_read(
2936 move |tx| {
2937 Self::list_execution_events(
2938 tx,
2939 &execution_id,
2940 since,
2941 since + max_length,
2942 include_backtrace_id,
2943 )
2944 },
2945 "get",
2946 )
2947 .await?
2948 }
2949
2950 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2954 async fn subscribe_to_next_responses(
2955 &self,
2956 execution_id: &ExecutionId,
2957 start_idx: usize,
2958 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2959 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2960 debug!("next_responses");
2961 let unique_tag: u64 = rand::random();
2962 let execution_id = execution_id.clone();
2963
2964 let cleanup = || {
2965 let mut guard = self.0.response_subscribers.lock().unwrap();
2966 match guard.remove(&execution_id) {
2967 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
2969 guard.insert(execution_id.clone(), other);
2971 }
2972 None => {} }
2974 };
2975
2976 let response_subscribers = self.0.response_subscribers.clone();
2977 let resp_or_receiver = {
2978 let execution_id = execution_id.clone();
2979 self.transaction_write(
2980 move |tx| {
2981 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2982 if responses.is_empty() {
2983 let (sender, receiver) = oneshot::channel();
2985 response_subscribers
2986 .lock()
2987 .unwrap()
2988 .insert(execution_id, (sender, unique_tag));
2989 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2990 } else {
2991 Ok(itertools::Either::Left(responses))
2992 }
2993 },
2994 "subscribe_to_next_responses",
2995 )
2996 .await
2997 }
2998 .map_err(DbErrorReadWithTimeout::from)
2999 .flatten()
3000 .inspect_err(|_| {
3001 cleanup();
3002 })?;
3003 match resp_or_receiver {
3004 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3006 let res = async move {
3007 tokio::select! {
3008 resp = receiver => {
3009 let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
3010 Ok(vec![resp])
3011 }
3012 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3013 }
3014 }
3015 .await;
3016 cleanup();
3017 res
3018 }
3019 }
3020 }
3021
3022 async fn wait_for_finished_result(
3024 &self,
3025 execution_id: &ExecutionId,
3026 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
3027 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3028 let unique_tag: u64 = rand::random();
3029 let execution_id = execution_id.clone();
3030 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3031
3032 let cleanup = || {
3033 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3034 if let Some(subscribers) = guard.get_mut(&execution_id) {
3035 subscribers.remove(&unique_tag);
3036 }
3037 };
3038
3039 let resp_or_receiver = {
3040 let execution_id = execution_id.clone();
3041 self.transaction_read(move |tx| {
3042 let pending_state =
3043 Self::get_combined_state(tx, &execution_id)?.pending_state;
3044 if let PendingState::Finished { finished } = pending_state {
3045 let event =
3046 Self::get_execution_event(tx, &execution_id, finished.version)?;
3047 if let ExecutionEventInner::Finished { result, ..} = event.event {
3048 Ok(itertools::Either::Left(result))
3049 } else {
3050 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3051 Err(DbErrorReadWithTimeout::from(consistency_db_err(
3052 "cannot get finished event based on t_state version"
3053 )))
3054 }
3055 } else {
3056 let (sender, receiver) = oneshot::channel();
3060 let mut guard = execution_finished_subscription.lock().unwrap();
3061 guard.entry(execution_id).or_default().insert(unique_tag, sender);
3062 Ok(itertools::Either::Right(receiver))
3063 }
3064 }, "wait_for_finished_result")
3065 .await
3066 }.map_err(DbErrorReadWithTimeout::from)
3067 .flatten()
3068 .inspect_err(|_| {
3069 cleanup();
3072 })?;
3073
3074 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3075 match resp_or_receiver {
3076 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
3078 let res = async move {
3079 tokio::select! {
3080 resp = receiver => {
3081 Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3082 }
3083 () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3084 }
3085 }
3086 .await;
3087 cleanup();
3088 res
3089 }
3090 }
3091 }
3092
3093 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3094 async fn append_response(
3095 &self,
3096 created_at: DateTime<Utc>,
3097 execution_id: ExecutionId,
3098 response_event: JoinSetResponseEvent,
3099 ) -> Result<(), DbErrorWrite> {
3100 debug!("append_response");
3101 let event = JoinSetResponseEventOuter {
3102 created_at,
3103 event: response_event,
3104 };
3105 let notifier = self
3106 .transaction_write(
3107 move |tx| Self::append_response(tx, &execution_id, event),
3108 "append_response",
3109 )
3110 .await??;
3111 self.notify_all(vec![notifier], created_at);
3112 Ok(())
3113 }
3114
3115 #[instrument(level = Level::DEBUG, skip_all)]
3116 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3117 debug!("append_backtrace");
3118 self.transaction_write(
3119 move |tx| Self::append_backtrace(tx, &append),
3120 "append_backtrace",
3121 )
3122 .await?
3123 }
3124
3125 #[instrument(level = Level::DEBUG, skip_all)]
3126 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3127 debug!("append_backtrace_batch");
3128 self.transaction_write(
3129 move |tx| {
3130 for append in batch {
3131 Self::append_backtrace(tx, &append)?;
3132 }
3133 Ok(())
3134 },
3135 "append_backtrace",
3136 )
3137 .await?
3138 }
3139
3140 #[instrument(level = Level::DEBUG, skip_all)]
3141 async fn get_backtrace(
3142 &self,
3143 execution_id: &ExecutionId,
3144 filter: BacktraceFilter,
3145 ) -> Result<BacktraceInfo, DbErrorRead> {
3146 debug!("get_last_backtrace");
3147 let execution_id = execution_id.clone();
3148
3149 self.transaction_read(
3150 move |tx| {
3151 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3152 WHERE execution_id = :execution_id";
3153 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3154 let select = match filter {
3155 BacktraceFilter::Specific(version) =>{
3156 params.push((":version", Box::new(version.0)));
3157 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3158 },
3159 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3160 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3161 };
3162 tx
3163 .prepare(&select)
3164 .map_err(prepare_err_generic)?
3165 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3166 params
3167 .iter()
3168 .map(|(key, value)| (*key, value.as_ref()))
3169 .collect::<Vec<_>>()
3170 .as_ref(),
3171 |row| {
3172 Ok(BacktraceInfo {
3173 execution_id: execution_id.clone(),
3174 component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3175 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3176 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3177 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3178 })
3179 },
3180 )
3181 .map_err(result_err_read)
3182 },
3183 "get_last_backtrace",
3184 ).await?
3185 }
3186
3187 #[instrument(level = Level::TRACE, skip(self))]
3189 async fn get_expired_timers(
3190 &self,
3191 at: DateTime<Utc>,
3192 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3193 self.conn_low_prio(
3194 move |conn| {
3195 let mut expired_timers = conn.prepare(
3196 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3197 ).map_err(prepare_err_generic)?
3198 .query_map(
3199 named_params! {
3200 ":at": at,
3201 },
3202 |row| {
3203 let execution_id = row.get("execution_id")?;
3204 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3205 let delay_id = row.get::<_, DelayId>("delay_id")?;
3206 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3207 Ok(ExpiredTimer::Delay(delay))
3208 },
3209 ).map_err(result_err_generic)?
3210 .collect::<Result<Vec<_>, _>>().map_err(result_err_generic)?;
3211 let expired = conn.prepare(&format!(r#"
3213 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis
3214 FROM t_state
3215 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3216 "#
3217 )
3218 ).map_err(prepare_err_generic)?
3219 .query_map(
3220 named_params! {
3221 ":at": at,
3222 },
3223 |row| {
3224 let execution_id = row.get("execution_id")?;
3225 let locked_at_version = Version::new(row.get("last_lock_version")?);
3226 let next_version = Version::new(row.get("corresponding_version")?).increment();
3227 let intermittent_event_count = row.get("intermittent_event_count")?;
3228 let max_retries = row.get("max_retries")?;
3229 let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3230 let parent = if let ExecutionId::Derived(derived) = &execution_id {
3231 derived.split_to_parts().inspect_err(|err| error!("cannot split execution {execution_id} to parts: {err:?}")).ok()
3232 } else {
3233 None
3234 };
3235 let lock = ExpiredLock {
3236 execution_id,
3237 locked_at_version,
3238 next_version,
3239 intermittent_event_count,
3240 max_retries,
3241 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3242 parent
3243 };
3244 Ok(ExpiredTimer::Lock(lock))
3245 }
3246 ).map_err(result_err_generic)?
3247 .collect::<Result<Vec<_>, _>>().map_err(result_err_generic)?;
3248 expired_timers.extend(expired);
3249 if !expired_timers.is_empty() {
3250 debug!("get_expired_timers found {expired_timers:?}");
3251 }
3252 Ok(expired_timers)
3253 }, "get_expired_timers"
3254 )
3255 .await
3256 }
3257
3258 async fn get_execution_event(
3259 &self,
3260 execution_id: &ExecutionId,
3261 version: &Version,
3262 ) -> Result<ExecutionEvent, DbErrorRead> {
3263 let version = version.0;
3264 let execution_id = execution_id.clone();
3265 self.transaction_read(
3266 move |tx| Self::get_execution_event(tx, &execution_id, version),
3267 "get_execution_event",
3268 )
3269 .await?
3270 }
3271
3272 async fn get_pending_state(
3273 &self,
3274 execution_id: &ExecutionId,
3275 ) -> Result<PendingState, DbErrorRead> {
3276 let execution_id = execution_id.clone();
3277 Ok(self
3278 .transaction_read(
3279 move |tx| Self::get_combined_state(tx, &execution_id),
3280 "get_pending_state",
3281 )
3282 .await??
3283 .pending_state)
3284 }
3285
3286 async fn list_executions(
3287 &self,
3288 ffqn: Option<FunctionFqn>,
3289 top_level_only: bool,
3290 pagination: ExecutionListPagination,
3291 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3292 self.transaction_read(
3293 move |tx| Self::list_executions(tx, ffqn, top_level_only, pagination),
3294 "list_executions",
3295 )
3296 .await?
3297 }
3298
3299 async fn list_responses(
3300 &self,
3301 execution_id: &ExecutionId,
3302 pagination: Pagination<u32>,
3303 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3304 let execution_id = execution_id.clone();
3305 self.transaction_read(
3306 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3307 "list_executions",
3308 )
3309 .await?
3310 }
3311}
3312
3313#[cfg(any(test, feature = "tempfile"))]
3314pub mod tempfile {
3315 use super::{SqliteConfig, SqlitePool};
3316 use tempfile::NamedTempFile;
3317
3318 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3319 if let Ok(path) = std::env::var("SQLITE_FILE") {
3320 (
3321 SqlitePool::new(path, SqliteConfig::default())
3322 .await
3323 .unwrap(),
3324 None,
3325 )
3326 } else {
3327 let file = NamedTempFile::new().unwrap();
3328 let path = file.path();
3329 (
3330 SqlitePool::new(path, SqliteConfig::default())
3331 .await
3332 .unwrap(),
3333 Some(file),
3334 )
3335 }
3336 }
3337}