1use std::{
67 fmt::{self, Debug},
68 fs,
69 future::Future,
70 io::Write,
71 num::NonZeroU64,
72 ops::Deref,
73 path::{Path, PathBuf},
74 sync::{
75 atomic::{AtomicU64, Ordering},
76 Arc,
77 },
78};
79
80use bao_tree::{
81 blake3,
82 io::{
83 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
84 outboard::PreOrderOutboard,
85 sync::ReadAt,
86 BaoContentItem, Leaf,
87 },
88 BaoTree, ChunkNum, ChunkRanges,
89};
90use bytes::Bytes;
91use delete_set::{BaoFilePart, ProtectHandle};
92use entity_manager::{EntityManagerState, SpawnArg};
93use entry_state::{DataLocation, OutboardLocation};
94use import::{ImportEntry, ImportSource};
95use irpc::{channel::mpsc, RpcMessage};
96use meta::list_blobs;
97use n0_future::{future::yield_now, io};
98use nested_enum_utils::enum_conversions;
99use range_collections::range_set::RangeSetRange;
100use tokio::task::{JoinError, JoinSet};
101use tracing::{error, instrument, trace};
102
103use crate::{
104 api::{
105 proto::{
106 self, bitfield::is_validated, BatchMsg, BatchResponse, Bitfield, Command,
107 CreateTempTagMsg, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
108 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, HashSpecific, ImportBaoMsg,
109 ImportBaoRequest, ObserveMsg, Scope,
110 },
111 ApiClient,
112 },
113 protocol::ChunkRangesExt,
114 store::{
115 fs::{
116 bao_file::{
117 BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader,
118 OutboardReader,
119 },
120 util::entity_manager::{self, ActiveEntityState},
121 },
122 gc::run_gc,
123 util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
124 IROH_BLOCK_SIZE,
125 },
126 util::{
127 channel::oneshot,
128 temp_tag::{TagDrop, TempTag, TempTagScope, TempTags},
129 },
130 Hash,
131};
132mod bao_file;
133use bao_file::BaoFileHandle;
134mod delete_set;
135mod entry_state;
136mod import;
137mod meta;
138pub mod options;
139pub(crate) mod util;
140use entry_state::EntryState;
141use import::{import_byte_stream, import_bytes, import_path, ImportEntryMsg};
142use options::Options;
143use tracing::Instrument;
144
145use crate::{
146 api::{
147 self,
148 blobs::{AddProgressItem, ExportMode, ExportProgressItem},
149 Store,
150 },
151 HashAndFormat,
152};
153
154const MAX_EXTERNAL_PATHS: usize = 8;
156
157fn new_uuid() -> [u8; 16] {
159 use rand::RngCore;
160 let mut rng = rand::rng();
161 let mut bytes = [0u8; 16];
162 rng.fill_bytes(&mut bytes);
163 bytes
164}
165
166fn temp_name() -> String {
168 format!("{}.temp", hex::encode(new_uuid()))
169}
170
171#[derive(Debug)]
172#[enum_conversions()]
173pub(crate) enum InternalCommand {
174 Dump(meta::Dump),
175 FinishImport(ImportEntryMsg),
176 ClearScope(ClearScope),
177}
178
179#[derive(Debug)]
180pub(crate) struct ClearScope {
181 pub scope: Scope,
182}
183
184impl InternalCommand {
185 pub fn parent_span(&self) -> tracing::Span {
186 match self {
187 Self::Dump(_) => tracing::Span::current(),
188 Self::ClearScope(_) => tracing::Span::current(),
189 Self::FinishImport(cmd) => cmd
190 .parent_span_opt()
191 .cloned()
192 .unwrap_or_else(tracing::Span::current),
193 }
194 }
195}
196
197#[derive(Debug)]
199struct TaskContext {
200 pub options: Arc<Options>,
202 pub db: meta::Db,
204 pub internal_cmd_tx: tokio::sync::mpsc::Sender<InternalCommand>,
206 pub protect: ProtectHandle,
208}
209
210impl TaskContext {
211 pub async fn clear_scope(&self, scope: Scope) {
212 self.internal_cmd_tx
213 .send(ClearScope { scope }.into())
214 .await
215 .ok();
216 }
217}
218
219#[derive(Debug)]
220struct EmParams;
221
222impl entity_manager::Params for EmParams {
223 type EntityId = Hash;
224
225 type GlobalState = Arc<TaskContext>;
226
227 type EntityState = BaoFileHandle;
228
229 async fn on_shutdown(
230 state: entity_manager::ActiveEntityState<Self>,
231 cause: entity_manager::ShutdownCause,
232 ) {
233 trace!("persist {:?} due to {cause:?}", state.id);
234 state.persist().await;
235 }
236}
237
238#[derive(Debug)]
239struct Actor {
240 context: Arc<TaskContext>,
242 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
244 fs_cmd_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
246 tasks: JoinSet<()>,
248 handles: EntityManagerState<EmParams>,
250 temp_tags: TempTags,
252 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
254 _rt: RtWrapper,
256}
257
258type HashContext = ActiveEntityState<EmParams>;
259
260impl SyncEntityApi for HashContext {
261 async fn load(&self) {
267 enum Action {
268 Load,
269 Wait,
270 None,
271 }
272 let mut action = Action::None;
273 self.state.send_if_modified(|guard| match guard.deref() {
274 BaoFileStorage::Initial => {
275 *guard = BaoFileStorage::Loading;
276 action = Action::Load;
277 true
278 }
279 BaoFileStorage::Loading => {
280 action = Action::Wait;
281 false
282 }
283 _ => false,
284 });
285 match action {
286 Action::Load => {
287 let state = if self.id == Hash::EMPTY {
288 BaoFileStorage::Complete(CompleteStorage {
289 data: MemOrFile::Mem(Bytes::new()),
290 outboard: MemOrFile::empty(),
291 })
292 } else {
293 match self.global.db.get(self.id).await {
296 Ok(state) => match BaoFileStorage::open(state, self).await {
297 Ok(handle) => handle,
298 Err(_) => BaoFileStorage::Poisoned,
299 },
300 Err(_) => BaoFileStorage::Poisoned,
301 }
302 };
303 self.state.send_replace(state);
304 }
305 Action::Wait => {
306 while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) {
309 self.state.0.subscribe().changed().await.ok();
310 }
311 }
312 Action::None => {}
313 }
314 }
315
316 async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> {
318 trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len());
319 let mut res = Ok(None);
320 self.state.send_if_modified(|state| {
321 let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else {
322 res = Err(io::Error::other("write batch failed"));
323 return false;
324 };
325 res = Ok(update);
326 *state = state1;
327 true
328 });
329 if let Some(update) = res? {
330 self.global.db.update(self.id, update).await?;
331 }
332 Ok(())
333 }
334
335 #[allow(refining_impl_trait_internal)]
340 fn data_reader(&self) -> DataReader {
341 DataReader(self.state.clone())
342 }
343
344 #[allow(refining_impl_trait_internal)]
349 fn outboard_reader(&self) -> OutboardReader {
350 OutboardReader(self.state.clone())
351 }
352
353 fn current_size(&self) -> io::Result<u64> {
355 match self.state.borrow().deref() {
356 BaoFileStorage::Complete(mem) => Ok(mem.size()),
357 BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()),
358 BaoFileStorage::Partial(file) => file.current_size(),
359 BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
360 BaoFileStorage::Initial => Err(io::Error::other("initial")),
361 BaoFileStorage::Loading => Err(io::Error::other("loading")),
362 BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
363 }
364 }
365
366 fn bitfield(&self) -> io::Result<Bitfield> {
368 match self.state.borrow().deref() {
369 BaoFileStorage::Complete(mem) => Ok(mem.bitfield()),
370 BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()),
371 BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()),
372 BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")),
373 BaoFileStorage::Initial => Err(io::Error::other("initial")),
374 BaoFileStorage::Loading => Err(io::Error::other("loading")),
375 BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()),
376 }
377 }
378}
379
380impl HashContext {
381 pub fn outboard(&self) -> io::Result<PreOrderOutboard<OutboardReader>> {
383 let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE);
384 let outboard = self.outboard_reader();
385 Ok(PreOrderOutboard {
386 root: blake3::Hash::from(self.id),
387 tree,
388 data: outboard,
389 })
390 }
391
392 fn db(&self) -> &meta::Db {
393 &self.global.db
394 }
395
396 pub fn options(&self) -> &Arc<Options> {
397 &self.global.options
398 }
399
400 pub fn protect(&self, parts: impl IntoIterator<Item = BaoFilePart>) {
401 self.global.protect.protect(self.id, parts);
402 }
403
404 pub async fn update_await(&self, state: EntryState<Bytes>) -> io::Result<()> {
406 self.db().update_await(self.id, state).await?;
407 Ok(())
408 }
409
410 pub async fn get_entry_state(&self) -> io::Result<Option<EntryState<Bytes>>> {
411 let hash = self.id;
412 if hash == Hash::EMPTY {
413 return Ok(Some(EntryState::Complete {
414 data_location: DataLocation::Inline(Bytes::new()),
415 outboard_location: OutboardLocation::NotNeeded,
416 }));
417 };
418 self.db().get(hash).await
419 }
420
421 pub async fn set(&self, state: EntryState<Bytes>) -> io::Result<()> {
423 self.db().set(self.id, state).await
424 }
425}
426
427impl Actor {
428 fn db(&self) -> &meta::Db {
429 &self.context.db
430 }
431
432 fn context(&self) -> Arc<TaskContext> {
433 self.context.clone()
434 }
435
436 fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
437 let span = tracing::Span::current();
438 self.tasks.spawn(fut.instrument(span));
439 }
440
441 fn log_task_result(res: Result<(), JoinError>) {
442 match res {
443 Ok(_) => {}
444 Err(e) => {
445 error!("task failed: {e}");
446 }
447 }
448 }
449
450 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
451 let CreateTempTagMsg { tx, inner, .. } = cmd;
452 let mut tt = self.temp_tags.create(inner.scope, inner.value);
453 if tx.is_rpc() {
454 tt.leak();
455 }
456 tx.send(tt).await.ok();
457 }
458
459 async fn handle_command(&mut self, cmd: Command) {
460 let span = cmd.parent_span();
461 let _entered = span.enter();
462 match cmd {
463 Command::SyncDb(cmd) => {
464 trace!("{cmd:?}");
465 self.db().send(cmd.into()).await.ok();
466 }
467 Command::WaitIdle(cmd) => {
468 trace!("{cmd:?}");
469 if self.tasks.is_empty() {
470 cmd.tx.send(()).await.ok();
472 } else {
473 self.idle_waiters.push(cmd.tx);
475 }
476 }
477 Command::Shutdown(cmd) => {
478 trace!("{cmd:?}");
479 self.db().send(cmd.into()).await.ok();
480 }
481 Command::CreateTag(cmd) => {
482 trace!("{cmd:?}");
483 self.db().send(cmd.into()).await.ok();
484 }
485 Command::SetTag(cmd) => {
486 trace!("{cmd:?}");
487 self.db().send(cmd.into()).await.ok();
488 }
489 Command::ListTags(cmd) => {
490 trace!("{cmd:?}");
491 self.db().send(cmd.into()).await.ok();
492 }
493 Command::DeleteTags(cmd) => {
494 trace!("{cmd:?}");
495 self.db().send(cmd.into()).await.ok();
496 }
497 Command::RenameTag(cmd) => {
498 trace!("{cmd:?}");
499 self.db().send(cmd.into()).await.ok();
500 }
501 Command::ClearProtected(cmd) => {
502 trace!("{cmd:?}");
503 self.db().send(cmd.into()).await.ok();
504 }
505 Command::BlobStatus(cmd) => {
506 trace!("{cmd:?}");
507 self.db().send(cmd.into()).await.ok();
508 }
509 Command::DeleteBlobs(cmd) => {
510 trace!("{cmd:?}");
511 self.db().send(cmd.into()).await.ok();
512 }
513 Command::ListBlobs(cmd) => {
514 trace!("{cmd:?}");
515 if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await {
516 self.spawn(list_blobs(snapshot, cmd));
517 }
518 }
519 Command::Batch(cmd) => {
520 trace!("{cmd:?}");
521 let (id, scope) = self.temp_tags.create_scope();
522 self.spawn(handle_batch(cmd, id, scope, self.context()));
523 }
524 Command::CreateTempTag(cmd) => {
525 trace!("{cmd:?}");
526 self.create_temp_tag(cmd).await;
527 }
528 Command::ListTempTags(cmd) => {
529 trace!("{cmd:?}");
530 let tts = self.temp_tags.list();
531 cmd.tx.send(tts).await.ok();
532 }
533 Command::ImportBytes(cmd) => {
534 trace!("{cmd:?}");
535 self.spawn(import_bytes(cmd, self.context()));
536 }
537 Command::ImportByteStream(cmd) => {
538 trace!("{cmd:?}");
539 self.spawn(import_byte_stream(cmd, self.context()));
540 }
541 Command::ImportPath(cmd) => {
542 trace!("{cmd:?}");
543 self.spawn(import_path(cmd, self.context()));
544 }
545 Command::ExportPath(cmd) => {
546 trace!("{cmd:?}");
547 cmd.spawn(&mut self.handles, &mut self.tasks).await;
548 }
549 Command::ExportBao(cmd) => {
550 trace!("{cmd:?}");
551 cmd.spawn(&mut self.handles, &mut self.tasks).await;
552 }
553 Command::ExportRanges(cmd) => {
554 trace!("{cmd:?}");
555 cmd.spawn(&mut self.handles, &mut self.tasks).await;
556 }
557 Command::ImportBao(cmd) => {
558 trace!("{cmd:?}");
559 cmd.spawn(&mut self.handles, &mut self.tasks).await;
560 }
561 Command::Observe(cmd) => {
562 trace!("{cmd:?}");
563 cmd.spawn(&mut self.handles, &mut self.tasks).await;
564 }
565 }
566 }
567
568 async fn handle_fs_command(&mut self, cmd: InternalCommand) {
569 let span = cmd.parent_span();
570 let _entered = span.enter();
571 match cmd {
572 InternalCommand::Dump(cmd) => {
573 trace!("{cmd:?}");
574 self.db().send(cmd.into()).await.ok();
575 }
576 InternalCommand::ClearScope(cmd) => {
577 trace!("{cmd:?}");
578 self.temp_tags.end_scope(cmd.scope);
579 }
580 InternalCommand::FinishImport(cmd) => {
581 trace!("{cmd:?}");
582 if cmd.hash == Hash::EMPTY {
583 cmd.tx
584 .send(AddProgressItem::Done(TempTag::leaking_empty(cmd.format)))
585 .await
586 .ok();
587 } else {
588 let tt = self.temp_tags.create(
589 cmd.scope,
590 HashAndFormat {
591 hash: cmd.hash,
592 format: cmd.format,
593 },
594 );
595 (tt, cmd).spawn(&mut self.handles, &mut self.tasks).await;
596 }
597 }
598 }
599 }
600
601 async fn run(mut self) {
602 loop {
603 tokio::select! {
604 task = self.handles.tick() => {
605 if let Some(task) = task {
606 self.spawn(task);
607 }
608 }
609 cmd = self.cmd_rx.recv() => {
610 let Some(cmd) = cmd else {
611 break;
612 };
613 self.handle_command(cmd).await;
614 }
615 Some(cmd) = self.fs_cmd_rx.recv() => {
616 self.handle_fs_command(cmd).await;
617 }
618 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
619 Self::log_task_result(res);
620 if self.tasks.is_empty() {
621 for tx in self.idle_waiters.drain(..) {
622 tx.send(()).await.ok();
623 }
624 }
625 }
626 }
627 }
628 self.handles.shutdown().await;
629 while let Some(res) = self.tasks.join_next().await {
630 Self::log_task_result(res);
631 }
632 }
633
634 async fn new(
635 db_path: PathBuf,
636 rt: RtWrapper,
637 cmd_rx: tokio::sync::mpsc::Receiver<Command>,
638 fs_commands_rx: tokio::sync::mpsc::Receiver<InternalCommand>,
639 fs_commands_tx: tokio::sync::mpsc::Sender<InternalCommand>,
640 options: Arc<Options>,
641 ) -> anyhow::Result<Self> {
642 trace!(
643 "creating data directory: {}",
644 options.path.data_path.display()
645 );
646 fs::create_dir_all(&options.path.data_path)?;
647 trace!(
648 "creating temp directory: {}",
649 options.path.temp_path.display()
650 );
651 fs::create_dir_all(&options.path.temp_path)?;
652 trace!(
653 "creating parent directory for db file{}",
654 db_path.parent().unwrap().display()
655 );
656 fs::create_dir_all(db_path.parent().unwrap())?;
657 let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
658 let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
659 let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
660 let slot_context = Arc::new(TaskContext {
661 options: options.clone(),
662 db: meta::Db::new(db_send),
663 internal_cmd_tx: fs_commands_tx,
664 protect,
665 });
666 rt.spawn(db_actor.run());
667 Ok(Self {
668 context: slot_context.clone(),
669 cmd_rx,
670 fs_cmd_rx: fs_commands_rx,
671 tasks: JoinSet::new(),
672 handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2),
673 temp_tags: Default::default(),
674 idle_waiters: Vec::new(),
675 _rt: rt,
676 })
677 }
678}
679
680trait HashSpecificCommand: HashSpecific + Send + 'static {
681 fn handle(self, ctx: HashContext) -> impl Future<Output = ()> + Send + 'static;
683
684 fn on_error(self, arg: SpawnArg<EmParams>) -> impl Future<Output = ()> + Send + 'static;
687
688 async fn spawn(
689 self,
690 manager: &mut entity_manager::EntityManagerState<EmParams>,
691 tasks: &mut JoinSet<()>,
692 ) where
693 Self: Sized,
694 {
695 let span = tracing::Span::current();
696 let task = manager
697 .spawn(self.hash(), |arg| {
698 async move {
699 match arg {
700 SpawnArg::Active(state) => {
701 self.handle(state).await;
702 }
703 SpawnArg::Busy => {
704 self.on_error(arg).await;
705 }
706 SpawnArg::Dead => {
707 self.on_error(arg).await;
708 }
709 }
710 }
711 .instrument(span)
712 })
713 .await;
714 if let Some(task) = task {
715 tasks.spawn(task);
716 }
717 }
718}
719
720impl HashSpecificCommand for ObserveMsg {
721 async fn handle(self, ctx: HashContext) {
722 ctx.observe(self).await
723 }
724 async fn on_error(self, _arg: SpawnArg<EmParams>) {}
725}
726impl HashSpecificCommand for ExportPathMsg {
727 async fn handle(self, ctx: HashContext) {
728 ctx.export_path(self).await
729 }
730 async fn on_error(self, arg: SpawnArg<EmParams>) {
731 let err = match arg {
732 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
733 SpawnArg::Dead => io::Error::other("entity is dead"),
734 _ => unreachable!(),
735 };
736 self.tx
737 .send(ExportProgressItem::Error(api::Error::Io(err)))
738 .await
739 .ok();
740 }
741}
742impl HashSpecificCommand for ExportBaoMsg {
743 async fn handle(self, ctx: HashContext) {
744 ctx.export_bao(self).await
745 }
746 async fn on_error(self, arg: SpawnArg<EmParams>) {
747 let err = match arg {
748 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
749 SpawnArg::Dead => io::Error::other("entity is dead"),
750 _ => unreachable!(),
751 };
752 self.tx
753 .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err)))
754 .await
755 .ok();
756 }
757}
758impl HashSpecificCommand for ExportRangesMsg {
759 async fn handle(self, ctx: HashContext) {
760 ctx.export_ranges(self).await
761 }
762 async fn on_error(self, arg: SpawnArg<EmParams>) {
763 let err = match arg {
764 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
765 SpawnArg::Dead => io::Error::other("entity is dead"),
766 _ => unreachable!(),
767 };
768 self.tx
769 .send(ExportRangesItem::Error(api::Error::Io(err)))
770 .await
771 .ok();
772 }
773}
774impl HashSpecificCommand for ImportBaoMsg {
775 async fn handle(self, ctx: HashContext) {
776 ctx.import_bao(self).await
777 }
778 async fn on_error(self, arg: SpawnArg<EmParams>) {
779 let err = match arg {
780 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
781 SpawnArg::Dead => io::Error::other("entity is dead"),
782 _ => unreachable!(),
783 };
784 self.tx.send(Err(api::Error::Io(err))).await.ok();
785 }
786}
787impl HashSpecific for (TempTag, ImportEntryMsg) {
788 fn hash(&self) -> Hash {
789 self.1.hash()
790 }
791}
792impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
793 async fn handle(self, ctx: HashContext) {
794 let (tt, cmd) = self;
795 ctx.finish_import(cmd, tt).await
796 }
797 async fn on_error(self, arg: SpawnArg<EmParams>) {
798 let err = match arg {
799 SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
800 SpawnArg::Dead => io::Error::other("entity is dead"),
801 _ => unreachable!(),
802 };
803 self.1.tx.send(AddProgressItem::Error(err)).await.ok();
804 }
805}
806
807struct RtWrapper(Option<tokio::runtime::Runtime>);
808
809impl From<tokio::runtime::Runtime> for RtWrapper {
810 fn from(rt: tokio::runtime::Runtime) -> Self {
811 Self(Some(rt))
812 }
813}
814
815impl fmt::Debug for RtWrapper {
816 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
817 ValueOrPoisioned(self.0.as_ref()).fmt(f)
818 }
819}
820
821impl Deref for RtWrapper {
822 type Target = tokio::runtime::Runtime;
823
824 fn deref(&self) -> &Self::Target {
825 self.0.as_ref().unwrap()
826 }
827}
828
829impl Drop for RtWrapper {
830 fn drop(&mut self) {
831 if let Some(rt) = self.0.take() {
832 trace!("dropping tokio runtime");
833 tokio::task::block_in_place(|| {
834 drop(rt);
835 });
836 trace!("dropped tokio runtime");
837 }
838 }
839}
840
841async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: Arc<TaskContext>) {
842 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
843 error!("batch failed: {cause}");
844 }
845 ctx.clear_scope(id).await;
846}
847
848async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
849 let BatchMsg { tx, mut rx, .. } = cmd;
850 trace!("created scope {}", id);
851 tx.send(id).await.map_err(api::Error::other)?;
852 while let Some(msg) = rx.recv().await? {
853 match msg {
854 BatchResponse::Drop(msg) => scope.on_drop(&msg),
855 BatchResponse::Ping => {}
856 }
857 }
858 Ok(())
859}
860
861trait EntityApi {
863 async fn import_bao(&self, cmd: ImportBaoMsg);
865 async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag);
867 async fn observe(&self, cmd: ObserveMsg);
869 async fn export_ranges(&self, cmd: ExportRangesMsg);
871 async fn export_bao(&self, cmd: ExportBaoMsg);
873 async fn export_path(&self, cmd: ExportPathMsg);
875 async fn persist(&self);
877}
878
879trait SyncEntityApi: EntityApi {
882 async fn load(&self);
886
887 fn data_reader(&self) -> impl ReadBytesAt;
889
890 fn outboard_reader(&self) -> impl ReadAt;
892
893 fn current_size(&self) -> io::Result<u64>;
895
896 fn bitfield(&self) -> io::Result<Bitfield>;
898
899 async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>;
901}
902
903impl EntityApi for HashContext {
905 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
906 async fn import_bao(&self, cmd: ImportBaoMsg) {
907 trace!("{cmd:?}");
908 self.load().await;
909 let ImportBaoMsg {
910 inner: ImportBaoRequest { size, .. },
911 rx,
912 tx,
913 ..
914 } = cmd;
915 let res = import_bao_impl(self, size, rx).await;
916 trace!("{res:?}");
917 tx.send(res).await.ok();
918 }
919
920 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
921 async fn observe(&self, cmd: ObserveMsg) {
922 trace!("{cmd:?}");
923 self.load().await;
924 BaoFileStorageSubscriber::new(self.state.subscribe())
925 .forward(cmd.tx)
926 .await
927 .ok();
928 }
929
930 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
931 async fn export_ranges(&self, mut cmd: ExportRangesMsg) {
932 trace!("{cmd:?}");
933 self.load().await;
934 if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await {
935 cmd.tx
936 .send(ExportRangesItem::Error(cause.into()))
937 .await
938 .ok();
939 }
940 }
941
942 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
943 async fn export_bao(&self, mut cmd: ExportBaoMsg) {
944 trace!("{cmd:?}");
945 self.load().await;
946 if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await {
947 cmd.tx
950 .send(bao_tree::io::EncodeError::Io(cause).into())
951 .await
952 .ok();
953 }
954 }
955
956 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
957 async fn export_path(&self, cmd: ExportPathMsg) {
958 trace!("{cmd:?}");
959 self.load().await;
960 let ExportPathMsg { inner, mut tx, .. } = cmd;
961 if let Err(cause) = export_path_impl(self, inner, &mut tx).await {
962 tx.send(cause.into()).await.ok();
963 }
964 }
965
966 #[instrument(skip_all, fields(hash = %cmd.hash_short()))]
967 async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) {
968 trace!("{cmd:?}");
969 self.load().await;
970 let res = match finish_import_impl(self, cmd.inner).await {
971 Ok(()) => {
972 if cmd.tx.is_rpc() {
975 trace!("leaking temp tag {}", tt.hash_and_format());
976 tt.leak();
977 }
978 AddProgressItem::Done(tt)
979 }
980 Err(cause) => AddProgressItem::Error(cause),
981 };
982 cmd.tx.send(res).await.ok();
983 }
984
985 #[instrument(skip_all, fields(hash = %self.id.fmt_short()))]
986 async fn persist(&self) {
987 self.state.send_if_modified(|guard| {
988 let hash = &self.id;
989 let BaoFileStorage::Partial(fs) = guard.take() else {
990 return false;
991 };
992 let path = self.global.options.path.bitfield_path(hash);
993 trace!("writing bitfield for hash {} to {}", hash, path.display());
994 if let Err(cause) = fs.sync_all(&path) {
995 error!(
996 "failed to write bitfield for {} at {}: {:?}",
997 hash,
998 path.display(),
999 cause
1000 );
1001 }
1002 false
1003 });
1004 }
1005}
1006
1007async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> {
1008 if ctx.id == Hash::EMPTY {
1009 return Ok(()); }
1011 let ImportEntry {
1012 source,
1013 hash,
1014 outboard,
1015 ..
1016 } = import_data;
1017 let options = ctx.options();
1018 match &source {
1019 ImportSource::Memory(data) => {
1020 debug_assert!(options.is_inlined_data(data.len() as u64));
1021 }
1022 ImportSource::External(_, _, size) => {
1023 debug_assert!(!options.is_inlined_data(*size));
1024 }
1025 ImportSource::TempFile(_, _, size) => {
1026 debug_assert!(!options.is_inlined_data(*size));
1027 }
1028 }
1029 ctx.load().await;
1030 let handle = &ctx.state;
1031 ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]);
1037 let data_location = match source {
1038 ImportSource::Memory(data) => DataLocation::Inline(data),
1039 ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size),
1040 ImportSource::TempFile(path, _file, size) => {
1041 let target = ctx.options().path.data_path(&hash);
1044 trace!(
1045 "moving temp file to owned data location: {} -> {}",
1046 path.display(),
1047 target.display()
1048 );
1049 if let Err(cause) = fs::rename(&path, &target) {
1050 error!(
1051 "failed to move temp file {} to owned data location {}: {cause}",
1052 path.display(),
1053 target.display()
1054 );
1055 }
1056 DataLocation::Owned(size)
1057 }
1058 };
1059 let outboard_location = match outboard {
1060 MemOrFile::Mem(bytes) if bytes.is_empty() => OutboardLocation::NotNeeded,
1061 MemOrFile::Mem(bytes) => OutboardLocation::Inline(bytes),
1062 MemOrFile::File(path) => {
1063 let target = ctx.options().path.outboard_path(&hash);
1065 trace!(
1066 "moving temp file to owned outboard location: {} -> {}",
1067 path.display(),
1068 target.display()
1069 );
1070 if let Err(cause) = fs::rename(&path, &target) {
1071 error!(
1072 "failed to move temp file {} to owned outboard location {}: {cause}",
1073 path.display(),
1074 target.display()
1075 );
1076 }
1077 OutboardLocation::Owned
1078 }
1079 };
1080 let data = match &data_location {
1081 DataLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1082 DataLocation::Owned(size) => {
1083 let path = ctx.options().path.data_path(&hash);
1084 let file = fs::File::open(&path)?;
1085 MemOrFile::File(FixedSize::new(file, *size))
1086 }
1087 DataLocation::External(paths, size) => {
1088 let Some(path) = paths.iter().next() else {
1089 return Err(io::Error::other("no external data path"));
1090 };
1091 let file = fs::File::open(path)?;
1092 MemOrFile::File(FixedSize::new(file, *size))
1093 }
1094 };
1095 let outboard = match &outboard_location {
1096 OutboardLocation::NotNeeded => MemOrFile::empty(),
1097 OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()),
1098 OutboardLocation::Owned => {
1099 let path = ctx.options().path.outboard_path(&hash);
1100 let file = fs::File::open(&path)?;
1101 MemOrFile::File(file)
1102 }
1103 };
1104 handle.complete(data, outboard);
1105 let state = EntryState::Complete {
1106 data_location,
1107 outboard_location,
1108 };
1109 ctx.update_await(state).await?;
1110 Ok(())
1111}
1112
1113fn chunk_range(leaf: &Leaf) -> ChunkRanges {
1114 let start = ChunkNum::chunks(leaf.offset);
1115 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
1116 (start..end).into()
1117}
1118
1119async fn import_bao_impl(
1120 ctx: &HashContext,
1121 size: NonZeroU64,
1122 mut rx: mpsc::Receiver<BaoContentItem>,
1123) -> api::Result<()> {
1124 trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size);
1125 let mut batch = Vec::<BaoContentItem>::new();
1126 let mut ranges = ChunkRanges::empty();
1127 while let Some(item) = rx.recv().await? {
1128 if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
1130 let bitfield = Bitfield::new_unchecked(ranges, size.into());
1131 ctx.write_batch(&batch, &bitfield).await?;
1132 batch.clear();
1133 ranges = ChunkRanges::empty();
1134 }
1135 if let BaoContentItem::Leaf(leaf) = &item {
1136 let leaf_range = chunk_range(leaf);
1137 if is_validated(size, &leaf_range) && size.get() != leaf.offset + leaf.data.len() as u64
1138 {
1139 return Err(api::Error::io(io::ErrorKind::InvalidData, "invalid size"));
1140 }
1141 ranges |= leaf_range;
1142 }
1143 batch.push(item);
1144 }
1145 if !batch.is_empty() {
1146 let bitfield = Bitfield::new_unchecked(ranges, size.into());
1147 ctx.write_batch(&batch, &bitfield).await?;
1148 }
1149 Ok(())
1150}
1151
1152async fn export_ranges_impl(
1153 ctx: &HashContext,
1154 cmd: ExportRangesRequest,
1155 tx: &mut mpsc::Sender<ExportRangesItem>,
1156) -> io::Result<()> {
1157 let ExportRangesRequest { ranges, hash } = cmd;
1158 trace!(
1159 "exporting ranges: {hash} {ranges:?} size={}",
1160 ctx.current_size()?
1161 );
1162 let bitfield = ctx.bitfield()?;
1163 let data = ctx.data_reader();
1164 let size = bitfield.size();
1165 for range in ranges.iter() {
1166 let range = match range {
1167 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
1168 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
1169 };
1170 let requested = ChunkRanges::bytes(range.start..range.end);
1171 if !bitfield.ranges.is_superset(&requested) {
1172 return Err(io::Error::other(format!(
1173 "missing range: {requested:?}, present: {bitfield:?}",
1174 )));
1175 }
1176 let bs = 1024;
1177 let mut offset = range.start;
1178 loop {
1179 let end: u64 = (offset + bs).min(range.end);
1180 let size = (end - offset) as usize;
1181 let res = data.read_bytes_at(offset, size);
1182 tx.send(ExportRangesItem::Data(Leaf { offset, data: res? }))
1183 .await?;
1184 offset = end;
1185 if offset >= range.end {
1186 break;
1187 }
1188 }
1189 }
1190 Ok(())
1191}
1192
1193async fn export_bao_impl(
1194 ctx: &HashContext,
1195 cmd: ExportBaoRequest,
1196 tx: &mut mpsc::Sender<EncodedItem>,
1197) -> io::Result<()> {
1198 let ExportBaoRequest { ranges, hash, .. } = cmd;
1199 let outboard = ctx.outboard()?;
1200 let size = outboard.tree.size();
1201 if size == 0 && cmd.hash != Hash::EMPTY {
1202 return Ok(());
1204 }
1205 trace!("exporting bao: {hash} {ranges:?} size={size}",);
1206 let data = ctx.data_reader();
1207 let tx = BaoTreeSender::new(tx);
1208 traverse_ranges_validated(data, outboard, &ranges, tx).await?;
1209 Ok(())
1210}
1211
1212async fn export_path_impl(
1213 ctx: &HashContext,
1214 cmd: ExportPathRequest,
1215 tx: &mut mpsc::Sender<ExportProgressItem>,
1216) -> api::Result<()> {
1217 let ExportPathRequest { mode, target, .. } = cmd;
1218 if !target.is_absolute() {
1219 return Err(api::Error::io(
1220 io::ErrorKind::InvalidInput,
1221 "path is not absolute",
1222 ));
1223 }
1224 if let Some(parent) = target.parent() {
1225 fs::create_dir_all(parent)?;
1226 }
1227 let state = ctx.get_entry_state().await?;
1228 let (data_location, outboard_location) = match state {
1229 Some(EntryState::Complete {
1230 data_location,
1231 outboard_location,
1232 }) => (data_location, outboard_location),
1233 Some(EntryState::Partial { .. }) => {
1234 return Err(api::Error::io(
1235 io::ErrorKind::InvalidInput,
1236 "cannot export partial entry",
1237 ));
1238 }
1239 None => {
1240 return Err(api::Error::io(io::ErrorKind::NotFound, "no entry found"));
1241 }
1242 };
1243 trace!("exporting {} to {}", cmd.hash.to_hex(), target.display());
1244 let (data, mut external) = match data_location {
1245 DataLocation::Inline(data) => (MemOrFile::Mem(data), vec![]),
1246 DataLocation::Owned(size) => (
1247 MemOrFile::File((ctx.options().path.data_path(&cmd.hash), size)),
1248 vec![],
1249 ),
1250 DataLocation::External(paths, size) => (
1251 MemOrFile::File((
1252 paths.first().cloned().ok_or_else(|| {
1253 io::Error::new(io::ErrorKind::NotFound, "no external data path")
1254 })?,
1255 size,
1256 )),
1257 paths,
1258 ),
1259 };
1260 let size = match &data {
1261 MemOrFile::Mem(data) => data.len() as u64,
1262 MemOrFile::File((_, size)) => *size,
1263 };
1264 tx.send(ExportProgressItem::Size(size))
1265 .await
1266 .map_err(api::Error::other)?;
1267 match data {
1268 MemOrFile::Mem(data) => {
1269 let mut target = fs::File::create(&target)?;
1270 target.write_all(&data)?;
1271 }
1272 MemOrFile::File((source_path, size)) => match mode {
1273 ExportMode::Copy => {
1274 let res = reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1275 trace!(
1276 "exported {} to {}, {res:?}",
1277 source_path.display(),
1278 target.display()
1279 );
1280 }
1281 ExportMode::TryReference => {
1282 if !external.is_empty() {
1283 let res =
1286 reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1287 trace!(
1288 "exported {} also to {}, {res:?}",
1289 source_path.display(),
1290 target.display()
1291 );
1292 external.push(target);
1293 external.sort();
1294 external.dedup();
1295 external.truncate(MAX_EXTERNAL_PATHS);
1296 } else {
1297 match std::fs::rename(&source_path, &target) {
1300 Ok(()) => {}
1301 Err(cause) => {
1302 const ERR_CROSS: i32 = 18;
1303 if cause.raw_os_error() == Some(ERR_CROSS) {
1304 reflink_or_copy_with_progress(&source_path, &target, size, tx)
1305 .await?;
1306 } else {
1307 return Err(cause.into());
1308 }
1309 }
1310 }
1311 external.push(target);
1312 };
1313 ctx.set(EntryState::Complete {
1315 data_location: DataLocation::External(external, size),
1316 outboard_location,
1317 })
1318 .await?;
1319 }
1320 },
1321 }
1322 tx.send(ExportProgressItem::Done)
1323 .await
1324 .map_err(api::Error::other)?;
1325 Ok(())
1326}
1327
1328trait CopyProgress: RpcMessage {
1329 fn from_offset(offset: u64) -> Self;
1330}
1331
1332impl CopyProgress for ExportProgressItem {
1333 fn from_offset(offset: u64) -> Self {
1334 ExportProgressItem::CopyProgress(offset)
1335 }
1336}
1337
1338impl CopyProgress for AddProgressItem {
1339 fn from_offset(offset: u64) -> Self {
1340 AddProgressItem::CopyProgress(offset)
1341 }
1342}
1343
1344#[derive(Debug)]
1345enum CopyResult {
1346 Reflinked,
1347 Copied,
1348}
1349
1350async fn reflink_or_copy_with_progress(
1351 from: impl AsRef<Path>,
1352 to: impl AsRef<Path>,
1353 size: u64,
1354 tx: &mut mpsc::Sender<impl CopyProgress>,
1355) -> io::Result<CopyResult> {
1356 let from = from.as_ref();
1357 let to = to.as_ref();
1358 if reflink_copy::reflink(from, to).is_ok() {
1359 return Ok(CopyResult::Reflinked);
1360 }
1361 let source = fs::File::open(from)?;
1362 let mut target = fs::File::create(to)?;
1363 copy_with_progress(source, size, &mut target, tx).await?;
1364 Ok(CopyResult::Copied)
1365}
1366
1367async fn copy_with_progress<T: CopyProgress>(
1368 file: impl ReadAt,
1369 size: u64,
1370 target: &mut impl Write,
1371 tx: &mut mpsc::Sender<T>,
1372) -> io::Result<()> {
1373 let mut offset = 0;
1374 let mut buf = vec![0u8; 1024 * 1024];
1375 while offset < size {
1376 let remaining = buf.len().min((size - offset) as usize);
1377 let buf: &mut [u8] = &mut buf[..remaining];
1378 file.read_exact_at(offset, buf)?;
1379 target.write_all(buf)?;
1380 tx.try_send(T::from_offset(offset))
1381 .await
1382 .map_err(|_e| io::Error::other(""))?;
1383 yield_now().await;
1384 offset += buf.len() as u64;
1385 }
1386 Ok(())
1387}
1388
1389impl FsStore {
1390 pub async fn load(root: impl AsRef<Path>) -> anyhow::Result<Self> {
1392 let path = root.as_ref();
1393 let db_path = path.join("blobs.db");
1394 let options = Options::new(path);
1395 Self::load_with_opts(db_path, options).await
1396 }
1397
1398 pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result<FsStore> {
1400 static THREAD_NR: AtomicU64 = AtomicU64::new(0);
1401 let rt = tokio::runtime::Builder::new_multi_thread()
1402 .thread_name_fn(|| {
1403 format!(
1404 "iroh-blob-store-{}",
1405 THREAD_NR.fetch_add(1, Ordering::Relaxed)
1406 )
1407 })
1408 .enable_time()
1409 .build()?;
1410 let handle = rt.handle().clone();
1411 let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
1412 let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
1413 let gc_config = options.gc.clone();
1414 let actor = handle
1415 .spawn(Actor::new(
1416 db_path,
1417 rt.into(),
1418 commands_rx,
1419 fs_commands_rx,
1420 fs_commands_tx.clone(),
1421 Arc::new(options),
1422 ))
1423 .await??;
1424 handle.spawn(actor.run());
1425 let store = FsStore::new(commands_tx.into(), fs_commands_tx);
1426 if let Some(config) = gc_config {
1427 handle.spawn(run_gc(store.deref().clone(), config));
1428 }
1429 Ok(store)
1430 }
1431}
1432
1433#[derive(Debug, Clone)]
1443pub struct FsStore {
1444 sender: ApiClient,
1445 db: tokio::sync::mpsc::Sender<InternalCommand>,
1446}
1447
1448impl From<FsStore> for Store {
1449 fn from(value: FsStore) -> Self {
1450 Store::from_sender(value.sender)
1451 }
1452}
1453
1454impl Deref for FsStore {
1455 type Target = Store;
1456
1457 fn deref(&self) -> &Self::Target {
1458 Store::ref_from_sender(&self.sender)
1459 }
1460}
1461
1462impl AsRef<Store> for FsStore {
1463 fn as_ref(&self) -> &Store {
1464 self.deref()
1465 }
1466}
1467
1468impl FsStore {
1469 fn new(
1470 sender: irpc::LocalSender<proto::Request>,
1471 db: tokio::sync::mpsc::Sender<InternalCommand>,
1472 ) -> Self {
1473 Self {
1474 sender: sender.into(),
1475 db,
1476 }
1477 }
1478
1479 pub async fn dump(&self) -> anyhow::Result<()> {
1480 let (tx, rx) = oneshot::channel();
1481 self.db
1482 .send(
1483 meta::Dump {
1484 tx,
1485 span: tracing::Span::current(),
1486 }
1487 .into(),
1488 )
1489 .await?;
1490 rx.await??;
1491 Ok(())
1492 }
1493}
1494
1495#[cfg(test)]
1496pub mod tests {
1497 use core::panic;
1498 use std::collections::{HashMap, HashSet};
1499
1500 use bao_tree::{io::round_up_to_chunks_groups, ChunkRanges};
1501 use n0_future::{stream, Stream, StreamExt};
1502 use testresult::TestResult;
1503 use walkdir::WalkDir;
1504
1505 use super::*;
1506 use crate::{
1507 api::blobs::Bitfield,
1508 store::{
1509 util::{read_checksummed, tests::create_n0_bao, SliceInfoExt, Tag},
1510 IROH_BLOCK_SIZE,
1511 },
1512 };
1513
1514 pub const INTERESTING_SIZES: [usize; 8] = [
1516 0, 1, 1024, 1024 * 16 - 1, 1024 * 16, 1024 * 16 + 1, 1024 * 1024, 1024 * 1024 * 8, ];
1525
1526 pub fn round_up_request(size: u64, ranges: &ChunkRanges) -> ChunkRanges {
1527 let last_chunk = ChunkNum::chunks(size);
1528 let data_range = ChunkRanges::from(..last_chunk);
1529 let ranges = if !data_range.intersects(ranges) && !ranges.is_empty() {
1530 if last_chunk == 0 {
1531 ChunkRanges::all()
1532 } else {
1533 ChunkRanges::from(last_chunk - 1..)
1534 }
1535 } else {
1536 ranges.clone()
1537 };
1538 round_up_to_chunks_groups(ranges, IROH_BLOCK_SIZE)
1539 }
1540
1541 fn create_n0_bao_full(
1542 data: &[u8],
1543 ranges: &ChunkRanges,
1544 ) -> anyhow::Result<(Hash, ChunkRanges, Vec<u8>)> {
1545 let ranges = round_up_request(data.len() as u64, ranges);
1546 let (hash, encoded) = create_n0_bao(data, &ranges)?;
1547 Ok((hash, ranges, encoded))
1548 }
1549
1550 #[tokio::test]
1551 async fn test_observe() -> TestResult<()> {
1553 tracing_subscriber::fmt::try_init().ok();
1554 let testdir = tempfile::tempdir()?;
1555 let db_dir = testdir.path().join("db");
1556 let options = Options::new(&db_dir);
1557 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options).await?;
1558 let sizes = INTERESTING_SIZES;
1559 for size in sizes {
1560 let data = test_data(size);
1561 let ranges = ChunkRanges::all();
1562 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1563 let obs = store.observe(hash);
1564 let task = tokio::spawn(async move {
1565 obs.await_completion().await?;
1566 api::Result::Ok(())
1567 });
1568 store.import_bao_bytes(hash, ranges, bao).await?;
1569 task.await??;
1570 }
1571 Ok(())
1572 }
1573
1574 pub fn test_data(n: usize) -> Bytes {
1580 let mut res = Vec::with_capacity(n);
1581 for i in 0..n {
1583 let block_num = i / 1024;
1585 let ascii_val = 65 + (block_num % 26) as u8;
1587 res.push(ascii_val);
1588 }
1589 Bytes::from(res)
1590 }
1591
1592 #[tokio::test]
1594 async fn test_import_byte_stream() -> TestResult<()> {
1595 tracing_subscriber::fmt::try_init().ok();
1596 let testdir = tempfile::tempdir()?;
1597 let db_dir = testdir.path().join("db");
1598 let store = FsStore::load(db_dir).await?;
1599 for size in INTERESTING_SIZES {
1600 let expected = test_data(size);
1601 let expected_hash = Hash::new(&expected);
1602 let stream = bytes_to_stream(expected.clone(), 1023);
1603 let obs = store.observe(expected_hash);
1604 let tt = store.add_stream(stream).await.temp_tag().await?;
1605 assert_eq!(expected_hash, tt.hash());
1606 obs.await_completion().await?;
1608 let actual = store.get_bytes(expected_hash).await?;
1609 assert_eq!(&expected, &actual);
1611 }
1612 Ok(())
1613 }
1614
1615 #[tokio::test]
1617 async fn test_import_bytes_simple() -> TestResult<()> {
1618 tracing_subscriber::fmt::try_init().ok();
1619 let testdir = tempfile::tempdir()?;
1620 let db_dir = testdir.path().join("db");
1621 let store = FsStore::load(&db_dir).await?;
1622 let sizes = INTERESTING_SIZES;
1623 trace!("{}", Options::new(&db_dir).is_inlined_data(16385));
1624 for size in sizes {
1625 let expected = test_data(size);
1626 let expected_hash = Hash::new(&expected);
1627 let obs = store.observe(expected_hash);
1628 let tt = store.add_bytes(expected.clone()).await?;
1629 assert_eq!(expected_hash, tt.hash);
1630 obs.await_completion().await?;
1632 let actual = store.get_bytes(expected_hash).await?;
1633 assert_eq!(&expected, &actual);
1635 }
1636 store.shutdown().await?;
1637 dump_dir_full(db_dir)?;
1638 Ok(())
1639 }
1640
1641 #[tokio::test]
1643 #[ignore = "flaky. I need a reliable way to keep the handle alive"]
1644 async fn test_roundtrip_bytes_small() -> TestResult<()> {
1645 tracing_subscriber::fmt::try_init().ok();
1646 let testdir = tempfile::tempdir()?;
1647 let db_dir = testdir.path().join("db");
1648 let store = FsStore::load(db_dir).await?;
1649 for size in INTERESTING_SIZES
1650 .into_iter()
1651 .filter(|x| *x != 0 && *x <= IROH_BLOCK_SIZE.bytes())
1652 {
1653 let expected = test_data(size);
1654 let expected_hash = Hash::new(&expected);
1655 let obs = store.observe(expected_hash);
1656 let tt = store.add_bytes(expected.clone()).await?;
1657 assert_eq!(expected_hash, tt.hash);
1658 let actual = store.get_bytes(expected_hash).await?;
1659 assert_eq!(&expected, &actual);
1661 assert_eq!(
1662 &expected.addr(),
1663 &actual.addr(),
1664 "address mismatch for size {size}"
1665 );
1666 obs.await_completion().await?;
1670 }
1671 store.shutdown().await?;
1672 Ok(())
1673 }
1674
1675 #[tokio::test]
1677 async fn test_import_path() -> TestResult<()> {
1678 tracing_subscriber::fmt::try_init().ok();
1679 let testdir = tempfile::tempdir()?;
1680 let db_dir = testdir.path().join("db");
1681 let store = FsStore::load(db_dir).await?;
1682 for size in INTERESTING_SIZES {
1683 let expected = test_data(size);
1684 let expected_hash = Hash::new(&expected);
1685 let path = testdir.path().join(format!("in-{size}"));
1686 fs::write(&path, &expected)?;
1687 let obs = store.observe(expected_hash);
1688 let tt = store.add_path(&path).await?;
1689 assert_eq!(expected_hash, tt.hash);
1690 obs.await_completion().await?;
1692 let actual = store.get_bytes(expected_hash).await?;
1693 assert_eq!(&expected, &actual, "size={size}");
1695 }
1696 dump_dir_full(testdir.path())?;
1697 Ok(())
1698 }
1699
1700 #[tokio::test]
1702 async fn test_export_path() -> TestResult<()> {
1703 tracing_subscriber::fmt::try_init().ok();
1704 let testdir = tempfile::tempdir()?;
1705 let db_dir = testdir.path().join("db");
1706 let store = FsStore::load(db_dir).await?;
1707 for size in INTERESTING_SIZES {
1708 let expected = test_data(size);
1709 let expected_hash = Hash::new(&expected);
1710 let tt = store.add_bytes(expected.clone()).await?;
1711 assert_eq!(expected_hash, tt.hash);
1712 let out_path = testdir.path().join(format!("out-{size}"));
1713 store.export(expected_hash, &out_path).await?;
1714 let actual = fs::read(&out_path)?;
1715 assert_eq!(expected, actual);
1716 }
1717 Ok(())
1718 }
1719
1720 #[tokio::test]
1721 async fn test_import_bao_ranges() -> TestResult<()> {
1722 tracing_subscriber::fmt::try_init().ok();
1723 let testdir = tempfile::tempdir()?;
1724 let db_dir = testdir.path().join("db");
1725 {
1726 let store = FsStore::load(&db_dir).await?;
1727 let data = test_data(100000);
1728 let ranges = ChunkRanges::chunks(16..32);
1729 let (hash, bao) = create_n0_bao(&data, &ranges)?;
1730 store
1731 .import_bao_bytes(hash, ranges.clone(), bao.clone())
1732 .await?;
1733 let bitfield = store.observe(hash).await?;
1734 assert_eq!(bitfield.ranges, ranges);
1735 assert_eq!(bitfield.size(), data.len() as u64);
1736 let export = store.export_bao(hash, ranges).bao_to_vec().await?;
1737 assert_eq!(export, bao);
1738 }
1739 Ok(())
1740 }
1741
1742 #[tokio::test]
1743 async fn test_import_bao_minimal() -> TestResult<()> {
1744 tracing_subscriber::fmt::try_init().ok();
1745 let testdir = tempfile::tempdir()?;
1746 let sizes = [1];
1747 let db_dir = testdir.path().join("db");
1748 {
1749 let store = FsStore::load(&db_dir).await?;
1750 for size in sizes {
1751 let data = vec![0u8; size];
1752 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1753 let data = Bytes::from(encoded);
1754 store
1755 .import_bao_bytes(hash, ChunkRanges::all(), data)
1756 .await?;
1757 }
1758 store.shutdown().await?;
1759 }
1760 Ok(())
1761 }
1762
1763 #[tokio::test]
1764 async fn test_import_bao_simple() -> TestResult<()> {
1765 tracing_subscriber::fmt::try_init().ok();
1766 let testdir = tempfile::tempdir()?;
1767 let sizes = [1048576];
1768 let db_dir = testdir.path().join("db");
1769 {
1770 let store = FsStore::load(&db_dir).await?;
1771 for size in sizes {
1772 let data = vec![0u8; size];
1773 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1774 let data = Bytes::from(encoded);
1775 trace!("importing size={}", size);
1776 store
1777 .import_bao_bytes(hash, ChunkRanges::all(), data)
1778 .await?;
1779 }
1780 store.shutdown().await?;
1781 }
1782 Ok(())
1783 }
1784
1785 #[tokio::test]
1786 async fn test_import_bao_persistence_full() -> TestResult<()> {
1787 tracing_subscriber::fmt::try_init().ok();
1788 let testdir = tempfile::tempdir()?;
1789 let sizes = INTERESTING_SIZES;
1790 let db_dir = testdir.path().join("db");
1791 {
1792 let store = FsStore::load(&db_dir).await?;
1793 for size in sizes {
1794 let data = vec![0u8; size];
1795 let (hash, encoded) = create_n0_bao(&data, &ChunkRanges::all())?;
1796 let data = Bytes::from(encoded);
1797 store
1798 .import_bao_bytes(hash, ChunkRanges::all(), data)
1799 .await?;
1800 }
1801 store.shutdown().await?;
1802 }
1803 {
1804 let store = FsStore::load(&db_dir).await?;
1805 for size in sizes {
1806 let expected = vec![0u8; size];
1807 let hash = Hash::new(&expected);
1808 let actual = store
1809 .export_bao(hash, ChunkRanges::all())
1810 .data_to_vec()
1811 .await?;
1812 assert_eq!(&expected, &actual);
1813 }
1814 store.shutdown().await?;
1815 }
1816 Ok(())
1817 }
1818
1819 #[tokio::test]
1820 async fn test_import_bao_persistence_just_size() -> TestResult<()> {
1821 tracing_subscriber::fmt::try_init().ok();
1822 let testdir = tempfile::tempdir()?;
1823 let sizes = INTERESTING_SIZES;
1824 let db_dir = testdir.path().join("db");
1825 let just_size = ChunkRanges::last_chunk();
1826 {
1827 let store = FsStore::load(&db_dir).await?;
1828 for size in sizes {
1829 let data = test_data(size);
1830 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1831 let data = Bytes::from(encoded);
1832 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1833 panic!("failed to import size={size}: {cause}");
1834 }
1835 }
1836 store.dump().await?;
1837 store.shutdown().await?;
1838 }
1839 {
1840 let store = FsStore::load(&db_dir).await?;
1841 store.dump().await?;
1842 for size in sizes {
1843 let data = test_data(size);
1844 let (hash, ranges, expected) = create_n0_bao_full(&data, &just_size)?;
1845 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1846 Ok(actual) => actual,
1847 Err(cause) => panic!("failed to export size={size}: {cause}"),
1848 };
1849 assert_eq!(&expected, &actual);
1850 }
1851 store.shutdown().await?;
1852 }
1853 dump_dir_full(testdir.path())?;
1854 Ok(())
1855 }
1856
1857 #[tokio::test]
1858 async fn test_import_bao_persistence_two_stages() -> TestResult<()> {
1859 tracing_subscriber::fmt::try_init().ok();
1860 let testdir = tempfile::tempdir()?;
1861 let sizes = INTERESTING_SIZES;
1862 let db_dir = testdir.path().join("db");
1863 let just_size = ChunkRanges::last_chunk();
1864 {
1866 let store = FsStore::load(&db_dir).await?;
1867 for size in sizes {
1868 let data = test_data(size);
1869 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1870 let data = Bytes::from(encoded);
1871 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1872 panic!("failed to import size={size}: {cause}");
1873 }
1874 }
1875 store.dump().await?;
1876 store.shutdown().await?;
1877 }
1878 dump_dir_full(testdir.path())?;
1879 {
1881 let store = FsStore::load(&db_dir).await?;
1882 for size in sizes {
1883 let remaining = ChunkRanges::all() - round_up_request(size as u64, &just_size);
1884 if remaining.is_empty() {
1885 continue;
1886 }
1887 let data = test_data(size);
1888 let (hash, ranges, encoded) = create_n0_bao_full(&data, &remaining)?;
1889 let data = Bytes::from(encoded);
1890 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1891 panic!("failed to import size={size}: {cause}");
1892 }
1893 }
1894 store.dump().await?;
1895 store.shutdown().await?;
1896 }
1897 {
1899 let store = FsStore::load(&db_dir).await?;
1900 store.dump().await?;
1901 for size in sizes {
1902 let data = test_data(size);
1903 let (hash, ranges, expected) = create_n0_bao_full(&data, &ChunkRanges::all())?;
1904 let actual = match store.export_bao(hash, ranges).bao_to_vec().await {
1905 Ok(actual) => actual,
1906 Err(cause) => panic!("failed to export size={size}: {cause}"),
1907 };
1908 assert_eq!(&expected, &actual);
1909 }
1910 store.dump().await?;
1911 store.shutdown().await?;
1912 }
1913 dump_dir_full(testdir.path())?;
1914 Ok(())
1915 }
1916
1917 fn just_size() -> ChunkRanges {
1918 ChunkRanges::last_chunk()
1919 }
1920
1921 #[tokio::test]
1922 async fn test_import_bao_persistence_observe() -> TestResult<()> {
1923 tracing_subscriber::fmt::try_init().ok();
1924 let testdir = tempfile::tempdir()?;
1925 let sizes = INTERESTING_SIZES;
1926 let db_dir = testdir.path().join("db");
1927 let just_size = just_size();
1928 {
1930 let store = FsStore::load(&db_dir).await?;
1931 for size in sizes {
1932 let data = test_data(size);
1933 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1934 let data = Bytes::from(encoded);
1935 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1936 panic!("failed to import size={size}: {cause}");
1937 }
1938 }
1939 store.dump().await?;
1940 store.shutdown().await?;
1941 }
1942 dump_dir_full(testdir.path())?;
1943 {
1945 let store = FsStore::load(&db_dir).await?;
1946 for size in sizes {
1947 let expected_ranges = round_up_request(size as u64, &just_size);
1948 let data = test_data(size);
1949 let hash = Hash::new(&data);
1950 let bitfield = store.observe(hash).await?;
1951 assert_eq!(bitfield.ranges, expected_ranges);
1952 }
1953 store.dump().await?;
1954 store.shutdown().await?;
1955 }
1956 Ok(())
1957 }
1958
1959 #[tokio::test]
1960 async fn test_import_bao_persistence_recover() -> TestResult<()> {
1961 tracing_subscriber::fmt::try_init().ok();
1962 let testdir = tempfile::tempdir()?;
1963 let sizes = INTERESTING_SIZES;
1964 let db_dir = testdir.path().join("db");
1965 let options = Options::new(&db_dir);
1966 let just_size = just_size();
1967 {
1969 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1970 for size in sizes {
1971 let data = test_data(size);
1972 let (hash, ranges, encoded) = create_n0_bao_full(&data, &just_size)?;
1973 let data = Bytes::from(encoded);
1974 if let Err(cause) = store.import_bao_bytes(hash, ranges, data).await {
1975 panic!("failed to import size={size}: {cause}");
1976 }
1977 }
1978 store.dump().await?;
1979 store.shutdown().await?;
1980 }
1981 delete_rec(testdir.path(), "bitfield")?;
1982 dump_dir_full(testdir.path())?;
1983 {
1985 let store = FsStore::load_with_opts(db_dir.join("blobs.db"), options.clone()).await?;
1986 for size in sizes {
1987 let expected_ranges = round_up_request(size as u64, &just_size);
1988 let data = test_data(size);
1989 let hash = Hash::new(&data);
1990 let bitfield = store.observe(hash).await?;
1991 assert_eq!(bitfield.ranges, expected_ranges, "size={size}");
1992 }
1993 store.dump().await?;
1994 store.shutdown().await?;
1995 }
1996 Ok(())
1997 }
1998
1999 #[tokio::test]
2000 async fn test_import_bytes_persistence_full() -> TestResult<()> {
2001 tracing_subscriber::fmt::try_init().ok();
2002 let testdir = tempfile::tempdir()?;
2003 let sizes = INTERESTING_SIZES;
2004 let db_dir = testdir.path().join("db");
2005 {
2006 let store = FsStore::load(&db_dir).await?;
2007 let mut tts = Vec::new();
2008 for size in sizes {
2009 let data = test_data(size);
2010 let data = data;
2011 tts.push(store.add_bytes(data.clone()).await?);
2012 }
2013 store.dump().await?;
2014 store.shutdown().await?;
2015 }
2016 {
2017 let store = FsStore::load(&db_dir).await?;
2018 store.dump().await?;
2019 for size in sizes {
2020 let expected = test_data(size);
2021 let hash = Hash::new(&expected);
2022 let Ok(actual) = store
2023 .export_bao(hash, ChunkRanges::all())
2024 .data_to_vec()
2025 .await
2026 else {
2027 panic!("failed to export size={size}");
2028 };
2029 assert_eq!(&expected, &actual, "size={size}");
2030 }
2031 store.shutdown().await?;
2032 }
2033 Ok(())
2034 }
2035
2036 async fn test_batch(store: &Store) -> TestResult<()> {
2037 let batch = store.blobs().batch().await?;
2038 let tt1 = batch.temp_tag(Hash::new("foo")).await?;
2039 let tt2 = batch.add_slice("boo").await?;
2040 let tts = store
2041 .tags()
2042 .list_temp_tags()
2043 .await?
2044 .collect::<HashSet<_>>()
2045 .await;
2046 assert!(tts.contains(&tt1.hash_and_format()));
2047 assert!(tts.contains(&tt2.hash_and_format()));
2048 drop(batch);
2049 store.sync_db().await?;
2050 store.wait_idle().await?;
2051 let tts = store
2052 .tags()
2053 .list_temp_tags()
2054 .await?
2055 .collect::<HashSet<_>>()
2056 .await;
2057 assert!(!tts.contains(&tt1.hash_and_format()));
2059 assert!(!tts.contains(&tt2.hash_and_format()));
2060 drop(tt1);
2061 drop(tt2);
2062 Ok(())
2063 }
2064
2065 #[tokio::test]
2066 async fn test_batch_fs() -> TestResult<()> {
2067 tracing_subscriber::fmt::try_init().ok();
2068 let testdir = tempfile::tempdir()?;
2069 let db_dir = testdir.path().join("db");
2070 let store = FsStore::load(db_dir).await?;
2071 test_batch(&store).await
2072 }
2073
2074 #[tokio::test]
2075 async fn smoke() -> TestResult<()> {
2076 tracing_subscriber::fmt::try_init().ok();
2077 let testdir = tempfile::tempdir()?;
2078 let db_dir = testdir.path().join("db");
2079 let store = FsStore::load(db_dir).await?;
2080 let haf = HashAndFormat::raw(Hash::from([0u8; 32]));
2081 store.tags().set(Tag::from("test"), haf).await?;
2082 store.tags().set(Tag::from("boo"), haf).await?;
2083 store.tags().set(Tag::from("bar"), haf).await?;
2084 let sizes = INTERESTING_SIZES;
2085 let mut hashes = Vec::new();
2086 let mut data_by_hash = HashMap::new();
2087 let mut bao_by_hash = HashMap::new();
2088 for size in sizes {
2089 let data = vec![0u8; size];
2090 let data = Bytes::from(data);
2091 let tt = store.add_bytes(data.clone()).temp_tag().await?;
2092 data_by_hash.insert(tt.hash(), data);
2093 hashes.push(tt);
2094 }
2095 store.sync_db().await?;
2096 for tt in &hashes {
2097 let hash = tt.hash();
2098 let path = testdir.path().join(format!("{hash}.txt"));
2099 store.export(hash, path).await?;
2100 }
2101 for tt in &hashes {
2102 let hash = tt.hash();
2103 let data = store
2104 .export_bao(hash, ChunkRanges::all())
2105 .data_to_vec()
2106 .await
2107 .unwrap();
2108 assert_eq!(data, data_by_hash[&hash].to_vec());
2109 let bao = store
2110 .export_bao(hash, ChunkRanges::all())
2111 .bao_to_vec()
2112 .await
2113 .unwrap();
2114 bao_by_hash.insert(hash, bao);
2115 }
2116 store.dump().await?;
2117
2118 for size in sizes {
2119 let data = test_data(size);
2120 let ranges = ChunkRanges::all();
2121 let (hash, bao) = create_n0_bao(&data, &ranges)?;
2122 store.import_bao_bytes(hash, ranges, bao).await?;
2123 }
2124
2125 for (_hash, _bao_tree) in bao_by_hash {
2126 }
2136 Ok(())
2137 }
2138
2139 pub fn delete_rec(root_dir: impl AsRef<Path>, extension: &str) -> Result<(), std::io::Error> {
2140 let ext = extension.trim_start_matches('.').to_lowercase();
2142
2143 for entry in WalkDir::new(root_dir).into_iter().filter_map(|e| e.ok()) {
2144 let path = entry.path();
2145
2146 if path.is_file() {
2147 if let Some(file_ext) = path.extension() {
2148 if file_ext.to_string_lossy().to_lowercase() == ext {
2149 fs::remove_file(path)?;
2150 }
2151 }
2152 }
2153 }
2154
2155 Ok(())
2156 }
2157
2158 pub fn dump_dir(path: impl AsRef<Path>) -> io::Result<()> {
2159 let mut entries: Vec<_> = WalkDir::new(&path)
2160 .into_iter()
2161 .filter_map(Result::ok) .collect();
2163
2164 entries.sort_by(|a, b| a.path().cmp(b.path()));
2166
2167 for entry in entries {
2168 let depth = entry.depth();
2169 let indent = " ".repeat(depth); let name = entry.file_name().to_string_lossy();
2171 let size = entry.metadata()?.len(); if entry.file_type().is_file() {
2174 println!("{indent}{name} ({size} bytes)");
2175 } else if entry.file_type().is_dir() {
2176 println!("{indent}{name}/");
2177 }
2178 }
2179 Ok(())
2180 }
2181
2182 pub fn dump_dir_full(path: impl AsRef<Path>) -> io::Result<()> {
2183 let mut entries: Vec<_> = WalkDir::new(&path)
2184 .into_iter()
2185 .filter_map(Result::ok) .collect();
2187
2188 entries.sort_by(|a, b| a.path().cmp(b.path()));
2190
2191 for entry in entries {
2192 let depth = entry.depth();
2193 let indent = " ".repeat(depth);
2194 let name = entry.file_name().to_string_lossy();
2195
2196 if entry.file_type().is_dir() {
2197 println!("{indent}{name}/");
2198 } else if entry.file_type().is_file() {
2199 let size = entry.metadata()?.len();
2200 println!("{indent}{name} ({size} bytes)");
2201
2202 let path = entry.path();
2204 if name.ends_with(".data") {
2205 print!("{indent} ");
2206 dump_file(path, 1024 * 16)?;
2207 } else if name.ends_with(".obao4") {
2208 print!("{indent} ");
2209 dump_file(path, 64)?;
2210 } else if name.ends_with(".sizes4") {
2211 print!("{indent} ");
2212 dump_file(path, 8)?;
2213 } else if name.ends_with(".bitfield") {
2214 match read_checksummed::<Bitfield>(path) {
2215 Ok(bitfield) => {
2216 println!("{indent} bitfield: {bitfield:?}");
2217 }
2218 Err(cause) => {
2219 println!("{indent} bitfield: error: {cause}");
2220 }
2221 }
2222 } else {
2223 continue; };
2225 }
2226 }
2227 Ok(())
2228 }
2229
2230 pub fn dump_file<P: AsRef<Path>>(path: P, chunk_size: u64) -> io::Result<()> {
2231 let bits = file_bits(path, chunk_size)?;
2232 println!("{}", print_bitfield_ansi(bits));
2233 Ok(())
2234 }
2235
2236 pub fn file_bits(path: impl AsRef<Path>, chunk_size: u64) -> io::Result<Vec<bool>> {
2237 let file = fs::File::open(&path)?;
2238 let file_size = file.metadata()?.len();
2239 let mut buffer = vec![0u8; chunk_size as usize];
2240 let mut bits = Vec::new();
2241
2242 let mut offset = 0u64;
2243 while offset < file_size {
2244 let remaining = file_size - offset;
2245 let current_chunk_size = chunk_size.min(remaining);
2246
2247 let chunk = &mut buffer[..current_chunk_size as usize];
2248 file.read_exact_at(offset, chunk)?;
2249
2250 let has_non_zero = chunk.iter().any(|&byte| byte != 0);
2251 bits.push(has_non_zero);
2252
2253 offset += current_chunk_size;
2254 }
2255
2256 Ok(bits)
2257 }
2258
2259 #[allow(dead_code)]
2260 fn print_bitfield(bits: impl IntoIterator<Item = bool>) -> String {
2261 bits.into_iter()
2262 .map(|bit| if bit { '#' } else { '_' })
2263 .collect()
2264 }
2265
2266 fn print_bitfield_ansi(bits: impl IntoIterator<Item = bool>) -> String {
2267 let mut result = String::new();
2268 let mut iter = bits.into_iter();
2269
2270 while let Some(b1) = iter.next() {
2271 let b2 = iter.next();
2272
2273 let white_fg = "\x1b[97m"; let reset = "\x1b[0m"; let gray_bg = "\x1b[100m"; let black_bg = "\x1b[40m"; let colored_char = match (b1, b2) {
2280 (true, Some(true)) => format!("{}{}{}", white_fg, '█', reset), (true, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, '▌', reset), (false, Some(true)) => format!("{}{}{}{}", gray_bg, white_fg, '▐', reset), (false, Some(false)) => format!("{}{}{}{}", gray_bg, white_fg, ' ', reset), (true, None) => format!("{}{}{}{}", black_bg, white_fg, '▌', reset), (false, None) => format!("{}{}{}{}", black_bg, white_fg, ' ', reset), };
2287
2288 result.push_str(&colored_char);
2289 }
2290
2291 result.push_str("\x1b[0m");
2293 result
2294 }
2295
2296 fn bytes_to_stream(
2297 bytes: Bytes,
2298 chunk_size: usize,
2299 ) -> impl Stream<Item = io::Result<Bytes>> + 'static {
2300 assert!(chunk_size > 0, "Chunk size must be greater than 0");
2301 stream::unfold((bytes, 0), move |(bytes, offset)| async move {
2302 if offset >= bytes.len() {
2303 None
2304 } else {
2305 let chunk_len = chunk_size.min(bytes.len() - offset);
2306 let chunk = bytes.slice(offset..offset + chunk_len);
2307 Some((Ok(chunk), (bytes, offset + chunk_len)))
2308 }
2309 })
2310 }
2311}