Skip to main content

uni_store/storage/
resilient_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::stream::{BoxStream, StreamExt};
7use object_store::path::Path;
8use object_store::{
9    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
10    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as StoreResult,
11};
12use std::fmt::{Debug, Display};
13use std::ops::Range;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17use tracing::warn;
18use uni_common::config::ObjectStoreConfig;
19
20#[derive(Debug)]
21struct CircuitBreaker {
22    failures: AtomicU32,
23    last_failure: AtomicI64, // timestamp as millis
24    threshold: u32,
25    reset_timeout: Duration,
26}
27
28impl CircuitBreaker {
29    fn new(threshold: u32, reset_timeout: Duration) -> Self {
30        Self {
31            failures: AtomicU32::new(0),
32            last_failure: AtomicI64::new(0),
33            threshold,
34            reset_timeout,
35        }
36    }
37
38    fn allow_request(&self) -> bool {
39        let failures = self.failures.load(Ordering::Relaxed);
40        if failures < self.threshold {
41            return true;
42        }
43
44        let last = self.last_failure.load(Ordering::Relaxed);
45        let now = SystemTime::now()
46            .duration_since(UNIX_EPOCH)
47            .unwrap()
48            .as_millis() as i64;
49
50        if (now - last) > self.reset_timeout.as_millis() as i64 {
51            // Half-open: allow one request (or probabilistic)
52            // Ideally we transition state. For simple Atomic impl, we allow retry after timeout.
53            return true;
54        }
55        false
56    }
57
58    fn report_success(&self) {
59        // Only reset if we were in a failure state to avoid contention?
60        // Relaxed store is cheap.
61        self.failures.store(0, Ordering::Relaxed);
62    }
63
64    fn report_failure(&self) {
65        self.failures.fetch_add(1, Ordering::Relaxed);
66        let now = SystemTime::now()
67            .duration_since(UNIX_EPOCH)
68            .unwrap()
69            .as_millis() as i64;
70        self.last_failure.store(now, Ordering::Relaxed);
71    }
72}
73
74#[derive(Debug)]
75pub struct ResilientObjectStore {
76    inner: Arc<dyn ObjectStore>,
77    config: ObjectStoreConfig,
78    cb: CircuitBreaker,
79}
80
81impl ResilientObjectStore {
82    pub fn new(inner: Arc<dyn ObjectStore>, config: ObjectStoreConfig) -> Self {
83        let cb = CircuitBreaker::new(5, Duration::from_secs(30)); // Hardcoded defaults for CB for now or use config?
84        // We can use config.max_retries * 2 as threshold?
85        // Let's use 5 failures and 30s reset.
86        Self { inner, config, cb }
87    }
88
89    async fn retry<F, Fut, T>(&self, mut f: F, op_name: &str) -> StoreResult<T>
90    where
91        F: FnMut() -> Fut,
92        Fut: std::future::Future<Output = StoreResult<T>>,
93    {
94        if !self.cb.allow_request() {
95            return Err(object_store::Error::Generic {
96                store: "ResilientObjectStore",
97                source: Box::new(std::io::Error::other("Circuit breaker open")),
98            });
99        }
100
101        let mut attempt = 0;
102        let mut backoff = self.config.retry_backoff_base;
103
104        loop {
105            match f().await {
106                Ok(val) => {
107                    self.cb.report_success();
108                    return Ok(val);
109                }
110                Err(e) => {
111                    attempt += 1;
112                    if attempt > self.config.max_retries {
113                        self.cb.report_failure();
114                        return Err(e);
115                    }
116
117                    // Check for non-retryable errors
118                    let msg = e.to_string().to_lowercase();
119                    if msg.contains("not found") || msg.contains("already exists") {
120                        // Don't count 404 as failure for CB?
121                        // Usually 404 is application level logic, not system failure.
122                        // So we report success to CB? Or just ignore?
123                        // Let's ignore it (don't report failure).
124                        return Err(e);
125                    }
126
127                    warn!(
128                        error = %e,
129                        attempt,
130                        operation = op_name,
131                        "ObjectStore operation failed, retrying"
132                    );
133
134                    tokio::time::sleep(backoff).await;
135                    backoff = std::cmp::min(backoff * 2, self.config.retry_backoff_max);
136                }
137            }
138        }
139    }
140
141    async fn timeout<F, Fut, T>(&self, f: F, duration: std::time::Duration) -> StoreResult<T>
142    where
143        F: FnOnce() -> Fut,
144        Fut: std::future::Future<Output = StoreResult<T>>,
145    {
146        tokio::time::timeout(duration, f())
147            .await
148            .map_err(|_| object_store::Error::Generic {
149                store: "ResilientObjectStore",
150                source: Box::new(std::io::Error::new(
151                    std::io::ErrorKind::TimedOut,
152                    "operation timed out",
153                )),
154            })?
155    }
156}
157
158impl Display for ResilientObjectStore {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        write!(f, "ResilientObjectStore({})", self.inner)
161    }
162}
163
164#[async_trait]
165impl ObjectStore for ResilientObjectStore {
166    async fn put(&self, location: &Path, payload: PutPayload) -> StoreResult<PutResult> {
167        let timeout = self.config.write_timeout;
168        // Non-retryable
169        // We still check CB logic?
170        if !self.cb.allow_request() {
171            return Err(object_store::Error::Generic {
172                store: "ResilientObjectStore",
173                source: Box::new(std::io::Error::other("Circuit breaker open")),
174            });
175        }
176
177        let res = self
178            .timeout(|| self.inner.put(location, payload), timeout)
179            .await;
180        match res {
181            Ok(_) => self.cb.report_success(),
182            Err(_) => self.cb.report_failure(), // Count timeout/error as failure
183        }
184        res
185    }
186
187    async fn put_opts(
188        &self,
189        location: &Path,
190        payload: PutPayload,
191        opts: PutOptions,
192    ) -> StoreResult<PutResult> {
193        let timeout = self.config.write_timeout;
194        if !self.cb.allow_request() {
195            return Err(object_store::Error::Generic {
196                store: "ResilientObjectStore",
197                source: Box::new(std::io::Error::other("Circuit breaker open")),
198            });
199        }
200        let res = self
201            .timeout(|| self.inner.put_opts(location, payload, opts), timeout)
202            .await;
203        match res {
204            Ok(_) => self.cb.report_success(),
205            Err(_) => self.cb.report_failure(),
206        }
207        res
208    }
209
210    async fn put_multipart(&self, location: &Path) -> StoreResult<Box<dyn MultipartUpload>> {
211        self.put_multipart_opts(location, PutMultipartOptions::default())
212            .await
213    }
214
215    async fn put_multipart_opts(
216        &self,
217        location: &Path,
218        opts: PutMultipartOptions,
219    ) -> StoreResult<Box<dyn MultipartUpload>> {
220        let timeout = self.config.write_timeout;
221        self.retry(
222            || async {
223                self.timeout(
224                    || self.inner.put_multipart_opts(location, opts.clone()),
225                    timeout,
226                )
227                .await
228            },
229            "put_multipart_opts",
230        )
231        .await
232    }
233
234    async fn get(&self, location: &Path) -> StoreResult<GetResult> {
235        self.get_opts(location, GetOptions::default()).await
236    }
237
238    async fn get_opts(&self, location: &Path, options: GetOptions) -> StoreResult<GetResult> {
239        let timeout = self.config.read_timeout;
240        self.retry(
241            || async {
242                self.timeout(|| self.inner.get_opts(location, options.clone()), timeout)
243                    .await
244            },
245            "get_opts",
246        )
247        .await
248    }
249
250    async fn get_range(&self, location: &Path, range: Range<u64>) -> StoreResult<Bytes> {
251        let timeout = self.config.read_timeout;
252        self.retry(
253            || async {
254                self.timeout(|| self.inner.get_range(location, range.clone()), timeout)
255                    .await
256            },
257            "get_range",
258        )
259        .await
260    }
261
262    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> StoreResult<Vec<Bytes>> {
263        let timeout = self.config.read_timeout;
264        self.retry(
265            || async {
266                self.timeout(|| self.inner.get_ranges(location, ranges), timeout)
267                    .await
268            },
269            "get_ranges",
270        )
271        .await
272    }
273
274    async fn head(&self, location: &Path) -> StoreResult<ObjectMeta> {
275        let timeout = self.config.read_timeout;
276        self.retry(
277            || async { self.timeout(|| self.inner.head(location), timeout).await },
278            "head",
279        )
280        .await
281    }
282
283    async fn delete(&self, location: &Path) -> StoreResult<()> {
284        let timeout = self.config.write_timeout;
285        self.retry(
286            || async { self.timeout(|| self.inner.delete(location), timeout).await },
287            "delete",
288        )
289        .await
290    }
291
292    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, StoreResult<ObjectMeta>> {
293        // Check CB? List returns stream.
294        // We can check CB on initial call.
295        if !self.cb.allow_request() {
296            // How to return error stream?
297            // We can return a stream that yields an error immediately.
298            // But BoxStream signature expects Stream of Results.
299            return futures::stream::once(async {
300                Err(object_store::Error::Generic {
301                    store: "ResilientObjectStore",
302                    source: Box::new(std::io::Error::other("Circuit breaker open")),
303                })
304            })
305            .boxed();
306        }
307
308        // We wrap stream to report failure/success?
309        // Too complex for P2. Just pass through.
310        self.inner.list(prefix)
311    }
312
313    fn list_with_offset(
314        &self,
315        prefix: Option<&Path>,
316        offset: &Path,
317    ) -> BoxStream<'static, StoreResult<ObjectMeta>> {
318        if !self.cb.allow_request() {
319            return futures::stream::once(async {
320                Err(object_store::Error::Generic {
321                    store: "ResilientObjectStore",
322                    source: Box::new(std::io::Error::other("Circuit breaker open")),
323                })
324            })
325            .boxed();
326        }
327        self.inner.list_with_offset(prefix, offset)
328    }
329
330    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> StoreResult<ListResult> {
331        let timeout = self.config.read_timeout;
332        self.retry(
333            || async {
334                self.timeout(|| self.inner.list_with_delimiter(prefix), timeout)
335                    .await
336            },
337            "list_with_delimiter",
338        )
339        .await
340    }
341
342    async fn copy(&self, from: &Path, to: &Path) -> StoreResult<()> {
343        let timeout = self.config.write_timeout;
344        self.retry(
345            || async { self.timeout(|| self.inner.copy(from, to), timeout).await },
346            "copy",
347        )
348        .await
349    }
350
351    async fn rename(&self, from: &Path, to: &Path) -> StoreResult<()> {
352        let timeout = self.config.write_timeout;
353        self.retry(
354            || async { self.timeout(|| self.inner.rename(from, to), timeout).await },
355            "rename",
356        )
357        .await
358    }
359
360    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> StoreResult<()> {
361        let timeout = self.config.write_timeout;
362        self.retry(
363            || async {
364                self.timeout(|| self.inner.copy_if_not_exists(from, to), timeout)
365                    .await
366            },
367            "copy_if_not_exists",
368        )
369        .await
370    }
371}