1use assert_matches::assert_matches;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ExecutionId, FunctionFqn, JoinSetId, StrVariant, SupportedFunctionReturnValue,
6 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, PrefixedUlid, RunId},
7 storage::{
8 AppendBatchResponse, AppendRequest, AppendResponse, BacktraceFilter, BacktraceInfo,
9 ClientError, CreateRequest, DUMMY_CREATED, DUMMY_HISTORY_EVENT, DUMMY_TEMPORARILY_FAILED,
10 DUMMY_TEMPORARILY_TIMED_OUT, DbConnection, DbConnectionError, DbError, DbPool,
11 ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
12 ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
13 JoinSetResponseEventOuter, LockPendingResponse, LockResponse, LockedExecution, Pagination,
14 PendingState, PendingStateFinished, PendingStateFinishedResultKind, ResponseWithCursor,
15 SpecificError, SubscribeError, Version, VersionType,
16 },
17 time::Sleep,
18};
19use hdrhistogram::{Counter, Histogram};
20use rusqlite::{
21 Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction, named_params,
22 types::{FromSql, FromSqlError},
23};
24use std::{
25 cmp::max,
26 collections::VecDeque,
27 error::Error,
28 fmt::Debug,
29 path::Path,
30 str::FromStr,
31 sync::{
32 Arc, Mutex,
33 atomic::{AtomicBool, Ordering},
34 },
35 time::Duration,
36};
37use std::{fmt::Write as _, pin::Pin};
38use tokio::{
39 sync::{mpsc, oneshot},
40 time::Instant,
41};
42use tracing::{Level, Span, debug, debug_span, error, info, instrument, trace, warn};
43
44#[derive(Debug, Clone)]
45struct DelayReq {
46 join_set_id: JoinSetId,
47 delay_id: DelayId,
48 expires_at: DateTime<Utc>,
49}
50const PRAGMA: [[&str; 2]; 9] = [
62 ["journal_mode", "wal"],
63 ["synchronous", "FULL"],
64 ["foreign_keys", "true"],
65 ["busy_timeout", "1000"],
66 ["cache_size", "10000"], ["temp_store", "MEMORY"],
68 ["page_size", "8192"], ["mmap_size", "134217728"],
70 ["journal_size_limit", "67108864"],
71];
72
73const CREATE_TABLE_T_METADATA: &str = r"
74CREATE TABLE IF NOT EXISTS t_metadata (
75 id INTEGER PRIMARY KEY CHECK (id = 1),
76 schema_version INTEGER NOT NULL,
77 created_at TEXT NOT NULL
78) STRICT
79";
80const T_METADATA_SINGLETON: u32 = 1;
81const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 1;
82
83const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
85CREATE TABLE IF NOT EXISTS t_execution_log (
86 execution_id TEXT NOT NULL,
87 created_at TEXT NOT NULL,
88 json_value TEXT NOT NULL,
89 version INTEGER NOT NULL,
90 variant TEXT NOT NULL,
91 join_set_id TEXT,
92 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
93 PRIMARY KEY (execution_id, version)
94) STRICT
95";
96const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
98CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
99";
100const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
102CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
103";
104
105const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
110CREATE TABLE IF NOT EXISTS t_join_set_response (
111 id INTEGER PRIMARY KEY AUTOINCREMENT,
112 created_at TEXT NOT NULL,
113 execution_id TEXT NOT NULL,
114 join_set_id TEXT NOT NULL,
115
116 delay_id TEXT,
117
118 child_execution_id TEXT,
119 finished_version INTEGER,
120
121 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
122) STRICT
123";
124const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
126CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
127";
128
129const CREATE_TABLE_T_STATE: &str = r"
136CREATE TABLE IF NOT EXISTS t_state (
137 execution_id TEXT NOT NULL,
138 is_top_level INTEGER NOT NULL,
139 next_version INTEGER NOT NULL,
140 pending_expires_finished TEXT NOT NULL,
141 ffqn TEXT NOT NULL,
142 state TEXT NOT NULL,
143 created_at TEXT NOT NULL,
144 scheduled_at TEXT NOT NULL,
145
146 join_set_id TEXT,
147 join_set_closing INTEGER,
148
149 executor_id TEXT,
150 run_id TEXT,
151 temporary_event_count INTEGER,
152 max_retries INTEGER,
153 retry_exp_backoff_millis INTEGER,
154 parent_execution_id TEXT,
155 parent_join_set_id TEXT,
156 locked_at_version INTEGER,
157
158 result_kind TEXT,
159
160 PRIMARY KEY (execution_id)
161) STRICT
162";
163const STATE_PENDING_AT: &str = "PendingAt";
164const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
165const STATE_LOCKED: &str = "Locked";
166const STATE_FINISHED: &str = "Finished";
167
168const IDX_T_STATE_LOCK_PENDING: &str = r"
169CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
170";
171const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
172CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
173";
174const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
175CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
176";
177const IDX_T_STATE_FFQN: &str = r"
179CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
180";
181const IDX_T_STATE_CREATED_AT: &str = r"
183CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
184";
185
186const CREATE_TABLE_T_DELAY: &str = r"
188CREATE TABLE IF NOT EXISTS t_delay (
189 execution_id TEXT NOT NULL,
190 join_set_id TEXT NOT NULL,
191 delay_id TEXT NOT NULL,
192 expires_at TEXT NOT NULL,
193 PRIMARY KEY (execution_id, join_set_id, delay_id)
194) STRICT
195";
196
197const CREATE_TABLE_T_BACKTRACE: &str = r"
198CREATE TABLE IF NOT EXISTS t_backtrace (
199 execution_id TEXT NOT NULL,
200 component_id TEXT NOT NULL,
201 version_min_including INTEGER NOT NULL,
202 version_max_excluding INTEGER NOT NULL,
203 wasm_backtrace TEXT NOT NULL,
204 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
205) STRICT
206";
207const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
209CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
210";
211
212#[expect(clippy::needless_pass_by_value)]
213fn convert_err(err: rusqlite::Error) -> DbError {
214 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
215 DbError::Specific(SpecificError::NotFound)
216 } else {
217 error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
218 DbError::Specific(SpecificError::GenericError(StrVariant::Static(
219 "sqlite error",
220 )))
221 }
222}
223
224fn parsing_err(err: impl Into<Box<dyn Error>>) -> DbError {
225 let err = err.into();
226 error!(backtrace = %std::backtrace::Backtrace::capture(), "Validation failed - {err:?}");
227 DbError::Specific(SpecificError::ValidationFailed(StrVariant::Arc(Arc::from(
228 err.to_string(),
229 ))))
230}
231
232type ResponseSubscribers = Arc<
233 std::sync::Mutex<hashbrown::HashMap<ExecutionId, oneshot::Sender<JoinSetResponseEventOuter>>>,
234>;
235
236struct PrefixedUlidWrapper<T: 'static>(PrefixedUlid<T>);
237type ExecutorIdW = PrefixedUlidWrapper<concepts::prefixed_ulid::prefix::Exr>;
238type RunIdW = PrefixedUlidWrapper<concepts::prefixed_ulid::prefix::Run>;
239
240impl<T: 'static> FromSql for PrefixedUlidWrapper<T> {
241 fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
242 let str = value.as_str()?;
243 let str = str.parse::<PrefixedUlid<T>>().map_err(|err| {
244 error!(
245 backtrace = %std::backtrace::Backtrace::capture(),
246 "Cannot convert identifier of type:`{type}`, value:`{str}` - {err:?}",
247 r#type = std::any::type_name::<T>()
248 );
249 FromSqlError::InvalidType
250 })?;
251 Ok(Self(str))
252 }
253}
254
255struct JsonWrapper<T>(T);
256impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
257 fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
258 let value = match value {
259 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
260 Ok(value)
261 }
262 other => {
263 error!(
264 backtrace = %std::backtrace::Backtrace::capture(),
265 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
266 );
267 Err(FromSqlError::InvalidType)
268 }
269 }?;
270 let value = serde_json::from_slice::<T>(value).map_err(|err| {
271 error!(
272 backtrace = %std::backtrace::Backtrace::capture(),
273 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
274 r#type = std::any::type_name::<T>()
275 );
276 FromSqlError::InvalidType
277 })?;
278 Ok(Self(value))
279 }
280}
281
282struct FromStrWrapper<T: FromStr>(T);
283impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
284 fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
285 let value = String::column_result(value)?;
286 let value = T::from_str(&value).map_err(|err| {
287 error!(
288 backtrace = %std::backtrace::Backtrace::capture(),
289 "Cannot convert string `{value}` to type:`{type}` - {err:?}",
290 r#type = std::any::type_name::<T>()
291 );
292 FromSqlError::InvalidType
293 })?;
294 Ok(Self(value))
295 }
296}
297
298#[derive(Debug)]
299struct CombinedStateDTO {
300 state: String,
301 ffqn: String,
302 pending_expires_finished: DateTime<Utc>,
303 executor_id: Option<ExecutorId>,
304 run_id: Option<RunId>,
305 join_set_id: Option<JoinSetId>,
306 join_set_closing: Option<bool>,
307 result_kind: Option<PendingStateFinishedResultKind>,
308}
309
310#[derive(Debug)]
311struct CombinedState {
312 ffqn: FunctionFqn,
313 pending_state: PendingState,
314 next_version: Version,
315}
316
317struct PendingAt {
318 scheduled_at: DateTime<Utc>,
319 ffqn: FunctionFqn,
320}
321
322#[derive(Default)]
323struct IndexUpdated {
324 temporary_event_count: Option<u32>,
325 pending_at: Option<PendingAt>,
326}
327
328impl CombinedState {
329 fn new(dto: &CombinedStateDTO, next_version: Version) -> Result<Self, DbError> {
330 let pending_state = match dto {
331 CombinedStateDTO {
332 state,
333 ffqn: _,
334 pending_expires_finished: scheduled_at,
335 executor_id: None,
336 run_id: None,
337 join_set_id: None,
338 join_set_closing: None,
339 result_kind: None,
340 } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
341 scheduled_at: *scheduled_at,
342 }),
343 CombinedStateDTO {
344 state,
345 ffqn: _,
346 pending_expires_finished: lock_expires_at,
347 executor_id: Some(executor_id),
348 run_id: Some(run_id),
349 join_set_id: None,
350 join_set_closing: None,
351 result_kind: None,
352 } if state == STATE_LOCKED => Ok(PendingState::Locked {
353 executor_id: *executor_id,
354 run_id: *run_id,
355 lock_expires_at: *lock_expires_at,
356 }),
357 CombinedStateDTO {
358 state,
359 ffqn: _,
360 pending_expires_finished: lock_expires_at,
361 executor_id: None,
362 run_id: None,
363 join_set_id: Some(join_set_id),
364 join_set_closing: Some(join_set_closing),
365 result_kind: None,
366 } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
367 join_set_id: join_set_id.clone(),
368 closing: *join_set_closing,
369 lock_expires_at: *lock_expires_at,
370 }),
371 CombinedStateDTO {
372 state,
373 ffqn: _,
374 pending_expires_finished: finished_at,
375 executor_id: None,
376 run_id: None,
377 join_set_id: None,
378 join_set_closing: None,
379 result_kind: Some(result_kind),
380 } if state == STATE_FINISHED => Ok(PendingState::Finished {
381 finished: PendingStateFinished {
382 finished_at: *finished_at,
383 version: next_version.0,
384 result_kind: *result_kind,
385 },
386 }),
387 _ => {
388 error!("Cannot deserialize pending state from {dto:?}");
389 Err(DbError::Specific(SpecificError::ConsistencyError(
390 StrVariant::Static("invalid `t_state`"),
391 )))
392 }
393 }?;
394 Ok(Self {
395 ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
396 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
397 DbError::Specific(SpecificError::ConsistencyError(StrVariant::Static(
398 "invalid ffqn value in `t_state`",
399 )))
400 })?,
401 pending_state,
402 next_version,
403 })
404 }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
408enum CommandPriority {
409 High,
411 Medium,
413 Low,
415}
416
417#[derive(derive_more::Debug)]
418enum ThreadCommand {
419 Func {
420 #[debug(skip)]
421 func: Box<dyn FnOnce(&mut Connection) + Send>,
422 priority: CommandPriority,
423 sent_at: Instant,
424 name: &'static str,
425 },
426 Shutdown,
427 Dummy,
428}
429
430#[derive(Clone)]
431pub struct SqlitePool<S: Sleep>(SqlitePoolInner<S>);
432
433#[derive(Clone)]
434struct SqlitePoolInner<S: Sleep> {
435 shutdown_requested: Arc<AtomicBool>,
436 shutdown_finished: Arc<AtomicBool>,
437 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
438 response_subscribers: ResponseSubscribers,
439 ffqn_to_pending_subscription: Arc<Mutex<hashbrown::HashMap<FunctionFqn, mpsc::Sender<()>>>>,
440 sleep: S,
441 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
443
444#[async_trait]
445impl<S: Sleep + 'static> DbPool for SqlitePool<S> {
446 fn connection(&self) -> Box<dyn DbConnection> {
447 Box::new(self.clone())
448 }
449
450 fn is_closing(&self) -> bool {
451 self.0.shutdown_requested.load(Ordering::Acquire)
452 }
453
454 async fn close(&self) -> Result<(), DbError> {
455 self.0.shutdown_requested.store(true, Ordering::Release);
456 let _ = self.0.command_tx.send(ThreadCommand::Shutdown).await;
457 while !self.0.shutdown_finished.load(Ordering::Acquire) {
458 self.0.sleep.sleep(Duration::from_millis(1)).await;
459 }
460 Ok(())
461 }
462}
463impl<S: Sleep> Drop for SqlitePool<S> {
464 fn drop(&mut self) {
465 let arc = self.0.join_handle.take().expect("join_handle was set");
466 if let Ok(join_handle) = Arc::try_unwrap(arc) {
467 if !join_handle.is_finished() {
469 if !self.0.shutdown_finished.load(Ordering::Acquire) {
470 let backtrace = std::backtrace::Backtrace::capture();
472 warn!("SqlitePool was not closed properly - {backtrace}");
473 self.0.shutdown_requested.store(true, Ordering::Release);
474 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
475 } else {
478 }
480 }
481 }
482 }
483}
484
485#[derive(Debug, Clone)]
486pub struct SqliteConfig {
487 pub queue_capacity: usize,
488 pub low_prio_threshold: usize,
489 pub pragma_override: Option<hashbrown::HashMap<String, String>>,
490}
491impl Default for SqliteConfig {
492 fn default() -> Self {
493 Self {
494 queue_capacity: 100,
495 low_prio_threshold: 100,
496 pragma_override: None,
497 }
498 }
499}
500
501impl<S: Sleep> SqlitePool<S> {
502 fn init_thread(
503 path: &Path,
504 mut pragma_override: hashbrown::HashMap<String, String>,
505 ) -> Connection {
506 fn execute<P: Params>(conn: &Connection, sql: &str, params: P) {
509 conn.execute(sql, params)
510 .unwrap_or_else(|err| panic!("cannot run `{sql}` - {err:?}"));
511 }
512 fn pragma_update(conn: &Connection, name: &str, value: &str) {
513 debug!("Setting PRAGMA {name}={value}");
514 conn.pragma_update(None, name, value)
515 .unwrap_or_else(|err| panic!("cannot update pragma `{name}`=`{value}` - {err:?}"));
516 }
517
518 let conn = Connection::open_with_flags(path, OpenFlags::default())
519 .unwrap_or_else(|err| panic!("cannot open the connection - {err:?}"));
520
521 for [pragma_name, pragma_value] in PRAGMA {
522 let pragma_value = pragma_override
523 .remove(pragma_name)
524 .unwrap_or_else(|| pragma_value.to_string());
525 pragma_update(&conn, pragma_name, &pragma_value);
526 }
527 for (pragma_name, pragma_value) in pragma_override.drain() {
529 pragma_update(&conn, &pragma_name, &pragma_value);
530 }
531
532 execute(&conn, CREATE_TABLE_T_METADATA, []);
534 execute(
536 &conn,
537 &format!(
538 "INSERT INTO t_metadata (id, schema_version, created_at) VALUES
539 ({T_METADATA_SINGLETON}, {T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
540 ),
541 [Utc::now()],
542 );
543 let actual_version = conn
545 .prepare("SELECT schema_version FROM t_metadata")
546 .unwrap_or_else(|err| panic!("cannot select schema version - {err:?}"))
547 .query_row([], |row| row.get::<_, u32>("schema_version"));
548
549 let actual_version = actual_version.unwrap_or_else(|err| {
550 panic!("Cannot read the schema version - {err:?}");
551 });
552 assert!(
553 actual_version == T_METADATA_EXPECTED_SCHEMA_VERSION,
554 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
555 );
556
557 execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, []);
559 execute(
560 &conn,
561 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
562 [],
563 );
564 execute(
565 &conn,
566 CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
567 [],
568 );
569 execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, []);
571 execute(
572 &conn,
573 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
574 [],
575 );
576 execute(&conn, CREATE_TABLE_T_STATE, []);
578 execute(&conn, IDX_T_STATE_LOCK_PENDING, []);
579 execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, []);
580 execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, []);
581 execute(&conn, IDX_T_STATE_FFQN, []);
582 execute(&conn, IDX_T_STATE_CREATED_AT, []);
583 execute(&conn, CREATE_TABLE_T_DELAY, []);
585 execute(&conn, CREATE_TABLE_T_BACKTRACE, []);
587 execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, []);
588 conn
589 }
590
591 fn connection_rpc(
592 mut conn: Connection,
593 shutdown_requested: &AtomicBool,
594 shutdown_finished: &AtomicBool,
595 mut command_rx: mpsc::Receiver<ThreadCommand>,
596 queue_capacity: usize,
597 low_prio_threshold: usize,
598 ) {
599 const METRIC_DUMPING_TRESHOLD: usize = 0; let mut vec: Vec<ThreadCommand> = Vec::with_capacity(queue_capacity);
601 let mut send_hist = Histogram::<u32>::new_with_bounds(1, 1_000_000, 3).unwrap();
603 let mut func_histograms = hashbrown::HashMap::new();
604 let mut metric_dumping_counter = 0;
605 let print_histogram = |name, histogram: &Histogram<u32>, trailing_coma| {
606 print!(
607 "\"{name}\": {mean}, \"{name}_len\": {len}, \"{name}_meanlen\": {meanlen} {coma}",
608 mean = histogram.mean(),
609 len = histogram.len(),
610 meanlen = histogram.mean() * histogram.len().as_f64(),
611 coma = if trailing_coma { "," } else { "" }
612 );
613 };
614 loop {
615 vec.clear();
616 metric_dumping_counter += 1;
617 if let Some(item) = command_rx.blocking_recv() {
618 vec.push(item);
619 } else {
620 debug!("command_rx was closed");
621 break;
622 }
623 while let Ok(more) = command_rx.try_recv() {
624 vec.push(more);
625 }
626 let shutdown_found = vec
628 .iter()
629 .any(|item| matches!(item, ThreadCommand::Shutdown));
630 if shutdown_found || shutdown_requested.load(Ordering::Acquire) {
631 debug!("Recveived shutdown before processing the batch");
632 break;
633 }
634
635 let mut execute = |expected: CommandPriority| {
636 let mut processed = 0;
637 for item in &mut vec {
638 if matches!(item, ThreadCommand::Func { priority, .. } if *priority == expected )
639 {
640 let item = std::mem::replace(item, ThreadCommand::Dummy); let (func, sent_at, name) = assert_matches!(item, ThreadCommand::Func{func, sent_at, name, ..} => (func, sent_at, name));
642 let sent_at = sent_at.elapsed();
643 let started_at = Instant::now();
644 func(&mut conn);
645 let started_at = started_at.elapsed();
646 processed += 1;
647 send_hist
649 .record(u64::from(sent_at.subsec_micros()))
650 .unwrap();
651 func_histograms
652 .entry(name)
653 .or_insert_with(|| {
654 Histogram::<u32>::new_with_bounds(1, 1_000_000, 3).unwrap()
655 })
656 .record(u64::from(started_at.subsec_micros()))
657 .unwrap();
658 }
659 if shutdown_requested.load(Ordering::Acquire) {
660 debug!("Recveived shutdown during processing of the batch");
662 break;
663 }
664 }
665 processed
666 };
667 let processed = execute(CommandPriority::High) + execute(CommandPriority::Medium);
668 if processed < low_prio_threshold {
669 execute(CommandPriority::Low);
670 }
671
672 if metric_dumping_counter == METRIC_DUMPING_TRESHOLD {
673 print!("{{");
674 metric_dumping_counter = 0;
675 func_histograms.iter_mut().for_each(|(name, h)| {
676 print_histogram(*name, h, true);
677 h.clear();
678 });
679 print_histogram("send", &send_hist, false);
680 send_hist.clear();
681 println!("}}");
682 }
683 } debug!("Closing command thread");
685 shutdown_finished.store(true, Ordering::Release);
686 }
687
688 #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
689 pub async fn new<P: AsRef<Path>>(
690 path: P,
691 config: SqliteConfig,
692 sleep: S,
693 ) -> Result<Self, DbError> {
694 let path = path.as_ref().to_owned();
695
696 let shutdown_requested = Arc::new(AtomicBool::new(false));
697 let shutdown_finished = Arc::new(AtomicBool::new(false));
698
699 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
700 info!("Sqlite database location: {path:?}");
701 let join_handle = {
702 let init_task = {
704 tokio::task::spawn_blocking(move || {
705 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
706 })
707 .await
708 };
709 let conn = match init_task {
710 Ok(conn) => conn,
711 Err(err) => {
712 error!("Initialization error - {err:?}");
713 return Err(DbError::Specific(SpecificError::GenericError(
714 StrVariant::Static("initialization error"),
715 )));
716 }
717 };
718 let shutdown_requested = shutdown_requested.clone();
719 let shutdown_finished = shutdown_finished.clone();
720 std::thread::spawn(move || {
722 Self::connection_rpc(
723 conn,
724 &shutdown_requested,
725 &shutdown_finished,
726 command_rx,
727 config.queue_capacity,
728 config.low_prio_threshold,
729 );
730 })
731 };
732 Ok(SqlitePool(SqlitePoolInner {
733 shutdown_requested,
734 shutdown_finished,
735 command_tx,
736 response_subscribers: Arc::default(),
737 ffqn_to_pending_subscription: Arc::default(),
738 sleep,
739 join_handle: Some(Arc::new(join_handle)),
740 }))
741 }
742
743 #[instrument(level = Level::TRACE, skip_all, fields(name))]
744 pub async fn transaction_write<F, T>(&self, func: F, name: &'static str) -> Result<T, DbError>
745 where
746 F: FnOnce(&mut rusqlite::Transaction) -> Result<T, DbError> + Send + 'static,
747 T: Send + 'static,
748 {
749 self.transaction(func, true, name).await
750 }
751
752 #[instrument(level = Level::TRACE, skip_all, fields(name))]
753 pub async fn transaction_read<F, T>(&self, func: F, name: &'static str) -> Result<T, DbError>
754 where
755 F: FnOnce(&mut rusqlite::Transaction) -> Result<T, DbError> + Send + 'static,
756 T: Send + 'static,
757 {
758 self.transaction(func, false, name).await
759 }
760
761 async fn transaction<F, T>(
763 &self,
764 func: F,
765 write: bool,
766 name: &'static str,
767 ) -> Result<T, DbError>
768 where
769 F: FnOnce(&mut rusqlite::Transaction) -> Result<T, DbError> + Send + 'static,
770 T: Send + 'static,
771 {
772 let (tx, rx) = oneshot::channel();
773 let parent_span = Span::current();
774
775 self.0
776 .command_tx
777 .send(ThreadCommand::Func {
778 priority: if write {
779 CommandPriority::High
780 } else {
781 CommandPriority::Medium
782 },
783 func: Box::new(move |conn| {
784 let res = debug_span!(parent: &parent_span, "tx_begin", name).in_scope(|| {
785 conn.transaction_with_behavior(if write {
786 rusqlite::TransactionBehavior::Immediate
787 } else {
788 rusqlite::TransactionBehavior::Deferred
789 })
790 .map_err(convert_err)
791 });
792 let res = res.and_then(|mut transaction| {
793 parent_span.in_scope(|| func(&mut transaction).map(|ok| (ok, transaction)))
794 });
795 let res = res.and_then(|(ok, transaction)| {
796 debug_span!(parent: &parent_span, "tx_commit", name)
797 .in_scope(|| transaction.commit().map(|()| ok).map_err(convert_err))
798 });
799 _ = tx.send(res);
800 }),
801 sent_at: Instant::now(),
802 name,
803 })
804 .await
805 .map_err(|_send_err| DbError::Connection(DbConnectionError::SendError))?;
806 rx.await
807 .map_err(|_recv_err| DbError::Connection(DbConnectionError::RecvError))?
808 }
809
810 #[instrument(level = Level::TRACE, skip_all, fields(name))]
811 pub async fn conn_low_prio<F, T>(&self, func: F, name: &'static str) -> Result<T, DbError>
812 where
813 F: FnOnce(&Connection) -> Result<T, DbError> + Send + 'static,
814 T: Send + 'static + Default,
815 {
816 let (tx, rx) = oneshot::channel();
817 let span = tracing::trace_span!("tx_function");
818 self.0
819 .command_tx
820 .send(ThreadCommand::Func {
821 priority: CommandPriority::Low,
822 func: Box::new(move |conn| {
823 _ = tx.send(span.in_scope(|| func(conn)));
824 }),
825 sent_at: Instant::now(),
826 name,
827 })
828 .await
829 .map_err(|_send_err| DbError::Connection(DbConnectionError::SendError))?;
830 match rx.await {
831 Ok(res) => res,
832 Err(_recv_err) => Ok(T::default()), }
834 }
835
836 fn fetch_created_event(
837 conn: &Connection,
838 execution_id: &ExecutionId,
839 ) -> Result<CreateRequest, DbError> {
840 let mut stmt = conn
841 .prepare(
842 "SELECT created_at, json_value FROM t_execution_log WHERE \
843 execution_id = :execution_id AND version = 0",
844 )
845 .map_err(convert_err)?;
846 let (created_at, event) = stmt
847 .query_row(
848 named_params! {
849 ":execution_id": execution_id.to_string(),
850 },
851 |row| {
852 let created_at = row.get("created_at")?;
853 let event = row
854 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
855 .map(|event| (created_at, event.0))
856 .map_err(|serde| {
857 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
858 parsing_err(serde)
859 });
860 Ok(event)
861 },
862 )
863 .map_err(convert_err)??;
864 if let ExecutionEventInner::Created {
865 ffqn,
866 params,
867 parent,
868 scheduled_at,
869 retry_exp_backoff,
870 max_retries,
871 component_id,
872 metadata,
873 scheduled_by,
874 } = event
875 {
876 Ok(CreateRequest {
877 created_at,
878 execution_id: execution_id.clone(),
879 ffqn,
880 params,
881 parent,
882 scheduled_at,
883 retry_exp_backoff,
884 max_retries,
885 component_id,
886 metadata,
887 scheduled_by,
888 })
889 } else {
890 error!("Cannt match `Created` event - {event:?}");
891 Err(DbError::Specific(SpecificError::ConsistencyError(
892 StrVariant::Static("Cannot deserialize `Created` event"),
893 )))
894 }
895 }
896
897 fn count_temporary_events(
898 conn: &Connection,
899 execution_id: &ExecutionId,
900 ) -> Result<u32, DbError> {
901 let mut stmt = conn.prepare(
902 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND (variant = :v1 OR variant = :v2)",
903 ).map_err(convert_err)?;
904 stmt.query_row(
905 named_params! {
906 ":execution_id": execution_id.to_string(),
907 ":v1": DUMMY_TEMPORARILY_TIMED_OUT.variant(),
908 ":v2": DUMMY_TEMPORARILY_FAILED.variant(),
909 },
910 |row| row.get("count"),
911 )
912 .map_err(convert_err)
913 }
914
915 fn get_current_version(
916 tx: &Transaction,
917 execution_id: &ExecutionId,
918 ) -> Result<Version, DbError> {
919 let mut stmt = tx.prepare(
920 "SELECT version FROM t_execution_log WHERE execution_id = :execution_id ORDER BY version DESC LIMIT 1",
921 ).map_err(convert_err)?;
922 stmt.query_row(
923 named_params! {
924 ":execution_id": execution_id.to_string(),
925 },
926 |row| {
927 let version: VersionType = row.get("version")?;
928 Ok(Version::new(version))
929 },
930 )
931 .map_err(convert_err)
932 }
933
934 fn get_next_version(tx: &Transaction, execution_id: &ExecutionId) -> Result<Version, DbError> {
935 Self::get_current_version(tx, execution_id).map(|ver| Version::new(ver.0 + 1))
936 }
937
938 fn check_expected_next_and_appending_version(
939 expected_version: &Version,
940 appending_version: &Version,
941 ) -> Result<(), DbError> {
942 if *expected_version != *appending_version {
943 debug!(
944 "Version mismatch - expected: {expected_version:?}, appending: {appending_version:?}"
945 );
946 return Err(DbError::Specific(SpecificError::VersionMismatch {
947 appending_version: appending_version.clone(),
948 expected_version: expected_version.clone(),
949 }));
950 }
951 Ok(())
952 }
953
954 #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
955 fn create_inner(
956 tx: &Transaction,
957 req: CreateRequest,
958 ) -> Result<(AppendResponse, PendingAt), DbError> {
959 debug!("create_inner");
960
961 let version = Version::new(0);
962 let execution_id = req.execution_id.clone();
963 let execution_id_str = execution_id.to_string();
964 let ffqn = req.ffqn.clone();
965 let created_at = req.created_at;
966 let scheduled_at = req.scheduled_at;
967 let event = ExecutionEventInner::from(req);
968 let event_ser = serde_json::to_string(&event)
969 .map_err(|err| {
970 error!("Cannot serialize {event:?} - {err:?}");
971 rusqlite::Error::ToSqlConversionFailure(err.into())
972 })
973 .map_err(convert_err)?;
974 tx.prepare(
975 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
976 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
977 .map_err(convert_err)?
978 .execute(named_params! {
979 ":execution_id": &execution_id_str,
980 ":created_at": created_at,
981 ":version": version.0,
982 ":json_value": event_ser,
983 ":variant": event.variant(),
984 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
985 })
986 .map_err(convert_err)?;
987 let next_version = Version::new(version.0 + 1);
988 let pending_state = PendingState::PendingAt { scheduled_at };
989 let pending_at = {
990 let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
991 debug!("Creating with `Pending(`{scheduled_at:?}`)");
992 tx.prepare(
993 "INSERT INTO t_state (created_at, scheduled_at, state, execution_id, is_top_level, next_version, pending_expires_finished, ffqn) \
994 VALUES (:created_at, :scheduled_at, :state, :execution_id, :is_top_level, :next_version, :pending_expires_finished, :ffqn)",
995 )
996 .map_err(convert_err)?
997 .execute(named_params! {
998 ":created_at": created_at,
999 ":scheduled_at": scheduled_at,
1000 ":state": STATE_PENDING_AT,
1001 ":execution_id": execution_id.to_string(),
1002 ":is_top_level": execution_id.is_top_level(),
1003 ":next_version": next_version.0,
1004 ":pending_expires_finished": scheduled_at,
1005 ":ffqn": ffqn.to_string(),
1006 })
1007 .map_err(convert_err)?;
1008 PendingAt { scheduled_at, ffqn }
1009 };
1010 Ok((next_version, pending_at))
1011 }
1012
1013 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %pending_state, %next_version, purge, ffqn = ?ffqn_when_creating))]
1014 fn update_state(
1015 tx: &Transaction,
1016 execution_id: &ExecutionId,
1017 pending_state: &PendingState,
1018 expected_current_version: &Version,
1019 next_version: &Version,
1020 ffqn_when_creating: Option<FunctionFqn>, ) -> Result<IndexUpdated, DbError> {
1022 debug!("update_index");
1023
1024 match &pending_state {
1025 PendingState::PendingAt { scheduled_at } => {
1026 debug!("Setting state `Pending(`{scheduled_at:?}`)");
1027 let updated = tx
1028 .prepare(
1029 "UPDATE t_state SET \
1030 state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1031 join_set_id = NULL,join_set_closing=NULL, \
1032 executor_id = NULL,run_id = NULL,temporary_event_count = NULL, \
1033 max_retries = NULL,retry_exp_backoff_millis = NULL,parent_execution_id = NULL,parent_join_set_id = NULL, locked_at_version = NULL, \
1034 result_kind = NULL \
1035 WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1036 )
1037 .map_err(convert_err)?
1038 .execute(named_params! {
1039 ":next_version": next_version.0,
1040 ":pending_expires_finished": scheduled_at,
1041 ":state": STATE_PENDING_AT,
1042
1043 ":execution_id": execution_id.to_string(),
1044 ":expected_current_version": expected_current_version.0,
1045
1046 })
1047 .map_err(convert_err)?;
1048 if updated != 1 {
1049 error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1050 return Err(DbError::Specific(SpecificError::ConsistencyError(
1051 "updating state failed".into(),
1052 )));
1053 }
1054 let ffqn = if let Some(ffqn) = ffqn_when_creating {
1055 ffqn
1056 } else {
1057 Self::fetch_created_event(tx, execution_id)?.ffqn
1058 };
1059 Ok(IndexUpdated {
1060 temporary_event_count: None,
1061 pending_at: Some(PendingAt {
1062 scheduled_at: *scheduled_at,
1063 ffqn,
1064 }),
1065 })
1066 }
1067 PendingState::Locked {
1068 lock_expires_at,
1069 executor_id,
1070 run_id,
1071 } => {
1072 debug!(
1073 %executor_id,
1074 %run_id, "Setting state `Locked({next_version}, {lock_expires_at})`"
1075 );
1076 let temporary_event_count = Self::count_temporary_events(tx, execution_id)?;
1077 let create_req = Self::fetch_created_event(tx, execution_id)?;
1078
1079 let updated = tx
1080 .prepare(
1081 "UPDATE t_state SET \
1082 state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1083 join_set_id = NULL,join_set_closing=NULL, \
1084 executor_id = :executor_id, run_id = :run_id, temporary_event_count = :temporary_event_count, \
1085 max_retries = :max_retries, retry_exp_backoff_millis = :retry_exp_backoff_millis, \
1086 parent_execution_id = :parent_execution_id, parent_join_set_id = :parent_join_set_id, locked_at_version = :locked_at_version, \
1087 result_kind = NULL \
1088 WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1089 )
1090 .map_err(convert_err)?
1091 .execute(named_params! {
1092 ":next_version": next_version.0,
1093 ":pending_expires_finished": lock_expires_at,
1094 ":state": STATE_LOCKED,
1095
1096 ":executor_id": executor_id.to_string(),
1097 ":run_id": run_id.to_string(),
1098 ":temporary_event_count": temporary_event_count,
1099 ":max_retries": create_req.max_retries,
1100 ":retry_exp_backoff_millis": u64::try_from(create_req.retry_exp_backoff.as_millis()).unwrap(),
1101 ":parent_execution_id": create_req.parent.as_ref().map(|(pid, _) | pid.to_string()),
1102 ":parent_join_set_id": create_req.parent.as_ref().map(|(_, join_set_id)| join_set_id.to_string()),
1103 ":locked_at_version": expected_current_version.0,
1104
1105 ":execution_id": execution_id.to_string(),
1106 ":expected_current_version": expected_current_version.0,
1107
1108 })
1109 .map_err(convert_err)?;
1110 if updated != 1 {
1111 error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1112 return Err(DbError::Specific(SpecificError::ConsistencyError(
1113 "updating state failed".into(),
1114 )));
1115 }
1116
1117 Ok(IndexUpdated {
1118 temporary_event_count: Some(temporary_event_count),
1119 pending_at: None,
1120 })
1121 }
1122 PendingState::BlockedByJoinSet {
1123 join_set_id,
1124 lock_expires_at,
1125 closing: join_set_closing,
1126 } => {
1127 debug!(%join_set_id, "Setting state `BlockedByJoinSet({next_version},{lock_expires_at})`");
1128 let updated = tx
1129 .prepare(
1130 "UPDATE t_state SET \
1131 state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1132 join_set_id = :join_set_id, join_set_closing=:join_set_closing, \
1133 executor_id = NULL,run_id = NULL,temporary_event_count = NULL, \
1134 max_retries = NULL,retry_exp_backoff_millis = NULL,parent_execution_id = NULL,parent_join_set_id = NULL, locked_at_version = NULL, \
1135 result_kind = NULL \
1136 WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1137 )
1138 .map_err(convert_err)?
1139 .execute(named_params! {
1140 ":next_version": next_version.0,
1141 ":pending_expires_finished": lock_expires_at,
1142 ":state": STATE_BLOCKED_BY_JOIN_SET,
1143
1144 ":join_set_id": join_set_id.to_string(),
1145 ":join_set_closing": join_set_closing,
1146
1147 ":execution_id": execution_id.to_string(),
1148 ":expected_current_version": expected_current_version.0,
1149
1150 })
1151 .map_err(convert_err)?;
1152 if updated != 1 {
1153 error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1154 return Err(DbError::Specific(SpecificError::ConsistencyError(
1155 "updating state failed".into(),
1156 )));
1157 }
1158
1159 Ok(IndexUpdated::default())
1160 }
1161 PendingState::Finished {
1162 finished:
1163 PendingStateFinished {
1164 version: finished_version,
1165 finished_at,
1166 result_kind,
1167 },
1168 } => {
1169 debug!("Setting state `Finished`");
1170 assert_eq!(*finished_version, expected_current_version.0);
1171 let updated = tx
1172 .prepare(
1173 "UPDATE t_state SET \
1174 state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1175 join_set_id = NULL,join_set_closing=NULL, \
1176 executor_id = NULL,run_id = NULL,temporary_event_count = NULL, \
1177 max_retries = NULL,retry_exp_backoff_millis = NULL,parent_execution_id = NULL,parent_join_set_id = NULL, locked_at_version = NULL, \
1178 result_kind = :result_kind \
1179 WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1180 )
1181 .map_err(convert_err)?
1182 .execute(named_params! {
1183 ":next_version": finished_version,
1184 ":pending_expires_finished": finished_at,
1185 ":state": STATE_FINISHED,
1186
1187 ":result_kind": result_kind.to_string(),
1188
1189 ":execution_id": execution_id.to_string(),
1190 ":expected_current_version": expected_current_version.0,
1191
1192 })
1193 .map_err(convert_err)?;
1194 if updated != 1 {
1195 error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1196 return Err(DbError::Specific(SpecificError::ConsistencyError(
1197 "updating state failed".into(),
1198 )));
1199 }
1200
1201 Ok(IndexUpdated::default())
1202 }
1203 }
1204 }
1205
1206 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %next_version, purge))]
1207 fn bump_state_next_version(
1208 tx: &Transaction,
1209 execution_id: &ExecutionId,
1210 next_version: &Version,
1211 delay_req: Option<DelayReq>,
1212 ) -> Result<(), DbError> {
1213 debug!("update_index_version");
1214 let execution_id_str = execution_id.to_string();
1215 let mut stmt = tx.prepare_cached(
1216 "UPDATE t_state SET next_version = :next_version WHERE execution_id = :execution_id AND next_version = :next_version - 1",
1217 ).map_err(convert_err)?;
1218 let updated = stmt
1219 .execute(named_params! {
1220 ":execution_id": execution_id_str,
1221 ":next_version": next_version.0,
1222 })
1223 .map_err(convert_err)?;
1224 if updated != 1 {
1225 return Err(DbError::Specific(SpecificError::NotFound));
1226 }
1227 if let Some(DelayReq {
1228 join_set_id,
1229 delay_id,
1230 expires_at,
1231 }) = delay_req
1232 {
1233 debug!("Inserting delay to `t_delay`");
1234 let mut stmt = tx
1235 .prepare_cached(
1236 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1237 VALUES \
1238 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1239 )
1240 .map_err(convert_err)?;
1241 stmt.execute(named_params! {
1242 ":execution_id": execution_id_str,
1243 ":join_set_id": join_set_id.to_string(),
1244 ":delay_id": delay_id.to_string(),
1245 ":expires_at": expires_at,
1246 })
1247 .map_err(convert_err)?;
1248 }
1249 Ok(())
1250 }
1251
1252 fn get_combined_state(
1253 tx: &Transaction,
1254 execution_id: &ExecutionId,
1255 ) -> Result<CombinedState, DbError> {
1256 let mut stmt = tx.prepare(
1257 "SELECT state, ffqn, next_version, pending_expires_finished, executor_id, run_id, join_set_id, join_set_closing, result_kind FROM t_state WHERE \
1258 execution_id = :execution_id",
1259 ).map_err(convert_err)?;
1260 stmt.query_row(
1261 named_params! {
1262 ":execution_id": execution_id.to_string(),
1263 },
1264 |row| {
1265 Ok(CombinedState::new(
1266 &CombinedStateDTO {
1267 state: row.get("state")?,
1268 ffqn: row.get("ffqn")?,
1269 pending_expires_finished: row
1270 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1271 executor_id: row
1272 .get::<_, Option<ExecutorIdW>>("executor_id")?
1273 .map(|w| w.0),
1274 run_id: row.get::<_, Option<RunIdW>>("run_id")?.map(|w| w.0),
1275 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1276 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1277 result_kind: row
1278 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1279 "result_kind",
1280 )?
1281 .map(|wrapper| wrapper.0),
1282 },
1283 Version::new(row.get("next_version")?),
1284 ))
1285 },
1286 )
1287 .map_err(convert_err)?
1288 }
1289
1290 fn list_executions(
1291 read_tx: &Transaction,
1292 ffqn: Option<FunctionFqn>,
1293 top_level_only: bool,
1294 pagination: ExecutionListPagination,
1295 ) -> Result<Vec<ExecutionWithState>, DbError> {
1296 struct StatementModifier {
1297 where_vec: Vec<String>,
1298 params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)>,
1299 limit: u32,
1300 limit_desc: bool,
1301 }
1302 fn paginate<T: rusqlite::ToSql + 'static>(
1303 pagination: Pagination<Option<T>>,
1304 column: &str,
1305 top_level_only: bool,
1306 ) -> StatementModifier {
1307 let mut where_vec: Vec<String> = vec![];
1308 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1309 let limit = pagination.length();
1310 let limit_desc = pagination.is_desc();
1311 let rel = pagination.rel();
1312 match pagination {
1313 Pagination::NewerThan {
1314 cursor: Some(cursor),
1315 ..
1316 }
1317 | Pagination::OlderThan {
1318 cursor: Some(cursor),
1319 ..
1320 } => {
1321 where_vec.push(format!("{column} {rel} :cursor"));
1322 params.push((":cursor", Box::new(cursor)));
1323 }
1324 _ => {}
1325 }
1326 if top_level_only {
1327 where_vec.push("is_top_level=true".to_string());
1328 }
1329 StatementModifier {
1330 where_vec,
1331 params,
1332 limit,
1333 limit_desc,
1334 }
1335 }
1336 let mut statement_mod = match pagination {
1337 ExecutionListPagination::CreatedBy(pagination) => {
1338 paginate(pagination, "created_at", top_level_only)
1339 }
1340 ExecutionListPagination::ExecutionId(pagination) => {
1341 paginate(pagination, "execution_id", top_level_only)
1342 }
1343 };
1344
1345 if let Some(ffqn) = ffqn {
1346 statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1347 statement_mod
1348 .params
1349 .push((":ffqn", Box::new(ffqn.to_string())));
1350 }
1351
1352 let where_str = if statement_mod.where_vec.is_empty() {
1353 String::new()
1354 } else {
1355 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1356 };
1357 let sql = format!(
1358 "SELECT created_at, scheduled_at, state, execution_id, ffqn, next_version, \
1359 pending_expires_finished, executor_id, run_id, join_set_id, join_set_closing, result_kind \
1360 FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}",
1361 desc = if statement_mod.limit_desc { "DESC" } else { "" },
1362 limit = statement_mod.limit
1363 );
1364 let mut vec: Vec<_> = read_tx
1365 .prepare(&sql)
1366 .map_err(convert_err)?
1367 .query_map::<_, &[(&'static str, &dyn rusqlite::ToSql)], _>(
1368 statement_mod
1369 .params
1370 .iter()
1371 .map(|(key, value)| (*key, value.as_ref()))
1372 .collect::<Vec<_>>()
1373 .as_ref(),
1374 |row| {
1375 let execution_id_res = row
1376 .get::<_, String>("execution_id")?
1377 .parse::<ExecutionId>()
1378 .map_err(parsing_err);
1379 let created_at = row.get("created_at")?;
1380 let scheduled_at = row.get("scheduled_at")?;
1381 let combined_state_res = CombinedState::new(
1382 &CombinedStateDTO {
1383 state: row.get("state")?,
1384 ffqn: row.get("ffqn")?,
1385 pending_expires_finished: row
1386 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1387 executor_id: row
1388 .get::<_, Option<ExecutorIdW>>("executor_id")?
1389 .map(|w| w.0),
1390 run_id: row.get::<_, Option<RunIdW>>("run_id")?.map(|w| w.0),
1391 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1392 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1393 result_kind: row
1394 .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1395 "result_kind",
1396 )?
1397 .map(|wrapper| wrapper.0),
1398 },
1399 Version::new(row.get("next_version")?),
1400 );
1401 Ok(combined_state_res.and_then(|combined_state| {
1402 execution_id_res.map(|execution_id| ExecutionWithState {
1403 execution_id,
1404 ffqn: combined_state.ffqn,
1405 pending_state: combined_state.pending_state,
1406 created_at,
1407 scheduled_at,
1408 })
1409 }))
1410 },
1411 )
1412 .map_err(convert_err)?
1413 .collect::<Result<Vec<_>, _>>()
1414 .map_err(convert_err)?
1415 .into_iter()
1416 .collect::<Result<_, _>>()?;
1417
1418 if !statement_mod.limit_desc {
1419 vec.reverse();
1421 }
1422 Ok(vec)
1423 }
1424
1425 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1426 #[expect(clippy::too_many_arguments)]
1427 fn lock_inner(
1428 tx: &Transaction,
1429 created_at: DateTime<Utc>,
1430 component_id: ComponentId,
1431 execution_id: &ExecutionId,
1432 run_id: RunId,
1433 appending_version: &Version,
1434 executor_id: ExecutorId,
1435 lock_expires_at: DateTime<Utc>,
1436 ) -> Result<LockedExecution, DbError> {
1437 debug!("lock_inner");
1438 let CombinedState {
1439 ffqn,
1440 pending_state,
1441 next_version: expected_version,
1442 } = Self::get_combined_state(tx, execution_id)?;
1443 pending_state
1444 .can_append_lock(created_at, executor_id, run_id, lock_expires_at)
1445 .map_err(DbError::Specific)?;
1446 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1447
1448 let event = ExecutionEventInner::Locked {
1450 component_id,
1451 executor_id,
1452 lock_expires_at,
1453 run_id,
1454 };
1455 let event_ser = serde_json::to_string(&event)
1456 .map_err(|err| {
1457 error!("Cannot serialize {event:?} - {err:?}");
1458 rusqlite::Error::ToSqlConversionFailure(err.into())
1459 })
1460 .map_err(convert_err)?;
1461 let mut stmt = tx
1462 .prepare_cached(
1463 "INSERT INTO t_execution_log \
1464 (execution_id, created_at, json_value, version, variant) \
1465 VALUES \
1466 (:execution_id, :created_at, :json_value, :version, :variant)",
1467 )
1468 .map_err(convert_err)?;
1469 stmt.execute(named_params! {
1470 ":execution_id": execution_id.to_string(),
1471 ":created_at": created_at,
1472 ":json_value": event_ser,
1473 ":version": appending_version.0,
1474 ":variant": event.variant(),
1475 })
1476 .map_err(convert_err)?;
1477 let pending_state = PendingState::Locked {
1479 executor_id,
1480 run_id,
1481 lock_expires_at,
1482 };
1483
1484 let next_version = Version::new(appending_version.0 + 1);
1485 let temporary_event_count = Self::update_state(
1486 tx,
1487 execution_id,
1488 &pending_state,
1489 &Version(next_version.0 - 1),
1490 &next_version,
1491 Some(ffqn),
1492 )?
1493 .temporary_event_count
1494 .expect("temporary_event_count must be set");
1495 let mut events = tx
1497 .prepare(
1498 "SELECT json_value FROM t_execution_log WHERE \
1499 execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1500 ORDER BY version",
1501 )
1502 .map_err(convert_err)?
1503 .query_map(
1504 named_params! {
1505 ":execution_id": execution_id.to_string(),
1506 ":v1": DUMMY_CREATED.variant(),
1507 ":v2": DUMMY_HISTORY_EVENT.variant(),
1508 },
1509 |row| {
1510 let event = row
1511 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1512 .map(|wrapper| wrapper.0)
1513 .map_err(|serde| {
1514 error!("Cannot deserialize {row:?} - {serde:?}");
1515 parsing_err(serde)
1516 });
1517 Ok(event)
1518 },
1519 )
1520 .map_err(convert_err)?
1521 .collect::<Result<Vec<_>, _>>()
1522 .map_err(convert_err)?
1523 .into_iter()
1524 .collect::<Result<VecDeque<_>, _>>()?;
1525 let Some(ExecutionEventInner::Created {
1526 ffqn,
1527 params,
1528 scheduled_at,
1529 retry_exp_backoff,
1530 max_retries,
1531 parent,
1532 metadata,
1533 ..
1534 }) = events.pop_front()
1535 else {
1536 error!("Execution log must contain at least `Created` event");
1537 return Err(DbError::Specific(SpecificError::ConsistencyError(
1538 StrVariant::Static("execution log must contain `Created` event"),
1539 )));
1540 };
1541 let responses = Self::list_responses(tx, execution_id, None)?
1542 .into_iter()
1543 .map(|resp| resp.event)
1544 .collect();
1545 trace!("Responses: {responses:?}");
1546 let event_history = events
1547 .into_iter()
1548 .map(|event| {
1549 if let ExecutionEventInner::HistoryEvent { event } = event {
1550 Ok(event)
1551 } else {
1552 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1553 Err(DbError::Specific(SpecificError::ConsistencyError(
1554 StrVariant::Static(
1555 "Rows can only contain `Created` and `HistoryEvent` event kinds",
1556 ),
1557 )))
1558 }
1559 })
1560 .collect::<Result<Vec<_>, _>>()?;
1561 Ok(LockedExecution {
1562 execution_id: execution_id.clone(),
1563 metadata,
1564 run_id,
1565 version: next_version,
1566 ffqn,
1567 params,
1568 event_history,
1569 responses,
1570 scheduled_at,
1571 retry_exp_backoff,
1572 max_retries,
1573 parent,
1574 temporary_event_count,
1575 })
1576 }
1577
1578 fn count_join_next(
1579 tx: &Transaction,
1580 execution_id: &ExecutionId,
1581 join_set_id: &JoinSetId,
1582 ) -> Result<u64, DbError> {
1583 let mut stmt = tx.prepare(
1584 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1585 AND history_event_type = :join_next",
1586 ).map_err(convert_err)?;
1587 stmt.query_row(
1588 named_params! {
1589 ":execution_id": execution_id.to_string(),
1590 ":join_set_id": join_set_id.to_string(),
1591 ":join_next": "JoinNext",
1592 },
1593 |row| row.get("count"),
1594 )
1595 .map_err(convert_err)
1596 }
1597
1598 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1599 fn append(
1600 tx: &Transaction,
1601 execution_id: &ExecutionId,
1602 req: &AppendRequest,
1603 appending_version: &Version,
1604 ) -> Result<(AppendResponse, Option<PendingAt>), DbError> {
1605 assert!(!matches!(req.event, ExecutionEventInner::Created { .. }));
1606 if let ExecutionEventInner::Locked {
1607 component_id,
1608 executor_id,
1609 run_id,
1610 lock_expires_at,
1611 } = &req.event
1612 {
1613 let locked = Self::lock_inner(
1614 tx,
1615 req.created_at,
1616 component_id.clone(),
1617 execution_id,
1618 *run_id,
1619 appending_version,
1620 *executor_id,
1621 *lock_expires_at,
1622 )?;
1623 return Ok((locked.version, None));
1624 }
1625
1626 let combined_state = Self::get_combined_state(tx, execution_id)?;
1627 if combined_state.pending_state.is_finished() {
1628 debug!("Execution is already finished");
1629 return Err(DbError::Specific(SpecificError::ValidationFailed(
1630 StrVariant::Static("execution is already finished"),
1631 )));
1632 }
1633
1634 Self::check_expected_next_and_appending_version(
1635 &combined_state.next_version,
1636 appending_version,
1637 )?;
1638 let event_ser = serde_json::to_string(&req.event)
1639 .map_err(|err| {
1640 error!("Cannot serialize {:?} - {err:?}", req.event);
1641 rusqlite::Error::ToSqlConversionFailure(err.into())
1642 })
1643 .map_err(convert_err)?;
1644
1645 let mut stmt = tx.prepare(
1646 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1647 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)").map_err(convert_err)?;
1648 stmt.execute(named_params! {
1649 ":execution_id": execution_id.to_string(),
1650 ":created_at": req.created_at,
1651 ":json_value": event_ser,
1652 ":version": appending_version.0,
1653 ":variant": req.event.variant(),
1654 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1655 })
1656 .map_err(convert_err)?;
1657 let mut next_version = Version(appending_version.0 + 1);
1659 let index_action = match &req.event {
1660 ExecutionEventInner::Created { .. } => {
1661 unreachable!("handled in the caller")
1662 }
1663
1664 ExecutionEventInner::Locked { .. } => {
1665 unreachable!("handled above")
1666 }
1667 ExecutionEventInner::TemporarilyFailed {
1668 backoff_expires_at, ..
1669 }
1670 | ExecutionEventInner::TemporarilyTimedOut {
1671 backoff_expires_at, ..
1672 }
1673 | ExecutionEventInner::Unlocked {
1674 backoff_expires_at, ..
1675 } => IndexAction::PendingStateChanged(PendingState::PendingAt {
1676 scheduled_at: *backoff_expires_at,
1677 }),
1678 ExecutionEventInner::Finished { result, .. } => {
1679 next_version = appending_version.clone();
1680 IndexAction::PendingStateChanged(PendingState::Finished {
1681 finished: PendingStateFinished {
1682 version: appending_version.0,
1683 finished_at: req.created_at,
1684 result_kind: PendingStateFinishedResultKind::from(result),
1685 },
1686 })
1687 }
1688 ExecutionEventInner::HistoryEvent {
1689 event:
1690 HistoryEvent::JoinSetCreate { .. }
1691 | HistoryEvent::JoinSetRequest {
1692 request: JoinSetRequest::ChildExecutionRequest { .. },
1693 ..
1694 }
1695 | HistoryEvent::Persist { .. }
1696 | HistoryEvent::Schedule { .. }
1697 | HistoryEvent::Stub { .. }
1698 | HistoryEvent::JoinNextTooMany { .. },
1699 } => IndexAction::NoPendingStateChange(None),
1700
1701 ExecutionEventInner::HistoryEvent {
1702 event:
1703 HistoryEvent::JoinSetRequest {
1704 join_set_id,
1705 request:
1706 JoinSetRequest::DelayRequest {
1707 delay_id,
1708 expires_at,
1709 ..
1710 },
1711 },
1712 } => IndexAction::NoPendingStateChange(Some(DelayReq {
1713 join_set_id: join_set_id.clone(),
1714 delay_id: delay_id.clone(),
1715 expires_at: *expires_at,
1716 })),
1717
1718 ExecutionEventInner::HistoryEvent {
1719 event:
1720 HistoryEvent::JoinNext {
1721 join_set_id,
1722 run_expires_at,
1723 closing,
1724 requested_ffqn: _,
1725 },
1726 } => {
1727 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1729 let nth_response =
1730 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1732 assert!(join_next_count > 0);
1733 if let Some(JoinSetResponseEventOuter {
1734 created_at: nth_created_at,
1735 ..
1736 }) = nth_response
1737 {
1738 let scheduled_at = max(*run_expires_at, nth_created_at);
1740 IndexAction::PendingStateChanged(PendingState::PendingAt { scheduled_at })
1741 } else {
1742 IndexAction::PendingStateChanged(PendingState::BlockedByJoinSet {
1743 join_set_id: join_set_id.clone(),
1744 lock_expires_at: *run_expires_at,
1745 closing: *closing,
1746 })
1747 }
1748 }
1749 };
1750
1751 let pending_at = match index_action {
1752 IndexAction::PendingStateChanged(pending_state) => {
1753 let expected_current_version = Version(
1754 if let PendingState::Finished {
1755 finished:
1756 PendingStateFinished {
1757 version: finished_version,
1758 ..
1759 },
1760 } = &pending_state
1761 {
1762 assert_eq!(*finished_version, next_version.0); next_version.0
1764 } else {
1765 next_version.0 - 1
1766 },
1767 );
1768
1769 Self::update_state(
1770 tx,
1771 execution_id,
1772 &pending_state,
1773 &expected_current_version,
1774 &next_version,
1775 None,
1776 )?
1777 .pending_at
1778 }
1779 IndexAction::NoPendingStateChange(delay_req) => {
1780 Self::bump_state_next_version(tx, execution_id, &next_version, delay_req)?;
1781 None
1782 }
1783 };
1784 Ok((next_version, pending_at))
1785 }
1786
1787 fn append_response(
1788 tx: &Transaction,
1789 execution_id: &ExecutionId,
1790 req: &JoinSetResponseEventOuter,
1791 response_subscribers: &ResponseSubscribers,
1792 ) -> Result<
1793 (
1794 Option<oneshot::Sender<JoinSetResponseEventOuter>>,
1795 Option<PendingAt>,
1796 ),
1797 DbError,
1798 > {
1799 let mut stmt = tx.prepare(
1800 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1801 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1802 ).map_err(convert_err)?;
1803 let join_set_id = &req.event.join_set_id;
1804 let delay_id = match &req.event.event {
1805 JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1806 JoinSetResponse::ChildExecutionFinished { .. } => None,
1807 };
1808 let (child_execution_id, finished_version) = match &req.event.event {
1809 JoinSetResponse::ChildExecutionFinished {
1810 child_execution_id,
1811 finished_version,
1812 result: _,
1813 } => (
1814 Some(child_execution_id.to_string()),
1815 Some(finished_version.0),
1816 ),
1817 JoinSetResponse::DelayFinished { .. } => (None, None),
1818 };
1819
1820 stmt.execute(named_params! {
1821 ":execution_id": execution_id.to_string(),
1822 ":created_at": req.created_at,
1823 ":join_set_id": join_set_id.to_string(),
1824 ":delay_id": delay_id,
1825 ":child_execution_id": child_execution_id,
1826 ":finished_version": finished_version,
1827 })
1828 .map_err(convert_err)?;
1829
1830 let previous_pending_state = Self::get_combined_state(tx, execution_id)?.pending_state;
1832 debug!("previous_pending_state: {previous_pending_state:?}");
1833 let pendnig_at = match previous_pending_state {
1834 PendingState::BlockedByJoinSet {
1835 join_set_id: found_join_set_id,
1836 lock_expires_at, closing: _,
1838 } if *join_set_id == found_join_set_id => {
1839 let scheduled_at = max(lock_expires_at, req.created_at);
1842 let pending_state = PendingState::PendingAt { scheduled_at };
1845 let next_version = Self::get_next_version(tx, execution_id)?;
1846 Self::update_state(
1847 tx,
1848 execution_id,
1849 &pending_state,
1850 &next_version, &next_version,
1852 None,
1853 )?
1854 .pending_at
1855 }
1856 _ => None,
1857 };
1858 if let JoinSetResponseEvent {
1859 join_set_id,
1860 event: JoinSetResponse::DelayFinished { delay_id },
1861 } = &req.event
1862 {
1863 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
1864 let mut stmt =
1865 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id").map_err(convert_err)?;
1866 stmt.execute(named_params! {
1867 ":execution_id": execution_id.to_string(),
1868 ":join_set_id": join_set_id.to_string(),
1869 ":delay_id": delay_id.to_string(),
1870 })
1871 .map_err(convert_err)?;
1872 }
1873 Ok((
1874 response_subscribers.lock().unwrap().remove(execution_id),
1875 pendnig_at,
1876 ))
1877 }
1878
1879 fn append_backtrace(tx: &Transaction, backtrace_info: &BacktraceInfo) -> Result<(), DbError> {
1880 let mut stmt = tx
1881 .prepare(
1882 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
1883 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
1884 )
1885 .map_err(convert_err)?;
1886 let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace)
1887 .map_err(|err| {
1888 error!(
1889 "Cannot serialize backtrace {:?} - {err:?}",
1890 backtrace_info.wasm_backtrace
1891 );
1892 rusqlite::Error::ToSqlConversionFailure(err.into())
1893 })
1894 .map_err(convert_err)?;
1895 stmt.execute(named_params! {
1896 ":execution_id": backtrace_info.execution_id.to_string(),
1897 ":component_id": backtrace_info.component_id.to_string(),
1898 ":version_min_including": backtrace_info.version_min_including.0,
1899 ":version_max_excluding": backtrace_info.version_max_excluding.0,
1900 ":wasm_backtrace": backtrace,
1901 })
1902 .map_err(convert_err)?;
1903 Ok(())
1904 }
1905
1906 #[cfg(feature = "test")]
1907 fn get(
1908 tx: &Transaction,
1909 execution_id: &ExecutionId,
1910 ) -> Result<concepts::storage::ExecutionLog, DbError> {
1911 let mut stmt = tx
1912 .prepare(
1913 "SELECT created_at, json_value FROM t_execution_log WHERE \
1914 execution_id = :execution_id ORDER BY version",
1915 )
1916 .map_err(convert_err)?;
1917 let events = stmt
1918 .query_map(
1919 named_params! {
1920 ":execution_id": execution_id.to_string(),
1921 },
1922 |row| {
1923 let created_at = row.get("created_at")?;
1924 let event = row
1925 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1926 .map(|event| ExecutionEvent {
1927 created_at,
1928 event: event.0,
1929 backtrace_id: None,
1930 })
1931 .map_err(|serde| {
1932 error!("Cannot deserialize {row:?} - {serde:?}");
1933 parsing_err(serde)
1934 });
1935 Ok(event)
1936 },
1937 )
1938 .map_err(convert_err)?
1939 .collect::<Result<Vec<_>, _>>()
1940 .map_err(convert_err)?
1941 .into_iter()
1942 .collect::<Result<Vec<_>, _>>()?;
1943 if events.is_empty() {
1944 return Err(DbError::Specific(SpecificError::NotFound));
1945 }
1946 let combined_state = Self::get_combined_state(tx, execution_id)?;
1947 let responses = Self::list_responses(tx, execution_id, None)?
1948 .into_iter()
1949 .map(|resp| resp.event)
1950 .collect();
1951 Ok(concepts::storage::ExecutionLog {
1952 execution_id: execution_id.clone(),
1953 events,
1954 responses,
1955 next_version: combined_state.next_version, pending_state: combined_state.pending_state,
1957 })
1958 }
1959
1960 fn list_execution_events(
1961 tx: &Transaction,
1962 execution_id: &ExecutionId,
1963 version_min: VersionType,
1964 version_max_excluding: VersionType,
1965 include_backtrace_id: bool,
1966 ) -> Result<Vec<ExecutionEvent>, DbError> {
1967 let select = if include_backtrace_id {
1968 "SELECT
1969 log.created_at,
1970 log.json_value,
1971 -- Select version_min_including from backtrace if a match is found, otherwise NULL
1972 bt.version_min_including AS backtrace_id
1973 FROM
1974 t_execution_log AS log
1975 LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
1976 t_backtrace AS bt ON log.execution_id = bt.execution_id
1977 -- Check if the log's version falls within the backtrace's range
1978 AND log.version >= bt.version_min_including
1979 AND log.version < bt.version_max_excluding
1980 WHERE
1981 log.execution_id = :execution_id
1982 AND log.version >= :version_min
1983 AND log.version < :version_max_excluding
1984 ORDER BY
1985 log.version;"
1986 } else {
1987 "SELECT
1988 created_at, json_value, NULL as backtrace_id
1989 FROM t_execution_log WHERE
1990 execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
1991 ORDER BY version"
1992 };
1993 tx.prepare(select)
1994 .map_err(convert_err)?
1995 .query_map(
1996 named_params! {
1997 ":execution_id": execution_id.to_string(),
1998 ":version_min": version_min,
1999 ":version_max_excluding": version_max_excluding
2000 },
2001 |row| {
2002 let created_at = row.get("created_at")?;
2003 let backtrace_id = row
2004 .get::<_, Option<VersionType>>("backtrace_id")?
2005 .map(Version::new);
2006 let event = row
2007 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2008 .map(|event| ExecutionEvent {
2009 created_at,
2010 event: event.0,
2011 backtrace_id,
2012 })
2013 .map_err(|serde| {
2014 error!("Cannot deserialize {row:?} - {serde:?}");
2015 parsing_err(serde)
2016 });
2017 Ok(event)
2018 },
2019 )
2020 .map_err(convert_err)?
2021 .collect::<Result<Vec<_>, _>>()
2022 .map_err(convert_err)?
2023 .into_iter()
2024 .collect::<Result<Vec<_>, _>>()
2025 }
2026
2027 fn get_execution_event(
2028 tx: &Transaction,
2029 execution_id: &ExecutionId,
2030 version: VersionType,
2031 ) -> Result<ExecutionEvent, DbError> {
2032 let mut stmt = tx
2033 .prepare(
2034 "SELECT created_at, json_value FROM t_execution_log WHERE \
2035 execution_id = :execution_id AND version = :version",
2036 )
2037 .map_err(convert_err)?;
2038 stmt.query_row(
2039 named_params! {
2040 ":execution_id": execution_id.to_string(),
2041 ":version": version,
2042 },
2043 |row| {
2044 let created_at = row.get("created_at")?;
2045 let event = row
2046 .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2047 .map(|event| ExecutionEvent {
2048 created_at,
2049 event: event.0,
2050 backtrace_id: None,
2051 })
2052 .map_err(|serde| {
2053 error!("Cannot deserialize {row:?} - {serde:?}");
2054 parsing_err(serde)
2055 });
2056 Ok(event)
2057 },
2058 )
2059 .map_err(convert_err)?
2060 }
2061
2062 fn list_responses(
2063 tx: &Transaction,
2064 execution_id: &ExecutionId,
2065 pagination: Option<Pagination<u32>>,
2066 ) -> Result<Vec<ResponseWithCursor>, DbError> {
2067 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2069 let mut sql = "SELECT r.id, r.created_at, r.join_set_id, \
2070 r.delay_id, \
2071 r.child_execution_id, r.finished_version, l.json_value \
2072 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2073 WHERE \
2074 r.execution_id = :execution_id AND \
2075 ( \
2076 r.finished_version = l.version \
2077 OR r.child_execution_id IS NULL \
2078 )"
2079 .to_string();
2080 let limit = match &pagination {
2081 Some(
2082 pagination @ (Pagination::NewerThan { cursor, .. }
2083 | Pagination::OlderThan { cursor, .. }),
2084 ) => {
2085 params.push((":cursor", Box::new(cursor)));
2086 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2087 Some(pagination.length())
2088 }
2089 None => None,
2090 };
2091 sql.push_str(" ORDER BY id");
2092 if pagination.as_ref().is_some_and(Pagination::is_desc) {
2093 sql.push_str(" DESC");
2094 }
2095 if let Some(limit) = limit {
2096 write!(sql, " LIMIT {limit}").unwrap();
2097 }
2098 params.push((":execution_id", Box::new(execution_id.to_string())));
2099 tx.prepare(&sql)
2100 .map_err(convert_err)?
2101 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2102 params
2103 .iter()
2104 .map(|(key, value)| (*key, value.as_ref()))
2105 .collect::<Vec<_>>()
2106 .as_ref(),
2107 Self::parse_response_with_cursor,
2108 )
2109 .map_err(convert_err)?
2110 .collect::<Result<Result<Vec<_>, DbError>, rusqlite::Error>>()
2111 .map_err(convert_err)?
2112 }
2113
2114 fn parse_response_with_cursor(
2115 row: &rusqlite::Row<'_>,
2116 ) -> Result<Result<ResponseWithCursor, DbError>, rusqlite::Error> {
2117 let id = row.get("id")?;
2118 let created_at: DateTime<Utc> = row.get("created_at")?;
2119 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2120 let inner_res = match (
2121 row.get::<_, Option<DelayId>>("delay_id")?,
2122 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2123 row.get::<_, Option<VersionType>>("finished_version")?,
2124 row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2125 ) {
2126 (Some(delay_id), None, None, None) => Ok(JoinSetResponse::DelayFinished {
2127 delay_id,
2128 }),
2129 (None, Some(child_execution_id), Some(finished_version), Some(result)) => {
2130 match result.0 {
2131 ExecutionEventInner::Finished { result, .. } => {
2132 Ok(JoinSetResponse::ChildExecutionFinished {
2133 child_execution_id,
2134 finished_version: Version(finished_version),
2135 result,
2136 })
2137 }
2138 _ => Err(DbError::Specific(SpecificError::ConsistencyError(
2139 StrVariant::from(format!("invalid row t_join_set_response.{id} - Finished event not found at finished_version")),
2140 ))),
2141 }
2142 }
2143 (delay, child, finished, result) => {
2144 error!("Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}", delay, result.map(|it| it.0));
2145 Err(DbError::Specific(SpecificError::ConsistencyError(
2146 StrVariant::Static("invalid row in t_join_set_response"),
2147 )))},
2148 }
2149 .map(|event| ResponseWithCursor {
2150 cursor: id,
2151 event: JoinSetResponseEventOuter {
2152 event: JoinSetResponseEvent { join_set_id, event },
2153 created_at,
2154 },
2155 });
2156 Ok(inner_res)
2157 }
2158
2159 fn nth_response(
2160 tx: &Transaction,
2161 execution_id: &ExecutionId,
2162 join_set_id: &JoinSetId,
2163 skip_rows: u64,
2164 ) -> Result<Option<JoinSetResponseEventOuter>, DbError> {
2165 Ok(tx
2167 .prepare(
2168 "SELECT r.id, r.created_at, r.join_set_id, \
2169 r.delay_id, \
2170 r.child_execution_id, r.finished_version, l.json_value \
2171 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2172 WHERE \
2173 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2174 (
2175 r.finished_version = l.version \
2176 OR \
2177 r.child_execution_id IS NULL \
2178 ) \
2179 ORDER BY id \
2180 LIMIT 1 OFFSET :offset",
2181 )
2182 .map_err(convert_err)?
2183 .query_row(
2184 named_params! {
2185 ":execution_id": execution_id.to_string(),
2186 ":join_set_id": join_set_id.to_string(),
2187 ":offset": skip_rows,
2188 },
2189 Self::parse_response_with_cursor,
2190 )
2191 .optional()
2192 .map_err(convert_err)?
2193 .transpose()?
2194 .map(|resp| resp.event))
2195 }
2196
2197 #[instrument(level = Level::TRACE, skip_all)]
2199 fn get_responses_with_offset(
2200 tx: &Transaction,
2201 execution_id: &ExecutionId,
2202 skip_rows: usize,
2203 ) -> Result<Vec<JoinSetResponseEventOuter>, DbError> {
2204 tx.prepare(
2206 "SELECT r.id, r.created_at, r.join_set_id, \
2207 r.delay_id, \
2208 r.child_execution_id, r.finished_version, l.json_value \
2209 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2210 WHERE \
2211 r.execution_id = :execution_id AND \
2212 ( \
2213 r.finished_version = l.version \
2214 OR r.child_execution_id IS NULL \
2215 ) \
2216 ORDER BY id \
2217 LIMIT -1 OFFSET :offset",
2218 )
2219 .map_err(convert_err)?
2220 .query_map(
2221 named_params! {
2222 ":execution_id": execution_id.to_string(),
2223 ":offset": skip_rows,
2224 },
2225 Self::parse_response_with_cursor,
2226 )
2227 .map_err(convert_err)?
2228 .collect::<Result<Result<Vec<_>, DbError>, _>>()
2229 .map_err(convert_err)?
2230 .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2231 }
2232
2233 fn get_pending(
2234 conn: &Connection,
2235 batch_size: usize,
2236 pending_at_or_sooner: DateTime<Utc>,
2237 ffqns: &[FunctionFqn],
2238 ) -> Result<Vec<(ExecutionId, Version)>, DbError> {
2239 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2240
2241 for ffqn in ffqns {
2242 let mut stmt = conn
2244 .prepare(&format!(
2245 "SELECT execution_id, next_version FROM t_state WHERE \
2246 state = \"{STATE_PENDING_AT}\" AND \
2247 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn \
2248 ORDER BY pending_expires_finished LIMIT :batch_size"
2249 ))
2250 .map_err(convert_err)?;
2251 let execs_and_versions = stmt
2252 .query_map(
2253 named_params! {
2254 ":pending_expires_finished": pending_at_or_sooner,
2255 ":ffqn": ffqn.to_string(),
2256 ":batch_size": batch_size - execution_ids_versions.len(),
2257 },
2258 |row| {
2259 let execution_id =
2260 row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2261 let version = Version::new(row.get::<_, VersionType>("next_version")?);
2262 Ok(execution_id.map(|e| (e, version)))
2263 },
2264 )
2265 .map_err(convert_err)?
2266 .collect::<Result<Vec<_>, _>>()
2267 .map_err(convert_err)?
2268 .into_iter()
2269 .collect::<Result<Vec<_>, _>>()
2270 .map_err(parsing_err)?;
2271 execution_ids_versions.extend(execs_and_versions);
2272 if execution_ids_versions.len() == batch_size {
2273 break;
2275 }
2276 }
2277 Ok(execution_ids_versions)
2278 }
2279
2280 #[instrument(level = Level::TRACE, skip_all)]
2281 fn notify_pending(&self, pending_at: PendingAt, current_time: DateTime<Utc>) {
2282 Self::notify_pending_locked(
2283 pending_at,
2284 current_time,
2285 &self.0.ffqn_to_pending_subscription.lock().unwrap(),
2286 );
2287 }
2288
2289 #[instrument(level = Level::TRACE, skip_all)]
2290 fn notify_pending_all(
2291 &self,
2292 pending_ats: impl Iterator<Item = PendingAt>,
2293 current_time: DateTime<Utc>,
2294 ) {
2295 let ffqn_to_pending_subscription = self.0.ffqn_to_pending_subscription.lock().unwrap();
2296 for pending_at in pending_ats {
2297 Self::notify_pending_locked(pending_at, current_time, &ffqn_to_pending_subscription);
2298 }
2299 }
2300
2301 fn notify_pending_locked(
2302 pending_at: PendingAt,
2303 current_time: DateTime<Utc>,
2304 ffqn_to_pending_subscription: &std::sync::MutexGuard<
2305 hashbrown::HashMap<FunctionFqn, mpsc::Sender<()>>,
2306 >,
2307 ) {
2308 match pending_at {
2309 PendingAt { scheduled_at, ffqn } if scheduled_at <= current_time => {
2310 if let Some(subscription) = ffqn_to_pending_subscription.get(&ffqn) {
2311 debug!("Notifying pending subscriber");
2312 let _ = subscription.try_send(());
2313 }
2314 }
2315 _ => {}
2316 }
2317 }
2318
2319 #[instrument(level = Level::TRACE, skip_all)]
2320 fn append_batch_create_new_execution_inner(
2321 tx: &mut rusqlite::Transaction,
2322 batch: Vec<AppendRequest>,
2323 execution_id: &ExecutionId,
2324 mut version: Version,
2325 child_req: Vec<CreateRequest>,
2326 ) -> Result<(Version, Vec<PendingAt>), DbError> {
2327 let mut pending_at = None;
2328 for append_request in batch {
2329 (version, pending_at) = Self::append(tx, execution_id, &append_request, &version)?;
2330 }
2331 let mut pending_ats = Vec::new();
2332 if let Some(pending_at) = pending_at {
2333 pending_ats.push(pending_at);
2334 }
2335 for child_req in child_req {
2336 let (_, pending_at) = Self::create_inner(tx, child_req)?;
2337 pending_ats.push(pending_at);
2338 }
2339 Ok((version, pending_ats))
2340 }
2341}
2342
2343enum IndexAction {
2344 PendingStateChanged(PendingState),
2345 NoPendingStateChange(Option<DelayReq>),
2346}
2347
2348#[async_trait]
2349impl<S: Sleep> DbConnection for SqlitePool<S> {
2350 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2351 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbError> {
2352 debug!("create");
2353 trace!(?req, "create");
2354 let created_at = req.created_at;
2355 let (version, pending_at) = self
2356 .transaction_write(move |tx| Self::create_inner(tx, req), "create")
2357 .await?;
2358 self.notify_pending(pending_at, created_at);
2359 Ok(version)
2360 }
2361
2362 #[instrument(level = Level::TRACE, skip(self))]
2363 async fn lock_pending(
2364 &self,
2365 batch_size: usize,
2366 pending_at_or_sooner: DateTime<Utc>,
2367 ffqns: Arc<[FunctionFqn]>,
2368 created_at: DateTime<Utc>,
2369 component_id: ComponentId,
2370 executor_id: ExecutorId,
2371 lock_expires_at: DateTime<Utc>,
2372 run_id: RunId,
2373 ) -> Result<LockPendingResponse, DbError> {
2374 let execution_ids_versions = self
2375 .conn_low_prio(
2376 move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2377 "get_pending",
2378 )
2379 .await?;
2380 if execution_ids_versions.is_empty() {
2381 Ok(vec![])
2382 } else {
2383 debug!("Locking {execution_ids_versions:?}");
2384 self.transaction_write(
2385 move |tx| {
2386 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2387 for (execution_id, version) in execution_ids_versions {
2389 match Self::lock_inner(
2390 tx,
2391 created_at,
2392 component_id.clone(),
2393 &execution_id,
2394 run_id,
2395 &version,
2396 executor_id,
2397 lock_expires_at,
2398 ) {
2399 Ok(locked) => locked_execs.push(locked),
2400 Err(err) => {
2401 warn!("Locking row {execution_id} failed - {err:?}");
2402 }
2403 }
2404 }
2405 Ok(locked_execs)
2406 },
2407 "lock_pending",
2408 )
2409 .await
2410 }
2411 }
2412
2413 #[instrument(level = Level::DEBUG, skip(self))]
2415 async fn lock(
2416 &self,
2417 created_at: DateTime<Utc>,
2418 component_id: ComponentId,
2419 execution_id: &ExecutionId,
2420 run_id: RunId,
2421 version: Version,
2422 executor_id: ExecutorId,
2423 lock_expires_at: DateTime<Utc>,
2424 ) -> Result<LockResponse, DbError> {
2425 debug!("lock");
2426 let execution_id = execution_id.clone();
2427 self.transaction_write(
2428 move |tx| {
2429 let locked = Self::lock_inner(
2430 tx,
2431 created_at,
2432 component_id,
2433 &execution_id,
2434 run_id,
2435 &version,
2436 executor_id,
2437 lock_expires_at,
2438 )?;
2439 Ok((locked.event_history, locked.version))
2440 },
2441 "lock_inner",
2442 )
2443 .await
2444 }
2445
2446 #[instrument(level = Level::DEBUG, skip(self, req))]
2447 async fn append(
2448 &self,
2449 execution_id: ExecutionId,
2450 version: Version,
2451 req: AppendRequest,
2452 ) -> Result<AppendResponse, DbError> {
2453 debug!(%req, "append");
2454 trace!(?req, "append");
2455 let created_at = req.created_at;
2457 if let ExecutionEventInner::Created { .. } = req.event {
2458 panic!("Cannot append `Created` event - use `create` instead");
2459 }
2460 let (version, pending_at) = self
2461 .transaction_write(
2462 move |tx| Self::append(tx, &execution_id, &req, &version),
2463 "append",
2464 )
2465 .await?;
2466 if let Some(pending_at) = pending_at {
2467 self.notify_pending(pending_at, created_at);
2468 }
2469 Ok(version)
2470 }
2471
2472 #[instrument(level = Level::DEBUG, skip(self, batch))]
2473 async fn append_batch(
2474 &self,
2475 current_time: DateTime<Utc>,
2476 batch: Vec<AppendRequest>,
2477 execution_id: ExecutionId,
2478 version: Version,
2479 ) -> Result<AppendBatchResponse, DbError> {
2480 debug!("append_batch");
2481 trace!(?batch, "append_batch");
2482 assert!(!batch.is_empty(), "Empty batch request");
2483 if batch.iter().any(|append_request| {
2484 matches!(append_request.event, ExecutionEventInner::Created { .. })
2485 }) {
2486 panic!("Cannot append `Created` event - use `create` instead");
2487 }
2488 let (version, pending_at) = self
2489 .transaction_write(
2490 move |tx| {
2491 let mut version = version;
2492 let mut pending_at = None;
2493 for append_request in batch {
2494 (version, pending_at) =
2495 Self::append(tx, &execution_id, &append_request, &version)?;
2496 }
2497 Ok((version, pending_at))
2498 },
2499 "append_batch",
2500 )
2501 .await?;
2502 if let Some(pending_at) = pending_at {
2503 self.notify_pending(pending_at, current_time);
2504 }
2505 Ok(version)
2506 }
2507
2508 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2509 async fn append_batch_create_new_execution(
2510 &self,
2511 current_time: DateTime<Utc>,
2512 batch: Vec<AppendRequest>,
2513 execution_id: ExecutionId,
2514 version: Version,
2515 child_req: Vec<CreateRequest>,
2516 ) -> Result<AppendBatchResponse, DbError> {
2517 debug!("append_batch_create_new_execution");
2518 trace!(?batch, ?child_req, "append_batch_create_new_execution");
2519 assert!(!batch.is_empty(), "Empty batch request");
2520 if batch.iter().any(|append_request| {
2521 matches!(append_request.event, ExecutionEventInner::Created { .. })
2522 }) {
2523 panic!("Cannot append `Created` event - use `create` instead");
2524 }
2525
2526 let (version, pending_ats) = self
2527 .transaction_write(
2528 move |tx| {
2529 Self::append_batch_create_new_execution_inner(
2530 tx,
2531 batch,
2532 &execution_id,
2533 version,
2534 child_req,
2535 )
2536 },
2537 "append_batch_create_new_execution_inner",
2538 )
2539 .await?;
2540 self.notify_pending_all(pending_ats.into_iter(), current_time);
2541 Ok(version)
2542 }
2543
2544 #[instrument(level = Level::DEBUG, skip(self, batch, parent_response_event))]
2545 async fn append_batch_respond_to_parent(
2546 &self,
2547 execution_id: ExecutionIdDerived,
2548 current_time: DateTime<Utc>,
2549 batch: Vec<AppendRequest>,
2550 version: Version,
2551 parent_execution_id: ExecutionId,
2552 parent_response_event: JoinSetResponseEventOuter,
2553 ) -> Result<AppendBatchResponse, DbError> {
2554 debug!("append_batch_respond_to_parent");
2555 let execution_id = ExecutionId::Derived(execution_id);
2556 if execution_id == parent_execution_id {
2557 return Err(DbError::Specific(SpecificError::ValidationFailed(
2560 StrVariant::Static(
2561 "Parameters `execution_id` and `parent_execution_id` cannot be the same",
2562 ),
2563 )));
2564 }
2565 trace!(
2566 ?batch,
2567 ?parent_response_event,
2568 "append_batch_respond_to_parent"
2569 );
2570 assert!(!batch.is_empty(), "Empty batch request");
2571 if batch.iter().any(|append_request| {
2572 matches!(append_request.event, ExecutionEventInner::Created { .. })
2573 }) {
2574 panic!("Cannot append `Created` event - use `create` instead");
2575 }
2576 let response_subscribers = self.0.response_subscribers.clone();
2577 let (version, response_subscriber, pending_ats) = {
2578 let event = parent_response_event.clone();
2579 self.transaction_write(
2580 move |tx| {
2581 let mut version = version;
2582 let mut pending_at_child = None;
2583 for append_request in batch {
2584 (version, pending_at_child) =
2585 Self::append(tx, &execution_id, &append_request, &version)?;
2586 }
2587
2588 let (response_subscriber, pending_at_parent) = Self::append_response(
2589 tx,
2590 &parent_execution_id,
2591 &event,
2592 &response_subscribers,
2593 )?;
2594 Ok((
2595 version,
2596 response_subscriber,
2597 vec![pending_at_child, pending_at_parent],
2598 ))
2599 },
2600 "append_batch_respond_to_parent",
2601 )
2602 .await?
2603 };
2604 if let Some(response_subscriber) = response_subscriber {
2605 let notified = response_subscriber.send(parent_response_event);
2606 debug!("Notifying response subscriber: {notified:?}");
2607 }
2608 self.notify_pending_all(pending_ats.into_iter().flatten(), current_time);
2609 Ok(version)
2610 }
2611
2612 #[cfg(feature = "test")]
2613 #[instrument(level = Level::DEBUG, skip(self))]
2614 async fn get(
2615 &self,
2616 execution_id: &ExecutionId,
2617 ) -> Result<concepts::storage::ExecutionLog, DbError> {
2618 trace!("get");
2619 let execution_id = execution_id.clone();
2620 self.transaction_read(move |tx| Self::get(tx, &execution_id), "get")
2621 .await
2622 }
2623
2624 #[instrument(level = Level::DEBUG, skip(self))]
2625 async fn list_execution_events(
2626 &self,
2627 execution_id: &ExecutionId,
2628 since: &Version,
2629 max_length: VersionType,
2630 include_backtrace_id: bool,
2631 ) -> Result<Vec<ExecutionEvent>, DbError> {
2632 let execution_id = execution_id.clone();
2633 let since = since.0;
2634 self.transaction_read(
2635 move |tx| {
2636 Self::list_execution_events(
2637 tx,
2638 &execution_id,
2639 since,
2640 since + max_length,
2641 include_backtrace_id,
2642 )
2643 },
2644 "get",
2645 )
2646 .await
2647 }
2648
2649 #[instrument(level = Level::DEBUG, skip(self, interrupt_after))]
2650 async fn subscribe_to_next_responses(
2651 &self,
2652 execution_id: &ExecutionId,
2653 start_idx: usize,
2654 interrupt_after: Pin<Box<dyn Future<Output = ()> + Send>>,
2655 ) -> Result<Vec<JoinSetResponseEventOuter>, SubscribeError> {
2656 debug!("next_responses");
2657 let execution_id = execution_id.clone();
2658 let response_subscribers = self.0.response_subscribers.clone();
2659 let resp_or_receiver = self
2660 .transaction_write(
2661 move |tx| {
2662 let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2663 if responses.is_empty() {
2664 let (sender, receiver) = oneshot::channel();
2666 response_subscribers
2667 .lock()
2668 .unwrap()
2669 .insert(execution_id, sender);
2670 Ok(itertools::Either::Right(receiver))
2671 } else {
2672 Ok(itertools::Either::Left(responses))
2673 }
2674 },
2675 "subscribe_to_next_responses",
2676 )
2677 .await
2678 .map_err(SubscribeError::DbError)?;
2679 match resp_or_receiver {
2680 itertools::Either::Left(resp) => Ok(resp),
2681 itertools::Either::Right(receiver) => {
2682 tokio::select! {
2683 resp = receiver => resp.map(|resp| vec![resp]).map_err(|_| SubscribeError::DbError(DbError::Connection(DbConnectionError::RecvError))),
2684 () = interrupt_after => Err(SubscribeError::Interrupted),
2685 }
2686 }
2687 }
2688 }
2689
2690 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
2691 async fn append_response(
2692 &self,
2693 created_at: DateTime<Utc>,
2694 execution_id: ExecutionId,
2695 response_event: JoinSetResponseEvent,
2696 ) -> Result<(), DbError> {
2697 debug!("append_response");
2698 let response_subscribers = self.0.response_subscribers.clone();
2699 let event = JoinSetResponseEventOuter {
2700 created_at,
2701 event: response_event,
2702 };
2703 let (response_subscriber, pending_at) = {
2704 let event = event.clone();
2705 self.transaction_write(
2706 move |tx| Self::append_response(tx, &execution_id, &event, &response_subscribers),
2707 "append_response",
2708 )
2709 .await?
2710 };
2711 if let Some(response_subscriber) = response_subscriber {
2712 debug!("Notifying response subscriber");
2713 let _ = response_subscriber.send(event);
2714 }
2715 if let Some(pending_at) = pending_at {
2716 self.notify_pending(pending_at, created_at);
2717 }
2718 Ok(())
2719 }
2720
2721 #[instrument(level = Level::DEBUG, skip_all)]
2722 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbError> {
2723 debug!("append_backtrace");
2724 self.transaction_write(
2725 move |tx| Self::append_backtrace(tx, &append),
2726 "append_backtrace",
2727 )
2728 .await
2729 }
2730
2731 #[instrument(level = Level::DEBUG, skip_all)]
2732 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbError> {
2733 debug!("append_backtrace_batch");
2734 self.transaction_write(
2735 move |tx| {
2736 for append in batch {
2737 Self::append_backtrace(tx, &append)?;
2738 }
2739 Ok(())
2740 },
2741 "append_backtrace",
2742 )
2743 .await
2744 }
2745
2746 #[instrument(level = Level::DEBUG, skip_all)]
2747 async fn get_backtrace(
2748 &self,
2749 execution_id: &ExecutionId,
2750 filter: BacktraceFilter,
2751 ) -> Result<BacktraceInfo, DbError> {
2752 debug!("get_last_backtrace");
2753 let execution_id = execution_id.clone();
2754
2755 self.transaction_read(
2756 move |tx| {
2757 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
2758 WHERE execution_id = :execution_id";
2759 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
2760 let select = match filter {
2761 BacktraceFilter::Specific(version) =>{
2762 params.push((":version", Box::new(version.0)));
2763 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
2764 },
2765 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
2766 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
2767 };
2768 let mut stmt = tx
2769 .prepare(
2770 &select
2771 )
2772 .map_err(convert_err)?;
2773
2774 stmt.query_row::<_, &[(&'static str, &dyn ToSql)], _>(
2775 params
2776 .iter()
2777 .map(|(key, value)| (*key, value.as_ref()))
2778 .collect::<Vec<_>>()
2779 .as_ref(),
2780 |row| {
2781 Ok(BacktraceInfo {
2782 execution_id: execution_id.clone(),
2783 component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
2784 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
2785 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
2786 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
2787 })
2788 },
2789 )
2790 .map_err(convert_err)
2791 },
2792 "get_last_backtrace",
2793 ).await
2794 }
2795
2796 #[instrument(level = Level::TRACE, skip(self))]
2798 async fn get_expired_timers(&self, at: DateTime<Utc>) -> Result<Vec<ExpiredTimer>, DbError> {
2799 self.conn_low_prio(
2800 move |conn| {
2801 let mut expired_timers = conn.prepare(
2802 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
2803 ).map_err(convert_err)?.query_map(
2804 named_params! {
2805 ":at": at,
2806 },
2807 |row| {
2808 let execution_id = row.get("execution_id")?;
2809 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2810 let delay_id = row.get::<_, DelayId>("delay_id")?;
2811 Ok(ExpiredTimer::Delay { execution_id, join_set_id, delay_id })
2812 },
2813 ).map_err(convert_err)?
2814 .collect::<Result<Vec<_>, _>>().map_err(convert_err)?;
2815 expired_timers.extend(conn.prepare(&format!(
2817 "SELECT execution_id, next_version, temporary_event_count, max_retries, retry_exp_backoff_millis, parent_execution_id, parent_join_set_id, locked_at_version FROM t_state \
2818 WHERE pending_expires_finished <= :at AND state = \"{STATE_LOCKED}\"")
2819 ).map_err(convert_err)?
2820 .query_map(
2821 named_params! {
2822 ":at": at,
2823 },
2824 |row| {
2825 let execution_id = row.get("execution_id")?;
2826 let version = Version::new(row.get::<_, VersionType>("next_version")?);
2827 let locked_at_version = Version::new(row.get::<_, VersionType>("locked_at_version")?);
2828
2829 let temporary_event_count = row.get::<_, u32>("temporary_event_count")?;
2830 let max_retries = row.get::<_, u32>("max_retries")?;
2831 let retry_exp_backoff = Duration::from_millis(row.get::<_, u64>("retry_exp_backoff_millis")?);
2832 let parent_execution_id = row.get::<_, Option<ExecutionId>>("parent_execution_id")?;
2833 let parent_join_set_id = row.get::<_, Option<JoinSetId>>("parent_join_set_id")?;
2834
2835 Ok(ExpiredTimer::Lock { execution_id, locked_at_version, version, temporary_event_count, max_retries,
2836 retry_exp_backoff, parent: parent_execution_id.and_then(|pexe| parent_join_set_id.map(|pjs| (pexe, pjs)))})
2837 },
2838 ).map_err(convert_err)?
2839 .collect::<Result<Vec<_>, _>>().map_err(convert_err)?
2840 );
2841
2842 if !expired_timers.is_empty() {
2843 debug!("get_expired_timers found {expired_timers:?}");
2844 }
2845 Ok(expired_timers)
2846 }, "get_expired_timers"
2847 )
2848 .await
2849 }
2850
2851 #[instrument(level = Level::TRACE, skip(self))]
2852 async fn wait_for_pending(
2853 &self,
2854 pending_at_or_sooner: DateTime<Utc>,
2855 ffqns: Arc<[FunctionFqn]>,
2856 max_wait: Duration,
2857 ) {
2858 let sleep_fut = self.0.sleep.sleep(max_wait);
2859 let (sender, mut receiver) = mpsc::channel(1);
2860 {
2861 let mut ffqn_to_pending_subscription =
2862 self.0.ffqn_to_pending_subscription.lock().unwrap();
2863 for ffqn in ffqns.as_ref() {
2864 ffqn_to_pending_subscription.insert(ffqn.clone(), sender.clone());
2865 }
2866 }
2867 let execution_ids_versions = match self
2868 .conn_low_prio(
2869 {
2870 let ffqns = ffqns.clone();
2871 move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2872 },
2873 "subscribe_to_pending",
2874 )
2875 .await
2876 {
2877 Ok(ok) => ok,
2878 Err(err) => {
2879 trace!("Ignoring error and waiting in for timeout - {err:?}");
2880 sleep_fut.await;
2881 return;
2882 }
2883 };
2884 if !execution_ids_versions.is_empty() {
2885 trace!("Not waiting, database already contains new pending executions");
2886 return;
2887 }
2888 tokio::select! { _ = receiver.recv() => {
2890 trace!("Received a notification");
2891 }
2892 () = sleep_fut => {
2893 }
2894 }
2895 }
2896
2897 async fn wait_for_finished_result(
2898 &self,
2899 execution_id: &ExecutionId,
2900 timeout: Option<Duration>,
2901 ) -> Result<SupportedFunctionReturnValue, ClientError> {
2902 let execution_result = {
2903 let fut = async move {
2904 loop {
2905 let execution_id = execution_id.clone();
2906 if let Some(execution_result) = self
2907 .transaction_read(move |tx| {
2908 let pending_state =
2909 Self::get_combined_state(tx, &execution_id)?.pending_state;
2910 if let PendingState::Finished { finished } = pending_state {
2911 let event =
2912 Self::get_execution_event(tx, &execution_id, finished.version)?;
2913 if let ExecutionEventInner::Finished { result, ..} = event.event {
2914 Ok(Some(result))
2915 } else {
2916 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2917 Err(DbError::Specific(SpecificError::ConsistencyError(
2918 StrVariant::Static(
2919 "cannot get finished event based on t_state version",
2920 ),
2921 )))
2922 }
2923 } else {
2924 Ok(None)
2925 }
2926 }, "wait_for_finished_result")
2927 .await?
2928 {
2929 return Ok(execution_result);
2930 }
2931 tokio::time::sleep(Duration::from_millis(100)).await;
2933 }
2934 };
2935
2936 if let Some(timeout) = timeout {
2937 tokio::select! { res = fut => res,
2939 () = tokio::time::sleep(timeout) => Err(ClientError::Timeout)
2940 }
2941 } else {
2942 fut.await
2943 }
2944 }?;
2945 Ok(execution_result)
2946 }
2947
2948 async fn get_execution_event(
2949 &self,
2950 execution_id: &ExecutionId,
2951 version: &Version,
2952 ) -> Result<ExecutionEvent, DbError> {
2953 let version = version.0;
2954 let execution_id = execution_id.clone();
2955 self.transaction_read(
2956 move |tx| Self::get_execution_event(tx, &execution_id, version),
2957 "get_execution_event",
2958 )
2959 .await
2960 }
2961
2962 async fn get_pending_state(&self, execution_id: &ExecutionId) -> Result<PendingState, DbError> {
2963 let execution_id = execution_id.clone();
2964 Ok(self
2965 .transaction_read(
2966 move |tx| Self::get_combined_state(tx, &execution_id),
2967 "get_pending_state",
2968 )
2969 .await?
2970 .pending_state)
2971 }
2972
2973 async fn list_executions(
2974 &self,
2975 ffqn: Option<FunctionFqn>,
2976 top_level_only: bool,
2977 pagination: ExecutionListPagination,
2978 ) -> Result<Vec<ExecutionWithState>, DbError> {
2979 Ok(self
2980 .transaction_read(
2981 move |tx| Self::list_executions(tx, ffqn, top_level_only, pagination),
2982 "list_executions",
2983 )
2984 .await?)
2985 }
2986
2987 async fn list_responses(
2988 &self,
2989 execution_id: &ExecutionId,
2990 pagination: Pagination<u32>,
2991 ) -> Result<Vec<ResponseWithCursor>, DbError> {
2992 let execution_id = execution_id.clone();
2993 Ok(self
2994 .transaction_read(
2995 move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
2996 "list_executions",
2997 )
2998 .await?)
2999 }
3000}
3001
3002#[cfg(any(test, feature = "tempfile"))]
3003pub mod tempfile {
3004 use super::{SqliteConfig, SqlitePool};
3005 use concepts::time::TokioSleep;
3006 use tempfile::NamedTempFile;
3007
3008 pub async fn sqlite_pool() -> (SqlitePool<TokioSleep>, Option<NamedTempFile>) {
3009 if let Ok(path) = std::env::var("SQLITE_FILE") {
3010 (
3011 SqlitePool::new(path, SqliteConfig::default(), TokioSleep)
3012 .await
3013 .unwrap(),
3014 None,
3015 )
3016 } else {
3017 let file = NamedTempFile::new().unwrap();
3018 let path = file.path();
3019 (
3020 SqlitePool::new(path, SqliteConfig::default(), TokioSleep)
3021 .await
3022 .unwrap(),
3023 Some(file),
3024 )
3025 }
3026 }
3027}