1use crate::executor::Append;
2use crate::executor::ChildFinishedResponse;
3use chrono::{DateTime, Utc};
4use concepts::storage::AppendRequest;
5use concepts::storage::DbConnection;
6use concepts::storage::DbError;
7use concepts::storage::DbPool;
8use concepts::storage::ExecutionLog;
9use concepts::storage::JoinSetResponseEvent;
10use concepts::time::ClockFn;
11use concepts::{
12 storage::{ExecutionEventInner, ExpiredTimer, JoinSetResponse},
13 FinishedExecutionError,
14};
15use std::{
16 sync::{
17 atomic::{AtomicBool, Ordering},
18 Arc,
19 },
20 time::Duration,
21};
22use tokio::task::AbortHandle;
23use tracing::Level;
24use tracing::{debug, error, info, instrument, trace, warn};
25
26#[derive(Debug, Clone)]
27pub struct TimersWatcherConfig<C: ClockFn> {
28 pub tick_sleep: Duration,
29 pub clock_fn: C,
30 pub leeway: Duration, }
32
33#[derive(Debug)]
34pub struct TickProgress {
35 pub expired_locks: usize,
36 pub expired_async_timers: usize,
37}
38
39pub struct TaskHandle {
40 is_closing: Arc<AtomicBool>,
41 abort_handle: AbortHandle,
42}
43
44impl TaskHandle {
45 #[instrument(level = Level::DEBUG, skip_all, name = "expired_timers_watcher.close")]
46 pub async fn close(&self) {
47 trace!("Gracefully closing");
48 self.is_closing.store(true, Ordering::Relaxed);
49 while !self.abort_handle.is_finished() {
50 tokio::time::sleep(Duration::from_millis(1)).await;
51 }
52 debug!("Gracefully closed expired_timers_watcher");
53 }
54}
55
56impl Drop for TaskHandle {
57 fn drop(&mut self) {
58 if self.abort_handle.is_finished() {
59 return;
60 }
61 warn!("Aborting the expired_timers_watcher");
62 self.abort_handle.abort();
63 }
64}
65
66pub fn spawn_new<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static>(
67 db_pool: P,
68 config: TimersWatcherConfig<C>,
69) -> TaskHandle {
70 let is_closing = Arc::new(AtomicBool::default());
71 let tick_sleep = config.tick_sleep;
72 let abort_handle = tokio::spawn({
73 let is_closing = is_closing.clone();
74 async move {
75 debug!("Spawned expired_timers_watcher");
76 let mut old_err = None;
77 while !is_closing.load(Ordering::Relaxed) {
78 let executed_at = config.clock_fn.now() - config.leeway;
79 let res = tick(&db_pool.connection(), executed_at).await;
80 log_err_if_new(res, &mut old_err);
81 tokio::time::sleep(tick_sleep).await;
82 }
83 }
84 })
85 .abort_handle();
86 TaskHandle {
87 is_closing,
88 abort_handle,
89 }
90}
91
92fn log_err_if_new(res: Result<TickProgress, DbError>, old_err: &mut Option<DbError>) {
93 match (res, &old_err) {
94 (Ok(_), _) => {
95 *old_err = None;
96 }
97 (Err(err), Some(old)) if err == *old => {}
98 (Err(err), _) => {
99 error!("Tick failed: {err:?}");
100 *old_err = Some(err);
101 }
102 }
103}
104
105#[cfg(feature = "test")]
106pub async fn tick_test<DB: DbConnection + 'static>(
107 db_connection: &DB,
108 executed_at: DateTime<Utc>,
109) -> Result<TickProgress, DbError> {
110 tick(db_connection, executed_at).await
111}
112
113#[instrument(level = Level::TRACE, skip_all)]
114pub(crate) async fn tick<DB: DbConnection + 'static>(
115 db_connection: &DB,
116 executed_at: DateTime<Utc>,
117) -> Result<TickProgress, DbError> {
118 let mut expired_locks = 0;
119 let mut expired_async_timers = 0;
120 for expired_timer in db_connection.get_expired_timers(executed_at).await? {
121 match expired_timer {
122 ExpiredTimer::Lock {
123 execution_id,
124 version,
125 temporary_event_count,
126 max_retries,
127 retry_exp_backoff,
128 parent,
129 } => {
130 let append = if let Some(duration) = ExecutionLog::can_be_retried_after(
131 temporary_event_count + 1,
132 max_retries,
133 retry_exp_backoff,
134 ) {
135 let backoff_expires_at = executed_at + duration;
136 debug!(%execution_id, "Retrying execution with expired lock after {duration:?} at {backoff_expires_at}");
137 Append {
138 created_at: executed_at,
139 primary_event: AppendRequest {
140 created_at: executed_at,
141 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
142 },
143 execution_id: execution_id.clone(),
144 version,
145 child_finished: None,
146 }
147 } else {
148 info!(%execution_id, "Marking execution with expired lock as permanently timed out");
149 let finished_exec_result = Err(FinishedExecutionError::PermanentTimeout);
150 let child_finished = parent.map(|(parent_execution_id, parent_join_set)| {
151 ChildFinishedResponse {
152 parent_execution_id,
153 parent_join_set,
154 result: finished_exec_result.clone(),
155 }
156 });
157 Append {
158 created_at: executed_at,
159 primary_event: AppendRequest {
160 created_at: executed_at,
161 event: ExecutionEventInner::Finished {
162 result: finished_exec_result,
163 },
164 },
165 execution_id: execution_id.clone(),
166 version,
167 child_finished,
168 }
169 };
170 let res = append.append(db_connection).await;
171 if let Err(err) = res {
172 debug!(%execution_id, "Failed to update expired lock - {err:?}");
173 } else {
174 expired_locks += 1;
175 }
176 }
177 ExpiredTimer::AsyncDelay {
178 execution_id,
179 join_set_id,
180 delay_id,
181 } => {
182 let event = JoinSetResponse::DelayFinished { delay_id };
183 debug!(%execution_id, %join_set_id, %delay_id, "Appending DelayFinishedAsyncResponse");
184 let res = db_connection
185 .append_response(
186 executed_at,
187 execution_id.clone(),
188 JoinSetResponseEvent { join_set_id, event },
189 )
190 .await;
191 if let Err(err) = res {
192 debug!(%execution_id, %delay_id, "Failed to update expired async timer - {err:?}");
193 } else {
194 expired_async_timers += 1;
195 }
196 }
197 }
198 }
199 Ok(TickProgress {
200 expired_locks,
201 expired_async_timers,
202 })
203}