obeli_sk_executor/
expired_timers_watcher.rs

1use crate::executor::Append;
2use crate::executor::ChildFinishedResponse;
3use chrono::{DateTime, Utc};
4use concepts::storage::AppendRequest;
5use concepts::storage::DbConnection;
6use concepts::storage::DbError;
7use concepts::storage::DbPool;
8use concepts::storage::ExecutionLog;
9use concepts::storage::JoinSetResponseEvent;
10use concepts::time::ClockFn;
11use concepts::{
12    FinishedExecutionError,
13    storage::{ExecutionEventInner, ExpiredTimer, JoinSetResponse},
14};
15use std::{
16    sync::{
17        Arc,
18        atomic::{AtomicBool, Ordering},
19    },
20    time::Duration,
21};
22use tokio::task::AbortHandle;
23use tracing::Level;
24use tracing::{debug, error, info, instrument, trace, warn};
25
26#[derive(Debug, Clone)]
27pub struct TimersWatcherConfig<C: ClockFn> {
28    pub tick_sleep: Duration,
29    pub clock_fn: C,
30    pub leeway: Duration, // A short duration that will be subtracted from now() so that a hot workflow can win.
31}
32
33#[derive(Debug)]
34pub struct TickProgress {
35    pub expired_locks: usize,
36    pub expired_async_timers: usize,
37}
38
39pub struct TaskHandle {
40    is_closing: Arc<AtomicBool>,
41    abort_handle: AbortHandle,
42}
43
44impl TaskHandle {
45    #[instrument(level = Level::DEBUG, skip_all, name = "expired_timers_watcher.close")]
46    pub async fn close(&self) {
47        trace!("Gracefully closing");
48        self.is_closing.store(true, Ordering::Relaxed);
49        while !self.abort_handle.is_finished() {
50            tokio::time::sleep(Duration::from_millis(1)).await;
51        }
52        debug!("Gracefully closed expired_timers_watcher");
53    }
54}
55
56impl Drop for TaskHandle {
57    fn drop(&mut self) {
58        if self.abort_handle.is_finished() {
59            return;
60        }
61        warn!("Aborting the expired_timers_watcher");
62        self.abort_handle.abort();
63    }
64}
65
66pub fn spawn_new<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static>(
67    db_pool: P,
68    config: TimersWatcherConfig<C>,
69) -> TaskHandle {
70    let is_closing = Arc::new(AtomicBool::default());
71    let tick_sleep = config.tick_sleep;
72    let abort_handle = tokio::spawn({
73        let is_closing = is_closing.clone();
74        async move {
75            debug!("Spawned expired_timers_watcher");
76            let mut old_err = None;
77            while !is_closing.load(Ordering::Relaxed) {
78                let executed_at = config.clock_fn.now() - config.leeway;
79                let res = tick(&db_pool.connection(), executed_at).await;
80                log_err_if_new(res, &mut old_err);
81                tokio::time::sleep(tick_sleep).await;
82            }
83        }
84    })
85    .abort_handle();
86    TaskHandle {
87        is_closing,
88        abort_handle,
89    }
90}
91
92fn log_err_if_new(res: Result<TickProgress, DbError>, old_err: &mut Option<DbError>) {
93    match (res, &old_err) {
94        (Ok(_), _) => {
95            *old_err = None;
96        }
97        (Err(err), Some(old)) if err == *old => {}
98        (Err(err), _) => {
99            error!("Tick failed: {err:?}");
100            *old_err = Some(err);
101        }
102    }
103}
104
105#[cfg(feature = "test")]
106pub async fn tick_test<DB: DbConnection + 'static>(
107    db_connection: &DB,
108    executed_at: DateTime<Utc>,
109) -> Result<TickProgress, DbError> {
110    tick(db_connection, executed_at).await
111}
112
113#[instrument(level = Level::TRACE, skip_all)]
114pub(crate) async fn tick<DB: DbConnection + 'static>(
115    db_connection: &DB,
116    executed_at: DateTime<Utc>,
117) -> Result<TickProgress, DbError> {
118    let mut expired_locks = 0;
119    let mut expired_async_timers = 0;
120    for expired_timer in db_connection.get_expired_timers(executed_at).await? {
121        match expired_timer {
122            ExpiredTimer::Lock {
123                execution_id,
124                version,
125                temporary_event_count,
126                max_retries,
127                retry_exp_backoff,
128                parent,
129            } => {
130                let append = if let Some(duration) = ExecutionLog::can_be_retried_after(
131                    temporary_event_count + 1,
132                    max_retries,
133                    retry_exp_backoff,
134                ) {
135                    let backoff_expires_at = executed_at + duration;
136                    debug!(%execution_id, "Retrying execution with expired lock after {duration:?} at {backoff_expires_at}");
137                    Append {
138                        created_at: executed_at,
139                        primary_event: AppendRequest {
140                            created_at: executed_at,
141                            event: ExecutionEventInner::TemporarilyTimedOut {
142                                backoff_expires_at,
143                                http_client_traces: None,
144                            },
145                        },
146                        execution_id: execution_id.clone(),
147                        version,
148                        child_finished: None,
149                    }
150                } else {
151                    info!(%execution_id, "Marking execution with expired lock as permanently timed out");
152                    let finished_exec_result = Err(FinishedExecutionError::PermanentTimeout);
153                    let child_finished = parent.map(|(parent_execution_id, parent_join_set)| {
154                        ChildFinishedResponse {
155                            parent_execution_id,
156                            parent_join_set,
157                            result: finished_exec_result.clone(),
158                        }
159                    });
160                    Append {
161                        created_at: executed_at,
162                        primary_event: AppendRequest {
163                            created_at: executed_at,
164                            event: ExecutionEventInner::Finished {
165                                result: finished_exec_result,
166                                http_client_traces: None,
167                            },
168                        },
169                        execution_id: execution_id.clone(),
170                        version,
171                        child_finished,
172                    }
173                };
174                let res = append.append(db_connection).await;
175                if let Err(err) = res {
176                    debug!(%execution_id, "Failed to update expired lock - {err:?}");
177                } else {
178                    expired_locks += 1;
179                }
180            }
181            ExpiredTimer::AsyncDelay {
182                execution_id,
183                join_set_id,
184                delay_id,
185            } => {
186                let event = JoinSetResponse::DelayFinished { delay_id };
187                debug!(%execution_id, %join_set_id, %delay_id, "Appending DelayFinishedAsyncResponse");
188                let res = db_connection
189                    .append_response(
190                        executed_at,
191                        execution_id.clone(),
192                        JoinSetResponseEvent { join_set_id, event },
193                    )
194                    .await;
195                if let Err(err) = res {
196                    debug!(%execution_id, %delay_id, "Failed to update expired async timer - {err:?}");
197                } else {
198                    expired_async_timers += 1;
199                }
200            }
201        }
202    }
203    Ok(TickProgress {
204        expired_locks,
205        expired_async_timers,
206    })
207}