Skip to main content

triblespace_core/repo/
objectstore.rs

1use std::array::TryFromSliceError;
2use std::convert::Infallible;
3use std::convert::TryInto;
4use std::error::Error;
5use std::fmt;
6use std::marker::PhantomData;
7use std::sync::Arc;
8
9use anybytes::Bytes;
10use crossbeam_channel::{bounded, Receiver};
11use futures::Stream;
12use futures::StreamExt;
13use tokio::runtime::Runtime;
14
15use object_store::parse_url;
16use object_store::path::Path;
17use object_store::ObjectStore;
18use object_store::PutMode;
19use object_store::UpdateVersion;
20use object_store::{self};
21use url::Url;
22
23use hex::FromHex;
24
25use crate::blob::schemas::UnknownBlob;
26use crate::blob::Blob;
27use crate::blob::BlobSchema;
28use crate::blob::ToBlob;
29use crate::blob::TryFromBlob;
30use crate::id::Id;
31use crate::id::RawId;
32use crate::prelude::blobschemas::SimpleArchive;
33use crate::value::schemas::hash::Handle;
34use crate::value::schemas::hash::HashProtocol;
35use crate::value::RawValue;
36use crate::value::Value;
37use crate::value::ValueSchema;
38
39use super::BlobStore;
40use super::BlobStoreGet;
41use super::BlobStoreList;
42use super::BlobStorePut;
43use super::BranchStore;
44use super::PushResult;
45
46const BRANCH_INFIX: &str = "branches";
47const BLOB_INFIX: &str = "blobs";
48
49/// Repository backed by an [`object_store`] compatible storage backend.
50///
51/// All data is stored in an external service (e.g. S3, local filesystem) via
52/// the `object_store` crate.
53pub struct ObjectStoreRemote<H> {
54    store: Arc<dyn ObjectStore>,
55    prefix: Path,
56    rt: Arc<Runtime>,
57    _hasher: PhantomData<H>,
58}
59
60impl<H> fmt::Debug for ObjectStoreRemote<H> {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        f.debug_struct("ObjectStoreRemote")
63            .field("prefix", &self.prefix)
64            .finish()
65    }
66}
67
68impl<H> fmt::Debug for ObjectStoreReader<H> {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        f.debug_struct("ObjectStoreReader")
71            .field("prefix", &self.prefix)
72            .finish()
73    }
74}
75
76/// Read-only handle into an [`ObjectStoreRemote`] that can be cloned and shared.
77#[derive(Clone)]
78pub struct ObjectStoreReader<H> {
79    store: Arc<dyn ObjectStore>,
80    prefix: Path,
81    rt: Arc<Runtime>,
82    _hasher: PhantomData<H>,
83}
84
85/// Iterator that bridges an async [`Stream`] into blocking iteration via a bounded channel.
86pub struct BlockingIter<T> {
87    rx: Receiver<T>,
88}
89
90impl<T> BlockingIter<T> {
91    fn from_stream<S>(handle: tokio::runtime::Handle, stream: S, capacity: usize) -> Self
92    where
93        S: Stream<Item = T> + Send + 'static,
94        T: Send + 'static,
95    {
96        let (tx, rx) = bounded(capacity);
97        let handle_for_spawn = handle.clone();
98        let handle_for_task = handle.clone();
99        handle_for_spawn.spawn(async move {
100            let mut s = Box::pin(stream);
101            let rt = handle_for_task;
102            while let Some(item) = s.next().await {
103                let tx_clone = tx.clone();
104                let bh = rt.clone();
105                // send on blocking pool to avoid blocking a runtime worker
106                match bh.spawn_blocking(move || tx_clone.send(item)).await {
107                    Ok(Ok(())) => {}
108                    _ => break,
109                }
110            }
111            // tx dropped here -> closes channel
112        });
113        BlockingIter { rx }
114    }
115}
116
117impl<T> Iterator for BlockingIter<T> {
118    type Item = T;
119    fn next(&mut self) -> Option<Self::Item> {
120        self.rx.recv().ok()
121    }
122}
123
124impl<H> PartialEq for ObjectStoreReader<H> {
125    fn eq(&self, other: &Self) -> bool {
126        Arc::ptr_eq(&self.store, &other.store) && self.prefix == other.prefix
127    }
128}
129
130impl<H> Eq for ObjectStoreReader<H> {}
131
132impl<H> ObjectStoreRemote<H> {
133    /// Creates a repository pointing at the object store described by `url`.
134    pub fn with_url(url: &Url) -> Result<ObjectStoreRemote<H>, object_store::Error> {
135        let (store, path) = parse_url(url)?;
136        Ok(ObjectStoreRemote {
137            store: Arc::from(store),
138            prefix: path,
139            rt: Arc::new(
140                tokio::runtime::Builder::new_multi_thread()
141                    .enable_all()
142                    .worker_threads(2)
143                    .build()
144                    .expect("build runtime"),
145            ),
146            _hasher: PhantomData,
147        })
148    }
149}
150
151impl<H> BlobStorePut<H> for ObjectStoreRemote<H>
152where
153    H: HashProtocol,
154{
155    type PutError = object_store::Error;
156
157    fn put<S, T>(&mut self, item: T) -> Result<Value<Handle<H, S>>, Self::PutError>
158    where
159        S: BlobSchema + 'static,
160        T: ToBlob<S>,
161        Handle<H, S>: ValueSchema,
162    {
163        let blob = item.to_blob();
164        let handle = blob.get_handle();
165        let path = self.prefix.child(BLOB_INFIX).child(hex::encode(handle.raw));
166        let bytes: bytes::Bytes = blob.bytes.into();
167        let result = self.rt.block_on(async {
168            self.store
169                .put_opts(&path, bytes.into(), PutMode::Create.into())
170                .await
171        });
172        match result {
173            Ok(_) | Err(object_store::Error::AlreadyExists { .. }) => Ok(handle),
174            Err(e) => Err(e),
175        }
176    }
177}
178
179impl<H> BlobStore<H> for ObjectStoreRemote<H>
180where
181    H: HashProtocol,
182{
183    type Reader = ObjectStoreReader<H>;
184    type ReaderError = Infallible;
185
186    fn reader(&mut self) -> Result<Self::Reader, Self::ReaderError> {
187        Ok(ObjectStoreReader {
188            store: self.store.clone(),
189            prefix: self.prefix.clone(),
190            rt: self.rt.clone(),
191            _hasher: PhantomData,
192        })
193    }
194}
195
196impl<H> BranchStore<H> for ObjectStoreRemote<H>
197where
198    H: HashProtocol,
199{
200    type BranchesError = ListBranchesErr;
201    type HeadError = PullBranchErr;
202    type UpdateError = PushBranchErr;
203
204    type ListIter<'a> = BlockingIter<Result<Id, Self::BranchesError>>;
205
206    fn branches<'a>(&'a mut self) -> Result<Self::ListIter<'a>, Self::BranchesError> {
207        let prefix = self.prefix.child(BRANCH_INFIX);
208        let stream = self.store.list(Some(&prefix)).filter_map(|r| async move {
209            match r {
210                Ok(meta) if meta.size == 0 => None, // tombstoned branch (0-byte object)
211                Ok(meta) => {
212                    let name = match meta.location.filename() {
213                        Some(name) => name,
214                        None => return Some(Err(ListBranchesErr::NotAFile("no filename"))),
215                    };
216                    let digest = match RawId::from_hex(name) {
217                        Ok(digest) => digest,
218                        Err(e) => return Some(Err(ListBranchesErr::BadNameHex(e))),
219                    };
220                    let Some(id) = Id::new(digest) else {
221                        return Some(Err(ListBranchesErr::BadId));
222                    };
223                    Some(Ok(id))
224                }
225                Err(e) => Some(Err(ListBranchesErr::List(e))),
226            }
227        });
228        Ok(BlockingIter::from_stream(
229            self.rt.handle().clone(),
230            stream,
231            16,
232        ))
233    }
234
235    fn head(&mut self, id: Id) -> Result<Option<Value<Handle<H, SimpleArchive>>>, Self::HeadError> {
236        let path = self.prefix.child(BRANCH_INFIX).child(hex::encode(id));
237        let result = self.rt.block_on(async { self.store.get(&path).await });
238        match result {
239            Ok(object) => {
240                let bytes = self.rt.block_on(object.bytes())?;
241                if bytes.is_empty() {
242                    return Ok(None);
243                }
244                let value = (&bytes[..]).try_into()?;
245                Ok(Some(Value::new(value)))
246            }
247            Err(object_store::Error::NotFound { .. }) => Ok(None),
248            Err(e) => Err(PullBranchErr::StoreErr(e)),
249        }
250    }
251
252    fn update(
253        &mut self,
254        id: Id,
255        old: Option<Value<Handle<H, SimpleArchive>>>,
256        new: Option<Value<Handle<H, SimpleArchive>>>,
257    ) -> Result<PushResult<H>, Self::UpdateError> {
258        let path = self.prefix.child(BRANCH_INFIX).child(hex::encode(id));
259        // We encode "deleted branch" as an empty object. This lets us preserve
260        // CAS semantics for delete via conditional PUT (PutMode::Update), since
261        // `object_store` does not currently expose conditional delete.
262        //
263        // TODO: Once `object_store` supports conditional delete, migrate away
264        // from 0-byte tombstones and treat empty objects as corruption.
265        let new_bytes = match new {
266            Some(new) => bytes::Bytes::copy_from_slice(&new.raw),
267            None => bytes::Bytes::new(),
268        };
269
270        let parse_branch = |bytes: &bytes::Bytes| -> Result<
271            Option<Value<Handle<H, SimpleArchive>>>,
272            TryFromSliceError,
273        > {
274            if bytes.is_empty() {
275                return Ok(None);
276            }
277            let value = (&bytes[..]).try_into()?;
278            Ok(Some(Value::new(value)))
279        };
280
281        if let Some(old_hash) = old {
282            let mut result = self.rt.block_on(async { self.store.get(&path).await });
283            loop {
284                match result {
285                    Ok(obj) => {
286                        let version = UpdateVersion {
287                            e_tag: obj.meta.e_tag.clone(),
288                            version: obj.meta.version.clone(),
289                        };
290                        let stored_bytes = self.rt.block_on(obj.bytes())?;
291                        let stored_hash = parse_branch(&stored_bytes)?;
292                        if stored_hash != Some(old_hash) {
293                            return Ok(PushResult::Conflict(stored_hash));
294                        }
295                        match self.rt.block_on(async {
296                            self.store
297                                .put_opts(
298                                    &path,
299                                    new_bytes.clone().into(),
300                                    PutMode::Update(version).into(),
301                                )
302                                .await
303                        }) {
304                            Ok(_) => return Ok(PushResult::Success()),
305                            Err(object_store::Error::Precondition { .. }) => {
306                                result = self.rt.block_on(async { self.store.get(&path).await });
307                                continue;
308                            }
309                            Err(e) => return Err(PushBranchErr::StoreErr(e)),
310                        }
311                    }
312                    Err(object_store::Error::NotFound { .. }) => {
313                        return Ok(PushResult::Conflict(None))
314                    }
315                    Err(e) => return Err(PushBranchErr::StoreErr(e)),
316                }
317            }
318        } else {
319            loop {
320                match self.rt.block_on(async {
321                    self.store
322                        .put_opts(&path, new_bytes.clone().into(), PutMode::Create.into())
323                        .await
324                }) {
325                    Ok(_) => return Ok(PushResult::Success()),
326                    Err(object_store::Error::AlreadyExists { .. }) => {
327                        let mut result = self.rt.block_on(async { self.store.get(&path).await });
328                        loop {
329                            match result {
330                                Ok(obj) => {
331                                    let version = UpdateVersion {
332                                        e_tag: obj.meta.e_tag.clone(),
333                                        version: obj.meta.version.clone(),
334                                    };
335                                    let stored_bytes = self.rt.block_on(obj.bytes())?;
336                                    let stored_hash = parse_branch(&stored_bytes)?;
337                                    if stored_hash.is_some() {
338                                        return Ok(PushResult::Conflict(stored_hash));
339                                    }
340                                    match self.rt.block_on(async {
341                                        self.store
342                                            .put_opts(
343                                                &path,
344                                                new_bytes.clone().into(),
345                                                PutMode::Update(version).into(),
346                                            )
347                                            .await
348                                    }) {
349                                        Ok(_) => return Ok(PushResult::Success()),
350                                        Err(object_store::Error::Precondition { .. }) => {
351                                            result = self
352                                                .rt
353                                                .block_on(async { self.store.get(&path).await });
354                                            continue;
355                                        }
356                                        Err(e) => return Err(PushBranchErr::StoreErr(e)),
357                                    }
358                                }
359                                Err(object_store::Error::NotFound { .. }) => break, // raced with delete; retry create
360                                Err(e) => return Err(PushBranchErr::StoreErr(e)),
361                            }
362                        }
363                        continue;
364                    }
365                    Err(e) => return Err(PushBranchErr::StoreErr(e)),
366                }
367            }
368        }
369    }
370}
371
372impl<H> crate::repo::StorageClose for ObjectStoreRemote<H> {
373    type Error = Infallible;
374
375    fn close(self) -> Result<(), Self::Error> {
376        // No explicit close necessary for the remote object store adapter.
377        Ok(())
378    }
379}
380
381impl<H> ObjectStoreReader<H> {
382    fn blob_path(&self, handle_hex: String) -> Path {
383        self.prefix.child(BLOB_INFIX).child(handle_hex)
384    }
385}
386
387impl<H> BlobStoreList<H> for ObjectStoreReader<H>
388where
389    H: HashProtocol,
390{
391    type Err = ListBlobsErr;
392    type Iter<'a> = BlockingIter<Result<Value<Handle<H, UnknownBlob>>, Self::Err>>;
393
394    fn blobs<'a>(&'a self) -> Self::Iter<'a> {
395        let prefix = self.prefix.child(BLOB_INFIX);
396        let stream = self.store.list(Some(&prefix)).map(|r| match r {
397            Ok(meta) => {
398                let blob_name = meta
399                    .location
400                    .filename()
401                    .ok_or(ListBlobsErr::NotAFile("no filename"))?;
402                let digest = RawValue::from_hex(blob_name).map_err(ListBlobsErr::BadNameHex)?;
403                Ok(Value::new(digest))
404            }
405            Err(e) => Err(ListBlobsErr::List(e)),
406        });
407        BlockingIter::from_stream(self.rt.handle().clone(), stream, 16)
408    }
409}
410
411/// Error returned when retrieving a blob from the object store.
412#[derive(Debug)]
413pub enum GetBlobErr<E: Error> {
414    /// The underlying object store operation failed.
415    Store(object_store::Error),
416    /// The blob bytes could not be converted to the requested type.
417    Conversion(E),
418}
419
420impl<E: Error> fmt::Display for GetBlobErr<E> {
421    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422        match self {
423            Self::Store(e) => write!(f, "object store error: {e}"),
424            Self::Conversion(e) => write!(f, "conversion error: {e}"),
425        }
426    }
427}
428
429impl<E: Error> Error for GetBlobErr<E> {
430    fn source(&self) -> Option<&(dyn Error + 'static)> {
431        match self {
432            Self::Store(e) => Some(e),
433            Self::Conversion(_) => None,
434        }
435    }
436}
437
438impl<E: Error> From<object_store::Error> for GetBlobErr<E> {
439    fn from(e: object_store::Error) -> Self {
440        Self::Store(e)
441    }
442}
443
444impl<H> BlobStoreGet<H> for ObjectStoreReader<H>
445where
446    H: HashProtocol,
447{
448    type GetError<E: Error> = GetBlobErr<E>;
449
450    fn get<T, S>(
451        &self,
452        handle: Value<Handle<H, S>>,
453    ) -> Result<T, Self::GetError<<T as TryFromBlob<S>>::Error>>
454    where
455        S: BlobSchema + 'static,
456        T: TryFromBlob<S>,
457        Handle<H, S>: ValueSchema,
458    {
459        let path = self.blob_path(hex::encode(handle.raw));
460        let object = self.rt.block_on(async { self.store.get(&path).await })?;
461        let bytes = self.rt.block_on(object.bytes())?;
462        let bytes: Bytes = bytes.into();
463        let blob: Blob<S> = Blob::new(bytes);
464        blob.try_from_blob().map_err(GetBlobErr::Conversion)
465    }
466}
467
468/// Error returned when listing blobs from the object store.
469#[derive(Debug)]
470pub enum ListBlobsErr {
471    /// The underlying list operation failed.
472    List(object_store::Error),
473    /// A listed object had no filename component.
474    NotAFile(&'static str),
475    /// A listed object's filename was not valid hexadecimal.
476    BadNameHex(<RawValue as FromHex>::Error),
477}
478
479impl fmt::Display for ListBlobsErr {
480    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
481        match self {
482            Self::List(e) => write!(f, "list failed: {e}"),
483            Self::NotAFile(e) => write!(f, "list failed: {e}"),
484            Self::BadNameHex(e) => write!(f, "list failed: {e}"),
485        }
486    }
487}
488impl Error for ListBlobsErr {}
489
490/// Error returned when listing branches from the object store.
491#[derive(Debug)]
492pub enum ListBranchesErr {
493    /// The underlying list operation failed.
494    List(object_store::Error),
495    /// A listed object had no filename component.
496    NotAFile(&'static str),
497    /// A listed object's filename was not valid hexadecimal.
498    BadNameHex(<RawId as FromHex>::Error),
499    /// The decoded bytes represent the nil identifier.
500    BadId,
501}
502
503impl fmt::Display for ListBranchesErr {
504    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
505        match self {
506            Self::List(e) => write!(f, "list failed: {e}"),
507            Self::NotAFile(e) => write!(f, "list failed: {e}"),
508            Self::BadNameHex(e) => write!(f, "list failed: {e}"),
509            Self::BadId => write!(f, "list failed: bad id"),
510        }
511    }
512}
513impl Error for ListBranchesErr {}
514
515/// Error returned when reading a branch head from the object store.
516#[derive(Debug)]
517pub enum PullBranchErr {
518    /// The stored bytes could not be parsed as a valid handle.
519    ValidationErr(TryFromSliceError),
520    /// The underlying object store operation failed.
521    StoreErr(object_store::Error),
522}
523
524impl fmt::Display for PullBranchErr {
525    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
526        match self {
527            Self::StoreErr(e) => write!(f, "pull failed: {e}"),
528            Self::ValidationErr(e) => write!(f, "pull failed: {e}"),
529        }
530    }
531}
532
533impl Error for PullBranchErr {}
534
535impl From<object_store::Error> for PullBranchErr {
536    fn from(err: object_store::Error) -> Self {
537        Self::StoreErr(err)
538    }
539}
540
541impl From<TryFromSliceError> for PullBranchErr {
542    fn from(err: TryFromSliceError) -> Self {
543        Self::ValidationErr(err)
544    }
545}
546
547/// Error returned when updating a branch head in the object store.
548#[derive(Debug)]
549pub enum PushBranchErr {
550    /// The stored bytes could not be parsed as a valid handle during a compare-and-swap.
551    ValidationErr(TryFromSliceError),
552    /// The underlying object store operation failed.
553    StoreErr(object_store::Error),
554}
555
556impl fmt::Display for PushBranchErr {
557    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
558        match self {
559            Self::ValidationErr(e) => write!(f, "commit failed: {e}"),
560            Self::StoreErr(e) => write!(f, "commit failed: {e}"),
561        }
562    }
563}
564
565impl Error for PushBranchErr {}
566
567impl From<object_store::Error> for PushBranchErr {
568    fn from(err: object_store::Error) -> Self {
569        Self::StoreErr(err)
570    }
571}
572
573impl From<TryFromSliceError> for PushBranchErr {
574    fn from(err: TryFromSliceError) -> Self {
575        Self::ValidationErr(err)
576    }
577}
578
579impl<H> crate::repo::BlobStoreMeta<H> for ObjectStoreReader<H>
580where
581    H: HashProtocol,
582{
583    type MetaError = object_store::Error;
584
585    fn metadata<S>(
586        &self,
587        handle: Value<Handle<H, S>>,
588    ) -> Result<Option<crate::repo::BlobMetadata>, Self::MetaError>
589    where
590        S: BlobSchema + 'static,
591        Handle<H, S>: ValueSchema,
592    {
593        let handle_hex = hex::encode(handle.raw);
594        let path = self.prefix.child(BLOB_INFIX).child(handle_hex);
595        match self.rt.block_on(async { self.store.head(&path).await }) {
596            Ok(meta) => {
597                let ts = meta.last_modified.timestamp_millis() as u64;
598                let len = meta.size;
599                Ok(Some(crate::repo::BlobMetadata {
600                    timestamp: ts,
601                    length: len,
602                }))
603            }
604            Err(object_store::Error::NotFound { .. }) => Ok(None),
605            Err(e) => Err(e),
606        }
607    }
608}
609
610impl<H> crate::repo::BlobStoreForget<H> for ObjectStoreRemote<H>
611where
612    H: HashProtocol,
613{
614    type ForgetError = object_store::Error;
615
616    fn forget<S>(&mut self, handle: Value<Handle<H, S>>) -> Result<(), Self::ForgetError>
617    where
618        S: BlobSchema + 'static,
619        Handle<H, S>: ValueSchema,
620    {
621        let handle_hex = hex::encode(handle.raw);
622        let path = self.prefix.child(BLOB_INFIX).child(handle_hex);
623        match self.rt.block_on(async { self.store.delete(&path).await }) {
624            Ok(_) => Ok(()),
625            Err(object_store::Error::NotFound { .. }) => Ok(()),
626            Err(e) => Err(e),
627        }
628    }
629}