1use std::{
68 collections::{BTreeMap, BTreeSet},
69 io::{self, BufReader, Read},
70 path::{Path, PathBuf},
71 sync::{Arc, RwLock},
72 time::{Duration, SystemTime},
73};
74
75use bao_tree::io::{
76 fsm::Outboard,
77 outboard::PreOrderOutboard,
78 sync::{ReadAt, Size},
79};
80use bytes::Bytes;
81use futures_lite::{Stream, StreamExt};
82
83use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
84use iroh_io::AsyncSliceReader;
85use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
86use serde::{Deserialize, Serialize};
87use smallvec::SmallVec;
88use tokio::{io::AsyncWriteExt, sync::oneshot};
89use tracing::trace_span;
90
91mod import_flat_store;
92mod migrate_redb_v1_v2;
93mod tables;
94#[doc(hidden)]
95pub mod test_support;
96#[cfg(test)]
97mod tests;
98mod util;
99mod validate;
100
101use crate::{
102 store::{
103 bao_file::{BaoFileStorage, CompleteStorage},
104 fs::{
105 tables::BaoFilePart,
106 util::{overwrite_and_sync, read_and_remove, ProgressReader},
107 },
108 },
109 util::{
110 progress::{
111 BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
112 ProgressSender,
113 },
114 raw_outboard_size, LivenessTracker, MemOrFile,
115 },
116 Tag, TempTag, IROH_BLOCK_SIZE,
117};
118use tables::{ReadOnlyTables, ReadableTables, Tables};
119
120use self::{tables::DeleteSet, util::PeekableFlumeReceiver};
121
122use self::test_support::EntryData;
123
124use super::{
125 bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
126 temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
127 ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap,
128};
129
130#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub(crate) enum DataLocation<I = (), E = ()> {
138 Inline(I),
140 Owned(E),
142 External(Vec<PathBuf>, E),
144}
145
146impl<X> DataLocation<X, u64> {
147 fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
148 Ok(match (self, that) {
149 (
150 DataLocation::External(mut paths, a_size),
151 DataLocation::External(b_paths, b_size),
152 ) => {
153 if a_size != b_size {
154 return Err(ActorError::Inconsistent(format!(
155 "complete size mismatch {} {}",
156 a_size, b_size
157 )));
158 }
159 paths.extend(b_paths);
160 paths.sort();
161 paths.dedup();
162 DataLocation::External(paths, a_size)
163 }
164 (_, b @ DataLocation::Owned(_)) => {
165 b
168 }
169 (a @ DataLocation::Owned(_), _) => {
170 a
173 }
174 (_, b @ DataLocation::Inline(_)) => {
175 b
178 }
179 (a @ DataLocation::Inline(_), _) => {
180 a
183 }
184 })
185 }
186}
187
188impl<I, E> DataLocation<I, E> {
189 fn discard_inline_data(self) -> DataLocation<(), E> {
190 match self {
191 DataLocation::Inline(_) => DataLocation::Inline(()),
192 DataLocation::Owned(x) => DataLocation::Owned(x),
193 DataLocation::External(paths, x) => DataLocation::External(paths, x),
194 }
195 }
196}
197
198#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
205pub(crate) enum OutboardLocation<I = ()> {
206 Inline(I),
208 Owned,
210 NotNeeded,
212}
213
214impl<I> OutboardLocation<I> {
215 fn discard_extra_data(self) -> OutboardLocation<()> {
216 match self {
217 Self::Inline(_) => OutboardLocation::Inline(()),
218 Self::Owned => OutboardLocation::Owned,
219 Self::NotNeeded => OutboardLocation::NotNeeded,
220 }
221 }
222}
223
224#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
228pub(crate) enum EntryState<I = ()> {
229 Complete {
232 data_location: DataLocation<I, u64>,
234 outboard_location: OutboardLocation<I>,
236 },
237 Partial {
244 size: Option<u64>,
249 },
250}
251
252impl Default for EntryState {
253 fn default() -> Self {
254 Self::Partial { size: None }
255 }
256}
257
258impl EntryState {
259 fn union(self, that: Self) -> ActorResult<Self> {
260 match (self, that) {
261 (
262 Self::Complete {
263 data_location,
264 outboard_location,
265 },
266 Self::Complete {
267 data_location: b_data_location,
268 ..
269 },
270 ) => Ok(Self::Complete {
271 data_location: data_location.union(b_data_location)?,
273 outboard_location,
274 }),
275 (a @ Self::Complete { .. }, Self::Partial { .. }) =>
276 {
278 Ok(a)
279 }
280 (Self::Partial { .. }, b @ Self::Complete { .. }) =>
281 {
283 Ok(b)
284 }
285 (Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
286 {
288 let size = match (a_size, b_size) {
289 (Some(a_size), Some(b_size)) => {
290 if a_size != b_size {
294 return Err(ActorError::Inconsistent(format!(
295 "validated size mismatch {} {}",
296 a_size, b_size
297 )));
298 }
299 Some(a_size)
300 }
301 (Some(a_size), None) => Some(a_size),
302 (None, Some(b_size)) => Some(b_size),
303 (None, None) => None,
304 };
305 Ok(Self::Partial { size })
306 }
307 }
308 }
309}
310
311impl redb::Value for EntryState {
312 type SelfType<'a> = EntryState;
313
314 type AsBytes<'a> = SmallVec<[u8; 128]>;
315
316 fn fixed_width() -> Option<usize> {
317 None
318 }
319
320 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
321 where
322 Self: 'a,
323 {
324 postcard::from_bytes(data).unwrap()
325 }
326
327 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
328 where
329 Self: 'a,
330 Self: 'b,
331 {
332 postcard::to_extend(value, SmallVec::new()).unwrap()
333 }
334
335 fn type_name() -> redb::TypeName {
336 redb::TypeName::new("EntryState")
337 }
338}
339
340#[derive(Debug, Clone)]
342pub struct InlineOptions {
343 pub max_data_inlined: u64,
345 pub max_outboard_inlined: u64,
347}
348
349impl InlineOptions {
350 pub const NO_INLINE: Self = Self {
352 max_data_inlined: 0,
353 max_outboard_inlined: 0,
354 };
355 pub const ALWAYS_INLINE: Self = Self {
357 max_data_inlined: u64::MAX,
358 max_outboard_inlined: u64::MAX,
359 };
360}
361
362impl Default for InlineOptions {
363 fn default() -> Self {
364 Self {
365 max_data_inlined: 1024 * 16,
366 max_outboard_inlined: 1024 * 16,
367 }
368 }
369}
370
371#[derive(Debug, Clone)]
373pub struct PathOptions {
374 pub data_path: PathBuf,
376 pub temp_path: PathBuf,
380}
381
382impl PathOptions {
383 fn new(root: &Path) -> Self {
384 Self {
385 data_path: root.join("data"),
386 temp_path: root.join("temp"),
387 }
388 }
389
390 fn owned_data_path(&self, hash: &Hash) -> PathBuf {
391 self.data_path.join(format!("{}.data", hash.to_hex()))
392 }
393
394 fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
395 self.data_path.join(format!("{}.obao4", hash.to_hex()))
396 }
397
398 fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
399 self.data_path.join(format!("{}.sizes4", hash.to_hex()))
400 }
401
402 fn temp_file_name(&self) -> PathBuf {
403 self.temp_path.join(temp_name())
404 }
405}
406
407#[derive(Debug, Clone)]
409pub struct BatchOptions {
410 pub max_read_batch: usize,
412 pub max_read_duration: Duration,
414 pub max_write_batch: usize,
416 pub max_write_duration: Duration,
418}
419
420impl Default for BatchOptions {
421 fn default() -> Self {
422 Self {
423 max_read_batch: 10000,
424 max_read_duration: Duration::from_secs(1),
425 max_write_batch: 1000,
426 max_write_duration: Duration::from_millis(500),
427 }
428 }
429}
430
431#[derive(Debug, Clone)]
433pub struct Options {
434 pub path: PathOptions,
436 pub inline: InlineOptions,
438 pub batch: BatchOptions,
440}
441
442#[derive(derive_more::Debug)]
443pub(crate) enum ImportSource {
444 TempFile(PathBuf),
445 External(PathBuf),
446 Memory(#[debug(skip)] Bytes),
447}
448
449impl ImportSource {
450 fn content(&self) -> MemOrFile<&[u8], &Path> {
451 match self {
452 Self::TempFile(path) => MemOrFile::File(path.as_path()),
453 Self::External(path) => MemOrFile::File(path.as_path()),
454 Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
455 }
456 }
457
458 fn len(&self) -> io::Result<u64> {
459 match self {
460 Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
461 Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
462 Self::Memory(data) => Ok(data.len() as u64),
463 }
464 }
465}
466
467pub type Entry = BaoFileHandle;
469
470impl super::MapEntry for Entry {
471 fn hash(&self) -> Hash {
472 self.hash()
473 }
474
475 fn size(&self) -> BaoBlobSize {
476 let size = self.current_size().unwrap();
477 tracing::trace!("redb::Entry::size() = {}", size);
478 BaoBlobSize::new(size, self.is_complete())
479 }
480
481 fn is_complete(&self) -> bool {
482 self.is_complete()
483 }
484
485 async fn outboard(&self) -> io::Result<impl Outboard> {
486 self.outboard()
487 }
488
489 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
490 Ok(self.data_reader())
491 }
492}
493
494impl super::MapEntryMut for Entry {
495 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
496 Ok(self.writer())
497 }
498}
499
500#[derive(derive_more::Debug)]
501pub(crate) struct Import {
502 content_id: HashAndFormat,
504 source: ImportSource,
506 data_size: u64,
508 #[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
510 outboard: Option<Vec<u8>>,
511}
512
513#[derive(derive_more::Debug)]
514pub(crate) struct Export {
515 temp_tag: TempTag,
518 target: PathBuf,
520 mode: ExportMode,
522 #[debug(skip)]
524 progress: ExportProgressCb,
525}
526
527#[derive(derive_more::Debug)]
528pub(crate) enum ActorMessage {
529 Get {
532 hash: Hash,
533 tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
534 },
535 EntryStatus {
537 hash: Hash,
538 tx: flume::Sender<ActorResult<EntryStatus>>,
539 },
540 #[cfg(test)]
541 EntryState {
544 hash: Hash,
545 tx: flume::Sender<ActorResult<test_support::EntryStateResponse>>,
546 },
547 GetFullEntryState {
549 hash: Hash,
550 tx: flume::Sender<ActorResult<Option<EntryData>>>,
551 },
552 SetFullEntryState {
554 hash: Hash,
555 entry: Option<EntryData>,
556 tx: flume::Sender<ActorResult<()>>,
557 },
558 GetOrCreate {
564 hash: Hash,
565 tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
566 },
567 OnMemSizeExceeded { hash: Hash },
571 OnComplete { handle: BaoFileHandle },
574 Import {
578 cmd: Import,
579 tx: flume::Sender<ActorResult<(TempTag, u64)>>,
580 },
581 Export {
587 cmd: Export,
588 tx: oneshot::Sender<ActorResult<()>>,
589 },
590 ImportFlatStore {
592 paths: FlatStorePaths,
593 tx: oneshot::Sender<bool>,
594 },
595 UpdateInlineOptions {
597 inline_options: InlineOptions,
599 reapply: bool,
601 tx: oneshot::Sender<()>,
602 },
603 Blobs {
605 #[debug(skip)]
606 filter: FilterPredicate<Hash, EntryState>,
607 #[allow(clippy::type_complexity)]
608 tx: oneshot::Sender<
609 ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
610 >,
611 },
612 Tags {
614 #[debug(skip)]
615 filter: FilterPredicate<Tag, HashAndFormat>,
616 #[allow(clippy::type_complexity)]
617 tx: oneshot::Sender<
618 ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
619 >,
620 },
621 SetTag {
623 tag: Tag,
624 value: Option<HashAndFormat>,
625 tx: oneshot::Sender<ActorResult<()>>,
626 },
627 CreateTag {
629 hash: HashAndFormat,
630 tx: oneshot::Sender<ActorResult<Tag>>,
631 },
632 Delete {
634 hashes: Vec<Hash>,
635 tx: oneshot::Sender<ActorResult<()>>,
636 },
637 Sync { tx: oneshot::Sender<()> },
641 Dump,
643 Fsck {
648 repair: bool,
649 progress: BoxedProgressSender<ConsistencyCheckProgress>,
650 tx: oneshot::Sender<ActorResult<()>>,
651 },
652 GcStart { tx: oneshot::Sender<()> },
656 Shutdown { tx: Option<oneshot::Sender<()>> },
660}
661
662impl ActorMessage {
663 fn category(&self) -> MessageCategory {
664 match self {
665 Self::Get { .. }
666 | Self::GetOrCreate { .. }
667 | Self::EntryStatus { .. }
668 | Self::Blobs { .. }
669 | Self::Tags { .. }
670 | Self::GcStart { .. }
671 | Self::GetFullEntryState { .. }
672 | Self::Dump => MessageCategory::ReadOnly,
673 Self::Import { .. }
674 | Self::Export { .. }
675 | Self::OnMemSizeExceeded { .. }
676 | Self::OnComplete { .. }
677 | Self::SetTag { .. }
678 | Self::CreateTag { .. }
679 | Self::SetFullEntryState { .. }
680 | Self::Delete { .. } => MessageCategory::ReadWrite,
681 Self::UpdateInlineOptions { .. }
682 | Self::Sync { .. }
683 | Self::Shutdown { .. }
684 | Self::Fsck { .. }
685 | Self::ImportFlatStore { .. } => MessageCategory::TopLevel,
686 #[cfg(test)]
687 Self::EntryState { .. } => MessageCategory::ReadOnly,
688 }
689 }
690}
691
692enum MessageCategory {
693 ReadOnly,
694 ReadWrite,
695 TopLevel,
696}
697
698pub(crate) type FilterPredicate<K, V> =
700 Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
701
702#[derive(Debug)]
704pub struct FlatStorePaths {
705 pub complete: PathBuf,
707 pub partial: PathBuf,
709 pub meta: PathBuf,
711}
712
713#[derive(Debug, Clone)]
716pub struct Store(Arc<StoreInner>);
717
718impl Store {
719 pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
721 let path = root.as_ref();
722 let db_path = path.join("blobs.db");
723 let options = Options {
724 path: PathOptions::new(path),
725 inline: Default::default(),
726 batch: Default::default(),
727 };
728 Self::new(db_path, options).await
729 }
730
731 pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
733 let rt = tokio::runtime::Handle::try_current()
735 .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
736 let inner =
737 tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
738 Ok(Self(Arc::new(inner)))
739 }
740
741 pub async fn update_inline_options(
746 &self,
747 inline_options: InlineOptions,
748 reapply: bool,
749 ) -> io::Result<()> {
750 Ok(self
751 .0
752 .update_inline_options(inline_options, reapply)
753 .await?)
754 }
755
756 pub async fn dump(&self) -> io::Result<()> {
758 Ok(self.0.dump().await?)
759 }
760
761 pub async fn sync(&self) -> io::Result<()> {
765 Ok(self.0.sync().await?)
766 }
767
768 pub async fn import_flat_store(&self, paths: FlatStorePaths) -> io::Result<bool> {
770 Ok(self.0.import_flat_store(paths).await?)
771 }
772}
773
774#[derive(Debug)]
775struct StoreInner {
776 tx: flume::Sender<ActorMessage>,
777 temp: Arc<RwLock<TempCounterMap>>,
778 handle: Option<std::thread::JoinHandle<()>>,
779 path_options: Arc<PathOptions>,
780}
781
782impl LivenessTracker for RwLock<TempCounterMap> {
783 fn on_clone(&self, content: &HashAndFormat) {
784 self.write().unwrap().inc(content);
785 }
786
787 fn on_drop(&self, content: &HashAndFormat) {
788 self.write().unwrap().dec(content);
789 }
790}
791
792impl StoreInner {
793 fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
794 tracing::trace!(
795 "creating data directory: {}",
796 options.path.data_path.display()
797 );
798 std::fs::create_dir_all(&options.path.data_path)?;
799 tracing::trace!(
800 "creating temp directory: {}",
801 options.path.temp_path.display()
802 );
803 std::fs::create_dir_all(&options.path.temp_path)?;
804 tracing::trace!(
805 "creating parent directory for db file{}",
806 path.parent().unwrap().display()
807 );
808 std::fs::create_dir_all(path.parent().unwrap())?;
809 let temp: Arc<RwLock<TempCounterMap>> = Default::default();
810 let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt)?;
811 let handle = std::thread::Builder::new()
812 .name("redb-actor".to_string())
813 .spawn(move || {
814 if let Err(cause) = actor.run_batched() {
815 tracing::error!("redb actor failed: {}", cause);
816 }
817 })
818 .expect("failed to spawn thread");
819 Ok(Self {
820 tx,
821 temp,
822 handle: Some(handle),
823 path_options: Arc::new(options.path),
824 })
825 }
826
827 pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
828 let (tx, rx) = oneshot::channel();
829 self.tx.send_async(ActorMessage::Get { hash, tx }).await?;
830 Ok(rx.await??)
831 }
832
833 async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
834 let (tx, rx) = oneshot::channel();
835 self.tx
836 .send_async(ActorMessage::GetOrCreate { hash, tx })
837 .await?;
838 Ok(rx.await??)
839 }
840
841 async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
842 let (tx, rx) = oneshot::channel();
843 let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
844 let v = v.value();
845 if let EntryState::Complete { .. } = &v {
846 Some((k.value(), v))
847 } else {
848 None
849 }
850 });
851 self.tx
852 .send_async(ActorMessage::Blobs { filter, tx })
853 .await?;
854 let blobs = rx.await?;
855 let res = blobs?
856 .into_iter()
857 .map(|r| {
858 r.map(|(hash, _)| hash)
859 .map_err(|e| ActorError::from(e).into())
860 })
861 .collect::<Vec<_>>();
862 Ok(res)
863 }
864
865 async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
866 let (tx, rx) = oneshot::channel();
867 let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
868 let v = v.value();
869 if let EntryState::Partial { .. } = &v {
870 Some((k.value(), v))
871 } else {
872 None
873 }
874 });
875 self.tx
876 .send_async(ActorMessage::Blobs { filter, tx })
877 .await?;
878 let blobs = rx.await?;
879 let res = blobs?
880 .into_iter()
881 .map(|r| {
882 r.map(|(hash, _)| hash)
883 .map_err(|e| ActorError::from(e).into())
884 })
885 .collect::<Vec<_>>();
886 Ok(res)
887 }
888
889 async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
890 let (tx, rx) = oneshot::channel();
891 let filter: FilterPredicate<Tag, HashAndFormat> =
892 Box::new(|_i, k, v| Some((k.value(), v.value())));
893 self.tx
894 .send_async(ActorMessage::Tags { filter, tx })
895 .await?;
896 let tags = rx.await?;
897 let tags = tags?
899 .into_iter()
900 .map(|r| r.map_err(|e| ActorError::from(e).into()))
901 .collect();
902 Ok(tags)
903 }
904
905 async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
906 let (tx, rx) = oneshot::channel();
907 self.tx
908 .send_async(ActorMessage::SetTag { tag, value, tx })
909 .await?;
910 Ok(rx.await??)
911 }
912
913 async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
914 let (tx, rx) = oneshot::channel();
915 self.tx
916 .send_async(ActorMessage::CreateTag { hash, tx })
917 .await?;
918 Ok(rx.await??)
919 }
920
921 async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
922 let (tx, rx) = oneshot::channel();
923 self.tx
924 .send_async(ActorMessage::Delete { hashes, tx })
925 .await?;
926 Ok(rx.await??)
927 }
928
929 async fn gc_start(&self) -> OuterResult<()> {
930 let (tx, rx) = oneshot::channel();
931 self.tx.send_async(ActorMessage::GcStart { tx }).await?;
932 Ok(rx.await?)
933 }
934
935 async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
936 let (tx, rx) = flume::bounded(1);
937 self.tx
938 .send_async(ActorMessage::EntryStatus { hash: *hash, tx })
939 .await?;
940 Ok(rx.into_recv_async().await??)
941 }
942
943 fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
944 let (tx, rx) = flume::bounded(1);
945 self.tx
946 .send(ActorMessage::EntryStatus { hash: *hash, tx })?;
947 Ok(rx.recv()??)
948 }
949
950 async fn complete(&self, entry: Entry) -> OuterResult<()> {
951 self.tx
952 .send_async(ActorMessage::OnComplete { handle: entry })
953 .await?;
954 Ok(())
955 }
956
957 async fn export(
958 &self,
959 hash: Hash,
960 target: PathBuf,
961 mode: ExportMode,
962 progress: ExportProgressCb,
963 ) -> OuterResult<()> {
964 tracing::debug!(
965 "exporting {} to {} using mode {:?}",
966 hash.to_hex(),
967 target.display(),
968 mode
969 );
970 if !target.is_absolute() {
971 return Err(io::Error::new(
972 io::ErrorKind::InvalidInput,
973 "target path must be absolute",
974 )
975 .into());
976 }
977 let parent = target.parent().ok_or_else(|| {
978 OuterError::from(io::Error::new(
979 io::ErrorKind::InvalidInput,
980 "target path has no parent directory",
981 ))
982 })?;
983 std::fs::create_dir_all(parent)?;
984 let temp_tag = self.temp_tag(HashAndFormat::raw(hash));
985 let (tx, rx) = oneshot::channel();
986 self.tx
987 .send_async(ActorMessage::Export {
988 cmd: Export {
989 temp_tag,
990 target,
991 mode,
992 progress,
993 },
994 tx,
995 })
996 .await?;
997 Ok(rx.await??)
998 }
999
1000 async fn consistency_check(
1001 &self,
1002 repair: bool,
1003 progress: BoxedProgressSender<ConsistencyCheckProgress>,
1004 ) -> OuterResult<()> {
1005 let (tx, rx) = oneshot::channel();
1006 self.tx
1007 .send_async(ActorMessage::Fsck {
1008 repair,
1009 progress,
1010 tx,
1011 })
1012 .await?;
1013 Ok(rx.await??)
1014 }
1015
1016 async fn import_flat_store(&self, paths: FlatStorePaths) -> OuterResult<bool> {
1017 let (tx, rx) = oneshot::channel();
1018 self.tx
1019 .send_async(ActorMessage::ImportFlatStore { paths, tx })
1020 .await?;
1021 Ok(rx.await?)
1022 }
1023
1024 async fn update_inline_options(
1025 &self,
1026 inline_options: InlineOptions,
1027 reapply: bool,
1028 ) -> OuterResult<()> {
1029 let (tx, rx) = oneshot::channel();
1030 self.tx
1031 .send_async(ActorMessage::UpdateInlineOptions {
1032 inline_options,
1033 reapply,
1034 tx,
1035 })
1036 .await?;
1037 Ok(rx.await?)
1038 }
1039
1040 async fn dump(&self) -> OuterResult<()> {
1041 self.tx.send_async(ActorMessage::Dump).await?;
1042 Ok(())
1043 }
1044
1045 async fn sync(&self) -> OuterResult<()> {
1046 let (tx, rx) = oneshot::channel();
1047 self.tx.send_async(ActorMessage::Sync { tx }).await?;
1048 Ok(rx.await?)
1049 }
1050
1051 fn temp_tag(&self, content: HashAndFormat) -> TempTag {
1052 TempTag::new(content, Some(self.temp.clone()))
1053 }
1054
1055 fn import_file_sync(
1056 &self,
1057 path: PathBuf,
1058 mode: ImportMode,
1059 format: BlobFormat,
1060 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1061 ) -> OuterResult<(TempTag, u64)> {
1062 if !path.is_absolute() {
1063 return Err(
1064 io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
1065 );
1066 }
1067 if !path.is_file() && !path.is_symlink() {
1068 return Err(io::Error::new(
1069 io::ErrorKind::InvalidInput,
1070 "path is not a file or symlink",
1071 )
1072 .into());
1073 }
1074 let id = progress.new_id();
1075 progress.blocking_send(ImportProgress::Found {
1076 id,
1077 name: path.to_string_lossy().to_string(),
1078 })?;
1079 let file = match mode {
1080 ImportMode::TryReference => ImportSource::External(path),
1081 ImportMode::Copy => {
1082 if std::fs::metadata(&path)?.len() < 16 * 1024 {
1083 let data = std::fs::read(&path)?;
1087 ImportSource::Memory(data.into())
1088 } else {
1089 let temp_path = self.temp_file_name();
1090 progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
1092 if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
1093 tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
1094 } else {
1095 tracing::debug!("copied {} to {}", path.display(), temp_path.display());
1096 }
1097 ImportSource::TempFile(temp_path)
1099 }
1100 }
1101 };
1102 let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
1103 Ok((tag, size))
1104 }
1105
1106 fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
1107 let id = 0;
1108 let file = ImportSource::Memory(data);
1109 let progress = IgnoreProgressSender::default();
1110 let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
1111 Ok(tag)
1112 }
1113
1114 fn finalize_import_sync(
1115 &self,
1116 file: ImportSource,
1117 format: BlobFormat,
1118 id: u64,
1119 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1120 ) -> OuterResult<(TempTag, u64)> {
1121 let data_size = file.len()?;
1122 tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
1123 progress.blocking_send(ImportProgress::Size {
1124 id,
1125 size: data_size,
1126 })?;
1127 let progress2 = progress.clone();
1128 let (hash, outboard) = match file.content() {
1129 MemOrFile::File(path) => {
1130 let span = trace_span!("outboard.compute", path = %path.display());
1131 let _guard = span.enter();
1132 let file = std::fs::File::open(path)?;
1133 compute_outboard(file, data_size, move |offset| {
1134 Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
1135 })?
1136 }
1137 MemOrFile::Mem(bytes) => {
1138 compute_outboard(bytes, data_size, |_| Ok(()))?
1140 }
1141 };
1142 progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
1143 let tag = self.temp_tag(HashAndFormat { hash, format });
1145 let hash = *tag.hash();
1146 let (tx, rx) = flume::bounded(1);
1148 self.tx.send(ActorMessage::Import {
1149 cmd: Import {
1150 content_id: HashAndFormat { hash, format },
1151 source: file,
1152 outboard,
1153 data_size,
1154 },
1155 tx,
1156 })?;
1157 Ok(rx.recv()??)
1158 }
1159
1160 fn temp_file_name(&self) -> PathBuf {
1161 self.path_options.temp_file_name()
1162 }
1163
1164 async fn shutdown(&self) {
1165 let (tx, rx) = oneshot::channel();
1166 self.tx
1167 .send_async(ActorMessage::Shutdown { tx: Some(tx) })
1168 .await
1169 .ok();
1170 rx.await.ok();
1171 }
1172}
1173
1174impl Drop for StoreInner {
1175 fn drop(&mut self) {
1176 if let Some(handle) = self.handle.take() {
1177 self.tx.send(ActorMessage::Shutdown { tx: None }).ok();
1178 handle.join().ok();
1179 }
1180 }
1181}
1182
1183struct ActorState {
1184 handles: BTreeMap<Hash, BaoFileHandleWeak>,
1185 protected: BTreeSet<Hash>,
1186 temp: Arc<RwLock<TempCounterMap>>,
1187 msgs: flume::Receiver<ActorMessage>,
1188 create_options: Arc<BaoFileConfig>,
1189 options: Options,
1190 rt: tokio::runtime::Handle,
1191}
1192
1193struct Actor {
1198 db: redb::Database,
1199 state: ActorState,
1200}
1201
1202#[derive(Debug, thiserror::Error)]
1207pub(crate) enum ActorError {
1208 #[error("table error: {0}")]
1209 Table(#[from] redb::TableError),
1210 #[error("database error: {0}")]
1211 Database(#[from] redb::DatabaseError),
1212 #[error("transaction error: {0}")]
1213 Transaction(#[from] redb::TransactionError),
1214 #[error("commit error: {0}")]
1215 Commit(#[from] redb::CommitError),
1216 #[error("storage error: {0}")]
1217 Storage(#[from] redb::StorageError),
1218 #[error("io error: {0}")]
1219 Io(#[from] io::Error),
1220 #[error("inconsistent database state: {0}")]
1221 Inconsistent(String),
1222 #[error("error during database migration: {0}")]
1223 Migration(#[source] anyhow::Error),
1224}
1225
1226impl From<ActorError> for io::Error {
1227 fn from(e: ActorError) -> Self {
1228 match e {
1229 ActorError::Io(e) => e,
1230 e => io::Error::new(io::ErrorKind::Other, e),
1231 }
1232 }
1233}
1234
1235pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
1239
1240#[derive(Debug, thiserror::Error)]
1245pub(crate) enum OuterError {
1246 #[error("inner error: {0}")]
1247 Inner(#[from] ActorError),
1248 #[error("send error: {0}")]
1249 Send(#[from] flume::SendError<ActorMessage>),
1250 #[error("progress send error: {0}")]
1251 ProgressSend(#[from] ProgressSendError),
1252 #[error("recv error: {0}")]
1253 Recv(#[from] oneshot::error::RecvError),
1254 #[error("recv error: {0}")]
1255 FlumeRecv(#[from] flume::RecvError),
1256 #[error("join error: {0}")]
1257 JoinTask(#[from] tokio::task::JoinError),
1258}
1259
1260pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
1264
1265impl From<io::Error> for OuterError {
1266 fn from(e: io::Error) -> Self {
1267 OuterError::Inner(ActorError::Io(e))
1268 }
1269}
1270
1271impl From<OuterError> for io::Error {
1272 fn from(e: OuterError) -> Self {
1273 match e {
1274 OuterError::Inner(ActorError::Io(e)) => e,
1275 e => io::Error::new(io::ErrorKind::Other, e),
1276 }
1277 }
1278}
1279
1280impl super::Map for Store {
1281 type Entry = Entry;
1282
1283 async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
1284 Ok(self.0.get(*hash).await?.map(From::from))
1285 }
1286}
1287
1288impl super::MapMut for Store {
1289 type EntryMut = Entry;
1290
1291 async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
1292 Ok(self.0.get_or_create(hash).await?)
1293 }
1294
1295 async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
1296 Ok(self.0.entry_status(hash).await?)
1297 }
1298
1299 async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
1300 self.get(hash).await
1301 }
1302
1303 async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
1304 Ok(self.0.complete(entry).await?)
1305 }
1306
1307 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
1308 Ok(self.0.entry_status_sync(hash)?)
1309 }
1310}
1311
1312impl super::ReadableStore for Store {
1313 async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
1314 Ok(Box::new(self.0.blobs().await?.into_iter()))
1315 }
1316
1317 async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
1318 Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
1319 }
1320
1321 async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1322 Ok(Box::new(self.0.tags().await?.into_iter()))
1323 }
1324
1325 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
1326 Box::new(self.0.temp.read().unwrap().keys())
1327 }
1328
1329 async fn consistency_check(
1330 &self,
1331 repair: bool,
1332 tx: BoxedProgressSender<ConsistencyCheckProgress>,
1333 ) -> io::Result<()> {
1334 self.0.consistency_check(repair, tx.clone()).await?;
1335 Ok(())
1336 }
1337
1338 async fn export(
1339 &self,
1340 hash: Hash,
1341 target: PathBuf,
1342 mode: ExportMode,
1343 progress: ExportProgressCb,
1344 ) -> io::Result<()> {
1345 Ok(self.0.export(hash, target, mode, progress).await?)
1346 }
1347}
1348
1349impl super::Store for Store {
1350 async fn import_file(
1351 &self,
1352 path: PathBuf,
1353 mode: ImportMode,
1354 format: BlobFormat,
1355 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1356 ) -> io::Result<(crate::TempTag, u64)> {
1357 let this = self.0.clone();
1358 Ok(
1359 tokio::task::spawn_blocking(move || {
1360 this.import_file_sync(path, mode, format, progress)
1361 })
1362 .await??,
1363 )
1364 }
1365
1366 async fn import_bytes(
1367 &self,
1368 data: bytes::Bytes,
1369 format: iroh_base::hash::BlobFormat,
1370 ) -> io::Result<crate::TempTag> {
1371 let this = self.0.clone();
1372 Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
1373 }
1374
1375 async fn import_stream(
1376 &self,
1377 mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
1378 format: BlobFormat,
1379 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1380 ) -> io::Result<(TempTag, u64)> {
1381 let this = self.clone();
1382 let id = progress.new_id();
1383 let temp_data_path = this.0.temp_file_name();
1385 let name = temp_data_path
1386 .file_name()
1387 .expect("just created")
1388 .to_string_lossy()
1389 .to_string();
1390 progress.send(ImportProgress::Found { id, name }).await?;
1391 let mut writer = tokio::fs::File::create(&temp_data_path).await?;
1392 let mut offset = 0;
1393 while let Some(chunk) = data.next().await {
1394 let chunk = chunk?;
1395 writer.write_all(&chunk).await?;
1396 offset += chunk.len() as u64;
1397 progress.try_send(ImportProgress::CopyProgress { id, offset })?;
1398 }
1399 writer.flush().await?;
1400 drop(writer);
1401 let file = ImportSource::TempFile(temp_data_path);
1402 Ok(tokio::task::spawn_blocking(move || {
1403 this.0.finalize_import_sync(file, format, id, progress)
1404 })
1405 .await??)
1406 }
1407
1408 async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
1409 Ok(self.0.set_tag(name, hash).await?)
1410 }
1411
1412 async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
1413 Ok(self.0.create_tag(hash).await?)
1414 }
1415
1416 async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
1417 Ok(self.0.delete(hashes).await?)
1418 }
1419
1420 async fn gc_start(&self) -> io::Result<()> {
1421 self.0.gc_start().await?;
1422 Ok(())
1423 }
1424
1425 fn temp_tag(&self, value: HashAndFormat) -> TempTag {
1426 self.0.temp_tag(value)
1427 }
1428
1429 async fn shutdown(&self) {
1430 self.0.shutdown().await;
1431 }
1432}
1433
1434impl Actor {
1435 fn new(
1436 path: &Path,
1437 options: Options,
1438 temp: Arc<RwLock<TempCounterMap>>,
1439 rt: tokio::runtime::Handle,
1440 ) -> ActorResult<(Self, flume::Sender<ActorMessage>)> {
1441 let db = match redb::Database::create(path) {
1442 Ok(db) => db,
1443 Err(DatabaseError::UpgradeRequired(1)) => {
1444 migrate_redb_v1_v2::run(path).map_err(ActorError::Migration)?
1445 }
1446 Err(err) => return Err(err.into()),
1447 };
1448
1449 let txn = db.begin_write()?;
1450 let mut t = Default::default();
1452 let tables = Tables::new(&txn, &mut t)?;
1453 drop(tables);
1454 txn.commit()?;
1455 let (tx, rx) = flume::bounded(1024);
1458 let tx2 = tx.clone();
1459 let on_file_create: CreateCb = Arc::new(move |hash| {
1460 tx2.send(ActorMessage::OnMemSizeExceeded { hash: *hash })
1462 .ok();
1463 Ok(())
1464 });
1465 let create_options = BaoFileConfig::new(
1466 Arc::new(options.path.data_path.clone()),
1467 16 * 1024,
1468 Some(on_file_create),
1469 );
1470 Ok((
1471 Self {
1472 db,
1473 state: ActorState {
1474 temp,
1475 handles: BTreeMap::new(),
1476 protected: BTreeSet::new(),
1477 msgs: rx,
1478 options,
1479 create_options: Arc::new(create_options),
1480 rt,
1481 },
1482 },
1483 tx,
1484 ))
1485 }
1486
1487 fn run_batched(mut self) -> ActorResult<()> {
1488 let mut msgs = PeekableFlumeReceiver::new(self.state.msgs.clone());
1489 while let Some(msg) = msgs.recv() {
1490 if let ActorMessage::Shutdown { tx } = msg {
1491 if let Some(tx) = tx {
1492 tx.send(()).ok();
1493 }
1494 break;
1495 }
1496 match msg.category() {
1497 MessageCategory::TopLevel => {
1498 self.state.handle_toplevel(&self.db, msg)?;
1499 }
1500 MessageCategory::ReadOnly => {
1501 msgs.push_back(msg).expect("just recv'd");
1502 tracing::debug!("starting read transaction");
1503 let txn = self.db.begin_read()?;
1504 let tables = ReadOnlyTables::new(&txn)?;
1505 let count = self.state.options.batch.max_read_batch;
1506 let timeout = self.state.options.batch.max_read_duration;
1507 for msg in msgs.batch_iter(count, timeout) {
1508 if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
1509 msgs.push_back(msg).expect("just recv'd");
1510 break;
1511 }
1512 }
1513 tracing::debug!("done with read transaction");
1514 }
1515 MessageCategory::ReadWrite => {
1516 msgs.push_back(msg).expect("just recv'd");
1517 tracing::debug!("starting write transaction");
1518 let txn = self.db.begin_write()?;
1519 let mut delete_after_commit = Default::default();
1520 let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
1521 let count = self.state.options.batch.max_write_batch;
1522 let timeout = self.state.options.batch.max_write_duration;
1523 for msg in msgs.batch_iter(count, timeout) {
1524 if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
1525 msgs.push_back(msg).expect("just recv'd");
1526 break;
1527 }
1528 }
1529 drop(tables);
1530 txn.commit()?;
1531 delete_after_commit.apply_and_clear(&self.state.options.path);
1532 tracing::debug!("write transaction committed");
1533 }
1534 }
1535 }
1536 tracing::debug!("redb actor done");
1537 Ok(())
1538 }
1539}
1540
1541impl ActorState {
1542 fn entry_status(
1543 &mut self,
1544 tables: &impl ReadableTables,
1545 hash: Hash,
1546 ) -> ActorResult<EntryStatus> {
1547 let status = match tables.blobs().get(hash)? {
1548 Some(guard) => match guard.value() {
1549 EntryState::Complete { .. } => EntryStatus::Complete,
1550 EntryState::Partial { .. } => EntryStatus::Partial,
1551 },
1552 None => EntryStatus::NotFound,
1553 };
1554 Ok(status)
1555 }
1556
1557 fn get(
1558 &mut self,
1559 tables: &impl ReadableTables,
1560 hash: Hash,
1561 ) -> ActorResult<Option<BaoFileHandle>> {
1562 if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
1563 return Ok(Some(handle));
1564 }
1565 let Some(entry) = tables.blobs().get(hash)? else {
1566 return Ok(None);
1567 };
1568 let entry = entry.value();
1571 let config = self.create_options.clone();
1572 let handle = match entry {
1573 EntryState::Complete {
1574 data_location,
1575 outboard_location,
1576 } => {
1577 let data = load_data(tables, &self.options.path, data_location, &hash)?;
1578 let outboard = load_outboard(
1579 tables,
1580 &self.options.path,
1581 outboard_location,
1582 data.size(),
1583 &hash,
1584 )?;
1585 BaoFileHandle::new_complete(config, hash, data, outboard)
1586 }
1587 EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
1588 };
1589 self.handles.insert(hash, handle.downgrade());
1590 Ok(Some(handle))
1591 }
1592
1593 fn export(
1594 &mut self,
1595 tables: &mut Tables,
1596 cmd: Export,
1597 tx: oneshot::Sender<ActorResult<()>>,
1598 ) -> ActorResult<()> {
1599 let Export {
1600 temp_tag,
1601 target,
1602 mode,
1603 progress,
1604 } = cmd;
1605 let guard = tables
1606 .blobs
1607 .get(temp_tag.hash())?
1608 .ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
1609 let entry = guard.value();
1610 match entry {
1611 EntryState::Complete {
1612 data_location,
1613 outboard_location,
1614 } => match data_location {
1615 DataLocation::Inline(()) => {
1616 let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
1618 ActorError::Inconsistent("inline data not found".to_owned())
1619 })?;
1620 tracing::trace!("exporting inline data to {}", target.display());
1621 tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
1622 .ok();
1623 }
1624 DataLocation::Owned(size) => {
1625 let path = self.options.path.owned_data_path(temp_tag.hash());
1626 match mode {
1627 ExportMode::Copy => {
1628 self.rt.spawn_blocking(move || {
1630 tx.send(export_file_copy(temp_tag, path, size, target, progress))
1631 .ok();
1632 });
1633 }
1634 ExportMode::TryReference => match std::fs::rename(&path, &target) {
1635 Ok(()) => {
1636 let entry = EntryState::Complete {
1637 data_location: DataLocation::External(vec![target], size),
1638 outboard_location,
1639 };
1640 drop(guard);
1641 tables.blobs.insert(temp_tag.hash(), entry)?;
1642 drop(temp_tag);
1643 tx.send(Ok(())).ok();
1644 }
1645 Err(e) => {
1646 const ERR_CROSS: i32 = 18;
1647 if e.raw_os_error() == Some(ERR_CROSS) {
1648 match std::fs::copy(&path, &target) {
1650 Ok(_) => {
1651 let entry = EntryState::Complete {
1652 data_location: DataLocation::External(
1653 vec![target],
1654 size,
1655 ),
1656 outboard_location,
1657 };
1658
1659 drop(guard);
1660 tables.blobs.insert(temp_tag.hash(), entry)?;
1661 tables
1662 .delete_after_commit
1663 .insert(*temp_tag.hash(), [BaoFilePart::Data]);
1664 drop(temp_tag);
1665
1666 tx.send(Ok(())).ok();
1667 }
1668 Err(e) => {
1669 drop(temp_tag);
1670 tx.send(Err(e.into())).ok();
1671 }
1672 }
1673 } else {
1674 drop(temp_tag);
1675 tx.send(Err(e.into())).ok();
1676 }
1677 }
1678 },
1679 }
1680 }
1681 DataLocation::External(paths, size) => {
1682 let path = paths
1683 .first()
1684 .ok_or_else(|| {
1685 ActorError::Inconsistent("external path missing".to_owned())
1686 })?
1687 .to_owned();
1688 if path == target {
1690 tx.send(Ok(())).ok();
1692 } else {
1693 self.rt.spawn_blocking(move || {
1695 tx.send(export_file_copy(temp_tag, path, size, target, progress))
1696 .ok();
1697 });
1698 }
1699 }
1700 },
1701 EntryState::Partial { .. } => {
1702 return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
1703 }
1704 }
1705 Ok(())
1706 }
1707
1708 fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
1709 let Import {
1710 content_id,
1711 source: file,
1712 outboard,
1713 data_size,
1714 } = cmd;
1715 let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
1716 let inline_data = data_size <= self.options.inline.max_data_inlined;
1717 let inline_outboard =
1718 outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
1719 let tag = TempTag::new(content_id, Some(self.temp.clone()));
1721 let hash = *tag.hash();
1722 self.protected.insert(hash);
1723 let data_location = match file {
1725 ImportSource::External(external_path) => {
1726 tracing::debug!("stored external reference {}", external_path.display());
1727 if inline_data {
1728 tracing::debug!(
1729 "reading external data to inline it: {}",
1730 external_path.display()
1731 );
1732 let data = Bytes::from(std::fs::read(&external_path)?);
1733 DataLocation::Inline(data)
1734 } else {
1735 DataLocation::External(vec![external_path], data_size)
1736 }
1737 }
1738 ImportSource::TempFile(temp_data_path) => {
1739 if inline_data {
1740 tracing::debug!(
1741 "reading and deleting temp file to inline it: {}",
1742 temp_data_path.display()
1743 );
1744 let data = Bytes::from(read_and_remove(&temp_data_path)?);
1745 DataLocation::Inline(data)
1746 } else {
1747 let data_path = self.options.path.owned_data_path(&hash);
1748 std::fs::rename(&temp_data_path, &data_path)?;
1749 tracing::debug!("created file {}", data_path.display());
1750 DataLocation::Owned(data_size)
1751 }
1752 }
1753 ImportSource::Memory(data) => {
1754 if inline_data {
1755 DataLocation::Inline(data)
1756 } else {
1757 let data_path = self.options.path.owned_data_path(&hash);
1758 overwrite_and_sync(&data_path, &data)?;
1759 tracing::debug!("created file {}", data_path.display());
1760 DataLocation::Owned(data_size)
1761 }
1762 }
1763 };
1764 let outboard_location = if let Some(outboard) = outboard {
1765 if inline_outboard {
1766 OutboardLocation::Inline(Bytes::from(outboard))
1767 } else {
1768 let outboard_path = self.options.path.owned_outboard_path(&hash);
1769 overwrite_and_sync(&outboard_path, &outboard)?;
1771 OutboardLocation::Owned
1772 }
1773 } else {
1774 OutboardLocation::NotNeeded
1775 };
1776 if let DataLocation::Inline(data) = &data_location {
1777 tables.inline_data.insert(hash, data.as_ref())?;
1778 }
1779 if let OutboardLocation::Inline(outboard) = &outboard_location {
1780 tables.inline_outboard.insert(hash, outboard.as_ref())?;
1781 }
1782 if let DataLocation::Owned(_) = &data_location {
1783 tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
1784 }
1785 if let OutboardLocation::Owned = &outboard_location {
1786 tables
1787 .delete_after_commit
1788 .remove(hash, [BaoFilePart::Outboard]);
1789 }
1790 let entry = tables.blobs.get(hash)?;
1791 let entry = entry.map(|x| x.value()).unwrap_or_default();
1792 let data_location = data_location.discard_inline_data();
1793 let outboard_location = outboard_location.discard_extra_data();
1794 let entry = entry.union(EntryState::Complete {
1795 data_location,
1796 outboard_location,
1797 })?;
1798 tables.blobs.insert(hash, entry)?;
1799 Ok((tag, data_size))
1800 }
1801
1802 fn get_or_create(
1803 &mut self,
1804 tables: &impl ReadableTables,
1805 hash: Hash,
1806 ) -> ActorResult<BaoFileHandle> {
1807 self.protected.insert(hash);
1808 if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
1809 return Ok(handle);
1810 }
1811 let entry = tables.blobs().get(hash)?;
1812 let handle = if let Some(entry) = entry {
1813 let entry = entry.value();
1814 match entry {
1815 EntryState::Complete {
1816 data_location,
1817 outboard_location,
1818 ..
1819 } => {
1820 let data = load_data(tables, &self.options.path, data_location, &hash)?;
1821 let outboard = load_outboard(
1822 tables,
1823 &self.options.path,
1824 outboard_location,
1825 data.size(),
1826 &hash,
1827 )?;
1828 println!("creating complete entry for {}", hash.to_hex());
1829 BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
1830 }
1831 EntryState::Partial { .. } => {
1832 println!("creating partial entry for {}", hash.to_hex());
1833 BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
1834 }
1835 }
1836 } else {
1837 BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
1838 };
1839 self.handles.insert(hash, handle.downgrade());
1840 Ok(handle)
1841 }
1842
1843 fn blobs(
1845 &mut self,
1846 tables: &impl ReadableTables,
1847 filter: FilterPredicate<Hash, EntryState>,
1848 ) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
1849 let mut res = Vec::new();
1850 let mut index = 0u64;
1851 #[allow(clippy::explicit_counter_loop)]
1852 for item in tables.blobs().iter()? {
1853 match item {
1854 Ok((k, v)) => {
1855 if let Some(item) = filter(index, k, v) {
1856 res.push(Ok(item));
1857 }
1858 }
1859 Err(e) => {
1860 res.push(Err(e));
1861 }
1862 }
1863 index += 1;
1864 }
1865 Ok(res)
1866 }
1867
1868 fn tags(
1870 &mut self,
1871 tables: &impl ReadableTables,
1872 filter: FilterPredicate<Tag, HashAndFormat>,
1873 ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
1874 let mut res = Vec::new();
1875 let mut index = 0u64;
1876 #[allow(clippy::explicit_counter_loop)]
1877 for item in tables.tags().iter()? {
1878 match item {
1879 Ok((k, v)) => {
1880 if let Some(item) = filter(index, k, v) {
1881 res.push(Ok(item));
1882 }
1883 }
1884 Err(e) => {
1885 res.push(Err(e));
1886 }
1887 }
1888 index += 1;
1889 }
1890 Ok(res)
1891 }
1892
1893 fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
1894 let tag = {
1895 let tag = Tag::auto(SystemTime::now(), |x| {
1896 matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
1897 });
1898 tables.tags.insert(tag.clone(), content)?;
1899 tag
1900 };
1901 Ok(tag)
1902 }
1903
1904 fn set_tag(
1905 &self,
1906 tables: &mut Tables,
1907 tag: Tag,
1908 value: Option<HashAndFormat>,
1909 ) -> ActorResult<()> {
1910 match value {
1911 Some(value) => {
1912 tables.tags.insert(tag, value)?;
1913 }
1914 None => {
1915 tables.tags.remove(tag)?;
1916 }
1917 }
1918 Ok(())
1919 }
1920
1921 fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
1922 let entry = tables
1923 .blobs
1924 .get(hash)?
1925 .map(|x| x.value())
1926 .unwrap_or_default();
1927 let entry = entry.union(EntryState::Partial { size: None })?;
1928 tables.blobs.insert(hash, entry)?;
1929 tables.delete_after_commit.remove(
1931 hash,
1932 [BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
1933 );
1934 Ok(())
1935 }
1936
1937 fn update_inline_options(
1938 &mut self,
1939 db: &redb::Database,
1940 options: InlineOptions,
1941 reapply: bool,
1942 ) -> ActorResult<()> {
1943 self.options.inline = options;
1944 if reapply {
1945 let mut delete_after_commit = Default::default();
1946 let tx = db.begin_write()?;
1947 {
1948 let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
1949 let hashes = tables
1950 .blobs
1951 .iter()?
1952 .map(|x| x.map(|(k, _)| k.value()))
1953 .collect::<Result<Vec<_>, _>>()?;
1954 for hash in hashes {
1955 let guard = tables
1956 .blobs
1957 .get(hash)?
1958 .ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
1959 let entry = guard.value();
1960 if let EntryState::Complete {
1961 data_location,
1962 outboard_location,
1963 } = entry
1964 {
1965 let (data_location, data_size, data_location_changed) = match data_location
1966 {
1967 DataLocation::Owned(size) => {
1968 if size <= self.options.inline.max_data_inlined {
1970 let path = self.options.path.owned_data_path(&hash);
1971 let data = std::fs::read(&path)?;
1972 tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
1973 tables.inline_data.insert(hash, data.as_slice())?;
1974 (DataLocation::Inline(()), size, true)
1975 } else {
1976 (DataLocation::Owned(size), size, false)
1977 }
1978 }
1979 DataLocation::Inline(()) => {
1980 let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
1981 ActorError::Inconsistent("inline data missing".to_owned())
1982 })?;
1983 let data = guard.value();
1984 let size = data.len() as u64;
1985 if size > self.options.inline.max_data_inlined {
1986 let path = self.options.path.owned_data_path(&hash);
1987 std::fs::write(&path, data)?;
1988 drop(guard);
1989 tables.inline_data.remove(hash)?;
1990 (DataLocation::Owned(size), size, true)
1991 } else {
1992 (DataLocation::Inline(()), size, false)
1993 }
1994 }
1995 DataLocation::External(paths, size) => {
1996 (DataLocation::External(paths, size), size, false)
1997 }
1998 };
1999 let outboard_size = raw_outboard_size(data_size);
2000 let (outboard_location, outboard_location_changed) = match outboard_location
2001 {
2002 OutboardLocation::Owned
2003 if outboard_size <= self.options.inline.max_outboard_inlined =>
2004 {
2005 let path = self.options.path.owned_outboard_path(&hash);
2006 let outboard = std::fs::read(&path)?;
2007 tables
2008 .delete_after_commit
2009 .insert(hash, [BaoFilePart::Outboard]);
2010 tables.inline_outboard.insert(hash, outboard.as_slice())?;
2011 (OutboardLocation::Inline(()), true)
2012 }
2013 OutboardLocation::Inline(())
2014 if outboard_size > self.options.inline.max_outboard_inlined =>
2015 {
2016 let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
2017 ActorError::Inconsistent("inline outboard missing".to_owned())
2018 })?;
2019 let outboard = guard.value();
2020 let path = self.options.path.owned_outboard_path(&hash);
2021 std::fs::write(&path, outboard)?;
2022 drop(guard);
2023 tables.inline_outboard.remove(hash)?;
2024 (OutboardLocation::Owned, true)
2025 }
2026 x => (x, false),
2027 };
2028 drop(guard);
2029 if data_location_changed || outboard_location_changed {
2030 tables.blobs.insert(
2031 hash,
2032 EntryState::Complete {
2033 data_location,
2034 outboard_location,
2035 },
2036 )?;
2037 }
2038 }
2039 }
2040 }
2041 tx.commit()?;
2042 delete_after_commit.apply_and_clear(&self.options.path);
2043 }
2044 Ok(())
2045 }
2046
2047 fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>) -> ActorResult<()> {
2048 for hash in hashes {
2049 if self.temp.as_ref().read().unwrap().contains(&hash) {
2050 continue;
2051 }
2052 if self.protected.contains(&hash) {
2053 tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
2054 continue;
2055 }
2056 tracing::debug!("deleting {}", &hash.to_hex()[..8]);
2057 self.handles.remove(&hash);
2058 if let Some(entry) = tables.blobs.remove(hash)? {
2059 match entry.value() {
2060 EntryState::Complete {
2061 data_location,
2062 outboard_location,
2063 } => {
2064 match data_location {
2065 DataLocation::Inline(_) => {
2066 tables.inline_data.remove(hash)?;
2067 }
2068 DataLocation::Owned(_) => {
2069 tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2071 }
2072 DataLocation::External(_, _) => {}
2073 }
2074 match outboard_location {
2075 OutboardLocation::Inline(_) => {
2076 tables.inline_outboard.remove(hash)?;
2077 }
2078 OutboardLocation::Owned => {
2079 tables
2081 .delete_after_commit
2082 .insert(hash, [BaoFilePart::Outboard]);
2083 }
2084 OutboardLocation::NotNeeded => {}
2085 }
2086 }
2087 EntryState::Partial { .. } => {
2088 tables.delete_after_commit.insert(
2090 hash,
2091 [BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
2092 );
2093 }
2094 }
2095 }
2096 }
2097 Ok(())
2098 }
2099
2100 fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
2101 let hash = entry.hash();
2102 let mut info = None;
2103 tracing::trace!("on_complete({})", hash.to_hex());
2104 entry.transform(|state| {
2105 tracing::trace!("on_complete transform {:?}", state);
2106 let entry = match complete_storage(
2107 state,
2108 &hash,
2109 &self.options.path,
2110 &self.options.inline,
2111 tables.delete_after_commit,
2112 )? {
2113 Ok(entry) => {
2114 info = Some((
2116 entry.data_size(),
2117 entry.data.mem().cloned(),
2118 entry.outboard_size(),
2119 entry.outboard.mem().cloned(),
2120 ));
2121 entry
2122 }
2123 Err(entry) => {
2124 entry
2126 }
2127 };
2128 Ok(BaoFileStorage::Complete(entry))
2129 })?;
2130 if let Some((data_size, data, outboard_size, outboard)) = info {
2131 let data_location = if data.is_some() {
2132 DataLocation::Inline(())
2133 } else {
2134 DataLocation::Owned(data_size)
2135 };
2136 let outboard_location = if outboard_size == 0 {
2137 OutboardLocation::NotNeeded
2138 } else if outboard.is_some() {
2139 OutboardLocation::Inline(())
2140 } else {
2141 OutboardLocation::Owned
2142 };
2143 {
2144 tracing::debug!(
2145 "inserting complete entry for {}, {} bytes",
2146 hash.to_hex(),
2147 data_size,
2148 );
2149 let entry = tables
2150 .blobs()
2151 .get(hash)?
2152 .map(|x| x.value())
2153 .unwrap_or_default();
2154 let entry = entry.union(EntryState::Complete {
2155 data_location,
2156 outboard_location,
2157 })?;
2158 tables.blobs.insert(hash, entry)?;
2159 if let Some(data) = data {
2160 tables.inline_data.insert(hash, data.as_ref())?;
2161 }
2162 if let Some(outboard) = outboard {
2163 tables.inline_outboard.insert(hash, outboard.as_ref())?;
2164 }
2165 }
2166 }
2167 Ok(())
2168 }
2169
2170 fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
2171 match msg {
2172 ActorMessage::ImportFlatStore { paths, tx } => {
2173 let res = self.import_flat_store(db, paths);
2174 tx.send(res?).ok();
2175 }
2176 ActorMessage::UpdateInlineOptions {
2177 inline_options,
2178 reapply,
2179 tx,
2180 } => {
2181 let res = self.update_inline_options(db, inline_options, reapply);
2182 tx.send(res?).ok();
2183 }
2184 ActorMessage::Fsck {
2185 repair,
2186 progress,
2187 tx,
2188 } => {
2189 let res = self.consistency_check(db, repair, progress);
2190 tx.send(res).ok();
2191 }
2192 ActorMessage::Sync { tx } => {
2193 tx.send(()).ok();
2194 }
2195 x => {
2196 return Err(ActorError::Inconsistent(format!(
2197 "unexpected message for handle_toplevel: {:?}",
2198 x
2199 )))
2200 }
2201 }
2202 Ok(())
2203 }
2204
2205 fn handle_readonly(
2206 &mut self,
2207 tables: &impl ReadableTables,
2208 msg: ActorMessage,
2209 ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2210 match msg {
2211 ActorMessage::Get { hash, tx } => {
2212 let res = self.get(tables, hash);
2213 tx.send(res).ok();
2214 }
2215 ActorMessage::GetOrCreate { hash, tx } => {
2216 let res = self.get_or_create(tables, hash);
2217 tx.send(res).ok();
2218 }
2219 ActorMessage::EntryStatus { hash, tx } => {
2220 let res = self.entry_status(tables, hash);
2221 tx.send(res).ok();
2222 }
2223 ActorMessage::Blobs { filter, tx } => {
2224 let res = self.blobs(tables, filter);
2225 tx.send(res).ok();
2226 }
2227 ActorMessage::Tags { filter, tx } => {
2228 let res = self.tags(tables, filter);
2229 tx.send(res).ok();
2230 }
2231 ActorMessage::GcStart { tx } => {
2232 self.protected.clear();
2233 self.handles.retain(|_, weak| weak.is_live());
2234 tx.send(()).ok();
2235 }
2236 ActorMessage::Dump => {
2237 dump(tables).ok();
2238 }
2239 #[cfg(test)]
2240 ActorMessage::EntryState { hash, tx } => {
2241 tx.send(self.entry_state(tables, hash)).ok();
2242 }
2243 ActorMessage::GetFullEntryState { hash, tx } => {
2244 let res = self.get_full_entry_state(tables, hash);
2245 tx.send(res).ok();
2246 }
2247 x => return Ok(Err(x)),
2248 }
2249 Ok(Ok(()))
2250 }
2251
2252 fn handle_readwrite(
2253 &mut self,
2254 tables: &mut Tables,
2255 msg: ActorMessage,
2256 ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2257 match msg {
2258 ActorMessage::Import { cmd, tx } => {
2259 let res = self.import(tables, cmd);
2260 tx.send(res).ok();
2261 }
2262 ActorMessage::SetTag { tag, value, tx } => {
2263 let res = self.set_tag(tables, tag, value);
2264 tx.send(res).ok();
2265 }
2266 ActorMessage::CreateTag { hash, tx } => {
2267 let res = self.create_tag(tables, hash);
2268 tx.send(res).ok();
2269 }
2270 ActorMessage::Delete { hashes, tx } => {
2271 let res = self.delete(tables, hashes);
2272 tx.send(res).ok();
2273 }
2274 ActorMessage::OnComplete { handle } => {
2275 let res = self.on_complete(tables, handle);
2276 res.ok();
2277 }
2278 ActorMessage::Export { cmd, tx } => {
2279 self.export(tables, cmd, tx)?;
2280 }
2281 ActorMessage::OnMemSizeExceeded { hash } => {
2282 let res = self.on_mem_size_exceeded(tables, hash);
2283 res.ok();
2284 }
2285 ActorMessage::Dump => {
2286 let res = dump(tables);
2287 res.ok();
2288 }
2289 ActorMessage::SetFullEntryState { hash, entry, tx } => {
2290 let res = self.set_full_entry_state(tables, hash, entry);
2291 tx.send(res).ok();
2292 }
2293 msg => {
2294 if let Err(msg) = self.handle_readonly(tables, msg)? {
2296 return Ok(Err(msg));
2297 }
2298 }
2299 }
2300 Ok(Ok(()))
2301 }
2302}
2303
2304fn export_file_copy(
2306 temp_tag: TempTag,
2307 path: PathBuf,
2308 size: u64,
2309 target: PathBuf,
2310 progress: ExportProgressCb,
2311) -> ActorResult<()> {
2312 progress(0)?;
2313 reflink_copy::reflink_or_copy(path, target)?;
2315 progress(size)?;
2316 drop(temp_tag);
2317 Ok(())
2318}
2319
2320fn compute_outboard(
2332 read: impl Read,
2333 size: u64,
2334 progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
2335) -> io::Result<(Hash, Option<Vec<u8>>)> {
2336 use bao_tree::io::sync::CreateOutboard;
2337
2338 let reader = ProgressReader::new(read, progress);
2340 let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
2343 let reader = BufReader::with_capacity(buf_size, reader);
2344
2345 let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
2346 let root = ob.root.into();
2347 let data = ob.data;
2348 tracing::trace!(%root, "done");
2349 let data = if !data.is_empty() { Some(data) } else { None };
2350 Ok((root, data))
2351}
2352
2353fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
2354 for e in tables.blobs().iter()? {
2355 let (k, v) = e?;
2356 let k = k.value();
2357 let v = v.value();
2358 println!("blobs: {} -> {:?}", k.to_hex(), v);
2359 }
2360 for e in tables.tags().iter()? {
2361 let (k, v) = e?;
2362 let k = k.value();
2363 let v = v.value();
2364 println!("tags: {} -> {:?}", k, v);
2365 }
2366 for e in tables.inline_data().iter()? {
2367 let (k, v) = e?;
2368 let k = k.value();
2369 let v = v.value();
2370 println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
2371 }
2372 for e in tables.inline_outboard().iter()? {
2373 let (k, v) = e?;
2374 let k = k.value();
2375 let v = v.value();
2376 println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
2377 }
2378 Ok(())
2379}
2380
2381fn load_data(
2382 tables: &impl ReadableTables,
2383 options: &PathOptions,
2384 location: DataLocation<(), u64>,
2385 hash: &Hash,
2386) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2387 Ok(match location {
2388 DataLocation::Inline(()) => {
2389 let Some(data) = tables.inline_data().get(hash)? else {
2390 return Err(ActorError::Inconsistent(format!(
2391 "inconsistent database state: {} should have inline data but does not",
2392 hash.to_hex()
2393 )));
2394 };
2395 MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
2396 }
2397 DataLocation::Owned(data_size) => {
2398 let path = options.owned_data_path(hash);
2399 let Ok(file) = std::fs::File::open(&path) else {
2400 return Err(io::Error::new(
2401 io::ErrorKind::NotFound,
2402 format!("file not found: {}", path.display()),
2403 )
2404 .into());
2405 };
2406 MemOrFile::File((file, data_size))
2407 }
2408 DataLocation::External(paths, data_size) => {
2409 if paths.is_empty() {
2410 return Err(ActorError::Inconsistent(
2411 "external data location must not be empty".into(),
2412 ));
2413 }
2414 let path = &paths[0];
2415 let Ok(file) = std::fs::File::open(path) else {
2416 return Err(io::Error::new(
2417 io::ErrorKind::NotFound,
2418 format!("external file not found: {}", path.display()),
2419 )
2420 .into());
2421 };
2422 MemOrFile::File((file, data_size))
2423 }
2424 })
2425}
2426
2427fn load_outboard(
2428 tables: &impl ReadableTables,
2429 options: &PathOptions,
2430 location: OutboardLocation,
2431 size: u64,
2432 hash: &Hash,
2433) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2434 Ok(match location {
2435 OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
2436 OutboardLocation::Inline(_) => {
2437 let Some(outboard) = tables.inline_outboard().get(hash)? else {
2438 return Err(ActorError::Inconsistent(format!(
2439 "inconsistent database state: {} should have inline outboard but does not",
2440 hash.to_hex()
2441 )));
2442 };
2443 MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
2444 }
2445 OutboardLocation::Owned => {
2446 let outboard_size = raw_outboard_size(size);
2447 let path = options.owned_outboard_path(hash);
2448 let Ok(file) = std::fs::File::open(&path) else {
2449 return Err(io::Error::new(
2450 io::ErrorKind::NotFound,
2451 format!("file not found: {} size={}", path.display(), outboard_size),
2452 )
2453 .into());
2454 };
2455 MemOrFile::File((file, outboard_size))
2456 }
2457 })
2458}
2459
2460fn complete_storage(
2462 storage: BaoFileStorage,
2463 hash: &Hash,
2464 path_options: &PathOptions,
2465 inline_options: &InlineOptions,
2466 delete_after_commit: &mut DeleteSet,
2467) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
2468 let (data, outboard, _sizes) = match storage {
2469 BaoFileStorage::Complete(c) => return Ok(Err(c)),
2470 BaoFileStorage::IncompleteMem(storage) => {
2471 let (data, outboard, sizes) = storage.into_parts();
2472 (
2473 MemOrFile::Mem(Bytes::from(data.into_parts().0)),
2474 MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
2475 MemOrFile::Mem(Bytes::from(sizes.to_vec())),
2476 )
2477 }
2478 BaoFileStorage::IncompleteFile(storage) => {
2479 let (data, outboard, sizes) = storage.into_parts();
2480 (
2481 MemOrFile::File(data),
2482 MemOrFile::File(outboard),
2483 MemOrFile::File(sizes),
2484 )
2485 }
2486 };
2487 let data_size = data.size()?.unwrap();
2488 let outboard_size = outboard.size()?.unwrap();
2489 debug_assert!(raw_outboard_size(data_size) == outboard_size);
2491 let data = if data_size <= inline_options.max_data_inlined {
2493 match data {
2494 MemOrFile::File(data) => {
2495 let mut buf = vec![0; data_size as usize];
2496 data.read_at(0, &mut buf)?;
2497 delete_after_commit.insert(*hash, [BaoFilePart::Data]);
2499 MemOrFile::Mem(Bytes::from(buf))
2500 }
2501 MemOrFile::Mem(data) => MemOrFile::Mem(data),
2502 }
2503 } else {
2504 delete_after_commit.remove(*hash, [BaoFilePart::Data]);
2506 match data {
2507 MemOrFile::Mem(data) => {
2508 let path = path_options.owned_data_path(hash);
2509 let file = overwrite_and_sync(&path, &data)?;
2510 MemOrFile::File((file, data_size))
2511 }
2512 MemOrFile::File(data) => MemOrFile::File((data, data_size)),
2513 }
2514 };
2515 let outboard = if outboard_size == 0 {
2517 Default::default()
2518 } else if outboard_size <= inline_options.max_outboard_inlined {
2519 match outboard {
2520 MemOrFile::File(outboard) => {
2521 let mut buf = vec![0; outboard_size as usize];
2522 outboard.read_at(0, &mut buf)?;
2523 drop(outboard);
2524 delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
2526 MemOrFile::Mem(Bytes::from(buf))
2527 }
2528 MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
2529 }
2530 } else {
2531 delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
2533 match outboard {
2534 MemOrFile::Mem(outboard) => {
2535 let path = path_options.owned_outboard_path(hash);
2536 let file = overwrite_and_sync(&path, &outboard)?;
2537 MemOrFile::File((file, outboard_size))
2538 }
2539 MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
2540 }
2541 };
2542 delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
2545 Ok(Ok(CompleteStorage { data, outboard }))
2546}