1use std::{
67 collections::{BTreeMap, BTreeSet},
68 future::Future,
69 io,
70 ops::Bound,
71 path::{Path, PathBuf},
72 sync::{Arc, RwLock},
73 time::{Duration, SystemTime},
74};
75
76use bao_tree::io::{
77 fsm::Outboard,
78 sync::{ReadAt, Size},
79};
80use bytes::Bytes;
81use futures_lite::{Stream, StreamExt};
82use genawaiter::rc::{Co, Gen};
83use iroh_io::AsyncSliceReader;
84use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
85use serde::{Deserialize, Serialize};
86use smallvec::SmallVec;
87use tokio::io::AsyncWriteExt;
88use tracing::trace_span;
89mod tables;
90#[doc(hidden)]
91pub mod test_support;
92#[cfg(test)]
93mod tests;
94mod util;
95mod validate;
96
97use tables::{ReadOnlyTables, ReadableTables, Tables};
98
99use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver};
100use super::{
101 bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
102 temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
103 ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
104};
105use crate::{
106 store::{
107 bao_file::{BaoFileStorage, CompleteStorage},
108 fs::{
109 tables::BaoFilePart,
110 util::{overwrite_and_sync, read_and_remove},
111 },
112 GcMarkEvent, GcSweepEvent,
113 },
114 util::{
115 compute_outboard,
116 progress::{
117 BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
118 ProgressSender,
119 },
120 raw_outboard_size, MemOrFile, TagCounter, TagDrop,
121 },
122 BlobFormat, Hash, HashAndFormat, Tag, TempTag,
123};
124
125#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
132pub(crate) enum DataLocation<I = (), E = ()> {
133 Inline(I),
135 Owned(E),
137 External(Vec<PathBuf>, E),
139}
140
141impl<X> DataLocation<X, u64> {
142 fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
143 Ok(match (self, that) {
144 (
145 DataLocation::External(mut paths, a_size),
146 DataLocation::External(b_paths, b_size),
147 ) => {
148 if a_size != b_size {
149 return Err(ActorError::Inconsistent(format!(
150 "complete size mismatch {} {}",
151 a_size, b_size
152 )));
153 }
154 paths.extend(b_paths);
155 paths.sort();
156 paths.dedup();
157 DataLocation::External(paths, a_size)
158 }
159 (_, b @ DataLocation::Owned(_)) => {
160 b
163 }
164 (a @ DataLocation::Owned(_), _) => {
165 a
168 }
169 (_, b @ DataLocation::Inline(_)) => {
170 b
173 }
174 (a @ DataLocation::Inline(_), _) => {
175 a
178 }
179 })
180 }
181}
182
183impl<I, E> DataLocation<I, E> {
184 fn discard_inline_data(self) -> DataLocation<(), E> {
185 match self {
186 DataLocation::Inline(_) => DataLocation::Inline(()),
187 DataLocation::Owned(x) => DataLocation::Owned(x),
188 DataLocation::External(paths, x) => DataLocation::External(paths, x),
189 }
190 }
191}
192
193#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
200pub(crate) enum OutboardLocation<I = ()> {
201 Inline(I),
203 Owned,
205 NotNeeded,
207}
208
209impl<I> OutboardLocation<I> {
210 fn discard_extra_data(self) -> OutboardLocation<()> {
211 match self {
212 Self::Inline(_) => OutboardLocation::Inline(()),
213 Self::Owned => OutboardLocation::Owned,
214 Self::NotNeeded => OutboardLocation::NotNeeded,
215 }
216 }
217}
218
219#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
223pub(crate) enum EntryState<I = ()> {
224 Complete {
227 data_location: DataLocation<I, u64>,
229 outboard_location: OutboardLocation<I>,
231 },
232 Partial {
239 size: Option<u64>,
244 },
245}
246
247impl Default for EntryState {
248 fn default() -> Self {
249 Self::Partial { size: None }
250 }
251}
252
253impl EntryState {
254 fn union(self, that: Self) -> ActorResult<Self> {
255 match (self, that) {
256 (
257 Self::Complete {
258 data_location,
259 outboard_location,
260 },
261 Self::Complete {
262 data_location: b_data_location,
263 ..
264 },
265 ) => Ok(Self::Complete {
266 data_location: data_location.union(b_data_location)?,
268 outboard_location,
269 }),
270 (a @ Self::Complete { .. }, Self::Partial { .. }) =>
271 {
273 Ok(a)
274 }
275 (Self::Partial { .. }, b @ Self::Complete { .. }) =>
276 {
278 Ok(b)
279 }
280 (Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
281 {
283 let size = match (a_size, b_size) {
284 (Some(a_size), Some(b_size)) => {
285 if a_size != b_size {
289 return Err(ActorError::Inconsistent(format!(
290 "validated size mismatch {} {}",
291 a_size, b_size
292 )));
293 }
294 Some(a_size)
295 }
296 (Some(a_size), None) => Some(a_size),
297 (None, Some(b_size)) => Some(b_size),
298 (None, None) => None,
299 };
300 Ok(Self::Partial { size })
301 }
302 }
303 }
304}
305
306impl redb::Value for EntryState {
307 type SelfType<'a> = EntryState;
308
309 type AsBytes<'a> = SmallVec<[u8; 128]>;
310
311 fn fixed_width() -> Option<usize> {
312 None
313 }
314
315 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
316 where
317 Self: 'a,
318 {
319 postcard::from_bytes(data).unwrap()
320 }
321
322 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
323 where
324 Self: 'a,
325 Self: 'b,
326 {
327 postcard::to_extend(value, SmallVec::new()).unwrap()
328 }
329
330 fn type_name() -> redb::TypeName {
331 redb::TypeName::new("EntryState")
332 }
333}
334
335#[derive(Debug, Clone)]
337pub struct InlineOptions {
338 pub max_data_inlined: u64,
340 pub max_outboard_inlined: u64,
342}
343
344impl InlineOptions {
345 pub const NO_INLINE: Self = Self {
347 max_data_inlined: 0,
348 max_outboard_inlined: 0,
349 };
350 pub const ALWAYS_INLINE: Self = Self {
352 max_data_inlined: u64::MAX,
353 max_outboard_inlined: u64::MAX,
354 };
355}
356
357impl Default for InlineOptions {
358 fn default() -> Self {
359 Self {
360 max_data_inlined: 1024 * 16,
361 max_outboard_inlined: 1024 * 16,
362 }
363 }
364}
365
366#[derive(Debug, Clone)]
368pub struct PathOptions {
369 pub data_path: PathBuf,
371 pub temp_path: PathBuf,
375}
376
377impl PathOptions {
378 fn new(root: &Path) -> Self {
379 Self {
380 data_path: root.join("data"),
381 temp_path: root.join("temp"),
382 }
383 }
384
385 fn owned_data_path(&self, hash: &Hash) -> PathBuf {
386 self.data_path.join(format!("{}.data", hash.to_hex()))
387 }
388
389 fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
390 self.data_path.join(format!("{}.obao4", hash.to_hex()))
391 }
392
393 fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
394 self.data_path.join(format!("{}.sizes4", hash.to_hex()))
395 }
396
397 fn temp_file_name(&self) -> PathBuf {
398 self.temp_path.join(temp_name())
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct BatchOptions {
405 pub max_read_batch: usize,
407 pub max_read_duration: Duration,
409 pub max_write_batch: usize,
411 pub max_write_duration: Duration,
413}
414
415impl Default for BatchOptions {
416 fn default() -> Self {
417 Self {
418 max_read_batch: 10000,
419 max_read_duration: Duration::from_secs(1),
420 max_write_batch: 1000,
421 max_write_duration: Duration::from_millis(500),
422 }
423 }
424}
425
426#[derive(Debug, Clone)]
428pub struct Options {
429 pub path: PathOptions,
431 pub inline: InlineOptions,
433 pub batch: BatchOptions,
435}
436
437#[derive(derive_more::Debug)]
438pub(crate) enum ImportSource {
439 TempFile(PathBuf),
440 External(PathBuf),
441 Memory(#[debug(skip)] Bytes),
442}
443
444impl ImportSource {
445 fn content(&self) -> MemOrFile<&[u8], &Path> {
446 match self {
447 Self::TempFile(path) => MemOrFile::File(path.as_path()),
448 Self::External(path) => MemOrFile::File(path.as_path()),
449 Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
450 }
451 }
452
453 fn len(&self) -> io::Result<u64> {
454 match self {
455 Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
456 Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
457 Self::Memory(data) => Ok(data.len() as u64),
458 }
459 }
460}
461
462pub type Entry = BaoFileHandle;
464
465impl super::MapEntry for Entry {
466 fn hash(&self) -> Hash {
467 self.hash()
468 }
469
470 fn size(&self) -> BaoBlobSize {
471 let size = self.current_size().unwrap();
472 tracing::trace!("redb::Entry::size() = {}", size);
473 BaoBlobSize::new(size, self.is_complete())
474 }
475
476 fn is_complete(&self) -> bool {
477 self.is_complete()
478 }
479
480 async fn outboard(&self) -> io::Result<impl Outboard> {
481 self.outboard()
482 }
483
484 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
485 Ok(self.data_reader())
486 }
487}
488
489impl super::MapEntryMut for Entry {
490 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
491 Ok(self.writer())
492 }
493}
494
495#[derive(derive_more::Debug)]
496pub(crate) struct Import {
497 content_id: HashAndFormat,
499 source: ImportSource,
501 data_size: u64,
503 #[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
505 outboard: Option<Vec<u8>>,
506}
507
508#[derive(derive_more::Debug)]
509pub(crate) struct Export {
510 temp_tag: TempTag,
513 target: PathBuf,
515 mode: ExportMode,
517 #[debug(skip)]
519 progress: ExportProgressCb,
520}
521
522#[derive(derive_more::Debug)]
523pub(crate) enum ActorMessage {
524 Get {
527 hash: Hash,
528 tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
529 },
530 EntryStatus {
532 hash: Hash,
533 tx: oneshot::Sender<ActorResult<EntryStatus>>,
534 },
535 #[cfg(test)]
536 EntryState {
539 hash: Hash,
540 tx: oneshot::Sender<ActorResult<test_support::EntryStateResponse>>,
541 },
542 GetFullEntryState {
544 hash: Hash,
545 tx: oneshot::Sender<ActorResult<Option<EntryData>>>,
546 },
547 SetFullEntryState {
549 hash: Hash,
550 entry: Option<EntryData>,
551 tx: oneshot::Sender<ActorResult<()>>,
552 },
553 GetOrCreate {
559 hash: Hash,
560 tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
561 },
562 OnMemSizeExceeded { hash: Hash },
566 OnComplete { handle: BaoFileHandle },
569 Import {
573 cmd: Import,
574 tx: oneshot::Sender<ActorResult<(TempTag, u64)>>,
575 },
576 Export {
582 cmd: Export,
583 tx: oneshot::Sender<ActorResult<()>>,
584 },
585 UpdateInlineOptions {
587 inline_options: InlineOptions,
589 reapply: bool,
591 tx: oneshot::Sender<()>,
592 },
593 Blobs {
595 #[debug(skip)]
596 filter: FilterPredicate<Hash, EntryState>,
597 #[allow(clippy::type_complexity)]
598 tx: oneshot::Sender<
599 ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
600 >,
601 },
602 Tags {
604 from: Option<Tag>,
605 to: Option<Tag>,
606 #[allow(clippy::type_complexity)]
607 tx: oneshot::Sender<
608 ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
609 >,
610 },
611 SetTag {
613 tag: Tag,
614 value: HashAndFormat,
615 tx: oneshot::Sender<ActorResult<()>>,
616 },
617 DeleteTags {
619 from: Option<Tag>,
620 to: Option<Tag>,
621 tx: oneshot::Sender<ActorResult<()>>,
622 },
623 CreateTag {
625 hash: HashAndFormat,
626 tx: oneshot::Sender<ActorResult<Tag>>,
627 },
628 RenameTag {
630 from: Tag,
631 to: Tag,
632 tx: oneshot::Sender<ActorResult<()>>,
633 },
634 Delete {
636 hashes: Vec<Hash>,
637 tx: oneshot::Sender<ActorResult<()>>,
638 },
639 GcDelete {
641 hashes: Vec<Hash>,
642 tx: oneshot::Sender<ActorResult<()>>,
643 },
644 Sync { tx: oneshot::Sender<()> },
648 Dump,
650 Fsck {
655 repair: bool,
656 progress: BoxedProgressSender<ConsistencyCheckProgress>,
657 tx: oneshot::Sender<ActorResult<()>>,
658 },
659 GcStart { tx: oneshot::Sender<()> },
663 Shutdown { tx: Option<oneshot::Sender<()>> },
667}
668
669impl ActorMessage {
670 fn category(&self) -> MessageCategory {
671 match self {
672 Self::Get { .. }
673 | Self::GetOrCreate { .. }
674 | Self::EntryStatus { .. }
675 | Self::Blobs { .. }
676 | Self::Tags { .. }
677 | Self::GcStart { .. }
678 | Self::GetFullEntryState { .. }
679 | Self::Dump => MessageCategory::ReadOnly,
680 Self::Import { .. }
681 | Self::Export { .. }
682 | Self::OnMemSizeExceeded { .. }
683 | Self::OnComplete { .. }
684 | Self::SetTag { .. }
685 | Self::CreateTag { .. }
686 | Self::SetFullEntryState { .. }
687 | Self::Delete { .. }
688 | Self::DeleteTags { .. }
689 | Self::RenameTag { .. }
690 | Self::GcDelete { .. } => MessageCategory::ReadWrite,
691 Self::UpdateInlineOptions { .. }
692 | Self::Sync { .. }
693 | Self::Shutdown { .. }
694 | Self::Fsck { .. } => MessageCategory::TopLevel,
695 #[cfg(test)]
696 Self::EntryState { .. } => MessageCategory::ReadOnly,
697 }
698 }
699}
700
701enum MessageCategory {
702 ReadOnly,
703 ReadWrite,
704 TopLevel,
705}
706
707pub(crate) type FilterPredicate<K, V> =
709 Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
710
711#[derive(Debug, Clone)]
714pub struct Store(Arc<StoreInner>);
715
716impl Store {
717 pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
719 let path = root.as_ref();
720 let db_path = path.join("blobs.db");
721 let options = Options {
722 path: PathOptions::new(path),
723 inline: Default::default(),
724 batch: Default::default(),
725 };
726 Self::new(db_path, options).await
727 }
728
729 pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
731 let rt = tokio::runtime::Handle::try_current()
733 .map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
734 let inner =
735 tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
736 Ok(Self(Arc::new(inner)))
737 }
738
739 pub async fn update_inline_options(
744 &self,
745 inline_options: InlineOptions,
746 reapply: bool,
747 ) -> io::Result<()> {
748 Ok(self
749 .0
750 .update_inline_options(inline_options, reapply)
751 .await?)
752 }
753
754 pub async fn dump(&self) -> io::Result<()> {
756 Ok(self.0.dump().await?)
757 }
758}
759
760#[derive(Debug)]
761struct StoreInner {
762 tx: async_channel::Sender<ActorMessage>,
763 temp: Arc<RwLock<TempCounterMap>>,
764 handle: Option<std::thread::JoinHandle<()>>,
765 path_options: Arc<PathOptions>,
766}
767
768impl TagDrop for RwLock<TempCounterMap> {
769 fn on_drop(&self, content: &HashAndFormat) {
770 self.write().unwrap().dec(content);
771 }
772}
773
774impl TagCounter for RwLock<TempCounterMap> {
775 fn on_create(&self, content: &HashAndFormat) {
776 self.write().unwrap().inc(content);
777 }
778}
779
780impl StoreInner {
781 fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
782 tracing::trace!(
783 "creating data directory: {}",
784 options.path.data_path.display()
785 );
786 std::fs::create_dir_all(&options.path.data_path)?;
787 tracing::trace!(
788 "creating temp directory: {}",
789 options.path.temp_path.display()
790 );
791 std::fs::create_dir_all(&options.path.temp_path)?;
792 tracing::trace!(
793 "creating parent directory for db file{}",
794 path.parent().unwrap().display()
795 );
796 std::fs::create_dir_all(path.parent().unwrap())?;
797 let temp: Arc<RwLock<TempCounterMap>> = Default::default();
798 let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?;
799 let handle = std::thread::Builder::new()
800 .name("redb-actor".to_string())
801 .spawn(move || {
802 rt.block_on(async move {
803 if let Err(cause) = actor.run_batched().await {
804 tracing::error!("redb actor failed: {}", cause);
805 }
806 });
807 })
808 .expect("failed to spawn thread");
809 Ok(Self {
810 tx,
811 temp,
812 handle: Some(handle),
813 path_options: Arc::new(options.path),
814 })
815 }
816
817 pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
818 let (tx, rx) = oneshot::channel();
819 self.tx.send(ActorMessage::Get { hash, tx }).await?;
820 Ok(rx.await??)
821 }
822
823 async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
824 let (tx, rx) = oneshot::channel();
825 self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?;
826 Ok(rx.await??)
827 }
828
829 async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
830 let (tx, rx) = oneshot::channel();
831 let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
832 let v = v.value();
833 if let EntryState::Complete { .. } = &v {
834 Some((k.value(), v))
835 } else {
836 None
837 }
838 });
839 self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
840 let blobs = rx.await?;
841 let res = blobs?
842 .into_iter()
843 .map(|r| {
844 r.map(|(hash, _)| hash)
845 .map_err(|e| ActorError::from(e).into())
846 })
847 .collect::<Vec<_>>();
848 Ok(res)
849 }
850
851 async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
852 let (tx, rx) = oneshot::channel();
853 let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
854 let v = v.value();
855 if let EntryState::Partial { .. } = &v {
856 Some((k.value(), v))
857 } else {
858 None
859 }
860 });
861 self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
862 let blobs = rx.await?;
863 let res = blobs?
864 .into_iter()
865 .map(|r| {
866 r.map(|(hash, _)| hash)
867 .map_err(|e| ActorError::from(e).into())
868 })
869 .collect::<Vec<_>>();
870 Ok(res)
871 }
872
873 async fn tags(
874 &self,
875 from: Option<Tag>,
876 to: Option<Tag>,
877 ) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
878 let (tx, rx) = oneshot::channel();
879 self.tx.send(ActorMessage::Tags { from, to, tx }).await?;
880 let tags = rx.await?;
881 let tags = tags?
883 .into_iter()
884 .map(|r| r.map_err(|e| ActorError::from(e).into()))
885 .collect();
886 Ok(tags)
887 }
888
889 async fn set_tag(&self, tag: Tag, value: HashAndFormat) -> OuterResult<()> {
890 let (tx, rx) = oneshot::channel();
891 self.tx
892 .send(ActorMessage::SetTag { tag, value, tx })
893 .await?;
894 Ok(rx.await??)
895 }
896
897 async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> OuterResult<()> {
898 let (tx, rx) = oneshot::channel();
899 self.tx
900 .send(ActorMessage::DeleteTags { from, to, tx })
901 .await?;
902 Ok(rx.await??)
903 }
904
905 async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
906 let (tx, rx) = oneshot::channel();
907 self.tx.send(ActorMessage::CreateTag { hash, tx }).await?;
908 Ok(rx.await??)
909 }
910
911 async fn rename_tag(&self, from: Tag, to: Tag) -> OuterResult<()> {
912 let (tx, rx) = oneshot::channel();
913 self.tx
914 .send(ActorMessage::RenameTag { from, to, tx })
915 .await?;
916 Ok(rx.await??)
917 }
918
919 async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
920 let (tx, rx) = oneshot::channel();
921 self.tx.send(ActorMessage::Delete { hashes, tx }).await?;
922 Ok(rx.await??)
923 }
924
925 async fn gc_delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
926 let (tx, rx) = oneshot::channel();
927 self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?;
928 Ok(rx.await??)
929 }
930
931 async fn gc_start(&self) -> OuterResult<()> {
932 let (tx, rx) = oneshot::channel();
933 self.tx.send(ActorMessage::GcStart { tx }).await?;
934 Ok(rx.await?)
935 }
936
937 async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
938 let (tx, rx) = oneshot::channel();
939 self.tx
940 .send(ActorMessage::EntryStatus { hash: *hash, tx })
941 .await?;
942 Ok(rx.await??)
943 }
944
945 fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
946 let (tx, rx) = oneshot::channel();
947 self.tx
948 .send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?;
949 Ok(rx.recv()??)
950 }
951
952 async fn complete(&self, entry: Entry) -> OuterResult<()> {
953 self.tx
954 .send(ActorMessage::OnComplete { handle: entry })
955 .await?;
956 Ok(())
957 }
958
959 async fn export(
960 &self,
961 hash: Hash,
962 target: PathBuf,
963 mode: ExportMode,
964 progress: ExportProgressCb,
965 ) -> OuterResult<()> {
966 tracing::debug!(
967 "exporting {} to {} using mode {:?}",
968 hash.to_hex(),
969 target.display(),
970 mode
971 );
972 if !target.is_absolute() {
973 return Err(io::Error::new(
974 io::ErrorKind::InvalidInput,
975 "target path must be absolute",
976 )
977 .into());
978 }
979 let parent = target.parent().ok_or_else(|| {
980 OuterError::from(io::Error::new(
981 io::ErrorKind::InvalidInput,
982 "target path has no parent directory",
983 ))
984 })?;
985 std::fs::create_dir_all(parent)?;
986 let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
987 let (tx, rx) = oneshot::channel();
988 self.tx
989 .send(ActorMessage::Export {
990 cmd: Export {
991 temp_tag,
992 target,
993 mode,
994 progress,
995 },
996 tx,
997 })
998 .await?;
999 Ok(rx.await??)
1000 }
1001
1002 async fn consistency_check(
1003 &self,
1004 repair: bool,
1005 progress: BoxedProgressSender<ConsistencyCheckProgress>,
1006 ) -> OuterResult<()> {
1007 let (tx, rx) = oneshot::channel();
1008 self.tx
1009 .send(ActorMessage::Fsck {
1010 repair,
1011 progress,
1012 tx,
1013 })
1014 .await?;
1015 Ok(rx.await??)
1016 }
1017
1018 async fn update_inline_options(
1019 &self,
1020 inline_options: InlineOptions,
1021 reapply: bool,
1022 ) -> OuterResult<()> {
1023 let (tx, rx) = oneshot::channel();
1024 self.tx
1025 .send(ActorMessage::UpdateInlineOptions {
1026 inline_options,
1027 reapply,
1028 tx,
1029 })
1030 .await?;
1031 Ok(rx.await?)
1032 }
1033
1034 async fn dump(&self) -> OuterResult<()> {
1035 self.tx.send(ActorMessage::Dump).await?;
1036 Ok(())
1037 }
1038
1039 async fn sync(&self) -> OuterResult<()> {
1040 let (tx, rx) = oneshot::channel();
1041 self.tx.send(ActorMessage::Sync { tx }).await?;
1042 Ok(rx.await?)
1043 }
1044
1045 fn import_file_sync(
1046 &self,
1047 path: PathBuf,
1048 mode: ImportMode,
1049 format: BlobFormat,
1050 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1051 ) -> OuterResult<(TempTag, u64)> {
1052 if !path.is_absolute() {
1053 return Err(
1054 io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
1055 );
1056 }
1057 if !path.is_file() && !path.is_symlink() {
1058 return Err(io::Error::new(
1059 io::ErrorKind::InvalidInput,
1060 "path is not a file or symlink",
1061 )
1062 .into());
1063 }
1064 let id = progress.new_id();
1065 progress.blocking_send(ImportProgress::Found {
1066 id,
1067 name: path.to_string_lossy().to_string(),
1068 })?;
1069 let file = match mode {
1070 ImportMode::TryReference => ImportSource::External(path),
1071 ImportMode::Copy => {
1072 if std::fs::metadata(&path)?.len() < 16 * 1024 {
1073 let data = std::fs::read(&path)?;
1077 ImportSource::Memory(data.into())
1078 } else {
1079 let temp_path = self.temp_file_name();
1080 progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
1082 if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
1083 tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
1084 } else {
1085 tracing::debug!("copied {} to {}", path.display(), temp_path.display());
1086 }
1087 ImportSource::TempFile(temp_path)
1089 }
1090 }
1091 };
1092 let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
1093 Ok((tag, size))
1094 }
1095
1096 fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
1097 let id = 0;
1098 let file = ImportSource::Memory(data);
1099 let progress = IgnoreProgressSender::default();
1100 let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
1101 Ok(tag)
1102 }
1103
1104 fn finalize_import_sync(
1105 &self,
1106 file: ImportSource,
1107 format: BlobFormat,
1108 id: u64,
1109 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1110 ) -> OuterResult<(TempTag, u64)> {
1111 let data_size = file.len()?;
1112 tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
1113 progress.blocking_send(ImportProgress::Size {
1114 id,
1115 size: data_size,
1116 })?;
1117 let progress2 = progress.clone();
1118 let (hash, outboard) = match file.content() {
1119 MemOrFile::File(path) => {
1120 let span = trace_span!("outboard.compute", path = %path.display());
1121 let _guard = span.enter();
1122 let file = std::fs::File::open(path)?;
1123 compute_outboard(file, data_size, move |offset| {
1124 Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
1125 })?
1126 }
1127 MemOrFile::Mem(bytes) => {
1128 compute_outboard(bytes, data_size, |_| Ok(()))?
1130 }
1131 };
1132 progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
1133 let tag = self.temp.temp_tag(HashAndFormat { hash, format });
1135 let hash = *tag.hash();
1136 let (tx, rx) = oneshot::channel();
1138 self.tx.send_blocking(ActorMessage::Import {
1139 cmd: Import {
1140 content_id: HashAndFormat { hash, format },
1141 source: file,
1142 outboard,
1143 data_size,
1144 },
1145 tx,
1146 })?;
1147 Ok(rx.recv()??)
1148 }
1149
1150 fn temp_file_name(&self) -> PathBuf {
1151 self.path_options.temp_file_name()
1152 }
1153
1154 async fn shutdown(&self) {
1155 let (tx, rx) = oneshot::channel();
1156 self.tx
1157 .send(ActorMessage::Shutdown { tx: Some(tx) })
1158 .await
1159 .ok();
1160 rx.await.ok();
1161 }
1162}
1163
1164impl Drop for StoreInner {
1165 fn drop(&mut self) {
1166 if let Some(handle) = self.handle.take() {
1167 self.tx
1168 .send_blocking(ActorMessage::Shutdown { tx: None })
1169 .ok();
1170 handle.join().ok();
1171 }
1172 }
1173}
1174
1175struct ActorState {
1176 handles: BTreeMap<Hash, BaoFileHandleWeak>,
1177 protected: BTreeSet<Hash>,
1178 temp: Arc<RwLock<TempCounterMap>>,
1179 msgs_rx: async_channel::Receiver<ActorMessage>,
1180 create_options: Arc<BaoFileConfig>,
1181 options: Options,
1182 rt: tokio::runtime::Handle,
1183}
1184
1185struct Actor {
1190 db: redb::Database,
1191 state: ActorState,
1192}
1193
1194#[derive(Debug, thiserror::Error)]
1199pub(crate) enum ActorError {
1200 #[error("table error: {0}")]
1201 Table(#[from] redb::TableError),
1202 #[error("database error: {0}")]
1203 Database(#[from] redb::DatabaseError),
1204 #[error("transaction error: {0}")]
1205 Transaction(#[from] redb::TransactionError),
1206 #[error("commit error: {0}")]
1207 Commit(#[from] redb::CommitError),
1208 #[error("storage error: {0}")]
1209 Storage(#[from] redb::StorageError),
1210 #[error("io error: {0}")]
1211 Io(#[from] io::Error),
1212 #[error("inconsistent database state: {0}")]
1213 Inconsistent(String),
1214 #[error("error during database migration: {0}")]
1215 Migration(#[source] anyhow::Error),
1216}
1217
1218impl From<ActorError> for io::Error {
1219 fn from(e: ActorError) -> Self {
1220 match e {
1221 ActorError::Io(e) => e,
1222 e => io::Error::new(io::ErrorKind::Other, e),
1223 }
1224 }
1225}
1226
1227pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
1231
1232#[derive(Debug, thiserror::Error)]
1237pub(crate) enum OuterError {
1238 #[error("inner error: {0}")]
1239 Inner(#[from] ActorError),
1240 #[error("send error")]
1241 Send,
1242 #[error("progress send error: {0}")]
1243 ProgressSend(#[from] ProgressSendError),
1244 #[error("recv error: {0}")]
1245 Recv(#[from] oneshot::RecvError),
1246 #[error("recv error: {0}")]
1247 AsyncChannelRecv(#[from] async_channel::RecvError),
1248 #[error("join error: {0}")]
1249 JoinTask(#[from] tokio::task::JoinError),
1250}
1251
1252impl From<async_channel::SendError<ActorMessage>> for OuterError {
1253 fn from(_e: async_channel::SendError<ActorMessage>) -> Self {
1254 OuterError::Send
1255 }
1256}
1257
1258pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
1262
1263impl From<io::Error> for OuterError {
1264 fn from(e: io::Error) -> Self {
1265 OuterError::Inner(ActorError::Io(e))
1266 }
1267}
1268
1269impl From<OuterError> for io::Error {
1270 fn from(e: OuterError) -> Self {
1271 match e {
1272 OuterError::Inner(ActorError::Io(e)) => e,
1273 e => io::Error::new(io::ErrorKind::Other, e),
1274 }
1275 }
1276}
1277
1278impl super::Map for Store {
1279 type Entry = Entry;
1280
1281 async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
1282 Ok(self.0.get(*hash).await?)
1283 }
1284}
1285
1286impl super::MapMut for Store {
1287 type EntryMut = Entry;
1288
1289 async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
1290 Ok(self.0.get_or_create(hash).await?)
1291 }
1292
1293 async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
1294 Ok(self.0.entry_status(hash).await?)
1295 }
1296
1297 async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
1298 self.get(hash).await
1299 }
1300
1301 async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
1302 Ok(self.0.complete(entry).await?)
1303 }
1304
1305 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
1306 Ok(self.0.entry_status_sync(hash)?)
1307 }
1308}
1309
1310impl super::ReadableStore for Store {
1311 async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
1312 Ok(Box::new(self.0.blobs().await?.into_iter()))
1313 }
1314
1315 async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
1316 Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
1317 }
1318
1319 async fn tags(
1320 &self,
1321 from: Option<Tag>,
1322 to: Option<Tag>,
1323 ) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
1324 Ok(Box::new(self.0.tags(from, to).await?.into_iter()))
1325 }
1326
1327 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
1328 Box::new(self.0.temp.read().unwrap().keys())
1329 }
1330
1331 async fn consistency_check(
1332 &self,
1333 repair: bool,
1334 tx: BoxedProgressSender<ConsistencyCheckProgress>,
1335 ) -> io::Result<()> {
1336 self.0.consistency_check(repair, tx.clone()).await?;
1337 Ok(())
1338 }
1339
1340 async fn export(
1341 &self,
1342 hash: Hash,
1343 target: PathBuf,
1344 mode: ExportMode,
1345 progress: ExportProgressCb,
1346 ) -> io::Result<()> {
1347 Ok(self.0.export(hash, target, mode, progress).await?)
1348 }
1349}
1350
1351impl super::Store for Store {
1352 async fn import_file(
1353 &self,
1354 path: PathBuf,
1355 mode: ImportMode,
1356 format: BlobFormat,
1357 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1358 ) -> io::Result<(crate::TempTag, u64)> {
1359 let this = self.0.clone();
1360 Ok(
1361 tokio::task::spawn_blocking(move || {
1362 this.import_file_sync(path, mode, format, progress)
1363 })
1364 .await??,
1365 )
1366 }
1367
1368 async fn import_bytes(
1369 &self,
1370 data: bytes::Bytes,
1371 format: crate::BlobFormat,
1372 ) -> io::Result<crate::TempTag> {
1373 let this = self.0.clone();
1374 Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
1375 }
1376
1377 async fn import_stream(
1378 &self,
1379 mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
1380 format: BlobFormat,
1381 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
1382 ) -> io::Result<(TempTag, u64)> {
1383 let this = self.clone();
1384 let id = progress.new_id();
1385 let temp_data_path = this.0.temp_file_name();
1387 let name = temp_data_path
1388 .file_name()
1389 .expect("just created")
1390 .to_string_lossy()
1391 .to_string();
1392 progress.send(ImportProgress::Found { id, name }).await?;
1393 let mut writer = tokio::fs::File::create(&temp_data_path).await?;
1394 let mut offset = 0;
1395 while let Some(chunk) = data.next().await {
1396 let chunk = chunk?;
1397 writer.write_all(&chunk).await?;
1398 offset += chunk.len() as u64;
1399 progress.try_send(ImportProgress::CopyProgress { id, offset })?;
1400 }
1401 writer.flush().await?;
1402 drop(writer);
1403 let file = ImportSource::TempFile(temp_data_path);
1404 Ok(tokio::task::spawn_blocking(move || {
1405 this.0.finalize_import_sync(file, format, id, progress)
1406 })
1407 .await??)
1408 }
1409
1410 async fn set_tag(&self, name: Tag, hash: HashAndFormat) -> io::Result<()> {
1411 Ok(self.0.set_tag(name, hash).await?)
1412 }
1413
1414 async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> {
1415 Ok(self.0.delete_tags(from, to).await?)
1416 }
1417
1418 async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
1419 Ok(self.0.create_tag(hash).await?)
1420 }
1421
1422 async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> {
1423 Ok(self.0.rename_tag(from, to).await?)
1424 }
1425
1426 async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
1427 Ok(self.0.delete(hashes).await?)
1428 }
1429
1430 async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
1431 where
1432 G: Fn() -> Gut,
1433 Gut: Future<Output = BTreeSet<Hash>> + Send,
1434 {
1435 tracing::info!("Starting GC task with interval {:?}", config.period);
1436 let mut live = BTreeSet::new();
1437 'outer: loop {
1438 if let Err(cause) = self.0.gc_start().await {
1439 tracing::debug!(
1440 "unable to notify the db of GC start: {cause}. Shutting down GC loop."
1441 );
1442 break;
1443 }
1444 tokio::time::sleep(config.period).await;
1446 tracing::debug!("Starting GC");
1447 live.clear();
1448
1449 let p = protected_cb().await;
1450 live.extend(p);
1451
1452 tracing::debug!("Starting GC mark phase");
1453 let live_ref = &mut live;
1454 let mut stream = Gen::new(|co| async move {
1455 if let Err(e) = super::gc_mark_task(self, live_ref, &co).await {
1456 co.yield_(GcMarkEvent::Error(e)).await;
1457 }
1458 });
1459 while let Some(item) = stream.next().await {
1460 match item {
1461 GcMarkEvent::CustomDebug(text) => {
1462 tracing::debug!("{}", text);
1463 }
1464 GcMarkEvent::CustomWarning(text, _) => {
1465 tracing::warn!("{}", text);
1466 }
1467 GcMarkEvent::Error(err) => {
1468 tracing::error!("Fatal error during GC mark {}", err);
1469 continue 'outer;
1470 }
1471 }
1472 }
1473 drop(stream);
1474
1475 tracing::debug!("Starting GC sweep phase");
1476 let live_ref = &live;
1477 let mut stream = Gen::new(|co| async move {
1478 if let Err(e) = gc_sweep_task(self, live_ref, &co).await {
1479 co.yield_(GcSweepEvent::Error(e)).await;
1480 }
1481 });
1482 while let Some(item) = stream.next().await {
1483 match item {
1484 GcSweepEvent::CustomDebug(text) => {
1485 tracing::debug!("{}", text);
1486 }
1487 GcSweepEvent::CustomWarning(text, _) => {
1488 tracing::warn!("{}", text);
1489 }
1490 GcSweepEvent::Error(err) => {
1491 tracing::error!("Fatal error during GC mark {}", err);
1492 continue 'outer;
1493 }
1494 }
1495 }
1496 if let Some(ref cb) = config.done_callback {
1497 cb();
1498 }
1499 }
1500 }
1501
1502 fn temp_tag(&self, value: HashAndFormat) -> TempTag {
1503 self.0.temp.temp_tag(value)
1504 }
1505
1506 async fn sync(&self) -> io::Result<()> {
1507 Ok(self.0.sync().await?)
1508 }
1509
1510 async fn shutdown(&self) {
1511 self.0.shutdown().await;
1512 }
1513}
1514
1515pub(super) async fn gc_sweep_task(
1516 store: &Store,
1517 live: &BTreeSet<Hash>,
1518 co: &Co<GcSweepEvent>,
1519) -> anyhow::Result<()> {
1520 let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
1521 let mut count = 0;
1522 let mut batch = Vec::new();
1523 for hash in blobs {
1524 let hash = hash?;
1525 if !live.contains(&hash) {
1526 batch.push(hash);
1527 count += 1;
1528 }
1529 if batch.len() >= 100 {
1530 store.0.gc_delete(batch.clone()).await?;
1531 batch.clear();
1532 }
1533 }
1534 if !batch.is_empty() {
1535 store.0.gc_delete(batch).await?;
1536 }
1537 co.yield_(GcSweepEvent::CustomDebug(format!(
1538 "deleted {} blobs",
1539 count
1540 )))
1541 .await;
1542 Ok(())
1543}
1544
1545impl Actor {
1546 fn new(
1547 path: &Path,
1548 options: Options,
1549 temp: Arc<RwLock<TempCounterMap>>,
1550 rt: tokio::runtime::Handle,
1551 ) -> ActorResult<(Self, async_channel::Sender<ActorMessage>)> {
1552 let db = match redb::Database::create(path) {
1553 Ok(db) => db,
1554 Err(DatabaseError::UpgradeRequired(1)) => {
1555 return Err(ActorError::Migration(anyhow::anyhow!(
1556 "migration from v1 no longer supported"
1557 )))
1558 }
1559 Err(err) => return Err(err.into()),
1560 };
1561
1562 let txn = db.begin_write()?;
1563 let mut t = Default::default();
1565 let tables = Tables::new(&txn, &mut t)?;
1566 drop(tables);
1567 txn.commit()?;
1568 let (tx, rx) = async_channel::bounded(1024);
1571 let tx2 = tx.clone();
1572 let on_file_create: CreateCb = Arc::new(move |hash| {
1573 tx2.send_blocking(ActorMessage::OnMemSizeExceeded { hash: *hash })
1575 .ok();
1576 Ok(())
1577 });
1578 let create_options = BaoFileConfig::new(
1579 Arc::new(options.path.data_path.clone()),
1580 16 * 1024,
1581 Some(on_file_create),
1582 );
1583 Ok((
1584 Self {
1585 db,
1586 state: ActorState {
1587 temp,
1588 handles: BTreeMap::new(),
1589 protected: BTreeSet::new(),
1590 msgs_rx: rx,
1591 options,
1592 create_options: Arc::new(create_options),
1593 rt,
1594 },
1595 },
1596 tx,
1597 ))
1598 }
1599
1600 async fn run_batched(mut self) -> ActorResult<()> {
1601 let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone());
1602 while let Some(msg) = msgs.recv().await {
1603 if let ActorMessage::Shutdown { tx } = msg {
1604 drop(self);
1606 if let Some(tx) = tx {
1607 tx.send(()).ok();
1608 }
1609 break;
1610 }
1611 match msg.category() {
1612 MessageCategory::TopLevel => {
1613 self.state.handle_toplevel(&self.db, msg)?;
1614 }
1615 MessageCategory::ReadOnly => {
1616 msgs.push_back(msg).expect("just recv'd");
1617 tracing::debug!("starting read transaction");
1618 let txn = self.db.begin_read()?;
1619 let tables = ReadOnlyTables::new(&txn)?;
1620 let count = self.state.options.batch.max_read_batch;
1621 let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration);
1622 tokio::pin!(timeout);
1623 for _ in 0..count {
1624 tokio::select! {
1625 msg = msgs.recv() => {
1626 if let Some(msg) = msg {
1627 if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
1628 msgs.push_back(msg).expect("just recv'd");
1629 break;
1630 }
1631 } else {
1632 break;
1633 }
1634 }
1635 _ = &mut timeout => {
1636 tracing::debug!("read transaction timed out");
1637 break;
1638 }
1639 }
1640 }
1641 tracing::debug!("done with read transaction");
1642 }
1643 MessageCategory::ReadWrite => {
1644 msgs.push_back(msg).expect("just recv'd");
1645 tracing::debug!("starting write transaction");
1646 let txn = self.db.begin_write()?;
1647 let mut delete_after_commit = Default::default();
1648 let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
1649 let count = self.state.options.batch.max_write_batch;
1650 let timeout = tokio::time::sleep(self.state.options.batch.max_write_duration);
1651 tokio::pin!(timeout);
1652 for _ in 0..count {
1653 tokio::select! {
1654 msg = msgs.recv() => {
1655 if let Some(msg) = msg {
1656 if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
1657 msgs.push_back(msg).expect("just recv'd");
1658 break;
1659 }
1660 } else {
1661 break;
1662 }
1663 }
1664 _ = &mut timeout => {
1665 tracing::debug!("write transaction timed out");
1666 break;
1667 }
1668 }
1669 }
1670 drop(tables);
1671 txn.commit()?;
1672 delete_after_commit.apply_and_clear(&self.state.options.path);
1673 tracing::debug!("write transaction committed");
1674 }
1675 }
1676 }
1677 tracing::debug!("redb actor done");
1678 Ok(())
1679 }
1680}
1681
1682impl ActorState {
1683 fn entry_status(
1684 &mut self,
1685 tables: &impl ReadableTables,
1686 hash: Hash,
1687 ) -> ActorResult<EntryStatus> {
1688 let status = match tables.blobs().get(hash)? {
1689 Some(guard) => match guard.value() {
1690 EntryState::Complete { .. } => EntryStatus::Complete,
1691 EntryState::Partial { .. } => EntryStatus::Partial,
1692 },
1693 None => EntryStatus::NotFound,
1694 };
1695 Ok(status)
1696 }
1697
1698 fn get(
1699 &mut self,
1700 tables: &impl ReadableTables,
1701 hash: Hash,
1702 ) -> ActorResult<Option<BaoFileHandle>> {
1703 if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
1704 return Ok(Some(handle));
1705 }
1706 let Some(entry) = tables.blobs().get(hash)? else {
1707 return Ok(None);
1708 };
1709 let entry = entry.value();
1712 let config = self.create_options.clone();
1713 let handle = match entry {
1714 EntryState::Complete {
1715 data_location,
1716 outboard_location,
1717 } => {
1718 let data = load_data(tables, &self.options.path, data_location, &hash)?;
1719 let outboard = load_outboard(
1720 tables,
1721 &self.options.path,
1722 outboard_location,
1723 data.size(),
1724 &hash,
1725 )?;
1726 BaoFileHandle::new_complete(config, hash, data, outboard)
1727 }
1728 EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
1729 };
1730 self.handles.insert(hash, handle.downgrade());
1731 Ok(Some(handle))
1732 }
1733
1734 fn export(
1735 &mut self,
1736 tables: &mut Tables,
1737 cmd: Export,
1738 tx: oneshot::Sender<ActorResult<()>>,
1739 ) -> ActorResult<()> {
1740 let Export {
1741 temp_tag,
1742 target,
1743 mode,
1744 progress,
1745 } = cmd;
1746 let guard = tables
1747 .blobs
1748 .get(temp_tag.hash())?
1749 .ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
1750 let entry = guard.value();
1751 match entry {
1752 EntryState::Complete {
1753 data_location,
1754 outboard_location,
1755 } => match data_location {
1756 DataLocation::Inline(()) => {
1757 let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
1759 ActorError::Inconsistent("inline data not found".to_owned())
1760 })?;
1761 tracing::trace!("exporting inline data to {}", target.display());
1762 tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
1763 .ok();
1764 }
1765 DataLocation::Owned(size) => {
1766 let path = self.options.path.owned_data_path(temp_tag.hash());
1767 match mode {
1768 ExportMode::Copy => {
1769 self.rt.spawn_blocking(move || {
1771 tx.send(export_file_copy(temp_tag, path, size, target, progress))
1772 .ok();
1773 });
1774 }
1775 ExportMode::TryReference => match std::fs::rename(&path, &target) {
1776 Ok(()) => {
1777 let entry = EntryState::Complete {
1778 data_location: DataLocation::External(vec![target], size),
1779 outboard_location,
1780 };
1781 drop(guard);
1782 tables.blobs.insert(temp_tag.hash(), entry)?;
1783 drop(temp_tag);
1784 tx.send(Ok(())).ok();
1785 }
1786 Err(e) => {
1787 const ERR_CROSS: i32 = 18;
1788 if e.raw_os_error() == Some(ERR_CROSS) {
1789 match std::fs::copy(&path, &target) {
1791 Ok(_) => {
1792 let entry = EntryState::Complete {
1793 data_location: DataLocation::External(
1794 vec![target],
1795 size,
1796 ),
1797 outboard_location,
1798 };
1799
1800 drop(guard);
1801 tables.blobs.insert(temp_tag.hash(), entry)?;
1802 tables
1803 .delete_after_commit
1804 .insert(*temp_tag.hash(), [BaoFilePart::Data]);
1805 drop(temp_tag);
1806
1807 tx.send(Ok(())).ok();
1808 }
1809 Err(e) => {
1810 drop(temp_tag);
1811 tx.send(Err(e.into())).ok();
1812 }
1813 }
1814 } else {
1815 drop(temp_tag);
1816 tx.send(Err(e.into())).ok();
1817 }
1818 }
1819 },
1820 }
1821 }
1822 DataLocation::External(paths, size) => {
1823 let path = paths
1824 .first()
1825 .ok_or_else(|| {
1826 ActorError::Inconsistent("external path missing".to_owned())
1827 })?
1828 .to_owned();
1829 if path == target {
1831 tx.send(Ok(())).ok();
1833 } else {
1834 self.rt.spawn_blocking(move || {
1836 tx.send(export_file_copy(temp_tag, path, size, target, progress))
1837 .ok();
1838 });
1839 }
1840 }
1841 },
1842 EntryState::Partial { .. } => {
1843 return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
1844 }
1845 }
1846 Ok(())
1847 }
1848
1849 fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
1850 let Import {
1851 content_id,
1852 source: file,
1853 outboard,
1854 data_size,
1855 } = cmd;
1856 let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
1857 let inline_data = data_size <= self.options.inline.max_data_inlined;
1858 let inline_outboard =
1859 outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
1860 let tag = self.temp.temp_tag(content_id);
1862 let hash = *tag.hash();
1863 self.protected.insert(hash);
1864 let data_location = match file {
1866 ImportSource::External(external_path) => {
1867 tracing::debug!("stored external reference {}", external_path.display());
1868 if inline_data {
1869 tracing::debug!(
1870 "reading external data to inline it: {}",
1871 external_path.display()
1872 );
1873 let data = Bytes::from(std::fs::read(&external_path)?);
1874 DataLocation::Inline(data)
1875 } else {
1876 DataLocation::External(vec![external_path], data_size)
1877 }
1878 }
1879 ImportSource::TempFile(temp_data_path) => {
1880 if inline_data {
1881 tracing::debug!(
1882 "reading and deleting temp file to inline it: {}",
1883 temp_data_path.display()
1884 );
1885 let data = Bytes::from(read_and_remove(&temp_data_path)?);
1886 DataLocation::Inline(data)
1887 } else {
1888 let data_path = self.options.path.owned_data_path(&hash);
1889 std::fs::rename(&temp_data_path, &data_path)?;
1890 tracing::debug!("created file {}", data_path.display());
1891 DataLocation::Owned(data_size)
1892 }
1893 }
1894 ImportSource::Memory(data) => {
1895 if inline_data {
1896 DataLocation::Inline(data)
1897 } else {
1898 let data_path = self.options.path.owned_data_path(&hash);
1899 overwrite_and_sync(&data_path, &data)?;
1900 tracing::debug!("created file {}", data_path.display());
1901 DataLocation::Owned(data_size)
1902 }
1903 }
1904 };
1905 let outboard_location = if let Some(outboard) = outboard {
1906 if inline_outboard {
1907 OutboardLocation::Inline(Bytes::from(outboard))
1908 } else {
1909 let outboard_path = self.options.path.owned_outboard_path(&hash);
1910 overwrite_and_sync(&outboard_path, &outboard)?;
1912 OutboardLocation::Owned
1913 }
1914 } else {
1915 OutboardLocation::NotNeeded
1916 };
1917 if let DataLocation::Inline(data) = &data_location {
1918 tables.inline_data.insert(hash, data.as_ref())?;
1919 }
1920 if let OutboardLocation::Inline(outboard) = &outboard_location {
1921 tables.inline_outboard.insert(hash, outboard.as_ref())?;
1922 }
1923 if let DataLocation::Owned(_) = &data_location {
1924 tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
1925 }
1926 if let OutboardLocation::Owned = &outboard_location {
1927 tables
1928 .delete_after_commit
1929 .remove(hash, [BaoFilePart::Outboard]);
1930 }
1931 let entry = tables.blobs.get(hash)?;
1932 let entry = entry.map(|x| x.value()).unwrap_or_default();
1933 let data_location = data_location.discard_inline_data();
1934 let outboard_location = outboard_location.discard_extra_data();
1935 let entry = entry.union(EntryState::Complete {
1936 data_location,
1937 outboard_location,
1938 })?;
1939 tables.blobs.insert(hash, entry)?;
1940 Ok((tag, data_size))
1941 }
1942
1943 fn get_or_create(
1944 &mut self,
1945 tables: &impl ReadableTables,
1946 hash: Hash,
1947 ) -> ActorResult<BaoFileHandle> {
1948 self.protected.insert(hash);
1949 if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
1950 return Ok(handle);
1951 }
1952 let entry = tables.blobs().get(hash)?;
1953 let handle = if let Some(entry) = entry {
1954 let entry = entry.value();
1955 match entry {
1956 EntryState::Complete {
1957 data_location,
1958 outboard_location,
1959 ..
1960 } => {
1961 let data = load_data(tables, &self.options.path, data_location, &hash)?;
1962 let outboard = load_outboard(
1963 tables,
1964 &self.options.path,
1965 outboard_location,
1966 data.size(),
1967 &hash,
1968 )?;
1969 tracing::debug!("creating complete entry for {}", hash.to_hex());
1970 BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
1971 }
1972 EntryState::Partial { .. } => {
1973 tracing::debug!("creating partial entry for {}", hash.to_hex());
1974 BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
1975 }
1976 }
1977 } else {
1978 BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
1979 };
1980 self.handles.insert(hash, handle.downgrade());
1981 Ok(handle)
1982 }
1983
1984 fn blobs(
1986 &mut self,
1987 tables: &impl ReadableTables,
1988 filter: FilterPredicate<Hash, EntryState>,
1989 ) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
1990 let mut res = Vec::new();
1991 let mut index = 0u64;
1992 #[allow(clippy::explicit_counter_loop)]
1993 for item in tables.blobs().iter()? {
1994 match item {
1995 Ok((k, v)) => {
1996 if let Some(item) = filter(index, k, v) {
1997 res.push(Ok(item));
1998 }
1999 }
2000 Err(e) => {
2001 res.push(Err(e));
2002 }
2003 }
2004 index += 1;
2005 }
2006 Ok(res)
2007 }
2008
2009 fn tags(
2011 &mut self,
2012 tables: &impl ReadableTables,
2013 from: Option<Tag>,
2014 to: Option<Tag>,
2015 ) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
2016 let mut res = Vec::new();
2017 let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded);
2018 let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
2019 for item in tables.tags().range((from, to))? {
2020 match item {
2021 Ok((k, v)) => {
2022 res.push(Ok((k.value(), v.value())));
2023 }
2024 Err(e) => {
2025 res.push(Err(e));
2026 }
2027 }
2028 }
2029 Ok(res)
2030 }
2031
2032 fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
2033 let tag = {
2034 let tag = Tag::auto(SystemTime::now(), |x| {
2035 matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
2036 });
2037 tables.tags.insert(tag.clone(), content)?;
2038 tag
2039 };
2040 Ok(tag)
2041 }
2042
2043 fn rename_tag(&mut self, tables: &mut Tables, from: Tag, to: Tag) -> ActorResult<()> {
2044 let value = tables
2045 .tags
2046 .remove(from)?
2047 .ok_or_else(|| {
2048 ActorError::Io(io::Error::new(io::ErrorKind::NotFound, "tag not found"))
2049 })?
2050 .value();
2051 tables.tags.insert(to, value)?;
2052 Ok(())
2053 }
2054
2055 fn set_tag(&self, tables: &mut Tables, tag: Tag, value: HashAndFormat) -> ActorResult<()> {
2056 tables.tags.insert(tag, value)?;
2057 Ok(())
2058 }
2059
2060 fn delete_tags(
2061 &self,
2062 tables: &mut Tables,
2063 from: Option<Tag>,
2064 to: Option<Tag>,
2065 ) -> ActorResult<()> {
2066 let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded);
2067 let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded);
2068 let removing = tables.tags.extract_from_if((from, to), |_, _| true)?;
2069 for res in removing {
2071 res?;
2072 }
2073 Ok(())
2074 }
2075
2076 fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
2077 let entry = tables
2078 .blobs
2079 .get(hash)?
2080 .map(|x| x.value())
2081 .unwrap_or_default();
2082 let entry = entry.union(EntryState::Partial { size: None })?;
2083 tables.blobs.insert(hash, entry)?;
2084 tables.delete_after_commit.remove(
2086 hash,
2087 [BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
2088 );
2089 Ok(())
2090 }
2091
2092 fn update_inline_options(
2093 &mut self,
2094 db: &redb::Database,
2095 options: InlineOptions,
2096 reapply: bool,
2097 ) -> ActorResult<()> {
2098 self.options.inline = options;
2099 if reapply {
2100 let mut delete_after_commit = Default::default();
2101 let tx = db.begin_write()?;
2102 {
2103 let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
2104 let hashes = tables
2105 .blobs
2106 .iter()?
2107 .map(|x| x.map(|(k, _)| k.value()))
2108 .collect::<Result<Vec<_>, _>>()?;
2109 for hash in hashes {
2110 let guard = tables
2111 .blobs
2112 .get(hash)?
2113 .ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
2114 let entry = guard.value();
2115 if let EntryState::Complete {
2116 data_location,
2117 outboard_location,
2118 } = entry
2119 {
2120 let (data_location, data_size, data_location_changed) = match data_location
2121 {
2122 DataLocation::Owned(size) => {
2123 if size <= self.options.inline.max_data_inlined {
2125 let path = self.options.path.owned_data_path(&hash);
2126 let data = std::fs::read(&path)?;
2127 tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2128 tables.inline_data.insert(hash, data.as_slice())?;
2129 (DataLocation::Inline(()), size, true)
2130 } else {
2131 (DataLocation::Owned(size), size, false)
2132 }
2133 }
2134 DataLocation::Inline(()) => {
2135 let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
2136 ActorError::Inconsistent("inline data missing".to_owned())
2137 })?;
2138 let data = guard.value();
2139 let size = data.len() as u64;
2140 if size > self.options.inline.max_data_inlined {
2141 let path = self.options.path.owned_data_path(&hash);
2142 std::fs::write(&path, data)?;
2143 drop(guard);
2144 tables.inline_data.remove(hash)?;
2145 (DataLocation::Owned(size), size, true)
2146 } else {
2147 (DataLocation::Inline(()), size, false)
2148 }
2149 }
2150 DataLocation::External(paths, size) => {
2151 (DataLocation::External(paths, size), size, false)
2152 }
2153 };
2154 let outboard_size = raw_outboard_size(data_size);
2155 let (outboard_location, outboard_location_changed) = match outboard_location
2156 {
2157 OutboardLocation::Owned
2158 if outboard_size <= self.options.inline.max_outboard_inlined =>
2159 {
2160 let path = self.options.path.owned_outboard_path(&hash);
2161 let outboard = std::fs::read(&path)?;
2162 tables
2163 .delete_after_commit
2164 .insert(hash, [BaoFilePart::Outboard]);
2165 tables.inline_outboard.insert(hash, outboard.as_slice())?;
2166 (OutboardLocation::Inline(()), true)
2167 }
2168 OutboardLocation::Inline(())
2169 if outboard_size > self.options.inline.max_outboard_inlined =>
2170 {
2171 let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
2172 ActorError::Inconsistent("inline outboard missing".to_owned())
2173 })?;
2174 let outboard = guard.value();
2175 let path = self.options.path.owned_outboard_path(&hash);
2176 std::fs::write(&path, outboard)?;
2177 drop(guard);
2178 tables.inline_outboard.remove(hash)?;
2179 (OutboardLocation::Owned, true)
2180 }
2181 x => (x, false),
2182 };
2183 drop(guard);
2184 if data_location_changed || outboard_location_changed {
2185 tables.blobs.insert(
2186 hash,
2187 EntryState::Complete {
2188 data_location,
2189 outboard_location,
2190 },
2191 )?;
2192 }
2193 }
2194 }
2195 }
2196 tx.commit()?;
2197 delete_after_commit.apply_and_clear(&self.options.path);
2198 }
2199 Ok(())
2200 }
2201
2202 fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>, force: bool) -> ActorResult<()> {
2203 for hash in hashes {
2204 if self.temp.as_ref().read().unwrap().contains(&hash) {
2205 continue;
2206 }
2207 if !force && self.protected.contains(&hash) {
2208 tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
2209 continue;
2210 }
2211
2212 tracing::debug!("deleting {}", &hash.to_hex()[..8]);
2213
2214 self.handles.remove(&hash);
2215 if let Some(entry) = tables.blobs.remove(hash)? {
2216 match entry.value() {
2217 EntryState::Complete {
2218 data_location,
2219 outboard_location,
2220 } => {
2221 match data_location {
2222 DataLocation::Inline(_) => {
2223 tables.inline_data.remove(hash)?;
2224 }
2225 DataLocation::Owned(_) => {
2226 tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
2228 }
2229 DataLocation::External(_, _) => {}
2230 }
2231 match outboard_location {
2232 OutboardLocation::Inline(_) => {
2233 tables.inline_outboard.remove(hash)?;
2234 }
2235 OutboardLocation::Owned => {
2236 tables
2238 .delete_after_commit
2239 .insert(hash, [BaoFilePart::Outboard]);
2240 }
2241 OutboardLocation::NotNeeded => {}
2242 }
2243 }
2244 EntryState::Partial { .. } => {
2245 tables.delete_after_commit.insert(
2247 hash,
2248 [BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
2249 );
2250 }
2251 }
2252 }
2253 }
2254 Ok(())
2255 }
2256
2257 fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
2258 let hash = entry.hash();
2259 let mut info = None;
2260 tracing::trace!("on_complete({})", hash.to_hex());
2261 entry.transform(|state| {
2262 tracing::trace!("on_complete transform {:?}", state);
2263 let entry = match complete_storage(
2264 state,
2265 &hash,
2266 &self.options.path,
2267 &self.options.inline,
2268 tables.delete_after_commit,
2269 )? {
2270 Ok(entry) => {
2271 info = Some((
2273 entry.data_size(),
2274 entry.data.mem().cloned(),
2275 entry.outboard_size(),
2276 entry.outboard.mem().cloned(),
2277 ));
2278 entry
2279 }
2280 Err(entry) => {
2281 entry
2283 }
2284 };
2285 Ok(BaoFileStorage::Complete(entry))
2286 })?;
2287 if let Some((data_size, data, outboard_size, outboard)) = info {
2288 let data_location = if data.is_some() {
2289 DataLocation::Inline(())
2290 } else {
2291 DataLocation::Owned(data_size)
2292 };
2293 let outboard_location = if outboard_size == 0 {
2294 OutboardLocation::NotNeeded
2295 } else if outboard.is_some() {
2296 OutboardLocation::Inline(())
2297 } else {
2298 OutboardLocation::Owned
2299 };
2300 {
2301 tracing::debug!(
2302 "inserting complete entry for {}, {} bytes",
2303 hash.to_hex(),
2304 data_size,
2305 );
2306 let entry = tables
2307 .blobs()
2308 .get(hash)?
2309 .map(|x| x.value())
2310 .unwrap_or_default();
2311 let entry = entry.union(EntryState::Complete {
2312 data_location,
2313 outboard_location,
2314 })?;
2315 tables.blobs.insert(hash, entry)?;
2316 if let Some(data) = data {
2317 tables.inline_data.insert(hash, data.as_ref())?;
2318 }
2319 if let Some(outboard) = outboard {
2320 tables.inline_outboard.insert(hash, outboard.as_ref())?;
2321 }
2322 }
2323 }
2324 Ok(())
2325 }
2326
2327 fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
2328 match msg {
2329 ActorMessage::UpdateInlineOptions {
2330 inline_options,
2331 reapply,
2332 tx,
2333 } => {
2334 let res = self.update_inline_options(db, inline_options, reapply);
2335 tx.send(res?).ok();
2336 }
2337 ActorMessage::Fsck {
2338 repair,
2339 progress,
2340 tx,
2341 } => {
2342 let res = self.consistency_check(db, repair, progress);
2343 tx.send(res).ok();
2344 }
2345 ActorMessage::Sync { tx } => {
2346 tx.send(()).ok();
2347 }
2348 x => {
2349 return Err(ActorError::Inconsistent(format!(
2350 "unexpected message for handle_toplevel: {:?}",
2351 x
2352 )))
2353 }
2354 }
2355 Ok(())
2356 }
2357
2358 fn handle_readonly(
2359 &mut self,
2360 tables: &impl ReadableTables,
2361 msg: ActorMessage,
2362 ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2363 match msg {
2364 ActorMessage::Get { hash, tx } => {
2365 let res = self.get(tables, hash);
2366 tx.send(res).ok();
2367 }
2368 ActorMessage::GetOrCreate { hash, tx } => {
2369 let res = self.get_or_create(tables, hash);
2370 tx.send(res).ok();
2371 }
2372 ActorMessage::EntryStatus { hash, tx } => {
2373 let res = self.entry_status(tables, hash);
2374 tx.send(res).ok();
2375 }
2376 ActorMessage::Blobs { filter, tx } => {
2377 let res = self.blobs(tables, filter);
2378 tx.send(res).ok();
2379 }
2380 ActorMessage::Tags { from, to, tx } => {
2381 let res = self.tags(tables, from, to);
2382 tx.send(res).ok();
2383 }
2384 ActorMessage::GcStart { tx } => {
2385 self.protected.clear();
2386 self.handles.retain(|_, weak| weak.is_live());
2387 tx.send(()).ok();
2388 }
2389 ActorMessage::Dump => {
2390 dump(tables).ok();
2391 }
2392 #[cfg(test)]
2393 ActorMessage::EntryState { hash, tx } => {
2394 tx.send(self.entry_state(tables, hash)).ok();
2395 }
2396 ActorMessage::GetFullEntryState { hash, tx } => {
2397 let res = self.get_full_entry_state(tables, hash);
2398 tx.send(res).ok();
2399 }
2400 x => return Ok(Err(x)),
2401 }
2402 Ok(Ok(()))
2403 }
2404
2405 fn handle_readwrite(
2406 &mut self,
2407 tables: &mut Tables,
2408 msg: ActorMessage,
2409 ) -> ActorResult<std::result::Result<(), ActorMessage>> {
2410 match msg {
2411 ActorMessage::Import { cmd, tx } => {
2412 let res = self.import(tables, cmd);
2413 tx.send(res).ok();
2414 }
2415 ActorMessage::SetTag { tag, value, tx } => {
2416 let res = self.set_tag(tables, tag, value);
2417 tx.send(res).ok();
2418 }
2419 ActorMessage::DeleteTags { from, to, tx } => {
2420 let res = self.delete_tags(tables, from, to);
2421 tx.send(res).ok();
2422 }
2423 ActorMessage::CreateTag { hash, tx } => {
2424 let res = self.create_tag(tables, hash);
2425 tx.send(res).ok();
2426 }
2427 ActorMessage::RenameTag { from, to, tx } => {
2428 let res = self.rename_tag(tables, from, to);
2429 tx.send(res).ok();
2430 }
2431 ActorMessage::Delete { hashes, tx } => {
2432 let res = self.delete(tables, hashes, true);
2433 tx.send(res).ok();
2434 }
2435 ActorMessage::GcDelete { hashes, tx } => {
2436 let res = self.delete(tables, hashes, false);
2437 tx.send(res).ok();
2438 }
2439 ActorMessage::OnComplete { handle } => {
2440 let res = self.on_complete(tables, handle);
2441 res.ok();
2442 }
2443 ActorMessage::Export { cmd, tx } => {
2444 self.export(tables, cmd, tx)?;
2445 }
2446 ActorMessage::OnMemSizeExceeded { hash } => {
2447 let res = self.on_mem_size_exceeded(tables, hash);
2448 res.ok();
2449 }
2450 ActorMessage::Dump => {
2451 let res = dump(tables);
2452 res.ok();
2453 }
2454 ActorMessage::SetFullEntryState { hash, entry, tx } => {
2455 let res = self.set_full_entry_state(tables, hash, entry);
2456 tx.send(res).ok();
2457 }
2458 msg => {
2459 if let Err(msg) = self.handle_readonly(tables, msg)? {
2461 return Ok(Err(msg));
2462 }
2463 }
2464 }
2465 Ok(Ok(()))
2466 }
2467}
2468
2469fn export_file_copy(
2471 temp_tag: TempTag,
2472 path: PathBuf,
2473 size: u64,
2474 target: PathBuf,
2475 progress: ExportProgressCb,
2476) -> ActorResult<()> {
2477 progress(0)?;
2478 reflink_copy::reflink_or_copy(path, target)?;
2480 progress(size)?;
2481 drop(temp_tag);
2482 Ok(())
2483}
2484
2485fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
2486 for e in tables.blobs().iter()? {
2487 let (k, v) = e?;
2488 let k = k.value();
2489 let v = v.value();
2490 println!("blobs: {} -> {:?}", k.to_hex(), v);
2491 }
2492 for e in tables.tags().iter()? {
2493 let (k, v) = e?;
2494 let k = k.value();
2495 let v = v.value();
2496 println!("tags: {} -> {:?}", k, v);
2497 }
2498 for e in tables.inline_data().iter()? {
2499 let (k, v) = e?;
2500 let k = k.value();
2501 let v = v.value();
2502 println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
2503 }
2504 for e in tables.inline_outboard().iter()? {
2505 let (k, v) = e?;
2506 let k = k.value();
2507 let v = v.value();
2508 println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
2509 }
2510 Ok(())
2511}
2512
2513fn load_data(
2514 tables: &impl ReadableTables,
2515 options: &PathOptions,
2516 location: DataLocation<(), u64>,
2517 hash: &Hash,
2518) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2519 Ok(match location {
2520 DataLocation::Inline(()) => {
2521 let Some(data) = tables.inline_data().get(hash)? else {
2522 return Err(ActorError::Inconsistent(format!(
2523 "inconsistent database state: {} should have inline data but does not",
2524 hash.to_hex()
2525 )));
2526 };
2527 MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
2528 }
2529 DataLocation::Owned(data_size) => {
2530 let path = options.owned_data_path(hash);
2531 let Ok(file) = std::fs::File::open(&path) else {
2532 return Err(io::Error::new(
2533 io::ErrorKind::NotFound,
2534 format!("file not found: {}", path.display()),
2535 )
2536 .into());
2537 };
2538 MemOrFile::File((file, data_size))
2539 }
2540 DataLocation::External(paths, data_size) => {
2541 if paths.is_empty() {
2542 return Err(ActorError::Inconsistent(
2543 "external data location must not be empty".into(),
2544 ));
2545 }
2546 let path = &paths[0];
2547 let Ok(file) = std::fs::File::open(path) else {
2548 return Err(io::Error::new(
2549 io::ErrorKind::NotFound,
2550 format!("external file not found: {}", path.display()),
2551 )
2552 .into());
2553 };
2554 MemOrFile::File((file, data_size))
2555 }
2556 })
2557}
2558
2559fn load_outboard(
2560 tables: &impl ReadableTables,
2561 options: &PathOptions,
2562 location: OutboardLocation,
2563 size: u64,
2564 hash: &Hash,
2565) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
2566 Ok(match location {
2567 OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
2568 OutboardLocation::Inline(_) => {
2569 let Some(outboard) = tables.inline_outboard().get(hash)? else {
2570 return Err(ActorError::Inconsistent(format!(
2571 "inconsistent database state: {} should have inline outboard but does not",
2572 hash.to_hex()
2573 )));
2574 };
2575 MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
2576 }
2577 OutboardLocation::Owned => {
2578 let outboard_size = raw_outboard_size(size);
2579 let path = options.owned_outboard_path(hash);
2580 let Ok(file) = std::fs::File::open(&path) else {
2581 return Err(io::Error::new(
2582 io::ErrorKind::NotFound,
2583 format!("file not found: {} size={}", path.display(), outboard_size),
2584 )
2585 .into());
2586 };
2587 MemOrFile::File((file, outboard_size))
2588 }
2589 })
2590}
2591
2592fn complete_storage(
2594 storage: BaoFileStorage,
2595 hash: &Hash,
2596 path_options: &PathOptions,
2597 inline_options: &InlineOptions,
2598 delete_after_commit: &mut DeleteSet,
2599) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
2600 let (data, outboard, _sizes) = match storage {
2601 BaoFileStorage::Complete(c) => return Ok(Err(c)),
2602 BaoFileStorage::IncompleteMem(storage) => {
2603 let (data, outboard, sizes) = storage.into_parts();
2604 (
2605 MemOrFile::Mem(Bytes::from(data.into_parts().0)),
2606 MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
2607 MemOrFile::Mem(Bytes::from(sizes.to_vec())),
2608 )
2609 }
2610 BaoFileStorage::IncompleteFile(storage) => {
2611 let (data, outboard, sizes) = storage.into_parts();
2612 (
2613 MemOrFile::File(data),
2614 MemOrFile::File(outboard),
2615 MemOrFile::File(sizes),
2616 )
2617 }
2618 };
2619 let data_size = data.size()?.unwrap();
2620 let outboard_size = outboard.size()?.unwrap();
2621 debug_assert!(raw_outboard_size(data_size) == outboard_size);
2623 let data = if data_size <= inline_options.max_data_inlined {
2625 match data {
2626 MemOrFile::File(data) => {
2627 let mut buf = vec![0; data_size as usize];
2628 data.read_at(0, &mut buf)?;
2629 delete_after_commit.insert(*hash, [BaoFilePart::Data]);
2631 MemOrFile::Mem(Bytes::from(buf))
2632 }
2633 MemOrFile::Mem(data) => MemOrFile::Mem(data),
2634 }
2635 } else {
2636 delete_after_commit.remove(*hash, [BaoFilePart::Data]);
2638 match data {
2639 MemOrFile::Mem(data) => {
2640 let path = path_options.owned_data_path(hash);
2641 let file = overwrite_and_sync(&path, &data)?;
2642 MemOrFile::File((file, data_size))
2643 }
2644 MemOrFile::File(data) => MemOrFile::File((data, data_size)),
2645 }
2646 };
2647 let outboard = if outboard_size == 0 {
2649 Default::default()
2650 } else if outboard_size <= inline_options.max_outboard_inlined {
2651 match outboard {
2652 MemOrFile::File(outboard) => {
2653 let mut buf = vec![0; outboard_size as usize];
2654 outboard.read_at(0, &mut buf)?;
2655 drop(outboard);
2656 delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
2658 MemOrFile::Mem(Bytes::from(buf))
2659 }
2660 MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
2661 }
2662 } else {
2663 delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
2665 match outboard {
2666 MemOrFile::Mem(outboard) => {
2667 let path = path_options.owned_outboard_path(hash);
2668 let file = overwrite_and_sync(&path, &outboard)?;
2669 MemOrFile::File((file, outboard_size))
2670 }
2671 MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
2672 }
2673 };
2674 delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
2677 Ok(Ok(CompleteStorage { data, outboard }))
2678}