1use crate::executor::Append;
2use crate::executor::ChildFinishedResponse;
3use chrono::{DateTime, Utc};
4use concepts::storage::AppendRequest;
5use concepts::storage::DbConnection;
6use concepts::storage::DbError;
7use concepts::storage::DbPool;
8use concepts::storage::ExecutionLog;
9use concepts::storage::JoinSetResponseEvent;
10use concepts::time::ClockFn;
11use concepts::{
12 FinishedExecutionError,
13 storage::{ExecutionEventInner, ExpiredTimer, JoinSetResponse},
14};
15use std::{
16 sync::{
17 Arc,
18 atomic::{AtomicBool, Ordering},
19 },
20 time::Duration,
21};
22use tokio::task::AbortHandle;
23use tracing::Level;
24use tracing::{debug, error, info, instrument, trace, warn};
25
26#[derive(Debug, Clone)]
27pub struct TimersWatcherConfig<C: ClockFn> {
28 pub tick_sleep: Duration,
29 pub clock_fn: C,
30 pub leeway: Duration, }
32
33#[derive(Debug)]
34pub struct TickProgress {
35 pub expired_locks: usize,
36 pub expired_async_timers: usize,
37}
38
39pub struct TaskHandle {
40 is_closing: Arc<AtomicBool>,
41 abort_handle: AbortHandle,
42}
43
44impl TaskHandle {
45 #[instrument(level = Level::DEBUG, skip_all, name = "expired_timers_watcher.close")]
46 pub async fn close(&self) {
47 trace!("Gracefully closing");
48 self.is_closing.store(true, Ordering::Relaxed);
49 while !self.abort_handle.is_finished() {
50 tokio::time::sleep(Duration::from_millis(1)).await;
51 }
52 debug!("Gracefully closed expired_timers_watcher");
53 }
54}
55
56impl Drop for TaskHandle {
57 fn drop(&mut self) {
58 if self.abort_handle.is_finished() {
59 return;
60 }
61 warn!("Aborting the expired_timers_watcher");
62 self.abort_handle.abort();
63 }
64}
65
66pub fn spawn_new<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static>(
67 db_pool: P,
68 config: TimersWatcherConfig<C>,
69) -> TaskHandle {
70 let is_closing = Arc::new(AtomicBool::default());
71 let tick_sleep = config.tick_sleep;
72 let abort_handle = tokio::spawn({
73 let is_closing = is_closing.clone();
74 async move {
75 debug!("Spawned expired_timers_watcher");
76 let mut old_err = None;
77 while !is_closing.load(Ordering::Relaxed) {
78 let executed_at = config.clock_fn.now() - config.leeway;
79 let res = tick(&db_pool.connection(), executed_at).await;
80 log_err_if_new(res, &mut old_err);
81 tokio::time::sleep(tick_sleep).await;
82 }
83 }
84 })
85 .abort_handle();
86 TaskHandle {
87 is_closing,
88 abort_handle,
89 }
90}
91
92fn log_err_if_new(res: Result<TickProgress, DbError>, old_err: &mut Option<DbError>) {
93 match (res, &old_err) {
94 (Ok(_), _) => {
95 *old_err = None;
96 }
97 (Err(err), Some(old)) if err == *old => {}
98 (Err(err), _) => {
99 error!("Tick failed: {err:?}");
100 *old_err = Some(err);
101 }
102 }
103}
104
105#[cfg(feature = "test")]
106pub async fn tick_test<DB: DbConnection + 'static>(
107 db_connection: &DB,
108 executed_at: DateTime<Utc>,
109) -> Result<TickProgress, DbError> {
110 tick(db_connection, executed_at).await
111}
112
113#[instrument(level = Level::TRACE, skip_all)]
114pub(crate) async fn tick<DB: DbConnection + 'static>(
115 db_connection: &DB,
116 executed_at: DateTime<Utc>,
117) -> Result<TickProgress, DbError> {
118 let mut expired_locks = 0;
119 let mut expired_async_timers = 0;
120 for expired_timer in db_connection.get_expired_timers(executed_at).await? {
121 match expired_timer {
122 ExpiredTimer::Lock {
123 execution_id,
124 version,
125 temporary_event_count,
126 max_retries,
127 retry_exp_backoff,
128 parent,
129 } => {
130 let append = if let Some(duration) = ExecutionLog::can_be_retried_after(
131 temporary_event_count + 1,
132 max_retries,
133 retry_exp_backoff,
134 ) {
135 let backoff_expires_at = executed_at + duration;
136 debug!(%execution_id, "Retrying execution with expired lock after {duration:?} at {backoff_expires_at}");
137 Append {
138 created_at: executed_at,
139 primary_event: AppendRequest {
140 created_at: executed_at,
141 event: ExecutionEventInner::TemporarilyTimedOut {
142 backoff_expires_at,
143 http_client_traces: None,
144 },
145 },
146 execution_id: execution_id.clone(),
147 version,
148 child_finished: None,
149 }
150 } else {
151 info!(%execution_id, "Marking execution with expired lock as permanently timed out");
152 let finished_exec_result = Err(FinishedExecutionError::PermanentTimeout);
153 let child_finished = parent.map(|(parent_execution_id, parent_join_set)| {
154 ChildFinishedResponse {
155 parent_execution_id,
156 parent_join_set,
157 result: finished_exec_result.clone(),
158 }
159 });
160 Append {
161 created_at: executed_at,
162 primary_event: AppendRequest {
163 created_at: executed_at,
164 event: ExecutionEventInner::Finished {
165 result: finished_exec_result,
166 http_client_traces: None,
167 },
168 },
169 execution_id: execution_id.clone(),
170 version,
171 child_finished,
172 }
173 };
174 let res = append.append(db_connection).await;
175 if let Err(err) = res {
176 debug!(%execution_id, "Failed to update expired lock - {err:?}");
177 } else {
178 expired_locks += 1;
179 }
180 }
181 ExpiredTimer::AsyncDelay {
182 execution_id,
183 join_set_id,
184 delay_id,
185 } => {
186 let event = JoinSetResponse::DelayFinished { delay_id };
187 debug!(%execution_id, %join_set_id, %delay_id, "Appending DelayFinishedAsyncResponse");
188 let res = db_connection
189 .append_response(
190 executed_at,
191 execution_id.clone(),
192 JoinSetResponseEvent { join_set_id, event },
193 )
194 .await;
195 if let Err(err) = res {
196 debug!(%execution_id, %delay_id, "Failed to update expired async timer - {err:?}");
197 } else {
198 expired_async_timers += 1;
199 }
200 }
201 }
202 }
203 Ok(TickProgress {
204 expired_locks,
205 expired_async_timers,
206 })
207}