1use crate::executor::Append;
2use crate::executor::ChildFinishedResponse;
3use chrono::{DateTime, Utc};
4use concepts::StrVariant;
5use concepts::storage::AppendRequest;
6use concepts::storage::DbConnection;
7use concepts::storage::DbError;
8use concepts::storage::DbPool;
9use concepts::storage::ExecutionLog;
10use concepts::storage::JoinSetResponseEvent;
11use concepts::time::ClockFn;
12use concepts::{
13 FinishedExecutionError,
14 storage::{ExecutionEventInner, ExpiredTimer, JoinSetResponse},
15};
16use std::{
17 sync::{
18 Arc,
19 atomic::{AtomicBool, Ordering},
20 },
21 time::Duration,
22};
23use tokio::task::AbortHandle;
24use tracing::Level;
25use tracing::{debug, error, info, instrument, trace, warn};
26
27#[derive(Debug, Clone)]
28pub struct TimersWatcherConfig<C: ClockFn> {
29 pub tick_sleep: Duration,
30 pub clock_fn: C,
31 pub leeway: Duration, }
33
34#[derive(Debug)]
35pub struct TickProgress {
36 pub expired_locks: usize,
37 pub expired_async_timers: usize,
38}
39
40pub struct TaskHandle {
41 is_closing: Arc<AtomicBool>,
42 abort_handle: AbortHandle,
43}
44
45impl TaskHandle {
46 #[instrument(level = Level::DEBUG, skip_all, name = "expired_timers_watcher.close")]
47 pub async fn close(&self) {
48 trace!("Gracefully closing");
49 self.is_closing.store(true, Ordering::Relaxed);
50 while !self.abort_handle.is_finished() {
51 tokio::time::sleep(Duration::from_millis(1)).await;
52 }
53 debug!("Gracefully closed expired_timers_watcher");
54 }
55}
56
57impl Drop for TaskHandle {
58 fn drop(&mut self) {
59 if self.abort_handle.is_finished() {
60 return;
61 }
62 warn!("Aborting the expired_timers_watcher");
63 self.abort_handle.abort();
64 }
65}
66
67pub fn spawn_new<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static>(
68 db_pool: P,
69 config: TimersWatcherConfig<C>,
70) -> TaskHandle {
71 let is_closing = Arc::new(AtomicBool::default());
72 let tick_sleep = config.tick_sleep;
73 let abort_handle = tokio::spawn({
74 let is_closing = is_closing.clone();
75 async move {
76 debug!("Spawned expired_timers_watcher");
77 let mut old_err = None;
78 while !is_closing.load(Ordering::Relaxed) {
79 let executed_at = config.clock_fn.now() - config.leeway;
80 let res = tick(&db_pool.connection(), executed_at).await;
81 log_err_if_new(res, &mut old_err);
82 tokio::time::sleep(tick_sleep).await;
83 }
84 }
85 })
86 .abort_handle();
87 TaskHandle {
88 is_closing,
89 abort_handle,
90 }
91}
92
93fn log_err_if_new(res: Result<TickProgress, DbError>, old_err: &mut Option<DbError>) {
94 match (res, &old_err) {
95 (Ok(_), _) => {
96 *old_err = None;
97 }
98 (Err(err), Some(old)) if err == *old => {}
99 (Err(err), _) => {
100 error!("Tick failed: {err:?}");
101 *old_err = Some(err);
102 }
103 }
104}
105
106#[cfg(feature = "test")]
107pub async fn tick_test<DB: DbConnection + 'static>(
108 db_connection: &DB,
109 executed_at: DateTime<Utc>,
110) -> Result<TickProgress, DbError> {
111 tick(db_connection, executed_at).await
112}
113
114#[instrument(level = Level::TRACE, skip_all)]
115pub(crate) async fn tick<DB: DbConnection + 'static>(
116 db_connection: &DB,
117 executed_at: DateTime<Utc>,
118) -> Result<TickProgress, DbError> {
119 let mut expired_locks = 0;
120 let mut expired_async_timers = 0;
121 for expired_timer in db_connection.get_expired_timers(executed_at).await? {
122 match expired_timer {
123 ExpiredTimer::Lock {
124 execution_id,
125 locked_at_version,
126 version,
127 temporary_event_count,
128 max_retries,
129 retry_exp_backoff,
130 parent,
131 } => {
132 let append = if max_retries == u32::MAX && locked_at_version.0 + 1 < version.0 {
133 debug!(%execution_id, "Unlocking workflow execution");
135 Append {
136 created_at: executed_at,
137 primary_event: AppendRequest {
138 created_at: executed_at,
139 event: ExecutionEventInner::Unlocked {
140 backoff_expires_at: executed_at,
141 reason: StrVariant::Static("made progress"),
142 },
143 },
144 execution_id: execution_id.clone(),
145 version,
146 child_finished: None,
147 }
148 } else if let Some(duration) = ExecutionLog::can_be_retried_after(
149 temporary_event_count + 1,
150 max_retries,
151 retry_exp_backoff,
152 ) {
153 let backoff_expires_at = executed_at + duration;
154 debug!(%execution_id, "Retrying execution with expired lock after {duration:?} at {backoff_expires_at}");
155 Append {
156 created_at: executed_at,
157 primary_event: AppendRequest {
158 created_at: executed_at,
159 event: ExecutionEventInner::TemporarilyTimedOut {
160 backoff_expires_at,
161 http_client_traces: None,
162 },
163 },
164 execution_id: execution_id.clone(),
165 version,
166 child_finished: None,
167 }
168 } else {
169 info!(%execution_id, "Marking execution with expired lock as permanently timed out");
170 let finished_exec_result = Err(FinishedExecutionError::PermanentTimeout);
171 let child_finished = parent.map(|(parent_execution_id, parent_join_set)| {
172 ChildFinishedResponse {
173 parent_execution_id,
174 parent_join_set,
175 result: finished_exec_result.clone(),
176 }
177 });
178 Append {
179 created_at: executed_at,
180 primary_event: AppendRequest {
181 created_at: executed_at,
182 event: ExecutionEventInner::Finished {
183 result: finished_exec_result,
184 http_client_traces: None,
185 },
186 },
187 execution_id: execution_id.clone(),
188 version,
189 child_finished,
190 }
191 };
192 let res = append.append(db_connection).await;
193 if let Err(err) = res {
194 debug!(%execution_id, "Failed to update expired lock - {err:?}");
195 } else {
196 expired_locks += 1;
197 }
198 }
199 ExpiredTimer::Delay {
200 execution_id,
201 join_set_id,
202 delay_id,
203 } => {
204 let event = JoinSetResponse::DelayFinished { delay_id };
205 debug!(%execution_id, %join_set_id, %delay_id, "Appending DelayFinishedAsyncResponse");
206 let res = db_connection
207 .append_response(
208 executed_at,
209 execution_id.clone(),
210 JoinSetResponseEvent { join_set_id, event },
211 )
212 .await;
213 if let Err(err) = res {
214 debug!(%execution_id, %delay_id, "Failed to update expired async timer - {err:?}");
215 } else {
216 expired_async_timers += 1;
217 }
218 }
219 }
220 }
221 Ok(TickProgress {
222 expired_locks,
223 expired_async_timers,
224 })
225}