1use crate::error::ExecutionGuardError;
2use crate::observer::{NoopObserver, SchedulerEvent, SchedulerObserver};
3use crate::{ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionSlot};
4use std::future::Future;
5use std::sync::Arc;
6use tokio::time::{Instant, interval_at};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum GuardedRunResult<T> {
10 Completed(T),
11 Contended,
12}
13
14#[derive(Clone)]
15pub struct GuardedSession<G> {
16 guard: Arc<G>,
17 observer: Arc<dyn SchedulerObserver>,
18 lease: crate::ExecutionLease,
19 catch_up: bool,
20 trigger_count: u32,
21}
22
23#[derive(Clone)]
24pub struct GuardedRunner<G> {
25 guard: Arc<G>,
26 observer: Arc<dyn SchedulerObserver>,
27}
28
29impl<G> GuardedRunner<G>
30where
31 G: ExecutionGuard + Send + Sync + 'static,
32{
33 pub fn new(guard: G) -> Self {
34 Self::with_observer(guard, NoopObserver)
35 }
36
37 pub fn with_observer<O>(guard: G, observer: O) -> Self
38 where
39 O: SchedulerObserver,
40 {
41 Self {
42 guard: Arc::new(guard),
43 observer: Arc::new(observer),
44 }
45 }
46
47 pub async fn run<F, Fut, T>(
48 &self,
49 slot: ExecutionSlot,
50 run: F,
51 ) -> Result<GuardedRunResult<T>, ExecutionGuardError>
52 where
53 F: FnOnce() -> Fut,
54 Fut: Future<Output = T>,
55 {
56 self.run_with_metadata(slot, false, 0, run).await
57 }
58
59 pub async fn run_with_metadata<F, Fut, T>(
60 &self,
61 slot: ExecutionSlot,
62 catch_up: bool,
63 trigger_count: u32,
64 run: F,
65 ) -> Result<GuardedRunResult<T>, ExecutionGuardError>
66 where
67 F: FnOnce() -> Fut,
68 Fut: Future<Output = T>,
69 {
70 let Some(session) = self
71 .acquire_with_metadata(slot, catch_up, trigger_count)
72 .await?
73 else {
74 return Ok(GuardedRunResult::Contended);
75 };
76
77 Ok(GuardedRunResult::Completed(session.run(run).await))
78 }
79
80 pub async fn acquire(
81 &self,
82 slot: ExecutionSlot,
83 ) -> Result<Option<GuardedSession<G>>, ExecutionGuardError> {
84 self.acquire_with_metadata(slot, false, 0).await
85 }
86
87 async fn acquire_with_metadata(
88 &self,
89 slot: ExecutionSlot,
90 catch_up: bool,
91 trigger_count: u32,
92 ) -> Result<Option<GuardedSession<G>>, ExecutionGuardError> {
93 let acquired = self.guard.acquire(slot.clone()).await.map_err(|error| {
94 let kind = G::classify_error(&error);
95 ExecutionGuardError::new(error, kind)
96 })?;
97
98 let ExecutionGuardAcquire::Acquired(lease) = acquired else {
99 self.observer
100 .on_event(&SchedulerEvent::ExecutionGuardContended {
101 job_id: slot.job_id,
102 resource_id: slot.resource_id,
103 scope: slot.scope,
104 scheduled_at: slot.scheduled_at,
105 catch_up,
106 trigger_count,
107 });
108 return Ok(None);
109 };
110
111 self.observer
112 .on_event(&SchedulerEvent::ExecutionGuardAcquired {
113 job_id: lease.job_id.clone(),
114 resource_id: lease.resource_id.clone(),
115 scope: lease.scope,
116 lease_key: lease.lease_key.clone(),
117 scheduled_at: lease.scheduled_at,
118 catch_up,
119 trigger_count,
120 });
121
122 Ok(Some(GuardedSession {
123 guard: self.guard.clone(),
124 observer: self.observer.clone(),
125 lease,
126 catch_up,
127 trigger_count,
128 }))
129 }
130}
131
132impl<G> GuardedSession<G>
133where
134 G: ExecutionGuard + Send + Sync + 'static,
135{
136 pub async fn run<F, Fut, T>(self, run: F) -> T
137 where
138 F: FnOnce() -> Fut,
139 Fut: Future<Output = T>,
140 {
141 let mut renewal_count = 0u32;
142 let mut failed_renewal_count = 0u32;
143 let mut lost_reported = false;
144 let task = run();
145 tokio::pin!(task);
146 let mut renewal = self
147 .guard
148 .renew_interval(&self.lease)
149 .map(|duration| interval_at(Instant::now() + duration, duration));
150
151 let output = loop {
152 if let Some(ticker) = renewal.as_mut() {
153 let mut disable_renewal = false;
154 let outcome = tokio::select! {
155 result = &mut task => Some(result),
156 _ = ticker.tick() => {
157 match self.guard.renew(&self.lease).await {
158 Ok(ExecutionGuardRenewal::Renewed) => {
159 renewal_count += 1;
160 self.observer.on_event(&SchedulerEvent::ExecutionGuardRenewed {
161 job_id: self.lease.job_id.clone(),
162 resource_id: self.lease.resource_id.clone(),
163 scope: self.lease.scope,
164 lease_key: self.lease.lease_key.clone(),
165 scheduled_at: self.lease.scheduled_at,
166 catch_up: self.catch_up,
167 trigger_count: self.trigger_count,
168 renewal_count,
169 });
170 }
171 Ok(ExecutionGuardRenewal::Lost) => {
172 if !lost_reported {
173 self.observer.on_event(&SchedulerEvent::ExecutionGuardLost {
174 job_id: self.lease.job_id.clone(),
175 resource_id: self.lease.resource_id.clone(),
176 scope: self.lease.scope,
177 lease_key: self.lease.lease_key.clone(),
178 scheduled_at: self.lease.scheduled_at,
179 catch_up: self.catch_up,
180 trigger_count: self.trigger_count,
181 renewal_count,
182 failed_renewal_count,
183 });
184 lost_reported = true;
185 }
186 disable_renewal = true;
187 }
188 Err(error) => {
189 failed_renewal_count += 1;
190 self.observer.on_event(&SchedulerEvent::ExecutionGuardRenewFailed {
191 job_id: self.lease.job_id.clone(),
192 resource_id: self.lease.resource_id.clone(),
193 scope: self.lease.scope,
194 lease_key: self.lease.lease_key.clone(),
195 scheduled_at: self.lease.scheduled_at,
196 catch_up: self.catch_up,
197 trigger_count: self.trigger_count,
198 renewal_count,
199 failed_renewal_count,
200 error: error.to_string(),
201 });
202 if !lost_reported {
203 self.observer.on_event(&SchedulerEvent::ExecutionGuardLost {
204 job_id: self.lease.job_id.clone(),
205 resource_id: self.lease.resource_id.clone(),
206 scope: self.lease.scope,
207 lease_key: self.lease.lease_key.clone(),
208 scheduled_at: self.lease.scheduled_at,
209 catch_up: self.catch_up,
210 trigger_count: self.trigger_count,
211 renewal_count,
212 failed_renewal_count,
213 });
214 lost_reported = true;
215 }
216 disable_renewal = true;
217 }
218 }
219 None
220 }
221 };
222
223 if disable_renewal {
224 renewal = None;
225 }
226
227 if let Some(result) = outcome {
228 break result;
229 }
230 } else {
231 break task.await;
232 }
233 };
234
235 if let Err(error) = self.guard.release(&self.lease).await {
236 self.observer
237 .on_event(&SchedulerEvent::ExecutionGuardReleaseFailed {
238 job_id: self.lease.job_id.clone(),
239 resource_id: self.lease.resource_id.clone(),
240 scope: self.lease.scope,
241 lease_key: self.lease.lease_key.clone(),
242 scheduled_at: self.lease.scheduled_at,
243 catch_up: self.catch_up,
244 trigger_count: self.trigger_count,
245 error: error.to_string(),
246 });
247 } else {
248 self.observer
249 .on_event(&SchedulerEvent::ExecutionGuardReleased {
250 job_id: self.lease.job_id.clone(),
251 resource_id: self.lease.resource_id.clone(),
252 scope: self.lease.scope,
253 lease_key: self.lease.lease_key.clone(),
254 scheduled_at: self.lease.scheduled_at,
255 catch_up: self.catch_up,
256 trigger_count: self.trigger_count,
257 });
258 }
259
260 output
261 }
262}