Skip to main content

uni_store/storage/
resilient_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::stream::{BoxStream, StreamExt};
7use object_store::path::Path;
8use object_store::{
9    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
10    PutOptions, PutPayload, PutResult, Result as StoreResult,
11};
12use std::fmt::{Debug, Display};
13use std::ops::Range;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17use tracing::warn;
18use uni_common::config::ObjectStoreConfig;
19
20#[derive(Debug)]
21struct CircuitBreaker {
22    failures: AtomicU32,
23    last_failure: AtomicI64, // timestamp as millis
24    threshold: u32,
25    reset_timeout: Duration,
26}
27
28impl CircuitBreaker {
29    fn new(threshold: u32, reset_timeout: Duration) -> Self {
30        Self {
31            failures: AtomicU32::new(0),
32            last_failure: AtomicI64::new(0),
33            threshold,
34            reset_timeout,
35        }
36    }
37
38    fn allow_request(&self) -> bool {
39        let failures = self.failures.load(Ordering::Relaxed);
40        if failures < self.threshold {
41            return true;
42        }
43
44        let last = self.last_failure.load(Ordering::Relaxed);
45        let now = SystemTime::now()
46            .duration_since(UNIX_EPOCH)
47            .unwrap()
48            .as_millis() as i64;
49
50        if (now - last) > self.reset_timeout.as_millis() as i64 {
51            // Half-open: allow one request (or probabilistic)
52            // Ideally we transition state. For simple Atomic impl, we allow retry after timeout.
53            return true;
54        }
55        false
56    }
57
58    fn report_success(&self) {
59        // Only reset if we were in a failure state to avoid contention?
60        // Relaxed store is cheap.
61        self.failures.store(0, Ordering::Relaxed);
62    }
63
64    fn report_failure(&self) {
65        self.failures.fetch_add(1, Ordering::Relaxed);
66        let now = SystemTime::now()
67            .duration_since(UNIX_EPOCH)
68            .unwrap()
69            .as_millis() as i64;
70        self.last_failure.store(now, Ordering::Relaxed);
71    }
72}
73
74#[derive(Debug)]
75pub struct ResilientObjectStore {
76    inner: Arc<dyn ObjectStore>,
77    config: ObjectStoreConfig,
78    cb: CircuitBreaker,
79}
80
81impl ResilientObjectStore {
82    pub fn new(inner: Arc<dyn ObjectStore>, config: ObjectStoreConfig) -> Self {
83        let cb = CircuitBreaker::new(5, Duration::from_secs(30)); // Hardcoded defaults for CB for now or use config?
84        // We can use config.max_retries * 2 as threshold?
85        // Let's use 5 failures and 30s reset.
86        Self { inner, config, cb }
87    }
88
89    async fn retry<F, Fut, T>(&self, mut f: F, op_name: &str) -> StoreResult<T>
90    where
91        F: FnMut() -> Fut,
92        Fut: std::future::Future<Output = StoreResult<T>>,
93    {
94        if !self.cb.allow_request() {
95            return Err(object_store::Error::Generic {
96                store: "ResilientObjectStore",
97                source: Box::new(std::io::Error::other("Circuit breaker open")),
98            });
99        }
100
101        let mut attempt = 0;
102        let mut backoff = self.config.retry_backoff_base;
103
104        loop {
105            match f().await {
106                Ok(val) => {
107                    self.cb.report_success();
108                    return Ok(val);
109                }
110                Err(e) => {
111                    attempt += 1;
112                    if attempt > self.config.max_retries {
113                        self.cb.report_failure();
114                        return Err(e);
115                    }
116
117                    // Check for non-retryable errors
118                    let msg = e.to_string().to_lowercase();
119                    if msg.contains("not found") || msg.contains("already exists") {
120                        // Don't count 404 as failure for CB?
121                        // Usually 404 is application level logic, not system failure.
122                        // So we report success to CB? Or just ignore?
123                        // Let's ignore it (don't report failure).
124                        return Err(e);
125                    }
126
127                    warn!(
128                        error = %e,
129                        attempt,
130                        operation = op_name,
131                        "ObjectStore operation failed, retrying"
132                    );
133
134                    tokio::time::sleep(backoff).await;
135                    backoff = std::cmp::min(backoff * 2, self.config.retry_backoff_max);
136                }
137            }
138        }
139    }
140
141    async fn timeout<F, Fut, T>(&self, f: F, duration: std::time::Duration) -> StoreResult<T>
142    where
143        F: FnOnce() -> Fut,
144        Fut: std::future::Future<Output = StoreResult<T>>,
145    {
146        tokio::time::timeout(duration, f())
147            .await
148            .map_err(|_| object_store::Error::Generic {
149                store: "ResilientObjectStore",
150                source: Box::new(std::io::Error::new(
151                    std::io::ErrorKind::TimedOut,
152                    "operation timed out",
153                )),
154            })?
155    }
156}
157
158impl Display for ResilientObjectStore {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        write!(f, "ResilientObjectStore({})", self.inner)
161    }
162}
163
164#[async_trait]
165impl ObjectStore for ResilientObjectStore {
166    async fn put(&self, location: &Path, payload: PutPayload) -> StoreResult<PutResult> {
167        let timeout = self.config.write_timeout;
168        // Non-retryable
169        // We still check CB logic?
170        if !self.cb.allow_request() {
171            return Err(object_store::Error::Generic {
172                store: "ResilientObjectStore",
173                source: Box::new(std::io::Error::other("Circuit breaker open")),
174            });
175        }
176
177        let res = self
178            .timeout(|| self.inner.put(location, payload), timeout)
179            .await;
180        match res {
181            Ok(_) => self.cb.report_success(),
182            Err(_) => self.cb.report_failure(), // Count timeout/error as failure
183        }
184        res
185    }
186
187    async fn put_opts(
188        &self,
189        location: &Path,
190        payload: PutPayload,
191        opts: PutOptions,
192    ) -> StoreResult<PutResult> {
193        let timeout = self.config.write_timeout;
194        if !self.cb.allow_request() {
195            return Err(object_store::Error::Generic {
196                store: "ResilientObjectStore",
197                source: Box::new(std::io::Error::other("Circuit breaker open")),
198            });
199        }
200        let res = self
201            .timeout(|| self.inner.put_opts(location, payload, opts), timeout)
202            .await;
203        match res {
204            Ok(_) => self.cb.report_success(),
205            Err(_) => self.cb.report_failure(),
206        }
207        res
208    }
209
210    async fn put_multipart(&self, location: &Path) -> StoreResult<Box<dyn MultipartUpload>> {
211        self.put_multipart_opts(location, PutMultipartOpts::default())
212            .await
213    }
214
215    async fn put_multipart_opts(
216        &self,
217        location: &Path,
218        opts: PutMultipartOpts,
219    ) -> StoreResult<Box<dyn MultipartUpload>> {
220        let timeout = self.config.write_timeout;
221        self.retry(
222            || async {
223                self.timeout(
224                    || self.inner.put_multipart_opts(location, opts.clone()),
225                    timeout,
226                )
227                .await
228            },
229            "put_multipart_opts",
230        )
231        .await
232    }
233
234    async fn get(&self, location: &Path) -> StoreResult<GetResult> {
235        self.get_opts(location, GetOptions::default()).await
236    }
237
238    async fn get_opts(&self, location: &Path, options: GetOptions) -> StoreResult<GetResult> {
239        let timeout = self.config.read_timeout;
240        self.retry(
241            || async {
242                self.timeout(|| self.inner.get_opts(location, options.clone()), timeout)
243                    .await
244            },
245            "get_opts",
246        )
247        .await
248    }
249
250    async fn get_range(&self, location: &Path, range: Range<usize>) -> StoreResult<Bytes> {
251        let timeout = self.config.read_timeout;
252        self.retry(
253            || async {
254                self.timeout(|| self.inner.get_range(location, range.clone()), timeout)
255                    .await
256            },
257            "get_range",
258        )
259        .await
260    }
261
262    async fn get_ranges(
263        &self,
264        location: &Path,
265        ranges: &[Range<usize>],
266    ) -> StoreResult<Vec<Bytes>> {
267        let timeout = self.config.read_timeout;
268        self.retry(
269            || async {
270                self.timeout(|| self.inner.get_ranges(location, ranges), timeout)
271                    .await
272            },
273            "get_ranges",
274        )
275        .await
276    }
277
278    async fn head(&self, location: &Path) -> StoreResult<ObjectMeta> {
279        let timeout = self.config.read_timeout;
280        self.retry(
281            || async { self.timeout(|| self.inner.head(location), timeout).await },
282            "head",
283        )
284        .await
285    }
286
287    async fn delete(&self, location: &Path) -> StoreResult<()> {
288        let timeout = self.config.write_timeout;
289        self.retry(
290            || async { self.timeout(|| self.inner.delete(location), timeout).await },
291            "delete",
292        )
293        .await
294    }
295
296    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, StoreResult<ObjectMeta>> {
297        // Check CB? List returns stream.
298        // We can check CB on initial call.
299        if !self.cb.allow_request() {
300            // How to return error stream?
301            // We can return a stream that yields an error immediately.
302            // But BoxStream signature expects Stream of Results.
303            return futures::stream::once(async {
304                Err(object_store::Error::Generic {
305                    store: "ResilientObjectStore",
306                    source: Box::new(std::io::Error::other("Circuit breaker open")),
307                })
308            })
309            .boxed();
310        }
311
312        // We wrap stream to report failure/success?
313        // Too complex for P2. Just pass through.
314        self.inner.list(prefix)
315    }
316
317    fn list_with_offset(
318        &self,
319        prefix: Option<&Path>,
320        offset: &Path,
321    ) -> BoxStream<'_, StoreResult<ObjectMeta>> {
322        if !self.cb.allow_request() {
323            return futures::stream::once(async {
324                Err(object_store::Error::Generic {
325                    store: "ResilientObjectStore",
326                    source: Box::new(std::io::Error::other("Circuit breaker open")),
327                })
328            })
329            .boxed();
330        }
331        self.inner.list_with_offset(prefix, offset)
332    }
333
334    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> StoreResult<ListResult> {
335        let timeout = self.config.read_timeout;
336        self.retry(
337            || async {
338                self.timeout(|| self.inner.list_with_delimiter(prefix), timeout)
339                    .await
340            },
341            "list_with_delimiter",
342        )
343        .await
344    }
345
346    async fn copy(&self, from: &Path, to: &Path) -> StoreResult<()> {
347        let timeout = self.config.write_timeout;
348        self.retry(
349            || async { self.timeout(|| self.inner.copy(from, to), timeout).await },
350            "copy",
351        )
352        .await
353    }
354
355    async fn rename(&self, from: &Path, to: &Path) -> StoreResult<()> {
356        let timeout = self.config.write_timeout;
357        self.retry(
358            || async { self.timeout(|| self.inner.rename(from, to), timeout).await },
359            "rename",
360        )
361        .await
362    }
363
364    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> StoreResult<()> {
365        let timeout = self.config.write_timeout;
366        self.retry(
367            || async {
368                self.timeout(|| self.inner.copy_if_not_exists(from, to), timeout)
369                    .await
370            },
371            "copy_if_not_exists",
372        )
373        .await
374    }
375}