obeli_sk_executor/
expired_timers_watcher.rs

1use crate::executor::Append;
2use crate::executor::ChildFinishedResponse;
3use chrono::{DateTime, Utc};
4use concepts::StrVariant;
5use concepts::storage::AppendRequest;
6use concepts::storage::DbConnection;
7use concepts::storage::DbError;
8use concepts::storage::DbPool;
9use concepts::storage::ExecutionLog;
10use concepts::storage::JoinSetResponseEvent;
11use concepts::time::ClockFn;
12use concepts::{
13    FinishedExecutionError,
14    storage::{ExecutionEventInner, ExpiredTimer, JoinSetResponse},
15};
16use std::{
17    sync::{
18        Arc,
19        atomic::{AtomicBool, Ordering},
20    },
21    time::Duration,
22};
23use tokio::task::AbortHandle;
24use tracing::Level;
25use tracing::{debug, error, info, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct TimersWatcherConfig<C: ClockFn> {
29    pub tick_sleep: Duration,
30    pub clock_fn: C,
31    pub leeway: Duration, // A short duration that will be subtracted from now() so that a hot workflow can win.
32}
33
34#[derive(Debug)]
35pub struct TickProgress {
36    pub expired_locks: usize,
37    pub expired_async_timers: usize,
38}
39
40pub struct TaskHandle {
41    is_closing: Arc<AtomicBool>,
42    abort_handle: AbortHandle,
43}
44
45impl TaskHandle {
46    #[instrument(level = Level::DEBUG, skip_all, name = "expired_timers_watcher.close")]
47    pub async fn close(&self) {
48        trace!("Gracefully closing");
49        self.is_closing.store(true, Ordering::Relaxed);
50        while !self.abort_handle.is_finished() {
51            tokio::time::sleep(Duration::from_millis(1)).await;
52        }
53        debug!("Gracefully closed expired_timers_watcher");
54    }
55}
56
57impl Drop for TaskHandle {
58    fn drop(&mut self) {
59        if self.abort_handle.is_finished() {
60            return;
61        }
62        warn!("Aborting the expired_timers_watcher");
63        self.abort_handle.abort();
64    }
65}
66
67pub fn spawn_new<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static>(
68    db_pool: P,
69    config: TimersWatcherConfig<C>,
70) -> TaskHandle {
71    let is_closing = Arc::new(AtomicBool::default());
72    let tick_sleep = config.tick_sleep;
73    let abort_handle = tokio::spawn({
74        let is_closing = is_closing.clone();
75        async move {
76            debug!("Spawned expired_timers_watcher");
77            let mut old_err = None;
78            while !is_closing.load(Ordering::Relaxed) {
79                let executed_at = config.clock_fn.now() - config.leeway;
80                let res = tick(&db_pool.connection(), executed_at).await;
81                log_err_if_new(res, &mut old_err);
82                tokio::time::sleep(tick_sleep).await;
83            }
84        }
85    })
86    .abort_handle();
87    TaskHandle {
88        is_closing,
89        abort_handle,
90    }
91}
92
93fn log_err_if_new(res: Result<TickProgress, DbError>, old_err: &mut Option<DbError>) {
94    match (res, &old_err) {
95        (Ok(_), _) => {
96            *old_err = None;
97        }
98        (Err(err), Some(old)) if err == *old => {}
99        (Err(err), _) => {
100            error!("Tick failed: {err:?}");
101            *old_err = Some(err);
102        }
103    }
104}
105
106#[cfg(feature = "test")]
107pub async fn tick_test<DB: DbConnection + 'static>(
108    db_connection: &DB,
109    executed_at: DateTime<Utc>,
110) -> Result<TickProgress, DbError> {
111    tick(db_connection, executed_at).await
112}
113
114#[instrument(level = Level::TRACE, skip_all)]
115pub(crate) async fn tick<DB: DbConnection + 'static>(
116    db_connection: &DB,
117    executed_at: DateTime<Utc>,
118) -> Result<TickProgress, DbError> {
119    let mut expired_locks = 0;
120    let mut expired_async_timers = 0;
121    for expired_timer in db_connection.get_expired_timers(executed_at).await? {
122        match expired_timer {
123            ExpiredTimer::Lock {
124                execution_id,
125                locked_at_version,
126                version,
127                temporary_event_count,
128                max_retries,
129                retry_exp_backoff,
130                parent,
131            } => {
132                let append = if max_retries == u32::MAX && locked_at_version.0 + 1 < version.0 {
133                    // Workflow that made progress is unlocked and immediately available for locking.
134                    debug!(%execution_id, "Unlocking workflow execution");
135                    Append {
136                        created_at: executed_at,
137                        primary_event: AppendRequest {
138                            created_at: executed_at,
139                            event: ExecutionEventInner::Unlocked {
140                                backoff_expires_at: executed_at,
141                                reason: StrVariant::Static("made progress"),
142                            },
143                        },
144                        execution_id: execution_id.clone(),
145                        version,
146                        child_finished: None,
147                    }
148                } else if let Some(duration) = ExecutionLog::can_be_retried_after(
149                    temporary_event_count + 1,
150                    max_retries,
151                    retry_exp_backoff,
152                ) {
153                    let backoff_expires_at = executed_at + duration;
154                    debug!(%execution_id, "Retrying execution with expired lock after {duration:?} at {backoff_expires_at}");
155                    Append {
156                        created_at: executed_at,
157                        primary_event: AppendRequest {
158                            created_at: executed_at,
159                            event: ExecutionEventInner::TemporarilyTimedOut {
160                                backoff_expires_at,
161                                http_client_traces: None,
162                            },
163                        },
164                        execution_id: execution_id.clone(),
165                        version,
166                        child_finished: None,
167                    }
168                } else {
169                    info!(%execution_id, "Marking execution with expired lock as permanently timed out");
170                    let finished_exec_result = Err(FinishedExecutionError::PermanentTimeout);
171                    let child_finished = parent.map(|(parent_execution_id, parent_join_set)| {
172                        ChildFinishedResponse {
173                            parent_execution_id,
174                            parent_join_set,
175                            result: finished_exec_result.clone(),
176                        }
177                    });
178                    Append {
179                        created_at: executed_at,
180                        primary_event: AppendRequest {
181                            created_at: executed_at,
182                            event: ExecutionEventInner::Finished {
183                                result: finished_exec_result,
184                                http_client_traces: None,
185                            },
186                        },
187                        execution_id: execution_id.clone(),
188                        version,
189                        child_finished,
190                    }
191                };
192                let res = append.append(db_connection).await;
193                if let Err(err) = res {
194                    debug!(%execution_id, "Failed to update expired lock - {err:?}");
195                } else {
196                    expired_locks += 1;
197                }
198            }
199            ExpiredTimer::Delay {
200                execution_id,
201                join_set_id,
202                delay_id,
203            } => {
204                let event = JoinSetResponse::DelayFinished { delay_id };
205                debug!(%execution_id, %join_set_id, %delay_id, "Appending DelayFinishedAsyncResponse");
206                let res = db_connection
207                    .append_response(
208                        executed_at,
209                        execution_id.clone(),
210                        JoinSetResponseEvent { join_set_id, event },
211                    )
212                    .await;
213                if let Err(err) = res {
214                    debug!(%execution_id, %delay_id, "Failed to update expired async timer - {err:?}");
215                } else {
216                    expired_async_timers += 1;
217                }
218            }
219        }
220    }
221    Ok(TickProgress {
222        expired_locks,
223        expired_async_timers,
224    })
225}