1use std::array::TryFromSliceError;
2use std::convert::Infallible;
3use std::convert::TryInto;
4use std::error::Error;
5use std::fmt;
6use std::marker::PhantomData;
7use std::sync::Arc;
8
9use anybytes::Bytes;
10use crossbeam_channel::{bounded, Receiver};
11use futures::Stream;
12use futures::StreamExt;
13use tokio::runtime::Runtime;
14
15use object_store::parse_url;
16use object_store::path::Path;
17use object_store::ObjectStore;
18use object_store::PutMode;
19use object_store::UpdateVersion;
20use object_store::{self};
21use url::Url;
22
23use hex::FromHex;
24
25use crate::blob::schemas::UnknownBlob;
26use crate::blob::Blob;
27use crate::blob::BlobSchema;
28use crate::blob::ToBlob;
29use crate::blob::TryFromBlob;
30use crate::id::Id;
31use crate::id::RawId;
32use crate::prelude::blobschemas::SimpleArchive;
33use crate::value::schemas::hash::Handle;
34use crate::value::schemas::hash::HashProtocol;
35use crate::value::RawValue;
36use crate::value::Value;
37use crate::value::ValueSchema;
38
39use super::BlobStore;
40use super::BlobStoreGet;
41use super::BlobStoreList;
42use super::BlobStorePut;
43use super::BranchStore;
44use super::PushResult;
45
46const BRANCH_INFIX: &str = "branches";
47const BLOB_INFIX: &str = "blobs";
48
49pub struct ObjectStoreRemote<H> {
54 store: Arc<dyn ObjectStore>,
55 prefix: Path,
56 rt: Arc<Runtime>,
57 _hasher: PhantomData<H>,
58}
59
60impl<H> fmt::Debug for ObjectStoreRemote<H> {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 f.debug_struct("ObjectStoreRemote")
63 .field("prefix", &self.prefix)
64 .finish()
65 }
66}
67
68impl<H> fmt::Debug for ObjectStoreReader<H> {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 f.debug_struct("ObjectStoreReader")
71 .field("prefix", &self.prefix)
72 .finish()
73 }
74}
75
76#[derive(Clone)]
78pub struct ObjectStoreReader<H> {
79 store: Arc<dyn ObjectStore>,
80 prefix: Path,
81 rt: Arc<Runtime>,
82 _hasher: PhantomData<H>,
83}
84
85pub struct BlockingIter<T> {
87 rx: Receiver<T>,
88}
89
90impl<T> BlockingIter<T> {
91 fn from_stream<S>(handle: tokio::runtime::Handle, stream: S, capacity: usize) -> Self
92 where
93 S: Stream<Item = T> + Send + 'static,
94 T: Send + 'static,
95 {
96 let (tx, rx) = bounded(capacity);
97 let handle_for_spawn = handle.clone();
98 let handle_for_task = handle.clone();
99 handle_for_spawn.spawn(async move {
100 let mut s = Box::pin(stream);
101 let rt = handle_for_task;
102 while let Some(item) = s.next().await {
103 let tx_clone = tx.clone();
104 let bh = rt.clone();
105 match bh.spawn_blocking(move || tx_clone.send(item)).await {
107 Ok(Ok(())) => {}
108 _ => break,
109 }
110 }
111 });
113 BlockingIter { rx }
114 }
115}
116
117impl<T> Iterator for BlockingIter<T> {
118 type Item = T;
119 fn next(&mut self) -> Option<Self::Item> {
120 self.rx.recv().ok()
121 }
122}
123
124impl<H> PartialEq for ObjectStoreReader<H> {
125 fn eq(&self, other: &Self) -> bool {
126 Arc::ptr_eq(&self.store, &other.store) && self.prefix == other.prefix
127 }
128}
129
130impl<H> Eq for ObjectStoreReader<H> {}
131
132impl<H> ObjectStoreRemote<H> {
133 pub fn with_url(url: &Url) -> Result<ObjectStoreRemote<H>, object_store::Error> {
135 let (store, path) = parse_url(url)?;
136 Ok(ObjectStoreRemote {
137 store: Arc::from(store),
138 prefix: path,
139 rt: Arc::new(
140 tokio::runtime::Builder::new_multi_thread()
141 .enable_all()
142 .worker_threads(2)
143 .build()
144 .expect("build runtime"),
145 ),
146 _hasher: PhantomData,
147 })
148 }
149}
150
151impl<H> BlobStorePut<H> for ObjectStoreRemote<H>
152where
153 H: HashProtocol,
154{
155 type PutError = object_store::Error;
156
157 fn put<S, T>(&mut self, item: T) -> Result<Value<Handle<H, S>>, Self::PutError>
158 where
159 S: BlobSchema + 'static,
160 T: ToBlob<S>,
161 Handle<H, S>: ValueSchema,
162 {
163 let blob = item.to_blob();
164 let handle = blob.get_handle();
165 let path = self.prefix.child(BLOB_INFIX).child(hex::encode(handle.raw));
166 let bytes: bytes::Bytes = blob.bytes.into();
167 let result = self.rt.block_on(async {
168 self.store
169 .put_opts(&path, bytes.into(), PutMode::Create.into())
170 .await
171 });
172 match result {
173 Ok(_) | Err(object_store::Error::AlreadyExists { .. }) => Ok(handle),
174 Err(e) => Err(e),
175 }
176 }
177}
178
179impl<H> BlobStore<H> for ObjectStoreRemote<H>
180where
181 H: HashProtocol,
182{
183 type Reader = ObjectStoreReader<H>;
184 type ReaderError = Infallible;
185
186 fn reader(&mut self) -> Result<Self::Reader, Self::ReaderError> {
187 Ok(ObjectStoreReader {
188 store: self.store.clone(),
189 prefix: self.prefix.clone(),
190 rt: self.rt.clone(),
191 _hasher: PhantomData,
192 })
193 }
194}
195
196impl<H> BranchStore<H> for ObjectStoreRemote<H>
197where
198 H: HashProtocol,
199{
200 type BranchesError = ListBranchesErr;
201 type HeadError = PullBranchErr;
202 type UpdateError = PushBranchErr;
203
204 type ListIter<'a> = BlockingIter<Result<Id, Self::BranchesError>>;
205
206 fn branches<'a>(&'a mut self) -> Result<Self::ListIter<'a>, Self::BranchesError> {
207 let prefix = self.prefix.child(BRANCH_INFIX);
208 let stream = self.store.list(Some(&prefix)).filter_map(|r| async move {
209 match r {
210 Ok(meta) if meta.size == 0 => None, Ok(meta) => {
212 let name = match meta.location.filename() {
213 Some(name) => name,
214 None => return Some(Err(ListBranchesErr::NotAFile("no filename"))),
215 };
216 let digest = match RawId::from_hex(name) {
217 Ok(digest) => digest,
218 Err(e) => return Some(Err(ListBranchesErr::BadNameHex(e))),
219 };
220 let Some(id) = Id::new(digest) else {
221 return Some(Err(ListBranchesErr::BadId));
222 };
223 Some(Ok(id))
224 }
225 Err(e) => Some(Err(ListBranchesErr::List(e))),
226 }
227 });
228 Ok(BlockingIter::from_stream(
229 self.rt.handle().clone(),
230 stream,
231 16,
232 ))
233 }
234
235 fn head(&mut self, id: Id) -> Result<Option<Value<Handle<H, SimpleArchive>>>, Self::HeadError> {
236 let path = self.prefix.child(BRANCH_INFIX).child(hex::encode(id));
237 let result = self.rt.block_on(async { self.store.get(&path).await });
238 match result {
239 Ok(object) => {
240 let bytes = self.rt.block_on(object.bytes())?;
241 if bytes.is_empty() {
242 return Ok(None);
243 }
244 let value = (&bytes[..]).try_into()?;
245 Ok(Some(Value::new(value)))
246 }
247 Err(object_store::Error::NotFound { .. }) => Ok(None),
248 Err(e) => Err(PullBranchErr::StoreErr(e)),
249 }
250 }
251
252 fn update(
253 &mut self,
254 id: Id,
255 old: Option<Value<Handle<H, SimpleArchive>>>,
256 new: Option<Value<Handle<H, SimpleArchive>>>,
257 ) -> Result<PushResult<H>, Self::UpdateError> {
258 let path = self.prefix.child(BRANCH_INFIX).child(hex::encode(id));
259 let new_bytes = match new {
266 Some(new) => bytes::Bytes::copy_from_slice(&new.raw),
267 None => bytes::Bytes::new(),
268 };
269
270 let parse_branch = |bytes: &bytes::Bytes| -> Result<
271 Option<Value<Handle<H, SimpleArchive>>>,
272 TryFromSliceError,
273 > {
274 if bytes.is_empty() {
275 return Ok(None);
276 }
277 let value = (&bytes[..]).try_into()?;
278 Ok(Some(Value::new(value)))
279 };
280
281 if let Some(old_hash) = old {
282 let mut result = self.rt.block_on(async { self.store.get(&path).await });
283 loop {
284 match result {
285 Ok(obj) => {
286 let version = UpdateVersion {
287 e_tag: obj.meta.e_tag.clone(),
288 version: obj.meta.version.clone(),
289 };
290 let stored_bytes = self.rt.block_on(obj.bytes())?;
291 let stored_hash = parse_branch(&stored_bytes)?;
292 if stored_hash != Some(old_hash) {
293 return Ok(PushResult::Conflict(stored_hash));
294 }
295 match self.rt.block_on(async {
296 self.store
297 .put_opts(
298 &path,
299 new_bytes.clone().into(),
300 PutMode::Update(version).into(),
301 )
302 .await
303 }) {
304 Ok(_) => return Ok(PushResult::Success()),
305 Err(object_store::Error::Precondition { .. }) => {
306 result = self.rt.block_on(async { self.store.get(&path).await });
307 continue;
308 }
309 Err(e) => return Err(PushBranchErr::StoreErr(e)),
310 }
311 }
312 Err(object_store::Error::NotFound { .. }) => {
313 return Ok(PushResult::Conflict(None))
314 }
315 Err(e) => return Err(PushBranchErr::StoreErr(e)),
316 }
317 }
318 } else {
319 loop {
320 match self.rt.block_on(async {
321 self.store
322 .put_opts(&path, new_bytes.clone().into(), PutMode::Create.into())
323 .await
324 }) {
325 Ok(_) => return Ok(PushResult::Success()),
326 Err(object_store::Error::AlreadyExists { .. }) => {
327 let mut result = self.rt.block_on(async { self.store.get(&path).await });
328 loop {
329 match result {
330 Ok(obj) => {
331 let version = UpdateVersion {
332 e_tag: obj.meta.e_tag.clone(),
333 version: obj.meta.version.clone(),
334 };
335 let stored_bytes = self.rt.block_on(obj.bytes())?;
336 let stored_hash = parse_branch(&stored_bytes)?;
337 if stored_hash.is_some() {
338 return Ok(PushResult::Conflict(stored_hash));
339 }
340 match self.rt.block_on(async {
341 self.store
342 .put_opts(
343 &path,
344 new_bytes.clone().into(),
345 PutMode::Update(version).into(),
346 )
347 .await
348 }) {
349 Ok(_) => return Ok(PushResult::Success()),
350 Err(object_store::Error::Precondition { .. }) => {
351 result = self
352 .rt
353 .block_on(async { self.store.get(&path).await });
354 continue;
355 }
356 Err(e) => return Err(PushBranchErr::StoreErr(e)),
357 }
358 }
359 Err(object_store::Error::NotFound { .. }) => break, Err(e) => return Err(PushBranchErr::StoreErr(e)),
361 }
362 }
363 continue;
364 }
365 Err(e) => return Err(PushBranchErr::StoreErr(e)),
366 }
367 }
368 }
369 }
370}
371
372impl<H> crate::repo::StorageClose for ObjectStoreRemote<H> {
373 type Error = Infallible;
374
375 fn close(self) -> Result<(), Self::Error> {
376 Ok(())
378 }
379}
380
381impl<H> ObjectStoreReader<H> {
382 fn blob_path(&self, handle_hex: String) -> Path {
383 self.prefix.child(BLOB_INFIX).child(handle_hex)
384 }
385}
386
387impl<H> BlobStoreList<H> for ObjectStoreReader<H>
388where
389 H: HashProtocol,
390{
391 type Err = ListBlobsErr;
392 type Iter<'a> = BlockingIter<Result<Value<Handle<H, UnknownBlob>>, Self::Err>>;
393
394 fn blobs<'a>(&'a self) -> Self::Iter<'a> {
395 let prefix = self.prefix.child(BLOB_INFIX);
396 let stream = self.store.list(Some(&prefix)).map(|r| match r {
397 Ok(meta) => {
398 let blob_name = meta
399 .location
400 .filename()
401 .ok_or(ListBlobsErr::NotAFile("no filename"))?;
402 let digest = RawValue::from_hex(blob_name).map_err(ListBlobsErr::BadNameHex)?;
403 Ok(Value::new(digest))
404 }
405 Err(e) => Err(ListBlobsErr::List(e)),
406 });
407 BlockingIter::from_stream(self.rt.handle().clone(), stream, 16)
408 }
409}
410
411#[derive(Debug)]
413pub enum GetBlobErr<E: Error> {
414 Store(object_store::Error),
416 Conversion(E),
418}
419
420impl<E: Error> fmt::Display for GetBlobErr<E> {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 match self {
423 Self::Store(e) => write!(f, "object store error: {e}"),
424 Self::Conversion(e) => write!(f, "conversion error: {e}"),
425 }
426 }
427}
428
429impl<E: Error> Error for GetBlobErr<E> {
430 fn source(&self) -> Option<&(dyn Error + 'static)> {
431 match self {
432 Self::Store(e) => Some(e),
433 Self::Conversion(_) => None,
434 }
435 }
436}
437
438impl<E: Error> From<object_store::Error> for GetBlobErr<E> {
439 fn from(e: object_store::Error) -> Self {
440 Self::Store(e)
441 }
442}
443
444impl<H> BlobStoreGet<H> for ObjectStoreReader<H>
445where
446 H: HashProtocol,
447{
448 type GetError<E: Error + Send + Sync + 'static> = GetBlobErr<E>;
449
450 fn get<T, S>(
451 &self,
452 handle: Value<Handle<H, S>>,
453 ) -> Result<T, Self::GetError<<T as TryFromBlob<S>>::Error>>
454 where
455 S: BlobSchema + 'static,
456 T: TryFromBlob<S>,
457 Handle<H, S>: ValueSchema,
458 {
459 let path = self.blob_path(hex::encode(handle.raw));
460 let object = self.rt.block_on(async { self.store.get(&path).await })?;
461 let bytes = self.rt.block_on(object.bytes())?;
462 let bytes: Bytes = bytes.into();
463 let blob: Blob<S> = Blob::new(bytes);
464 blob.try_from_blob().map_err(GetBlobErr::Conversion)
465 }
466}
467
468#[derive(Debug)]
470pub enum ListBlobsErr {
471 List(object_store::Error),
473 NotAFile(&'static str),
475 BadNameHex(<RawValue as FromHex>::Error),
477}
478
479impl fmt::Display for ListBlobsErr {
480 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
481 match self {
482 Self::List(e) => write!(f, "list failed: {e}"),
483 Self::NotAFile(e) => write!(f, "list failed: {e}"),
484 Self::BadNameHex(e) => write!(f, "list failed: {e}"),
485 }
486 }
487}
488impl Error for ListBlobsErr {}
489
490#[derive(Debug)]
492pub enum ListBranchesErr {
493 List(object_store::Error),
495 NotAFile(&'static str),
497 BadNameHex(<RawId as FromHex>::Error),
499 BadId,
501}
502
503impl fmt::Display for ListBranchesErr {
504 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
505 match self {
506 Self::List(e) => write!(f, "list failed: {e}"),
507 Self::NotAFile(e) => write!(f, "list failed: {e}"),
508 Self::BadNameHex(e) => write!(f, "list failed: {e}"),
509 Self::BadId => write!(f, "list failed: bad id"),
510 }
511 }
512}
513impl Error for ListBranchesErr {}
514
515#[derive(Debug)]
517pub enum PullBranchErr {
518 ValidationErr(TryFromSliceError),
520 StoreErr(object_store::Error),
522}
523
524impl fmt::Display for PullBranchErr {
525 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
526 match self {
527 Self::StoreErr(e) => write!(f, "pull failed: {e}"),
528 Self::ValidationErr(e) => write!(f, "pull failed: {e}"),
529 }
530 }
531}
532
533impl Error for PullBranchErr {}
534
535impl From<object_store::Error> for PullBranchErr {
536 fn from(err: object_store::Error) -> Self {
537 Self::StoreErr(err)
538 }
539}
540
541impl From<TryFromSliceError> for PullBranchErr {
542 fn from(err: TryFromSliceError) -> Self {
543 Self::ValidationErr(err)
544 }
545}
546
547#[derive(Debug)]
549pub enum PushBranchErr {
550 ValidationErr(TryFromSliceError),
552 StoreErr(object_store::Error),
554}
555
556impl fmt::Display for PushBranchErr {
557 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
558 match self {
559 Self::ValidationErr(e) => write!(f, "commit failed: {e}"),
560 Self::StoreErr(e) => write!(f, "commit failed: {e}"),
561 }
562 }
563}
564
565impl Error for PushBranchErr {}
566
567impl From<object_store::Error> for PushBranchErr {
568 fn from(err: object_store::Error) -> Self {
569 Self::StoreErr(err)
570 }
571}
572
573impl From<TryFromSliceError> for PushBranchErr {
574 fn from(err: TryFromSliceError) -> Self {
575 Self::ValidationErr(err)
576 }
577}
578
579impl<H> crate::repo::BlobStoreMeta<H> for ObjectStoreReader<H>
580where
581 H: HashProtocol,
582{
583 type MetaError = object_store::Error;
584
585 fn metadata<S>(
586 &self,
587 handle: Value<Handle<H, S>>,
588 ) -> Result<Option<crate::repo::BlobMetadata>, Self::MetaError>
589 where
590 S: BlobSchema + 'static,
591 Handle<H, S>: ValueSchema,
592 {
593 let handle_hex = hex::encode(handle.raw);
594 let path = self.prefix.child(BLOB_INFIX).child(handle_hex);
595 match self.rt.block_on(async { self.store.head(&path).await }) {
596 Ok(meta) => {
597 let ts = meta.last_modified.timestamp_millis() as u64;
598 let len = meta.size;
599 Ok(Some(crate::repo::BlobMetadata {
600 timestamp: ts,
601 length: len,
602 }))
603 }
604 Err(object_store::Error::NotFound { .. }) => Ok(None),
605 Err(e) => Err(e),
606 }
607 }
608}
609
610impl<H> crate::repo::BlobStoreForget<H> for ObjectStoreRemote<H>
611where
612 H: HashProtocol,
613{
614 type ForgetError = object_store::Error;
615
616 fn forget<S>(&mut self, handle: Value<Handle<H, S>>) -> Result<(), Self::ForgetError>
617 where
618 S: BlobSchema + 'static,
619 Handle<H, S>: ValueSchema,
620 {
621 let handle_hex = hex::encode(handle.raw);
622 let path = self.prefix.child(BLOB_INFIX).child(handle_hex);
623 match self.rt.block_on(async { self.store.delete(&path).await }) {
624 Ok(_) => Ok(()),
625 Err(object_store::Error::NotFound { .. }) => Ok(()),
626 Err(e) => Err(e),
627 }
628 }
629}