obeli_sk_executor/
expired_timers_watcher.rs

1use crate::executor::Append;
2use crate::executor::ChildFinishedResponse;
3use chrono::{DateTime, Utc};
4use concepts::storage::AppendRequest;
5use concepts::storage::DbConnection;
6use concepts::storage::DbError;
7use concepts::storage::DbPool;
8use concepts::storage::ExecutionLog;
9use concepts::storage::JoinSetResponseEvent;
10use concepts::time::ClockFn;
11use concepts::{
12    storage::{ExecutionEventInner, ExpiredTimer, JoinSetResponse},
13    FinishedExecutionError,
14};
15use std::{
16    sync::{
17        atomic::{AtomicBool, Ordering},
18        Arc,
19    },
20    time::Duration,
21};
22use tokio::task::AbortHandle;
23use tracing::Level;
24use tracing::{debug, error, info, instrument, trace, warn};
25
26#[derive(Debug, Clone)]
27pub struct TimersWatcherConfig<C: ClockFn> {
28    pub tick_sleep: Duration,
29    pub clock_fn: C,
30    pub leeway: Duration, // A short duration that will be subtracted from now() so that a hot workflow can win.
31}
32
33#[derive(Debug)]
34pub struct TickProgress {
35    pub expired_locks: usize,
36    pub expired_async_timers: usize,
37}
38
39pub struct TaskHandle {
40    is_closing: Arc<AtomicBool>,
41    abort_handle: AbortHandle,
42}
43
44impl TaskHandle {
45    #[instrument(level = Level::DEBUG, skip_all, name = "expired_timers_watcher.close")]
46    pub async fn close(&self) {
47        trace!("Gracefully closing");
48        self.is_closing.store(true, Ordering::Relaxed);
49        while !self.abort_handle.is_finished() {
50            tokio::time::sleep(Duration::from_millis(1)).await;
51        }
52        debug!("Gracefully closed expired_timers_watcher");
53    }
54}
55
56impl Drop for TaskHandle {
57    fn drop(&mut self) {
58        if self.abort_handle.is_finished() {
59            return;
60        }
61        warn!("Aborting the expired_timers_watcher");
62        self.abort_handle.abort();
63    }
64}
65
66pub fn spawn_new<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static>(
67    db_pool: P,
68    config: TimersWatcherConfig<C>,
69) -> TaskHandle {
70    let is_closing = Arc::new(AtomicBool::default());
71    let tick_sleep = config.tick_sleep;
72    let abort_handle = tokio::spawn({
73        let is_closing = is_closing.clone();
74        async move {
75            debug!("Spawned expired_timers_watcher");
76            let mut old_err = None;
77            while !is_closing.load(Ordering::Relaxed) {
78                let executed_at = config.clock_fn.now() - config.leeway;
79                let res = tick(&db_pool.connection(), executed_at).await;
80                log_err_if_new(res, &mut old_err);
81                tokio::time::sleep(tick_sleep).await;
82            }
83        }
84    })
85    .abort_handle();
86    TaskHandle {
87        is_closing,
88        abort_handle,
89    }
90}
91
92fn log_err_if_new(res: Result<TickProgress, DbError>, old_err: &mut Option<DbError>) {
93    match (res, &old_err) {
94        (Ok(_), _) => {
95            *old_err = None;
96        }
97        (Err(err), Some(old)) if err == *old => {}
98        (Err(err), _) => {
99            error!("Tick failed: {err:?}");
100            *old_err = Some(err);
101        }
102    }
103}
104
105#[cfg(feature = "test")]
106pub async fn tick_test<DB: DbConnection + 'static>(
107    db_connection: &DB,
108    executed_at: DateTime<Utc>,
109) -> Result<TickProgress, DbError> {
110    tick(db_connection, executed_at).await
111}
112
113#[instrument(level = Level::TRACE, skip_all)]
114pub(crate) async fn tick<DB: DbConnection + 'static>(
115    db_connection: &DB,
116    executed_at: DateTime<Utc>,
117) -> Result<TickProgress, DbError> {
118    let mut expired_locks = 0;
119    let mut expired_async_timers = 0;
120    for expired_timer in db_connection.get_expired_timers(executed_at).await? {
121        match expired_timer {
122            ExpiredTimer::Lock {
123                execution_id,
124                version,
125                temporary_event_count,
126                max_retries,
127                retry_exp_backoff,
128                parent,
129            } => {
130                let append = if let Some(duration) = ExecutionLog::can_be_retried_after(
131                    temporary_event_count + 1,
132                    max_retries,
133                    retry_exp_backoff,
134                ) {
135                    let backoff_expires_at = executed_at + duration;
136                    debug!(%execution_id, "Retrying execution with expired lock after {duration:?} at {backoff_expires_at}");
137                    Append {
138                        created_at: executed_at,
139                        primary_event: AppendRequest {
140                            created_at: executed_at,
141                            event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
142                        },
143                        execution_id: execution_id.clone(),
144                        version,
145                        child_finished: None,
146                    }
147                } else {
148                    info!(%execution_id, "Marking execution with expired lock as permanently timed out");
149                    let finished_exec_result = Err(FinishedExecutionError::PermanentTimeout);
150                    let child_finished = parent.map(|(parent_execution_id, parent_join_set)| {
151                        ChildFinishedResponse {
152                            parent_execution_id,
153                            parent_join_set,
154                            result: finished_exec_result.clone(),
155                        }
156                    });
157                    Append {
158                        created_at: executed_at,
159                        primary_event: AppendRequest {
160                            created_at: executed_at,
161                            event: ExecutionEventInner::Finished {
162                                result: finished_exec_result,
163                            },
164                        },
165                        execution_id: execution_id.clone(),
166                        version,
167                        child_finished,
168                    }
169                };
170                let res = append.append(db_connection).await;
171                if let Err(err) = res {
172                    debug!(%execution_id, "Failed to update expired lock - {err:?}");
173                } else {
174                    expired_locks += 1;
175                }
176            }
177            ExpiredTimer::AsyncDelay {
178                execution_id,
179                join_set_id,
180                delay_id,
181            } => {
182                let event = JoinSetResponse::DelayFinished { delay_id };
183                debug!(%execution_id, %join_set_id, %delay_id, "Appending DelayFinishedAsyncResponse");
184                let res = db_connection
185                    .append_response(
186                        executed_at,
187                        execution_id.clone(),
188                        JoinSetResponseEvent { join_set_id, event },
189                    )
190                    .await;
191                if let Err(err) = res {
192                    debug!(%execution_id, %delay_id, "Failed to update expired async timer - {err:?}");
193                } else {
194                    expired_async_timers += 1;
195                }
196            }
197        }
198    }
199    Ok(TickProgress {
200        expired_locks,
201        expired_async_timers,
202    })
203}