1use async_trait::async_trait;
5use bytes::Bytes;
6use futures::stream::{BoxStream, StreamExt};
7use object_store::path::Path;
8use object_store::{
9 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
10 PutOptions, PutPayload, PutResult, Result as StoreResult,
11};
12use std::fmt::{Debug, Display};
13use std::ops::Range;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17use tracing::warn;
18use uni_common::config::ObjectStoreConfig;
19
20#[derive(Debug)]
21struct CircuitBreaker {
22 failures: AtomicU32,
23 last_failure: AtomicI64, threshold: u32,
25 reset_timeout: Duration,
26}
27
28impl CircuitBreaker {
29 fn new(threshold: u32, reset_timeout: Duration) -> Self {
30 Self {
31 failures: AtomicU32::new(0),
32 last_failure: AtomicI64::new(0),
33 threshold,
34 reset_timeout,
35 }
36 }
37
38 fn allow_request(&self) -> bool {
39 let failures = self.failures.load(Ordering::Relaxed);
40 if failures < self.threshold {
41 return true;
42 }
43
44 let last = self.last_failure.load(Ordering::Relaxed);
45 let now = SystemTime::now()
46 .duration_since(UNIX_EPOCH)
47 .unwrap()
48 .as_millis() as i64;
49
50 if (now - last) > self.reset_timeout.as_millis() as i64 {
51 return true;
54 }
55 false
56 }
57
58 fn report_success(&self) {
59 self.failures.store(0, Ordering::Relaxed);
62 }
63
64 fn report_failure(&self) {
65 self.failures.fetch_add(1, Ordering::Relaxed);
66 let now = SystemTime::now()
67 .duration_since(UNIX_EPOCH)
68 .unwrap()
69 .as_millis() as i64;
70 self.last_failure.store(now, Ordering::Relaxed);
71 }
72}
73
74#[derive(Debug)]
75pub struct ResilientObjectStore {
76 inner: Arc<dyn ObjectStore>,
77 config: ObjectStoreConfig,
78 cb: CircuitBreaker,
79}
80
81impl ResilientObjectStore {
82 pub fn new(inner: Arc<dyn ObjectStore>, config: ObjectStoreConfig) -> Self {
83 let cb = CircuitBreaker::new(5, Duration::from_secs(30)); Self { inner, config, cb }
87 }
88
89 async fn retry<F, Fut, T>(&self, mut f: F, op_name: &str) -> StoreResult<T>
90 where
91 F: FnMut() -> Fut,
92 Fut: std::future::Future<Output = StoreResult<T>>,
93 {
94 if !self.cb.allow_request() {
95 return Err(object_store::Error::Generic {
96 store: "ResilientObjectStore",
97 source: Box::new(std::io::Error::other("Circuit breaker open")),
98 });
99 }
100
101 let mut attempt = 0;
102 let mut backoff = self.config.retry_backoff_base;
103
104 loop {
105 match f().await {
106 Ok(val) => {
107 self.cb.report_success();
108 return Ok(val);
109 }
110 Err(e) => {
111 attempt += 1;
112 if attempt > self.config.max_retries {
113 self.cb.report_failure();
114 return Err(e);
115 }
116
117 let msg = e.to_string().to_lowercase();
119 if msg.contains("not found") || msg.contains("already exists") {
120 return Err(e);
125 }
126
127 warn!(
128 error = %e,
129 attempt,
130 operation = op_name,
131 "ObjectStore operation failed, retrying"
132 );
133
134 tokio::time::sleep(backoff).await;
135 backoff = std::cmp::min(backoff * 2, self.config.retry_backoff_max);
136 }
137 }
138 }
139 }
140
141 async fn timeout<F, Fut, T>(&self, f: F, duration: std::time::Duration) -> StoreResult<T>
142 where
143 F: FnOnce() -> Fut,
144 Fut: std::future::Future<Output = StoreResult<T>>,
145 {
146 tokio::time::timeout(duration, f())
147 .await
148 .map_err(|_| object_store::Error::Generic {
149 store: "ResilientObjectStore",
150 source: Box::new(std::io::Error::new(
151 std::io::ErrorKind::TimedOut,
152 "operation timed out",
153 )),
154 })?
155 }
156}
157
158impl Display for ResilientObjectStore {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 write!(f, "ResilientObjectStore({})", self.inner)
161 }
162}
163
164#[async_trait]
165impl ObjectStore for ResilientObjectStore {
166 async fn put(&self, location: &Path, payload: PutPayload) -> StoreResult<PutResult> {
167 let timeout = self.config.write_timeout;
168 if !self.cb.allow_request() {
171 return Err(object_store::Error::Generic {
172 store: "ResilientObjectStore",
173 source: Box::new(std::io::Error::other("Circuit breaker open")),
174 });
175 }
176
177 let res = self
178 .timeout(|| self.inner.put(location, payload), timeout)
179 .await;
180 match res {
181 Ok(_) => self.cb.report_success(),
182 Err(_) => self.cb.report_failure(), }
184 res
185 }
186
187 async fn put_opts(
188 &self,
189 location: &Path,
190 payload: PutPayload,
191 opts: PutOptions,
192 ) -> StoreResult<PutResult> {
193 let timeout = self.config.write_timeout;
194 if !self.cb.allow_request() {
195 return Err(object_store::Error::Generic {
196 store: "ResilientObjectStore",
197 source: Box::new(std::io::Error::other("Circuit breaker open")),
198 });
199 }
200 let res = self
201 .timeout(|| self.inner.put_opts(location, payload, opts), timeout)
202 .await;
203 match res {
204 Ok(_) => self.cb.report_success(),
205 Err(_) => self.cb.report_failure(),
206 }
207 res
208 }
209
210 async fn put_multipart(&self, location: &Path) -> StoreResult<Box<dyn MultipartUpload>> {
211 self.put_multipart_opts(location, PutMultipartOpts::default())
212 .await
213 }
214
215 async fn put_multipart_opts(
216 &self,
217 location: &Path,
218 opts: PutMultipartOpts,
219 ) -> StoreResult<Box<dyn MultipartUpload>> {
220 let timeout = self.config.write_timeout;
221 self.retry(
222 || async {
223 self.timeout(
224 || self.inner.put_multipart_opts(location, opts.clone()),
225 timeout,
226 )
227 .await
228 },
229 "put_multipart_opts",
230 )
231 .await
232 }
233
234 async fn get(&self, location: &Path) -> StoreResult<GetResult> {
235 self.get_opts(location, GetOptions::default()).await
236 }
237
238 async fn get_opts(&self, location: &Path, options: GetOptions) -> StoreResult<GetResult> {
239 let timeout = self.config.read_timeout;
240 self.retry(
241 || async {
242 self.timeout(|| self.inner.get_opts(location, options.clone()), timeout)
243 .await
244 },
245 "get_opts",
246 )
247 .await
248 }
249
250 async fn get_range(&self, location: &Path, range: Range<usize>) -> StoreResult<Bytes> {
251 let timeout = self.config.read_timeout;
252 self.retry(
253 || async {
254 self.timeout(|| self.inner.get_range(location, range.clone()), timeout)
255 .await
256 },
257 "get_range",
258 )
259 .await
260 }
261
262 async fn get_ranges(
263 &self,
264 location: &Path,
265 ranges: &[Range<usize>],
266 ) -> StoreResult<Vec<Bytes>> {
267 let timeout = self.config.read_timeout;
268 self.retry(
269 || async {
270 self.timeout(|| self.inner.get_ranges(location, ranges), timeout)
271 .await
272 },
273 "get_ranges",
274 )
275 .await
276 }
277
278 async fn head(&self, location: &Path) -> StoreResult<ObjectMeta> {
279 let timeout = self.config.read_timeout;
280 self.retry(
281 || async { self.timeout(|| self.inner.head(location), timeout).await },
282 "head",
283 )
284 .await
285 }
286
287 async fn delete(&self, location: &Path) -> StoreResult<()> {
288 let timeout = self.config.write_timeout;
289 self.retry(
290 || async { self.timeout(|| self.inner.delete(location), timeout).await },
291 "delete",
292 )
293 .await
294 }
295
296 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, StoreResult<ObjectMeta>> {
297 if !self.cb.allow_request() {
300 return futures::stream::once(async {
304 Err(object_store::Error::Generic {
305 store: "ResilientObjectStore",
306 source: Box::new(std::io::Error::other("Circuit breaker open")),
307 })
308 })
309 .boxed();
310 }
311
312 self.inner.list(prefix)
315 }
316
317 fn list_with_offset(
318 &self,
319 prefix: Option<&Path>,
320 offset: &Path,
321 ) -> BoxStream<'_, StoreResult<ObjectMeta>> {
322 if !self.cb.allow_request() {
323 return futures::stream::once(async {
324 Err(object_store::Error::Generic {
325 store: "ResilientObjectStore",
326 source: Box::new(std::io::Error::other("Circuit breaker open")),
327 })
328 })
329 .boxed();
330 }
331 self.inner.list_with_offset(prefix, offset)
332 }
333
334 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> StoreResult<ListResult> {
335 let timeout = self.config.read_timeout;
336 self.retry(
337 || async {
338 self.timeout(|| self.inner.list_with_delimiter(prefix), timeout)
339 .await
340 },
341 "list_with_delimiter",
342 )
343 .await
344 }
345
346 async fn copy(&self, from: &Path, to: &Path) -> StoreResult<()> {
347 let timeout = self.config.write_timeout;
348 self.retry(
349 || async { self.timeout(|| self.inner.copy(from, to), timeout).await },
350 "copy",
351 )
352 .await
353 }
354
355 async fn rename(&self, from: &Path, to: &Path) -> StoreResult<()> {
356 let timeout = self.config.write_timeout;
357 self.retry(
358 || async { self.timeout(|| self.inner.rename(from, to), timeout).await },
359 "rename",
360 )
361 .await
362 }
363
364 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> StoreResult<()> {
365 let timeout = self.config.write_timeout;
366 self.retry(
367 || async {
368 self.timeout(|| self.inner.copy_if_not_exists(from, to), timeout)
369 .await
370 },
371 "copy_if_not_exists",
372 )
373 .await
374 }
375}