1use std::{
67 collections::{BTreeMap, BTreeSet},
68 future::Future,
69 io,
70 path::{Path, PathBuf},
71 sync::{Arc, RwLock},
72 time::{Duration, SystemTime},
73};
74
75use bao_tree::io::{
76 fsm::Outboard,
77 sync::{ReadAt, Size},
78};
79use bytes::Bytes;
80use futures_lite::{Stream, StreamExt};
81use genawaiter::rc::{Co, Gen};
82use iroh_io::AsyncSliceReader;
83use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
84use serde::{Deserialize, Serialize};
85use smallvec::SmallVec;
86use tokio::io::AsyncWriteExt;
87use tracing::trace_span;
88
89mod tables;
90#[doc(hidden)]
91pub mod test_support;
92#[cfg(test)]
93mod tests;
94mod util;
95mod validate;
96
97use tables::{ReadOnlyTables, ReadableTables, Tables};
98
99use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver};
100use super::{
101 bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
102 temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
103 ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
104};
105use crate::{
106 store::{
107 bao_file::{BaoFileStorage, CompleteStorage},
108 fs::{
109 tables::BaoFilePart,
110 util::{overwrite_and_sync, read_and_remove},
111 },
112 GcMarkEvent, GcSweepEvent,
113 },
114 util::{
115 compute_outboard,
116 progress::{
117 BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
118 ProgressSender,
119 },
120 raw_outboard_size, MemOrFile, TagCounter, TagDrop,
121 },
122 BlobFormat, Hash, HashAndFormat, Tag, TempTag,
123};
124
125#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
132pub(crate) enum DataLocation<I = (), E = ()> {
133 Inline(I),
135 Owned(E),
137 External(Vec<PathBuf>, E),
139}
140
141impl<X> DataLocation<X, u64> {
142 fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
143 Ok(match (self, that) {
144 (
145 DataLocation::External(mut paths, a_size),
146 DataLocation::External(b_paths, b_size),
147 ) => {
148 if a_size != b_size {
149 return Err(ActorError::Inconsistent(format!(
150 "complete size mismatch {} {}",
151 a_size, b_size
152 )));
153 }
154 paths.extend(b_paths);
155 paths.sort();
156 paths.dedup();
157 DataLocation::External(paths, a_size)
158 }
159 (_, b @ DataLocation::Owned(_)) => {
160 b
163 }
164 (a @ DataLocation::Owned(_), _) => {
165 a
168 }
169 (_, b @ DataLocation::Inline(_)) => {
170 b
173 }
174 (a @ DataLocation::Inline(_), _) => {
175 a
178 }
179 })
180 }
181}
182
183impl<I, E> DataLocation<I, E> {
184 fn discard_inline_data(self) -> DataLocation<(), E> {
185 match self {
186 DataLocation::Inline(_) => DataLocation::Inline(()),
187 DataLocation::Owned(x) => DataLocation::Owned(x),
188 DataLocation::External(paths, x) => DataLocation::External(paths, x),
189 }
190 }
191}
192
193#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
200pub(crate) enum OutboardLocation<I = ()> {
201 Inline(I),
203 Owned,
205 NotNeeded,
207}
208
209impl<I> OutboardLocation<I> {
210 fn discard_extra_data(self) -> OutboardLocation<()> {
211 match self {
212 Self::Inline(_) => OutboardLocation::Inline(()),
213 Self::Owned => OutboardLocation::Owned,
214 Self::NotNeeded => OutboardLocation::NotNeeded,
215 }
216 }
217}
218
219#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub(crate) enum EntryState<I = ()> {
224 Complete {
227 data_location: DataLocation<I, u64>,
229 outboard_location: OutboardLocation<I>,
231 },
232 Partial {
239 size: Option<u64>,
244 },
245}
246
247impl Default for EntryState {
248 fn default() -> Self {
249 Self::Partial { size: None }
250 }
251}
252
253impl EntryState {
254 fn union(self, that: Self) -> ActorResult<Self> {
255 match (self, that) {
256 (
257 Self::Complete {
258 data_location,
259 outboard_location,
260 },
261 Self::Complete {
262 data_location: b_data_location,
263 ..
264 },
265 ) => Ok(Self::Complete {
266 data_location: data_location.union(b_data_location)?,
268 outboard_location,
269 }),
270 (a @ Self::Complete { .. }, Self::Partial { .. }) =>
271 {
273 Ok(a)
274 }
275 (Self::Partial { .. }, b @ Self::Complete { .. }) =>
276 {
278 Ok(b)
279 }
280 (Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
281 {
283 let size = match (a_size, b_size) {
284 (Some(a_size), Some(b_size)) => {
285 if a_size != b_size {
289 return Err(ActorError::Inconsistent(format!(
290 "validated size mismatch {} {}",
291 a_size, b_size
292 )));
293 }
294 Some(a_size)
295 }
296 (Some(a_size), None) => Some(a_size),
297 (None, Some(b_size)) => Some(b_size),
298 (None, None) => None,
299 };
300 Ok(Self::Partial { size })
301 }
302 }
303 }
304}
305
306impl redb::Value for EntryState {
307 type SelfType<'a> = EntryState;
308
309 type AsBytes<'a> = SmallVec<[u8; 128]>;
310
311 fn fixed_width() -> Option<usize> {
312 None
313 }
314
315 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
316 where
317 Self: 'a,
318 {
319 postcard::from_bytes(data).unwrap()
320 }
321
322 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
323 where
324 Self: 'a,
325 Self: 'b,
326 {
327 postcard::to_extend(value, SmallVec::new()).unwrap()
328 }
329
330 fn type_name() -> redb::TypeName {
331 redb::TypeName::new("EntryState")
332 }
333}
334
335#[derive(Debug, Clone)]
337pub struct InlineOptions {
338 pub max_data_inlined: u64,
340 pub max_outboard_inlined: u64,
342}
343
344impl InlineOptions {
345 pub const NO_INLINE: Self = Self {
347 max_data_inlined: 0,
348 max_outboard_inlined: 0,
349 };
350 pub const ALWAYS_INLINE: Self = Self {
352 max_data_inlined: u64::MAX,
353 max_outboard_inlined: u64::MAX,
354 };
355}
356
357impl Default for InlineOptions {
358 fn default() -> Self {
359 Self {
360 max_data_inlined: 1024 * 16,
361 max_outboard_inlined: 1024 * 16,
362 }
363 }
364}
365
366#[derive(Debug, Clone)]
368pub struct PathOptions {
369 pub data_path: PathBuf,
371 pub temp_path: PathBuf,
375}
376
377impl PathOptions {
378 fn new(root: &Path) -> Self {
379 Self {
380 data_path: root.join("data"),
381 temp_path: root.join("temp"),
382 }
383 }
384
385 fn owned_data_path(&self, hash: &Hash) -> PathBuf {
386 self.data_path.join(format!("{}.data", hash.to_hex()))
387 }
388
389 fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
390 self.data_path.join(format!("{}.obao4", hash.to_hex()))
391 }
392
393 fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
394 self.data_path.join(format!("{}.sizes4", hash.to_hex()))
395 }
396
397 fn temp_file_name(&self) -> PathBuf {
398 self.temp_path.join(temp_name())
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct BatchOptions {
405 pub max_read_batch: usize,
407 pub max_read_duration: Duration,
409 pub max_write_batch: usize,
411 pub max_write_duration: Duration,
413}
414
415impl Default for BatchOptions {
416 fn default() -> Self {
417 Self {
418 max_read_batch: 10000,
419 max_read_duration: Duration::from_secs(1),
420 max_write_batch: 1000,
421 max_write_duration: Duration::from_millis(500),
422 }
423 }
424}
425
426#[derive(Debug, Clone)]
428pub struct Options {
429 pub path: PathOptions,
431 pub inline: InlineOptions,
433 pub batch: BatchOptions,
435}
436
437#[derive(derive_more::Debug)]
438pub(crate) enum ImportSource {
439 TempFile(PathBuf),
440 External(PathBuf),
441 Memory(#[debug(skip)] Bytes),
442}
443
444impl ImportSource {
445 fn content(&self) -> MemOrFile<&[u8], &Path> {
446 match self {
447 Self::TempFile(path) => MemOrFile::File(path.as_path()),
448 Self::External(path) => MemOrFile::File(path.as_path()),
449 Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
450 }
451 }
452
453 fn len(&self) -> io::Result<u64> {
454 match self {
455 Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
456 Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
457 Self::Memory(data) => Ok(data.len() as u64),
458 }
459 }
460}
461
462pub type Entry = BaoFileHandle;
464
465impl super::MapEntry for Entry {
466 fn hash(&self) -> Hash {
467 self.hash()
468 }
469
470 fn size(&self) -> BaoBlobSize {
471 let size = self.current_size().unwrap();
472 tracing::trace!("redb::Entry::size() = {}", size);
473 BaoBlobSize::new(size, self.is_complete())
474 }
475
476 fn is_complete(&self) -> bool {
477 self.is_complete()
478 }
479
480 async fn outboard(&self) -> io::Result<impl Outboard> {
481 self.outboard()
482 }
483
484 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
485 Ok(self.data_reader())
486 }
487}
488
489impl super::MapEntryMut for Entry {
490 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
491 Ok(self.writer())
492 }
493}
494
495#[derive(derive_more::Debug)]
496pub(crate) struct Import {
497 content_id: HashAndFormat,
499 source: ImportSource,
501 data_size: u64,
503 #[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
505 outboard: Option<Vec<u8>>,
506}
507
508#[derive(derive_more::Debug)]
509pub(crate) struct Export {
510 temp_tag: TempTag,
513 target: PathBuf,
515 mode: ExportMode,
517 #[debug(skip)]
519 progress: ExportProgressCb,
520}
521
522#[derive(derive_more::Debug)]
523pub(crate) enum ActorMessage {
524 Get {
527 hash: Hash,
528 tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
529 },
530 EntryStatus {
532 hash: Hash,
533 tx: oneshot::Sender<ActorResult<EntryStatus>>,
534 },
535 #[cfg(test)]
536 EntryState {
539 hash: Hash,
540 tx: oneshot::Sender<ActorResult<test_support::EntryStateResponse>>,
541 },
542 GetFullEntryState {
544 hash: Hash,
545 tx: oneshot::Sender<ActorResult<Option<EntryData>>>,
546 },
547 SetFullEntryState {
549 hash: Hash,
550 entry: Option<EntryData>,
551 tx: oneshot::Sender<ActorResult<()>>,
552 },
553 GetOrCreate {
559 hash: Hash,
560 tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
561 },
562 OnMemSizeExceeded { hash: Hash },
566 OnComplete { handle: BaoFileHandle },
569 Import {
573 cmd: Import,
574 tx: oneshot::Sender<ActorResult<(TempTag, u64)>>,
575 },
576 Export {
582 cmd: Export,
583 tx: oneshot::Sender<ActorResult<()>>,
584 },
585 UpdateInlineOptions {
587 inline_options: InlineOptions,
589 reapply: bool,
591 tx: oneshot::Sender<()>,
592 },
593 Blobs {
595 #[debug(skip)]
596 filter: FilterPredicate<Hash, EntryState>,
597 #[allow(clippy::type_complexity)]
598 tx: oneshot::Sender<
599 ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
600 >,
601 },
602 Tags {
604 #[debug(skip)]
605 filter: FilterPredicate<Tag, HashAndFormat>,
606 #[allow(clippy::type_complexity)]
607 tx: oneshot::Sender<
608 ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
609 >,
610 },
611 SetTag {
613 tag: Tag,
614 value: Option<HashAndFormat>,
615 tx: oneshot::Sender<ActorResult<()>>,
616 },
617 CreateTag {
619 hash: HashAndFormat,
620 tx: oneshot::Sender<ActorResult<Tag>>,
621 },
622 Delete {
624 hashes: Vec<Hash>,
625 tx: oneshot::Sender<ActorResult<()>>,
626 },
627 GcDelete {
629 hashes: Vec<Hash>,
630 tx: oneshot::Sender<ActorResult<()>>,
631 },
632 Sync { tx: oneshot::Sender<()> },
636 Dump,
638 Fsck {
643 repair: bool,
644 progress: BoxedProgressSender<ConsistencyCheckProgress>,
645 tx: oneshot::Sender<ActorResult<()>>,
646 },
647 GcStart { tx: oneshot::Sender<()> },
651 Shutdown { tx: Option<oneshot::Sender<()>> },
655}
656
657impl ActorMessage {
658 fn category(&self) -> MessageCategory {
659 match self {
660 Self::Get { .. }
661 | Self::GetOrCreate { .. }
662 | Self::EntryStatus { .. }
663 | Self::Blobs { .. }
664 | Self::Tags { .. }
665 | Self::GcStart { .. }
666 | Self::GetFullEntryState { .. }
667 | Self::Dump => MessageCategory::ReadOnly,
668 Self::Import { .. }
669 | Self::Export { .. }
670 | Self::OnMemSizeExceeded { .. }
671 | Self::OnComplete { .. }
672 | Self::SetTag { .. }
673 | Self::CreateTag { .. }
674 | Self::SetFullEntryState { .. }
675 | Self::Delete { .. }
676 | Self::GcDelete { .. } => MessageCategory::ReadWrite,
677 Self::UpdateInlineOptions { .. }
678 | Self::Sync { .. }
679 | Self::Shutdown { .. }
680 | Self::Fsck { .. } => MessageCategory::TopLevel,
681 #[cfg(test)]
682 Self::EntryState { .. } => MessageCategory::ReadOnly,
683 }
684 }
685}
686
687enum MessageCategory {
688 ReadOnly,
689 ReadWrite,
690 TopLevel,
691}
692
693pub(crate) type FilterPredicate<K, V> =
695 Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
696
697#[derive(Debug, Clone)]
700pub struct Store(Arc<StoreInner>);
701
702impl Store {
703 pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
705 let path = root.as_ref();
706 let db_path = path.join("blobs.db");
707 let options = Options {
708 path: PathOptions::new(path),
709 inline: Default::default(),
710 batch: Default::default(),
711 };
712 Self::new(db_path, options).await
713 }
714
715 pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
717 let rt = tokio::runtime::Handle::try_current()
719 .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
720 let inner =
721 tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
722 Ok(Self(Arc::new(inner)))
723 }
724
725 pub async fn update_inline_options(
730 &self,
731 inline_options: InlineOptions,
732 reapply: bool,
733 ) -> io::Result<()> {
734 Ok(self
735 .0
736 .update_inline_options(inline_options, reapply)
737 .await?)
738 }
739
740 pub async fn dump(&self) -> io::Result<()> {
742 Ok(self.0.dump().await?)
743 }
744}
745
746#[derive(Debug)]
747struct StoreInner {
748 tx: async_channel::Sender<ActorMessage>,
749 temp: Arc<RwLock<TempCounterMap>>,
750 handle: Option<std::thread::JoinHandle<()>>,
751 path_options: Arc<PathOptions>,
752}
753
754impl TagDrop for RwLock<TempCounterMap> {
755 fn on_drop(&self, content: &HashAndFormat) {
756 self.write().unwrap().dec(content);
757 }
758}
759
760impl TagCounter for RwLock<TempCounterMap> {
761 fn on_create(&self, content: &HashAndFormat) {
762 self.write().unwrap().inc(content);
763 }
764}
765
766impl StoreInner {
767 fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
768 tracing::trace!(
769 "creating data directory: {}",
770 options.path.data_path.display()
771 );
772 std::fs::create_dir_all(&options.path.data_path)?;
773 tracing::trace!(
774 "creating temp directory: {}",
775 options.path.temp_path.display()
776 );
777 std::fs::create_dir_all(&options.path.temp_path)?;
778 tracing::trace!(
779 "creating parent directory for db file{}",
780 path.parent().unwrap().display()
781 );
782 std::fs::create_dir_all(path.parent().unwrap())?;
783 let temp: Arc<RwLock<TempCounterMap>> = Default::default();
784 let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?;
785 let handle = std::thread::Builder::new()
786 .name("redb-actor".to_string())
787 .spawn(move || {
788 rt.block_on(async move {
789 if let Err(cause) = actor.run_batched().await {
790 tracing::error!("redb actor failed: {}", cause);
791 }
792 });
793 })
794 .expect("failed to spawn thread");
795 Ok(Self {
796 tx,
797 temp,
798 handle: Some(handle),
799 path_options: Arc::new(options.path),
800 })
801 }
802
803 pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
804 let (tx, rx) = oneshot::channel();
805 self.tx.send(ActorMessage::Get { hash, tx }).await?;
806 Ok(rx.await??)
807 }
808
809 async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
810 let (tx, rx) = oneshot::channel();
811 self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?;
812 Ok(rx.await??)
813 }
814
815 async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
816 let (tx, rx) = oneshot::channel();
817 let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
818 let v = v.value();
819 if let EntryState::Complete { .. } = &v {
820 Some((k.value(), v))
821 } else {
822 None
823 }
824 });
825 self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
826 let blobs = rx.await?;
827 let res = blobs?
828 .into_iter()
829 .map(|r| {
830 r.map(|(hash, _)| hash)
831 .map_err(|e| ActorError::from(e).into())
832 })
833 .collect::<Vec<_>>();
834 Ok(res)
835 }
836
837 async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
838 let (tx, rx) = oneshot::channel();
839 let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
840 let v = v.value();
841 if let EntryState::Partial { .. } = &v {
842 Some((k.value(), v))
843 } else {
844 None
845 }
846 });
847 self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
848 let blobs = rx.await?;
849 let res = blobs?
850 .into_iter()
851 .map(|r| {
852 r.map(|(hash, _)| hash)
853 .map_err(|e| ActorError::from(e).into())
854 })
855 .collect::<Vec<_>>();
856 Ok(res)
857 }
858
859 async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
860 let (tx, rx) = oneshot::channel();
861 let filter: FilterPredicate<Tag, HashAndFormat> =
862 Box::new(|_i, k, v| Some((k.value(), v.value())));
863 self.tx.send(ActorMessage::Tags { filter, tx }).await?;
864 let tags = rx.await?;
865 let tags = tags?
867 .into_iter()
868 .map(|r| r.map_err(|e| ActorError::from(e).into()))
869 .collect();
870 Ok(tags)
871 }
872
873 async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
874 let (tx, rx) = oneshot::channel();
875 self.tx
876 .send(ActorMessage::SetTag { tag, value, tx })
877 .await?;
878 Ok(rx.await??)
879 }
880
881 async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
882 let (tx, rx) = oneshot::channel();
883 self.tx.send(ActorMessage::CreateTag { hash, tx }).await?;
884 Ok(rx.await??)
885 }
886
887 async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
888 let (tx, rx) = oneshot::channel();
889 self.tx.send(ActorMessage::Delete { hashes, tx }).await?;
890 Ok(rx.await??)
891 }
892
893 async fn gc_delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
894 let (tx, rx) = oneshot::channel();
895 self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?;
896 Ok(rx.await??)
897 }
898
899 async fn gc_start(&self) -> OuterResult<()> {
900 let (tx, rx) = oneshot::channel();
901 self.tx.send(ActorMessage::GcStart { tx }).await?;
902 Ok(rx.await?)
903 }
904
905 async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
906 let (tx, rx) = oneshot::channel();
907 self.tx
908 .send(ActorMessage::EntryStatus { hash: *hash, tx })
909 .await?;
910 Ok(rx.await??)
911 }
912
913 fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
914 let (tx, rx) = oneshot::channel();
915 self.tx
916 .send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?;
917 Ok(rx.recv()??)
918 }
919
920 async fn complete(&self, entry: Entry) -> OuterResult<()> {
921 self.tx
922 .send(ActorMessage::OnComplete { handle: entry })
923 .await?;
924 Ok(())
925 }
926
927 async fn export(
928 &self,
929 hash: Hash,
930 target: PathBuf,
931 mode: ExportMode,
932 progress: ExportProgressCb,
933 ) -> OuterResult<()> {
934 tracing::debug!(
935 "exporting {} to {} using mode {:?}",
936 hash.to_hex(),
937 target.display(),
938 mode
939 );
940 if !target.is_absolute() {
941 return Err(io::Error::new(
942 io::ErrorKind::InvalidInput,
943 "target path must be absolute",
944 )
945 .into());
946 }
947 let parent = target.parent().ok_or_else(|| {
948 OuterError::from(io::Error::new(
949 io::ErrorKind::InvalidInput,
950 "target path has no parent directory",
951 ))
952 })?;
953 std::fs::create_dir_all(parent)?;
954 let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
955 let (tx, rx) = oneshot::channel();
956 self.tx
957 .send(ActorMessage::Export {
958 cmd: Export {
959 temp_tag,
960 target,
961 mode,
962 progress,
963 },
964 tx,
965 })
966 .await?;
967 Ok(rx.await??)
968 }
969
970 async fn consistency_check(
971 &self,
972 repair: bool,
973 progress: BoxedProgressSender<ConsistencyCheckProgress>,
974 ) -> OuterResult<()> {
975 let (tx, rx) = oneshot::channel();
976 self.tx
977 .send(ActorMessage::Fsck {
978 repair,
979 progress,
980 tx,
981 })
982 .await?;
983 Ok(rx.await??)
984 }
985
986 async fn update_inline_options(
987 &self,
988 inline_options: InlineOptions,
989 reapply: bool,
990 ) -> OuterResult<()> {
991 let (tx, rx) = oneshot::channel();
992 self.tx
993 .send(ActorMessage::UpdateInlineOptions {
994 inline_options,
995 reapply,
996 tx,
997 })
998 .await?;
999 Ok(rx.await?)
1000 }
1001
1002 async fn dump(&self) -> OuterResult<()> {
1003 self.tx.send(ActorMessage::Dump).await?;
1004 Ok(())
1005 }
1006
1007 async fn sync(&self) -> OuterResult<()> {
1008 let (tx, rx) = oneshot::channel();
1009 self.tx.send(ActorMessage::Sync { tx }).await?;
1010 Ok(rx.await?)
1011 }
1012
1013 fn import_file_sync(
1014 &self,
1015 path: PathBuf,
1016 mode: ImportMode,
1017 format: BlobFormat,
1018 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1019 ) -> OuterResult<(TempTag, u64)> {
1020 if !path.is_absolute() {
1021 return Err(
1022 io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
1023 );
1024 }
1025 if !path.is_file() && !path.is_symlink() {
1026 return Err(io::Error::new(
1027 io::ErrorKind::InvalidInput,
1028 "path is not a file or symlink",
1029 )
1030 .into());
1031 }
1032 let id = progress.new_id();
1033 progress.blocking_send(ImportProgress::Found {
1034 id,
1035 name: path.to_string_lossy().to_string(),
1036 })?;
1037 let file = match mode {
1038 ImportMode::TryReference => ImportSource::External(path),
1039 ImportMode::Copy => {
1040 if std::fs::metadata(&path)?.len() < 16 * 1024 {
1041 let data = std::fs::read(&path)?;
1045 ImportSource::Memory(data.into())
1046 } else {
1047 let temp_path = self.temp_file_name();
1048 progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
1050 if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
1051 tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
1052 } else {
1053 tracing::debug!("copied {} to {}", path.display(), temp_path.display());
1054 }
1055 ImportSource::TempFile(temp_path)
1057 }
1058 }
1059 };
1060 let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
1061 Ok((tag, size))
1062 }
1063
1064 fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
1065 let id = 0;
1066 let file = ImportSource::Memory(data);
1067 let progress = IgnoreProgressSender::default();
1068 let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
1069 Ok(tag)
1070 }
1071
1072 fn finalize_import_sync(
1073 &self,
1074 file: ImportSource,
1075 format: BlobFormat,
1076 id: u64,
1077 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1078 ) -> OuterResult<(TempTag, u64)> {
1079 let data_size = file.len()?;
1080 tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
1081 progress.blocking_send(ImportProgress::Size {
1082 id,
1083 size: data_size,
1084 })?;
1085 let progress2 = progress.clone();
1086 let (hash, outboard) = match file.content() {
1087 MemOrFile::File(path) => {
1088 let span = trace_span!("outboard.compute", path = %path.display());
1089 let _guard = span.enter();
1090 let file = std::fs::File::open(path)?;
1091 compute_outboard(file, data_size, move |offset| {
1092 Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
1093 })?
1094 }
1095 MemOrFile::Mem(bytes) => {
1096 compute_outboard(bytes, data_size, |_| Ok(()))?
1098 }
1099 };
1100 progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
1101 let tag = self.temp.temp_tag(HashAndFormat { hash, format });
1103 let hash = *tag.hash();
1104 let (tx, rx) = oneshot::channel();
1106 self.tx.send_blocking(ActorMessage::Import {
1107 cmd: Import {
1108 content_id: HashAndFormat { hash, format },
1109 source: file,
1110 outboard,
1111 data_size,
1112 },
1113 tx,
1114 })?;
1115 Ok(rx.recv()??)
1116 }
1117
1118 fn temp_file_name(&self) -> PathBuf {
1119 self.path_options.temp_file_name()
1120 }
1121
1122 async fn shutdown(&self) {
1123 let (tx, rx) = oneshot::channel();
1124 self.tx
1125 .send(ActorMessage::Shutdown { tx: Some(tx) })
1126 .await
1127 .ok();
1128 rx.await.ok();
1129 }
1130}
1131
1132impl Drop for StoreInner {
1133 fn drop(&mut self) {
1134 if let Some(handle) = self.handle.take() {
1135 self.tx
1136 .send_blocking(ActorMessage::Shutdown { tx: None })
1137 .ok();
1138 handle.join().ok();
1139 }
1140 }
1141}
1142
1143struct ActorState {
1144 handles: BTreeMap<Hash, BaoFileHandleWeak>,
1145 protected: BTreeSet<Hash>,
1146 temp: Arc<RwLock<TempCounterMap>>,
1147 msgs_rx: async_channel::Receiver<ActorMessage>,
1148 create_options: Arc<BaoFileConfig>,
1149 options: Options,
1150 rt: tokio::runtime::Handle,
1151}
1152
1153struct Actor {
1158 db: redb::Database,
1159 state: ActorState,
1160}
1161
1162#[derive(Debug, thiserror::Error)]
1167pub(crate) enum ActorError {
1168 #[error("table error: {0}")]
1169 Table(#[from] redb::TableError),
1170 #[error("database error: {0}")]
1171 Database(#[from] redb::DatabaseError),
1172 #[error("transaction error: {0}")]
1173 Transaction(#[from] redb::TransactionError),
1174 #[error("commit error: {0}")]
1175 Commit(#[from] redb::CommitError),
1176 #[error("storage error: {0}")]
1177 Storage(#[from] redb::StorageError),
1178 #[error("io error: {0}")]
1179 Io(#[from] io::Error),
1180 #[error("inconsistent database state: {0}")]
1181 Inconsistent(String),
1182 #[error("error during database migration: {0}")]
1183 Migration(#[source] anyhow::Error),
1184}
1185
1186impl From<ActorError> for io::Error {
1187 fn from(e: ActorError) -> Self {
1188 match e {
1189 ActorError::Io(e) => e,
1190 e => io::Error::new(io::ErrorKind::Other, e),
1191 }
1192 }
1193}
1194
1195pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
1199
1200#[derive(Debug, thiserror::Error)]
1205pub(crate) enum OuterError {
1206 #[error("inner error: {0}")]
1207 Inner(#[from] ActorError),
1208 #[error("send error")]
1209 Send,
1210 #[error("progress send error: {0}")]
1211 ProgressSend(#[from] ProgressSendError),
1212 #[error("recv error: {0}")]
1213 Recv(#[from] oneshot::RecvError),
1214 #[error("recv error: {0}")]
1215 AsyncChannelRecv(#[from] async_channel::RecvError),
1216 #[error("join error: {0}")]
1217 JoinTask(#[from] tokio::task::JoinError),
1218}
1219
1220impl From<async_channel::SendError<ActorMessage>> for OuterError {
1221 fn from(_e: async_channel::SendError<ActorMessage>) -> Self {
1222 OuterError::Send
1223 }
1224}
1225
1226pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
1230
1231impl From<io::Error> for OuterError {
1232 fn from(e: io::Error) -> Self {
1233 OuterError::Inner(ActorError::Io(e))
1234 }
1235}
1236
1237impl From<OuterError> for io::Error {
1238 fn from(e: OuterError) -> Self {
1239 match e {
1240 OuterError::Inner(ActorError::Io(e)) => e,
1241 e => io::Error::new(io::ErrorKind::Other, e),
1242 }
1243 }
1244}
1245
1246impl super::Map for Store {
1247 type Entry = Entry;
1248
1249 async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
1250 Ok(self.0.get(*hash).await?)
1251 }
1252}
1253
1254impl super::MapMut for Store {
1255 type EntryMut = Entry;
1256
1257 async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
1258 Ok(self.0.get_or_create(hash).await?)
1259 }
1260
1261 async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
1262 Ok(self.0.entry_status(hash).await?)
1263 }
1264
1265 async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
1266 self.get(hash).await
1267 }
1268
1269 async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
1270 Ok(self.0.complete(entry).await?)
1271 }
1272
1273 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
1274 Ok(self.0.entry_status_sync(hash)?)
1275 }
1276}
1277
1278impl super::ReadableStore for Store {
1279 async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
1280 Ok(Box::new(self.0.blobs().await?.into_iter()))
1281 }
1282
1283 async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
1284 Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
1285 }
1286
1287 async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1288 Ok(Box::new(self.0.tags().await?.into_iter()))
1289 }
1290
1291 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
1292 Box::new(self.0.temp.read().unwrap().keys())
1293 }
1294
1295 async fn consistency_check(
1296 &self,
1297 repair: bool,
1298 tx: BoxedProgressSender<ConsistencyCheckProgress>,
1299 ) -> io::Result<()> {
1300 self.0.consistency_check(repair, tx.clone()).await?;
1301 Ok(())
1302 }
1303
1304 async fn export(
1305 &self,
1306 hash: Hash,
1307 target: PathBuf,
1308 mode: ExportMode,
1309 progress: ExportProgressCb,
1310 ) -> io::Result<()> {
1311 Ok(self.0.export(hash, target, mode, progress).await?)
1312 }
1313}
1314
1315impl super::Store for Store {
1316 async fn import_file(
1317 &self,
1318 path: PathBuf,
1319 mode: ImportMode,
1320 format: BlobFormat,
1321 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1322 ) -> io::Result<(crate::TempTag, u64)> {
1323 let this = self.0.clone();
1324 Ok(
1325 tokio::task::spawn_blocking(move || {
1326 this.import_file_sync(path, mode, format, progress)
1327 })
1328 .await??,
1329 )
1330 }
1331
1332 async fn import_bytes(
1333 &self,
1334 data: bytes::Bytes,
1335 format: crate::BlobFormat,
1336 ) -> io::Result<crate::TempTag> {
1337 let this = self.0.clone();
1338 Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
1339 }
1340
1341 async fn import_stream(
1342 &self,
1343 mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
1344 format: BlobFormat,
1345 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1346 ) -> io::Result<(TempTag, u64)> {
1347 let this = self.clone();
1348 let id = progress.new_id();
1349 let temp_data_path = this.0.temp_file_name();
1351 let name = temp_data_path
1352 .file_name()
1353 .expect("just created")
1354 .to_string_lossy()
1355 .to_string();
1356 progress.send(ImportProgress::Found { id, name }).await?;
1357 let mut writer = tokio::fs::File::create(&temp_data_path).await?;
1358 let mut offset = 0;
1359 while let Some(chunk) = data.next().await {
1360 let chunk = chunk?;
1361 writer.write_all(&chunk).await?;
1362 offset += chunk.len() as u64;
1363 progress.try_send(ImportProgress::CopyProgress { id, offset })?;
1364 }
1365 writer.flush().await?;
1366 drop(writer);
1367 let file = ImportSource::TempFile(temp_data_path);
1368 Ok(tokio::task::spawn_blocking(move || {
1369 this.0.finalize_import_sync(file, format, id, progress)
1370 })
1371 .await??)
1372 }
1373
1374 async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
1375 Ok(self.0.set_tag(name, hash).await?)
1376 }
1377
1378 async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
1379 Ok(self.0.create_tag(hash).await?)
1380 }
1381
1382 async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
1383 Ok(self.0.delete(hashes).await?)
1384 }
1385
1386 async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
1387 where
1388 G: Fn() -> Gut,
1389 Gut: Future<Output = BTreeSet<Hash>> + Send,
1390 {
1391 tracing::info!("Starting GC task with interval {:?}", config.period);
1392 let mut live = BTreeSet::new();
1393 'outer: loop {
1394 if let Err(cause) = self.0.gc_start().await {
1395 tracing::debug!(
1396 "unable to notify the db of GC start: {cause}. Shutting down GC loop."
1397 );
1398 break;
1399 }
1400 tokio::time::sleep(config.period).await;
1402 tracing::debug!("Starting GC");
1403 live.clear();
1404
1405 let p = protected_cb().await;
1406 live.extend(p);
1407
1408 tracing::debug!("Starting GC mark phase");
1409 let live_ref = &mut live;
1410 let mut stream = Gen::new(|co| async move {
1411 if let Err(e) = super::gc_mark_task(self, live_ref, &co).await {
1412 co.yield_(GcMarkEvent::Error(e)).await;
1413 }
1414 });
1415 while let Some(item) = stream.next().await {
1416 match item {
1417 GcMarkEvent::CustomDebug(text) => {
1418 tracing::debug!("{}", text);
1419 }
1420 GcMarkEvent::CustomWarning(text, _) => {
1421 tracing::warn!("{}", text);
1422 }
1423 GcMarkEvent::Error(err) => {
1424 tracing::error!("Fatal error during GC mark {}", err);
1425 continue 'outer;
1426 }
1427 }
1428 }
1429 drop(stream);
1430
1431 tracing::debug!("Starting GC sweep phase");
1432 let live_ref = &live;
1433 let mut stream = Gen::new(|co| async move {
1434 if let Err(e) = gc_sweep_task(self, live_ref, &co).await {
1435 co.yield_(GcSweepEvent::Error(e)).await;
1436 }
1437 });
1438 while let Some(item) = stream.next().await {
1439 match item {
1440 GcSweepEvent::CustomDebug(text) => {
1441 tracing::debug!("{}", text);
1442 }
1443 GcSweepEvent::CustomWarning(text, _) => {
1444 tracing::warn!("{}", text);
1445 }
1446 GcSweepEvent::Error(err) => {
1447 tracing::error!("Fatal error during GC mark {}", err);
1448 continue 'outer;
1449 }
1450 }
1451 }
1452 if let Some(ref cb) = config.done_callback {
1453 cb();
1454 }
1455 }
1456 }
1457
1458 fn temp_tag(&self, value: HashAndFormat) -> TempTag {
1459 self.0.temp.temp_tag(value)
1460 }
1461
1462 async fn sync(&self) -> io::Result<()> {
1463 Ok(self.0.sync().await?)
1464 }
1465
1466 async fn shutdown(&self) {
1467 self.0.shutdown().await;
1468 }
1469}
1470
1471pub(super) async fn gc_sweep_task(
1472 store: &Store,
1473 live: &BTreeSet<Hash>,
1474 co: &Co<GcSweepEvent>,
1475) -> anyhow::Result<()> {
1476 let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
1477 let mut count = 0;
1478 let mut batch = Vec::new();
1479 for hash in blobs {
1480 let hash = hash?;
1481 if !live.contains(&hash) {
1482 batch.push(hash);
1483 count += 1;
1484 }
1485 if batch.len() >= 100 {
1486 store.0.gc_delete(batch.clone()).await?;
1487 batch.clear();
1488 }
1489 }
1490 if !batch.is_empty() {
1491 store.0.gc_delete(batch).await?;
1492 }
1493 co.yield_(GcSweepEvent::CustomDebug(format!(
1494 "deleted {} blobs",
1495 count
1496 )))
1497 .await;
1498 Ok(())
1499}
1500
1501impl Actor {
1502 fn new(
1503 path: &Path,
1504 options: Options,
1505 temp: Arc<RwLock<TempCounterMap>>,
1506 rt: tokio::runtime::Handle,
1507 ) -> ActorResult<(Self, async_channel::Sender<ActorMessage>)> {
1508 let db = match redb::Database::create(path) {
1509 Ok(db) => db,
1510 Err(DatabaseError::UpgradeRequired(1)) => {
1511 return Err(ActorError::Migration(anyhow::anyhow!(
1512 "migration from v1 no longer supported"
1513 )))
1514 }
1515 Err(err) => return Err(err.into()),
1516 };
1517
1518 let txn = db.begin_write()?;
1519 let mut t = Default::default();
1521 let tables = Tables::new(&txn, &mut t)?;
1522 drop(tables);
1523 txn.commit()?;
1524 let (tx, rx) = async_channel::bounded(1024);
1527 let tx2 = tx.clone();
1528 let on_file_create: CreateCb = Arc::new(move |hash| {
1529 tx2.send_blocking(ActorMessage::OnMemSizeExceeded { hash: *hash })
1531 .ok();
1532 Ok(())
1533 });
1534 let create_options = BaoFileConfig::new(
1535 Arc::new(options.path.data_path.clone()),
1536 16 * 1024,
1537 Some(on_file_create),
1538 );
1539 Ok((
1540 Self {
1541 db,
1542 state: ActorState {
1543 temp,
1544 handles: BTreeMap::new(),
1545 protected: BTreeSet::new(),
1546 msgs_rx: rx,
1547 options,
1548 create_options: Arc::new(create_options),
1549 rt,
1550 },
1551 },
1552 tx,
1553 ))
1554 }
1555
1556 async fn run_batched(mut self) -> ActorResult<()> {
1557 let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone());
1558 while let Some(msg) = msgs.recv().await {
1559 if let ActorMessage::Shutdown { tx } = msg {
1560 drop(self);
1562 if let Some(tx) = tx {
1563 tx.send(()).ok();
1564 }
1565 break;
1566 }
1567 match msg.category() {
1568 MessageCategory::TopLevel => {
1569 self.state.handle_toplevel(&self.db, msg)?;
1570 }
1571 MessageCategory::ReadOnly => {
1572 msgs.push_back(msg).expect("just recv'd");
1573 tracing::debug!("starting read transaction");
1574 let txn = self.db.begin_read()?;
1575 let tables = ReadOnlyTables::new(&txn)?;
1576 let count = self.state.options.batch.max_read_batch;
1577 let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration);
1578 tokio::pin!(timeout);
1579 for _ in 0..count {
1580 tokio::select! {
1581 msg = msgs.recv() => {
1582 if let Some(msg) = msg {
1583 if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
1584 msgs.push_back(msg).expect("just recv'd");
1585 break;
1586 }
1587 } else {
1588 break;
1589 }
1590 }
1591 _ = &mut timeout => {
1592 tracing::debug!("read transaction timed out");
1593 break;
1594 }
1595 }
1596 }
1597 tracing::debug!("done with read transaction");
1598 }
1599 MessageCategory::ReadWrite => {
1600 msgs.push_back(msg).expect("just recv'd");
1601 tracing::debug!("starting write transaction");
1602 let txn = self.db.begin_write()?;
1603 let mut delete_after_commit = Default::default();
1604 let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
1605 let count = self.state.options.batch.max_write_batch;
1606 let timeout = tokio::time::sleep(self.state.options.batch.max_write_duration);
1607 tokio::pin!(timeout);
1608 for _ in 0..count {
1609 tokio::select! {
1610 msg = msgs.recv() => {
1611 if let Some(msg) = msg {
1612 if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
1613 msgs.push_back(msg).expect("just recv'd");
1614 break;
1615 }
1616 } else {
1617 break;
1618 }
1619 }
1620 _ = &mut timeout => {
1621 tracing::debug!("write transaction timed out");
1622 break;
1623 }
1624 }
1625 }
1626 drop(tables);
1627 txn.commit()?;
1628 delete_after_commit.apply_and_clear(&self.state.options.path);
1629 tracing::debug!("write transaction committed");
1630 }
1631 }
1632 }
1633 tracing::debug!("redb actor done");
1634 Ok(())
1635 }
1636}
1637
1638impl ActorState {
1639 fn entry_status(
1640 &mut self,
1641 tables: &impl ReadableTables,
1642 hash: Hash,
1643 ) -> ActorResult<EntryStatus> {
1644 let status = match tables.blobs().get(hash)? {
1645 Some(guard) => match guard.value() {
1646 EntryState::Complete { .. } => EntryStatus::Complete,
1647 EntryState::Partial { .. } => EntryStatus::Partial,
1648 },
1649 None => EntryStatus::NotFound,
1650 };
1651 Ok(status)
1652 }
1653
1654 fn get(
1655 &mut self,
1656 tables: &impl ReadableTables,
1657 hash: Hash,
1658 ) -> ActorResult<Option<BaoFileHandle>> {
1659 if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
1660 return Ok(Some(handle));
1661 }
1662 let Some(entry) = tables.blobs().get(hash)? else {
1663 return Ok(None);
1664 };
1665 let entry = entry.value();
1668 let config = self.create_options.clone();
1669 let handle = match entry {
1670 EntryState::Complete {
1671 data_location,
1672 outboard_location,
1673 } => {
1674 let data = load_data(tables, &self.options.path, data_location, &hash)?;
1675 let outboard = load_outboard(
1676 tables,
1677 &self.options.path,
1678 outboard_location,
1679 data.size(),
1680 &hash,
1681 )?;
1682 BaoFileHandle::new_complete(config, hash, data, outboard)
1683 }
1684 EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
1685 };
1686 self.handles.insert(hash, handle.downgrade());
1687 Ok(Some(handle))
1688 }
1689
1690 fn export(
1691 &mut self,
1692 tables: &mut Tables,
1693 cmd: Export,
1694 tx: oneshot::Sender<ActorResult<()>>,
1695 ) -> ActorResult<()> {
1696 let Export {
1697 temp_tag,
1698 target,
1699 mode,
1700 progress,
1701 } = cmd;
1702 let guard = tables
1703 .blobs
1704 .get(temp_tag.hash())?
1705 .ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
1706 let entry = guard.value();
1707 match entry {
1708 EntryState::Complete {
1709 data_location,
1710 outboard_location,
1711 } => match data_location {
1712 DataLocation::Inline(()) => {
1713 let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
1715 ActorError::Inconsistent("inline data not found".to_owned())
1716 })?;
1717 tracing::trace!("exporting inline data to {}", target.display());
1718 tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
1719 .ok();
1720 }
1721 DataLocation::Owned(size) => {
1722 let path = self.options.path.owned_data_path(temp_tag.hash());
1723 match mode {
1724 ExportMode::Copy => {
1725 self.rt.spawn_blocking(move || {
1727 tx.send(export_file_copy(temp_tag, path, size, target, progress))
1728 .ok();
1729 });
1730 }
1731 ExportMode::TryReference => match std::fs::rename(&path, &target) {
1732 Ok(()) => {
1733 let entry = EntryState::Complete {
1734 data_location: DataLocation::External(vec![target], size),
1735 outboard_location,
1736 };
1737 drop(guard);
1738 tables.blobs.insert(temp_tag.hash(), entry)?;
1739 drop(temp_tag);
1740 tx.send(Ok(())).ok();
1741 }
1742 Err(e) => {
1743 const ERR_CROSS: i32 = 18;
1744 if e.raw_os_error() == Some(ERR_CROSS) {
1745 match std::fs::copy(&path, &target) {
1747 Ok(_) => {
1748 let entry = EntryState::Complete {
1749 data_location: DataLocation::External(
1750 vec![target],
1751 size,
1752 ),
1753 outboard_location,
1754 };
1755
1756 drop(guard);
1757 tables.blobs.insert(temp_tag.hash(), entry)?;
1758 tables
1759 .delete_after_commit
1760 .insert(*temp_tag.hash(), [BaoFilePart::Data]);
1761 drop(temp_tag);
1762
1763 tx.send(Ok(())).ok();
1764 }
1765 Err(e) => {
1766 drop(temp_tag);
1767 tx.send(Err(e.into())).ok();
1768 }
1769 }
1770 } else {
1771 drop(temp_tag);
1772 tx.send(Err(e.into())).ok();
1773 }
1774 }
1775 },
1776 }
1777 }
1778 DataLocation::External(paths, size) => {
1779 let path = paths
1780 .first()
1781 .ok_or_else(|| {
1782 ActorError::Inconsistent("external path missing".to_owned())
1783 })?
1784 .to_owned();
1785 if path == target {
1787 tx.send(Ok(())).ok();
1789 } else {
1790 self.rt.spawn_blocking(move || {
1792 tx.send(export_file_copy(temp_tag, path, size, target, progress))
1793 .ok();
1794 });
1795 }
1796 }
1797 },
1798 EntryState::Partial { .. } => {
1799 return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
1800 }
1801 }
1802 Ok(())
1803 }
1804
1805 fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
1806 let Import {
1807 content_id,
1808 source: file,
1809 outboard,
1810 data_size,
1811 } = cmd;
1812 let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
1813 let inline_data = data_size <= self.options.inline.max_data_inlined;
1814 let inline_outboard =
1815 outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
1816 let tag = self.temp.temp_tag(content_id);
1818 let hash = *tag.hash();
1819 self.protected.insert(hash);
1820 let data_location = match file {
1822 ImportSource::External(external_path) => {
1823 tracing::debug!("stored external reference {}", external_path.display());
1824 if inline_data {
1825 tracing::debug!(
1826 "reading external data to inline it: {}",
1827 external_path.display()
1828 );
1829 let data = Bytes::from(std::fs::read(&external_path)?);
1830 DataLocation::Inline(data)
1831 } else {
1832 DataLocation::External(vec![external_path], data_size)
1833 }
1834 }
1835 ImportSource::TempFile(temp_data_path) => {
1836 if inline_data {
1837 tracing::debug!(
1838 "reading and deleting temp file to inline it: {}",
1839 temp_data_path.display()
1840 );
1841 let data = Bytes::from(read_and_remove(&temp_data_path)?);
1842 DataLocation::Inline(data)
1843 } else {
1844 let data_path = self.options.path.owned_data_path(&hash);
1845 std::fs::rename(&temp_data_path, &data_path)?;
1846 tracing::debug!("created file {}", data_path.display());
1847 DataLocation::Owned(data_size)
1848 }
1849 }
1850 ImportSource::Memory(data) => {
1851 if inline_data {
1852 DataLocation::Inline(data)
1853 } else {
1854 let data_path = self.options.path.owned_data_path(&hash);
1855 overwrite_and_sync(&data_path, &data)?;
1856 tracing::debug!("created file {}", data_path.display());
1857 DataLocation::Owned(data_size)
1858 }
1859 }
1860 };
1861 let outboard_location = if let Some(outboard) = outboard {
1862 if inline_outboard {
1863 OutboardLocation::Inline(Bytes::from(outboard))
1864 } else {
1865 let outboard_path = self.options.path.owned_outboard_path(&hash);
1866 overwrite_and_sync(&outboard_path, &outboard)?;
1868 OutboardLocation::Owned
1869 }
1870 } else {
1871 OutboardLocation::NotNeeded
1872 };
1873 if let DataLocation::Inline(data) = &data_location {
1874 tables.inline_data.insert(hash, data.as_ref())?;
1875 }
1876 if let OutboardLocation::Inline(outboard) = &outboard_location {
1877 tables.inline_outboard.insert(hash, outboard.as_ref())?;
1878 }
1879 if let DataLocation::Owned(_) = &data_location {
1880 tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
1881 }
1882 if let OutboardLocation::Owned = &outboard_location {
1883 tables
1884 .delete_after_commit
1885 .remove(hash, [BaoFilePart::Outboard]);
1886 }
1887 let entry = tables.blobs.get(hash)?;
1888 let entry = entry.map(|x| x.value()).unwrap_or_default();
1889 let data_location = data_location.discard_inline_data();
1890 let outboard_location = outboard_location.discard_extra_data();
1891 let entry = entry.union(EntryState::Complete {
1892 data_location,
1893 outboard_location,
1894 })?;
1895 tables.blobs.insert(hash, entry)?;
1896 Ok((tag, data_size))
1897 }
1898
1899 fn get_or_create(
1900 &mut self,
1901 tables: &impl ReadableTables,
1902 hash: Hash,
1903 ) -> ActorResult<BaoFileHandle> {
1904 self.protected.insert(hash);
1905 if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
1906 return Ok(handle);
1907 }
1908 let entry = tables.blobs().get(hash)?;
1909 let handle = if let Some(entry) = entry {
1910 let entry = entry.value();
1911 match entry {
1912 EntryState::Complete {
1913 data_location,
1914 outboard_location,
1915 ..
1916 } => {
1917 let data = load_data(tables, &self.options.path, data_location, &hash)?;
1918 let outboard = load_outboard(
1919 tables,
1920 &self.options.path,
1921 outboard_location,
1922 data.size(),
1923 &hash,
1924 )?;
1925 tracing::debug!("creating complete entry for {}", hash.to_hex());
1926 BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
1927 }
1928 EntryState::Partial { .. } => {
1929 tracing::debug!("creating partial entry for {}", hash.to_hex());
1930 BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
1931 }
1932 }
1933 } else {
1934 BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
1935 };
1936 self.handles.insert(hash, handle.downgrade());
1937 Ok(handle)
1938 }
1939
1940 fn blobs(
1942 &mut self,
1943 tables: &impl ReadableTables,
1944 filter: FilterPredicate<Hash, EntryState>,
1945 ) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
1946 let mut res = Vec::new();
1947 let mut index = 0u64;
1948 #[allow(clippy::explicit_counter_loop)]
1949 for item in tables.blobs().iter()? {
1950 match item {
1951 Ok((k, v)) => {
1952 if let Some(item) = filter(index, k, v) {
1953 res.push(Ok(item));
1954 }
1955 }
1956 Err(e) => {
1957 res.push(Err(e));
1958 }
1959 }
1960 index += 1;
1961 }
1962 Ok(res)
1963 }
1964
1965 fn tags(
1967 &mut self,
1968 tables: &impl ReadableTables,
1969 filter: FilterPredicate<Tag, HashAndFormat>,
1970 ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
1971 let mut res = Vec::new();
1972 let mut index = 0u64;
1973 #[allow(clippy::explicit_counter_loop)]
1974 for item in tables.tags().iter()? {
1975 match item {
1976 Ok((k, v)) => {
1977 if let Some(item) = filter(index, k, v) {
1978 res.push(Ok(item));
1979 }
1980 }
1981 Err(e) => {
1982 res.push(Err(e));
1983 }
1984 }
1985 index += 1;
1986 }
1987 Ok(res)
1988 }
1989
1990 fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
1991 let tag = {
1992 let tag = Tag::auto(SystemTime::now(), |x| {
1993 matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
1994 });
1995 tables.tags.insert(tag.clone(), content)?;
1996 tag
1997 };
1998 Ok(tag)
1999 }
2000
2001 fn set_tag(
2002 &self,
2003 tables: &mut Tables,
2004 tag: Tag,
2005 value: Option<HashAndFormat>,
2006 ) -> ActorResult<()> {
2007 match value {
2008 Some(value) => {
2009 tables.tags.insert(tag, value)?;
2010 }
2011 None => {
2012 tables.tags.remove(tag)?;
2013 }
2014 }
2015 Ok(())
2016 }
2017
2018 fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
2019 let entry = tables
2020 .blobs
2021 .get(hash)?
2022 .map(|x| x.value())
2023 .unwrap_or_default();
2024 let entry = entry.union(EntryState::Partial { size: None })?;
2025 tables.blobs.insert(hash, entry)?;
2026 tables.delete_after_commit.remove(
2028 hash,
2029 [BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
2030 );
2031 Ok(())
2032 }
2033
2034 fn update_inline_options(
2035 &mut self,
2036 db: &redb::Database,
2037 options: InlineOptions,
2038 reapply: bool,
2039 ) -> ActorResult<()> {
2040 self.options.inline = options;
2041 if reapply {
2042 let mut delete_after_commit = Default::default();
2043 let tx = db.begin_write()?;
2044 {
2045 let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
2046 let hashes = tables
2047 .blobs
2048 .iter()?
2049 .map(|x| x.map(|(k, _)| k.value()))
2050 .collect::<Result<Vec<_>, _>>()?;
2051 for hash in hashes {
2052 let guard = tables
2053 .blobs
2054 .get(hash)?
2055 .ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
2056 let entry = guard.value();
2057 if let EntryState::Complete {
2058 data_location,
2059 outboard_location,
2060 } = entry
2061 {
2062 let (data_location, data_size, data_location_changed) = match data_location
2063 {
2064 DataLocation::Owned(size) => {
2065 if size <= self.options.inline.max_data_inlined {
2067 let path = self.options.path.owned_data_path(&hash);
2068 let data = std::fs::read(&path)?;
2069 tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2070 tables.inline_data.insert(hash, data.as_slice())?;
2071 (DataLocation::Inline(()), size, true)
2072 } else {
2073 (DataLocation::Owned(size), size, false)
2074 }
2075 }
2076 DataLocation::Inline(()) => {
2077 let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
2078 ActorError::Inconsistent("inline data missing".to_owned())
2079 })?;
2080 let data = guard.value();
2081 let size = data.len() as u64;
2082 if size > self.options.inline.max_data_inlined {
2083 let path = self.options.path.owned_data_path(&hash);
2084 std::fs::write(&path, data)?;
2085 drop(guard);
2086 tables.inline_data.remove(hash)?;
2087 (DataLocation::Owned(size), size, true)
2088 } else {
2089 (DataLocation::Inline(()), size, false)
2090 }
2091 }
2092 DataLocation::External(paths, size) => {
2093 (DataLocation::External(paths, size), size, false)
2094 }
2095 };
2096 let outboard_size = raw_outboard_size(data_size);
2097 let (outboard_location, outboard_location_changed) = match outboard_location
2098 {
2099 OutboardLocation::Owned
2100 if outboard_size <= self.options.inline.max_outboard_inlined =>
2101 {
2102 let path = self.options.path.owned_outboard_path(&hash);
2103 let outboard = std::fs::read(&path)?;
2104 tables
2105 .delete_after_commit
2106 .insert(hash, [BaoFilePart::Outboard]);
2107 tables.inline_outboard.insert(hash, outboard.as_slice())?;
2108 (OutboardLocation::Inline(()), true)
2109 }
2110 OutboardLocation::Inline(())
2111 if outboard_size > self.options.inline.max_outboard_inlined =>
2112 {
2113 let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
2114 ActorError::Inconsistent("inline outboard missing".to_owned())
2115 })?;
2116 let outboard = guard.value();
2117 let path = self.options.path.owned_outboard_path(&hash);
2118 std::fs::write(&path, outboard)?;
2119 drop(guard);
2120 tables.inline_outboard.remove(hash)?;
2121 (OutboardLocation::Owned, true)
2122 }
2123 x => (x, false),
2124 };
2125 drop(guard);
2126 if data_location_changed || outboard_location_changed {
2127 tables.blobs.insert(
2128 hash,
2129 EntryState::Complete {
2130 data_location,
2131 outboard_location,
2132 },
2133 )?;
2134 }
2135 }
2136 }
2137 }
2138 tx.commit()?;
2139 delete_after_commit.apply_and_clear(&self.options.path);
2140 }
2141 Ok(())
2142 }
2143
2144 fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>, force: bool) -> ActorResult<()> {
2145 for hash in hashes {
2146 if self.temp.as_ref().read().unwrap().contains(&hash) {
2147 continue;
2148 }
2149 if !force && self.protected.contains(&hash) {
2150 tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
2151 continue;
2152 }
2153
2154 tracing::debug!("deleting {}", &hash.to_hex()[..8]);
2155
2156 self.handles.remove(&hash);
2157 if let Some(entry) = tables.blobs.remove(hash)? {
2158 match entry.value() {
2159 EntryState::Complete {
2160 data_location,
2161 outboard_location,
2162 } => {
2163 match data_location {
2164 DataLocation::Inline(_) => {
2165 tables.inline_data.remove(hash)?;
2166 }
2167 DataLocation::Owned(_) => {
2168 tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2170 }
2171 DataLocation::External(_, _) => {}
2172 }
2173 match outboard_location {
2174 OutboardLocation::Inline(_) => {
2175 tables.inline_outboard.remove(hash)?;
2176 }
2177 OutboardLocation::Owned => {
2178 tables
2180 .delete_after_commit
2181 .insert(hash, [BaoFilePart::Outboard]);
2182 }
2183 OutboardLocation::NotNeeded => {}
2184 }
2185 }
2186 EntryState::Partial { .. } => {
2187 tables.delete_after_commit.insert(
2189 hash,
2190 [BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
2191 );
2192 }
2193 }
2194 }
2195 }
2196 Ok(())
2197 }
2198
2199 fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
2200 let hash = entry.hash();
2201 let mut info = None;
2202 tracing::trace!("on_complete({})", hash.to_hex());
2203 entry.transform(|state| {
2204 tracing::trace!("on_complete transform {:?}", state);
2205 let entry = match complete_storage(
2206 state,
2207 &hash,
2208 &self.options.path,
2209 &self.options.inline,
2210 tables.delete_after_commit,
2211 )? {
2212 Ok(entry) => {
2213 info = Some((
2215 entry.data_size(),
2216 entry.data.mem().cloned(),
2217 entry.outboard_size(),
2218 entry.outboard.mem().cloned(),
2219 ));
2220 entry
2221 }
2222 Err(entry) => {
2223 entry
2225 }
2226 };
2227 Ok(BaoFileStorage::Complete(entry))
2228 })?;
2229 if let Some((data_size, data, outboard_size, outboard)) = info {
2230 let data_location = if data.is_some() {
2231 DataLocation::Inline(())
2232 } else {
2233 DataLocation::Owned(data_size)
2234 };
2235 let outboard_location = if outboard_size == 0 {
2236 OutboardLocation::NotNeeded
2237 } else if outboard.is_some() {
2238 OutboardLocation::Inline(())
2239 } else {
2240 OutboardLocation::Owned
2241 };
2242 {
2243 tracing::debug!(
2244 "inserting complete entry for {}, {} bytes",
2245 hash.to_hex(),
2246 data_size,
2247 );
2248 let entry = tables
2249 .blobs()
2250 .get(hash)?
2251 .map(|x| x.value())
2252 .unwrap_or_default();
2253 let entry = entry.union(EntryState::Complete {
2254 data_location,
2255 outboard_location,
2256 })?;
2257 tables.blobs.insert(hash, entry)?;
2258 if let Some(data) = data {
2259 tables.inline_data.insert(hash, data.as_ref())?;
2260 }
2261 if let Some(outboard) = outboard {
2262 tables.inline_outboard.insert(hash, outboard.as_ref())?;
2263 }
2264 }
2265 }
2266 Ok(())
2267 }
2268
2269 fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
2270 match msg {
2271 ActorMessage::UpdateInlineOptions {
2272 inline_options,
2273 reapply,
2274 tx,
2275 } => {
2276 let res = self.update_inline_options(db, inline_options, reapply);
2277 tx.send(res?).ok();
2278 }
2279 ActorMessage::Fsck {
2280 repair,
2281 progress,
2282 tx,
2283 } => {
2284 let res = self.consistency_check(db, repair, progress);
2285 tx.send(res).ok();
2286 }
2287 ActorMessage::Sync { tx } => {
2288 tx.send(()).ok();
2289 }
2290 x => {
2291 return Err(ActorError::Inconsistent(format!(
2292 "unexpected message for handle_toplevel: {:?}",
2293 x
2294 )))
2295 }
2296 }
2297 Ok(())
2298 }
2299
2300 fn handle_readonly(
2301 &mut self,
2302 tables: &impl ReadableTables,
2303 msg: ActorMessage,
2304 ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2305 match msg {
2306 ActorMessage::Get { hash, tx } => {
2307 let res = self.get(tables, hash);
2308 tx.send(res).ok();
2309 }
2310 ActorMessage::GetOrCreate { hash, tx } => {
2311 let res = self.get_or_create(tables, hash);
2312 tx.send(res).ok();
2313 }
2314 ActorMessage::EntryStatus { hash, tx } => {
2315 let res = self.entry_status(tables, hash);
2316 tx.send(res).ok();
2317 }
2318 ActorMessage::Blobs { filter, tx } => {
2319 let res = self.blobs(tables, filter);
2320 tx.send(res).ok();
2321 }
2322 ActorMessage::Tags { filter, tx } => {
2323 let res = self.tags(tables, filter);
2324 tx.send(res).ok();
2325 }
2326 ActorMessage::GcStart { tx } => {
2327 self.protected.clear();
2328 self.handles.retain(|_, weak| weak.is_live());
2329 tx.send(()).ok();
2330 }
2331 ActorMessage::Dump => {
2332 dump(tables).ok();
2333 }
2334 #[cfg(test)]
2335 ActorMessage::EntryState { hash, tx } => {
2336 tx.send(self.entry_state(tables, hash)).ok();
2337 }
2338 ActorMessage::GetFullEntryState { hash, tx } => {
2339 let res = self.get_full_entry_state(tables, hash);
2340 tx.send(res).ok();
2341 }
2342 x => return Ok(Err(x)),
2343 }
2344 Ok(Ok(()))
2345 }
2346
2347 fn handle_readwrite(
2348 &mut self,
2349 tables: &mut Tables,
2350 msg: ActorMessage,
2351 ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2352 match msg {
2353 ActorMessage::Import { cmd, tx } => {
2354 let res = self.import(tables, cmd);
2355 tx.send(res).ok();
2356 }
2357 ActorMessage::SetTag { tag, value, tx } => {
2358 let res = self.set_tag(tables, tag, value);
2359 tx.send(res).ok();
2360 }
2361 ActorMessage::CreateTag { hash, tx } => {
2362 let res = self.create_tag(tables, hash);
2363 tx.send(res).ok();
2364 }
2365 ActorMessage::Delete { hashes, tx } => {
2366 let res = self.delete(tables, hashes, true);
2367 tx.send(res).ok();
2368 }
2369 ActorMessage::GcDelete { hashes, tx } => {
2370 let res = self.delete(tables, hashes, false);
2371 tx.send(res).ok();
2372 }
2373 ActorMessage::OnComplete { handle } => {
2374 let res = self.on_complete(tables, handle);
2375 res.ok();
2376 }
2377 ActorMessage::Export { cmd, tx } => {
2378 self.export(tables, cmd, tx)?;
2379 }
2380 ActorMessage::OnMemSizeExceeded { hash } => {
2381 let res = self.on_mem_size_exceeded(tables, hash);
2382 res.ok();
2383 }
2384 ActorMessage::Dump => {
2385 let res = dump(tables);
2386 res.ok();
2387 }
2388 ActorMessage::SetFullEntryState { hash, entry, tx } => {
2389 let res = self.set_full_entry_state(tables, hash, entry);
2390 tx.send(res).ok();
2391 }
2392 msg => {
2393 if let Err(msg) = self.handle_readonly(tables, msg)? {
2395 return Ok(Err(msg));
2396 }
2397 }
2398 }
2399 Ok(Ok(()))
2400 }
2401}
2402
2403fn export_file_copy(
2405 temp_tag: TempTag,
2406 path: PathBuf,
2407 size: u64,
2408 target: PathBuf,
2409 progress: ExportProgressCb,
2410) -> ActorResult<()> {
2411 progress(0)?;
2412 reflink_copy::reflink_or_copy(path, target)?;
2414 progress(size)?;
2415 drop(temp_tag);
2416 Ok(())
2417}
2418
2419fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
2420 for e in tables.blobs().iter()? {
2421 let (k, v) = e?;
2422 let k = k.value();
2423 let v = v.value();
2424 println!("blobs: {} -> {:?}", k.to_hex(), v);
2425 }
2426 for e in tables.tags().iter()? {
2427 let (k, v) = e?;
2428 let k = k.value();
2429 let v = v.value();
2430 println!("tags: {} -> {:?}", k, v);
2431 }
2432 for e in tables.inline_data().iter()? {
2433 let (k, v) = e?;
2434 let k = k.value();
2435 let v = v.value();
2436 println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
2437 }
2438 for e in tables.inline_outboard().iter()? {
2439 let (k, v) = e?;
2440 let k = k.value();
2441 let v = v.value();
2442 println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
2443 }
2444 Ok(())
2445}
2446
2447fn load_data(
2448 tables: &impl ReadableTables,
2449 options: &PathOptions,
2450 location: DataLocation<(), u64>,
2451 hash: &Hash,
2452) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2453 Ok(match location {
2454 DataLocation::Inline(()) => {
2455 let Some(data) = tables.inline_data().get(hash)? else {
2456 return Err(ActorError::Inconsistent(format!(
2457 "inconsistent database state: {} should have inline data but does not",
2458 hash.to_hex()
2459 )));
2460 };
2461 MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
2462 }
2463 DataLocation::Owned(data_size) => {
2464 let path = options.owned_data_path(hash);
2465 let Ok(file) = std::fs::File::open(&path) else {
2466 return Err(io::Error::new(
2467 io::ErrorKind::NotFound,
2468 format!("file not found: {}", path.display()),
2469 )
2470 .into());
2471 };
2472 MemOrFile::File((file, data_size))
2473 }
2474 DataLocation::External(paths, data_size) => {
2475 if paths.is_empty() {
2476 return Err(ActorError::Inconsistent(
2477 "external data location must not be empty".into(),
2478 ));
2479 }
2480 let path = &paths[0];
2481 let Ok(file) = std::fs::File::open(path) else {
2482 return Err(io::Error::new(
2483 io::ErrorKind::NotFound,
2484 format!("external file not found: {}", path.display()),
2485 )
2486 .into());
2487 };
2488 MemOrFile::File((file, data_size))
2489 }
2490 })
2491}
2492
2493fn load_outboard(
2494 tables: &impl ReadableTables,
2495 options: &PathOptions,
2496 location: OutboardLocation,
2497 size: u64,
2498 hash: &Hash,
2499) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2500 Ok(match location {
2501 OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
2502 OutboardLocation::Inline(_) => {
2503 let Some(outboard) = tables.inline_outboard().get(hash)? else {
2504 return Err(ActorError::Inconsistent(format!(
2505 "inconsistent database state: {} should have inline outboard but does not",
2506 hash.to_hex()
2507 )));
2508 };
2509 MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
2510 }
2511 OutboardLocation::Owned => {
2512 let outboard_size = raw_outboard_size(size);
2513 let path = options.owned_outboard_path(hash);
2514 let Ok(file) = std::fs::File::open(&path) else {
2515 return Err(io::Error::new(
2516 io::ErrorKind::NotFound,
2517 format!("file not found: {} size={}", path.display(), outboard_size),
2518 )
2519 .into());
2520 };
2521 MemOrFile::File((file, outboard_size))
2522 }
2523 })
2524}
2525
2526fn complete_storage(
2528 storage: BaoFileStorage,
2529 hash: &Hash,
2530 path_options: &PathOptions,
2531 inline_options: &InlineOptions,
2532 delete_after_commit: &mut DeleteSet,
2533) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
2534 let (data, outboard, _sizes) = match storage {
2535 BaoFileStorage::Complete(c) => return Ok(Err(c)),
2536 BaoFileStorage::IncompleteMem(storage) => {
2537 let (data, outboard, sizes) = storage.into_parts();
2538 (
2539 MemOrFile::Mem(Bytes::from(data.into_parts().0)),
2540 MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
2541 MemOrFile::Mem(Bytes::from(sizes.to_vec())),
2542 )
2543 }
2544 BaoFileStorage::IncompleteFile(storage) => {
2545 let (data, outboard, sizes) = storage.into_parts();
2546 (
2547 MemOrFile::File(data),
2548 MemOrFile::File(outboard),
2549 MemOrFile::File(sizes),
2550 )
2551 }
2552 };
2553 let data_size = data.size()?.unwrap();
2554 let outboard_size = outboard.size()?.unwrap();
2555 debug_assert!(raw_outboard_size(data_size) == outboard_size);
2557 let data = if data_size <= inline_options.max_data_inlined {
2559 match data {
2560 MemOrFile::File(data) => {
2561 let mut buf = vec![0; data_size as usize];
2562 data.read_at(0, &mut buf)?;
2563 delete_after_commit.insert(*hash, [BaoFilePart::Data]);
2565 MemOrFile::Mem(Bytes::from(buf))
2566 }
2567 MemOrFile::Mem(data) => MemOrFile::Mem(data),
2568 }
2569 } else {
2570 delete_after_commit.remove(*hash, [BaoFilePart::Data]);
2572 match data {
2573 MemOrFile::Mem(data) => {
2574 let path = path_options.owned_data_path(hash);
2575 let file = overwrite_and_sync(&path, &data)?;
2576 MemOrFile::File((file, data_size))
2577 }
2578 MemOrFile::File(data) => MemOrFile::File((data, data_size)),
2579 }
2580 };
2581 let outboard = if outboard_size == 0 {
2583 Default::default()
2584 } else if outboard_size <= inline_options.max_outboard_inlined {
2585 match outboard {
2586 MemOrFile::File(outboard) => {
2587 let mut buf = vec![0; outboard_size as usize];
2588 outboard.read_at(0, &mut buf)?;
2589 drop(outboard);
2590 delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
2592 MemOrFile::Mem(Bytes::from(buf))
2593 }
2594 MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
2595 }
2596 } else {
2597 delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
2599 match outboard {
2600 MemOrFile::Mem(outboard) => {
2601 let path = path_options.owned_outboard_path(hash);
2602 let file = overwrite_and_sync(&path, &outboard)?;
2603 MemOrFile::File((file, outboard_size))
2604 }
2605 MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
2606 }
2607 };
2608 delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
2611 Ok(Ok(CompleteStorage { data, outboard }))
2612}