1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17 time::SystemTime,
18};
19
20use bao_tree::{
21 blake3,
22 io::{
23 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24 outboard::PreOrderMemOutboard,
25 sync::{Outboard, ReadAt, WriteAt},
26 BaoContentItem, EncodeError, Leaf,
27 },
28 BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35 io::AsyncReadExt,
36 sync::watch,
37 task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43 api::{
44 self,
45 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46 proto::{
47 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54 SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55 },
56 tags::TagInfo,
57 ApiClient,
58 },
59 protocol::ChunkRangesExt,
60 store::{
61 gc::{run_gc, GcConfig},
62 util::{SizeInfo, SparseMemFile, Tag},
63 IROH_BLOCK_SIZE,
64 },
65 util::temp_tag::{TagDrop, TempTagScope, TempTags},
66 BlobFormat, Hash, HashAndFormat,
67};
68
69#[derive(Debug, Default)]
70pub struct Options {
71 pub gc_config: Option<GcConfig>,
72}
73
74#[derive(Debug, Clone)]
75#[repr(transparent)]
76pub struct MemStore {
77 client: ApiClient,
78}
79
80impl From<MemStore> for crate::api::Store {
81 fn from(value: MemStore) -> Self {
82 crate::api::Store::from_sender(value.client)
83 }
84}
85
86impl AsRef<crate::api::Store> for MemStore {
87 fn as_ref(&self) -> &crate::api::Store {
88 crate::api::Store::ref_from_sender(&self.client)
89 }
90}
91
92impl Deref for MemStore {
93 type Target = crate::api::Store;
94
95 fn deref(&self) -> &Self::Target {
96 crate::api::Store::ref_from_sender(&self.client)
97 }
98}
99
100impl Default for MemStore {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106#[derive(derive_more::From)]
107enum TaskResult {
108 Unit(()),
109 Import(anyhow::Result<ImportEntry>),
110 Scope(Scope),
111}
112
113impl MemStore {
114 pub fn from_sender(client: ApiClient) -> Self {
115 Self { client }
116 }
117
118 pub fn new() -> Self {
119 Self::new_with_opts(Options::default())
120 }
121
122 pub fn new_with_opts(opts: Options) -> Self {
123 let (sender, receiver) = tokio::sync::mpsc::channel(32);
124 tokio::spawn(
125 Actor {
126 commands: receiver,
127 tasks: JoinSet::new(),
128 state: State {
129 data: HashMap::new(),
130 tags: BTreeMap::new(),
131 empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
132 },
133 options: Arc::new(Options::default()),
134 temp_tags: Default::default(),
135 protected: Default::default(),
136 idle_waiters: Default::default(),
137 }
138 .run(),
139 );
140
141 let store = Self::from_sender(sender.into());
142 if let Some(gc_config) = opts.gc_config {
143 tokio::spawn(run_gc(store.deref().clone(), gc_config));
144 }
145
146 store
147 }
148}
149
150struct Actor {
151 commands: tokio::sync::mpsc::Receiver<Command>,
152 tasks: JoinSet<TaskResult>,
153 state: State,
154 #[allow(dead_code)]
155 options: Arc<Options>,
156 temp_tags: TempTags,
158 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
160 protected: HashSet<Hash>,
161}
162
163impl Actor {
164 fn spawn<F, T>(&mut self, f: F)
165 where
166 F: Future<Output = T> + Send + 'static,
167 T: Into<TaskResult>,
168 {
169 let span = tracing::Span::current();
170 let fut = async move { f.await.into() }.instrument(span);
171 self.tasks.spawn(fut);
172 }
173
174 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
175 match cmd {
176 Command::ImportBao(ImportBaoMsg {
177 inner: ImportBaoRequest { hash, size },
178 rx: data,
179 tx,
180 ..
181 }) => {
182 let entry = self.get_or_create_entry(hash);
183 self.spawn(import_bao(entry, size, data, tx));
184 }
185 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
186 trace!("wait idle");
187 if self.tasks.is_empty() {
188 tx.send(()).await.ok();
190 } else {
191 self.idle_waiters.push(tx);
193 }
194 }
195 Command::Observe(ObserveMsg {
196 inner: ObserveRequest { hash },
197 tx,
198 ..
199 }) => {
200 let entry = self.get_or_create_entry(hash);
201 self.spawn(observe(entry, tx));
202 }
203 Command::ImportBytes(ImportBytesMsg {
204 inner:
205 ImportBytesRequest {
206 data,
207 scope,
208 format,
209 ..
210 },
211 tx,
212 ..
213 }) => {
214 self.spawn(import_bytes(data, scope, format, tx));
215 }
216 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
217 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
218 }
219 Command::ImportPath(cmd) => {
220 self.spawn(import_path(cmd));
221 }
222 Command::ExportBao(ExportBaoMsg {
223 inner: ExportBaoRequest { hash, ranges },
224 tx,
225 ..
226 }) => {
227 let entry = self.get(&hash);
228 self.spawn(export_bao(entry, ranges, tx))
229 }
230 Command::ExportPath(cmd) => {
231 let entry = self.get(&cmd.hash);
232 self.spawn(export_path(entry, cmd));
233 }
234 Command::DeleteTags(cmd) => {
235 let DeleteTagsMsg {
236 inner: DeleteTagsRequest { from, to },
237 tx,
238 ..
239 } = cmd;
240 info!("deleting tags from {:?} to {:?}", from, to);
241 let mut deleted = 0;
244 self.state.tags.retain(|tag, _| {
245 if let Some(from) = &from {
246 if tag < from {
247 return true;
248 }
249 }
250 if let Some(to) = &to {
251 if tag >= to {
252 return true;
253 }
254 }
255 info!(" removing {:?}", tag);
256 deleted += 1;
257 false
258 });
259 tx.send(Ok(deleted)).await.ok();
260 }
261 Command::RenameTag(cmd) => {
262 let RenameTagMsg {
263 inner: RenameTagRequest { from, to },
264 tx,
265 ..
266 } = cmd;
267 let tags = &mut self.state.tags;
268 let value = match tags.remove(&from) {
269 Some(value) => value,
270 None => {
271 tx.send(Err(api::Error::io(
272 io::ErrorKind::NotFound,
273 format!("tag not found: {from:?}"),
274 )))
275 .await
276 .ok();
277 return None;
278 }
279 };
280 tags.insert(to, value);
281 tx.send(Ok(())).await.ok();
282 return None;
283 }
284 Command::ListTags(cmd) => {
285 let ListTagsMsg {
286 inner:
287 ListTagsRequest {
288 from,
289 to,
290 raw,
291 hash_seq,
292 },
293 tx,
294 ..
295 } = cmd;
296 let tags = self
297 .state
298 .tags
299 .iter()
300 .filter(move |(tag, value)| {
301 if let Some(from) = &from {
302 if tag < &from {
303 return false;
304 }
305 }
306 if let Some(to) = &to {
307 if tag >= &to {
308 return false;
309 }
310 }
311 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
312 })
313 .map(|(tag, value)| TagInfo {
314 name: tag.clone(),
315 hash: value.hash,
316 format: value.format,
317 })
318 .map(Ok);
319 tx.send(tags.collect()).await.ok();
320 }
321 Command::SetTag(SetTagMsg {
322 inner: SetTagRequest { name: tag, value },
323 tx,
324 ..
325 }) => {
326 self.state.tags.insert(tag, value);
327 tx.send(Ok(())).await.ok();
328 }
329 Command::CreateTag(CreateTagMsg {
330 inner: CreateTagRequest { value },
331 tx,
332 ..
333 }) => {
334 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
335 self.state.tags.insert(tag.clone(), value);
336 tx.send(Ok(tag)).await.ok();
337 }
338 Command::CreateTempTag(cmd) => {
339 trace!("{cmd:?}");
340 self.create_temp_tag(cmd).await;
341 }
342 Command::ListTempTags(cmd) => {
343 trace!("{cmd:?}");
344 let tts = self.temp_tags.list();
345 cmd.tx.send(tts).await.ok();
346 }
347 Command::ListBlobs(cmd) => {
348 let ListBlobsMsg { tx, .. } = cmd;
349 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
350 self.spawn(async move {
351 for blob in blobs {
352 if tx.send(Ok(blob)).await.is_err() {
353 break;
354 }
355 }
356 });
357 }
358 Command::BlobStatus(cmd) => {
359 trace!("{cmd:?}");
360 let BlobStatusMsg {
361 inner: BlobStatusRequest { hash },
362 tx,
363 ..
364 } = cmd;
365 let res = match self.get(&hash) {
366 None => api::blobs::BlobStatus::NotFound,
367 Some(x) => {
368 let bitfield = x.0.state.borrow().bitfield();
369 if bitfield.is_complete() {
370 BlobStatus::Complete {
371 size: bitfield.size,
372 }
373 } else {
374 BlobStatus::Partial {
375 size: bitfield.validated_size(),
376 }
377 }
378 }
379 };
380 tx.send(res).await.ok();
381 }
382 Command::DeleteBlobs(cmd) => {
383 trace!("{cmd:?}");
384 let DeleteBlobsMsg {
385 inner: BlobDeleteRequest { hashes, force },
386 tx,
387 ..
388 } = cmd;
389 for hash in hashes {
390 if !force && self.protected.contains(&hash) {
391 continue;
392 }
393 self.state.data.remove(&hash);
394 }
395 tx.send(Ok(())).await.ok();
396 }
397 Command::Batch(cmd) => {
398 trace!("{cmd:?}");
399 let (id, scope) = self.temp_tags.create_scope();
400 self.spawn(handle_batch(cmd, id, scope));
401 }
402 Command::ClearProtected(cmd) => {
403 self.protected.clear();
404 cmd.tx.send(Ok(())).await.ok();
405 }
406 Command::ExportRanges(cmd) => {
407 let entry = self.get(&cmd.hash);
408 self.spawn(export_ranges(cmd, entry));
409 }
410 Command::SyncDb(SyncDbMsg { tx, .. }) => {
411 tx.send(Ok(())).await.ok();
412 }
413 Command::Shutdown(cmd) => {
414 return Some(cmd);
415 }
416 }
417 None
418 }
419
420 fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
421 if *hash == Hash::EMPTY {
422 Some(self.state.empty_hash.clone())
423 } else {
424 self.state.data.get(hash).cloned()
425 }
426 }
427
428 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
429 if hash == Hash::EMPTY {
430 self.state.empty_hash.clone()
431 } else {
432 self.state
433 .data
434 .entry(hash)
435 .or_insert_with(|| BaoFileHandle::new_partial(hash))
436 .clone()
437 }
438 }
439
440 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
441 let CreateTempTagMsg { tx, inner, .. } = cmd;
442 let mut tt = self.temp_tags.create(inner.scope, inner.value);
443 if tx.is_rpc() {
444 tt.leak();
445 }
446 tx.send(tt).await.ok();
447 }
448
449 async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
450 let import_data = match res {
451 Ok(entry) => entry,
452 Err(e) => {
453 error!("import failed: {e}");
454 return;
455 }
456 };
457 let hash = import_data.outboard.root().into();
458 let entry = self.get_or_create_entry(hash);
459 entry
460 .0
461 .state
462 .send_if_modified(|state: &mut BaoFileStorage| {
463 let BaoFileStorage::Partial(_) = state.deref() else {
464 return false;
465 };
466 *state =
467 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
468 true
469 });
470 let tt = self.temp_tags.create(
471 import_data.scope,
472 HashAndFormat {
473 hash,
474 format: import_data.format,
475 },
476 );
477 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
478 }
479
480 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
481 match res {
482 Ok(x) => Some(x),
483 Err(e) => {
484 if e.is_cancelled() {
485 trace!("task cancelled: {e}");
486 } else {
487 error!("task failed: {e}");
488 }
489 None
490 }
491 }
492 }
493
494 pub async fn run(mut self) {
495 let shutdown = loop {
496 tokio::select! {
497 cmd = self.commands.recv() => {
498 let Some(cmd) = cmd else {
499 break None;
502 };
503 if let Some(cmd) = self.handle_command(cmd).await {
504 break Some(cmd);
505 }
506 }
507 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
508 let Some(res) = self.log_task_result(res) else {
509 continue;
510 };
511 match res {
512 TaskResult::Import(res) => {
513 self.finish_import(res).await;
514 }
515 TaskResult::Scope(scope) => {
516 self.temp_tags.end_scope(scope);
517 }
518 TaskResult::Unit(_) => {}
519 }
520 if self.tasks.is_empty() {
521 for tx in self.idle_waiters.drain(..) {
523 tx.send(()).await.ok();
524 }
525 }
526 }
527 }
528 };
529 if let Some(shutdown) = shutdown {
530 shutdown.tx.send(()).await.ok();
531 }
532 }
533}
534
535async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
536 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
537 error!("batch failed: {cause}");
538 }
539 id
540}
541
542async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
543 let BatchMsg { tx, mut rx, .. } = cmd;
544 trace!("created scope {}", id);
545 tx.send(id).await.map_err(api::Error::other)?;
546 while let Some(msg) = rx.recv().await? {
547 match msg {
548 BatchResponse::Drop(msg) => scope.on_drop(&msg),
549 BatchResponse::Ping => {}
550 }
551 }
552 Ok(())
553}
554
555async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
556 let Some(entry) = entry else {
557 let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
558 cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
559 return;
560 };
561 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
562 cmd.tx
563 .send(ExportRangesItem::Error(cause.into()))
564 .await
565 .ok();
566 }
567}
568
569async fn export_ranges_impl(
570 cmd: ExportRangesRequest,
571 tx: &mut mpsc::Sender<ExportRangesItem>,
572 entry: BaoFileHandle,
573) -> io::Result<()> {
574 let ExportRangesRequest { ranges, hash } = cmd;
575 let bitfield = entry.bitfield();
576 trace!(
577 "exporting ranges: {hash} {ranges:?} size={}",
578 bitfield.size()
579 );
580 debug_assert!(entry.hash() == hash, "hash mismatch");
581 let data = entry.data_reader();
582 let size = bitfield.size();
583 for range in ranges.iter() {
584 let range = match range {
585 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
586 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
587 };
588 let requested = ChunkRanges::bytes(range.start..range.end);
589 if !bitfield.ranges.is_superset(&requested) {
590 return Err(io::Error::other(format!(
591 "missing range: {requested:?}, present: {bitfield:?}",
592 )));
593 }
594 let bs = 1024;
595 let mut offset = range.start;
596 loop {
597 let end: u64 = (offset + bs).min(range.end);
598 let size = (end - offset) as usize;
599 tx.send(
600 Leaf {
601 offset,
602 data: data.read_bytes_at(offset, size)?,
603 }
604 .into(),
605 )
606 .await?;
607 offset = end;
608 if offset >= range.end {
609 break;
610 }
611 }
612 }
613 Ok(())
614}
615
616fn chunk_range(leaf: &Leaf) -> ChunkRanges {
617 let start = ChunkNum::chunks(leaf.offset);
618 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
619 (start..end).into()
620}
621
622async fn import_bao(
623 entry: BaoFileHandle,
624 size: NonZeroU64,
625 mut stream: mpsc::Receiver<BaoContentItem>,
626 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
627) {
628 let size = size.get();
629 entry
630 .0
631 .state
632 .send_if_modified(|state: &mut BaoFileStorage| {
633 let BaoFileStorage::Partial(entry) = state else {
634 return false;
636 };
637 entry.size.write(0, size);
638 false
639 });
640 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
641 while let Some(item) = stream.recv().await.unwrap() {
642 entry.0.state.send_if_modified(|state| {
643 let BaoFileStorage::Partial(partial) = state else {
644 return false;
646 };
647 match item {
648 BaoContentItem::Parent(parent) => {
649 if let Some(offset) = tree.pre_order_offset(parent.node) {
650 let mut pair = [0u8; 64];
651 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
652 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
653 partial
654 .outboard
655 .write_at(offset * 64, &pair)
656 .expect("writing to mem can never fail");
657 }
658 false
659 }
660 BaoContentItem::Leaf(leaf) => {
661 let start = leaf.offset;
662 partial
663 .data
664 .write_at(start, &leaf.data)
665 .expect("writing to mem can never fail");
666 let added = chunk_range(&leaf);
667 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
668 if update.new_state().complete {
669 let data = std::mem::take(&mut partial.data);
670 let outboard = std::mem::take(&mut partial.outboard);
671 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
672 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
673 *state = CompleteStorage::new(data, outboard).into();
674 }
675 update.changed()
676 }
677 }
678 });
679 }
680 tx.send(Ok(())).await.ok();
681}
682
683#[instrument(skip_all, fields(hash = tracing::field::Empty))]
684async fn export_bao(
685 entry: Option<BaoFileHandle>,
686 ranges: ChunkRanges,
687 mut sender: mpsc::Sender<EncodedItem>,
688) {
689 let Some(entry) = entry else {
690 let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
691 sender.send(err.into()).await.ok();
692 return;
693 };
694 tracing::Span::current().record("hash", tracing::field::display(entry.hash));
695 let data = entry.data_reader();
696 let outboard = entry.outboard_reader();
697 let tx = BaoTreeSender::new(&mut sender);
698 traverse_ranges_validated(data, outboard, &ranges, tx)
699 .await
700 .ok();
701}
702
703#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
704async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
705 entry.subscribe().forward(tx).await.ok();
706}
707
708async fn import_bytes(
709 data: Bytes,
710 scope: Scope,
711 format: BlobFormat,
712 tx: mpsc::Sender<AddProgressItem>,
713) -> anyhow::Result<ImportEntry> {
714 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
715 tx.send(AddProgressItem::CopyDone).await?;
716 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
717 Ok(ImportEntry {
718 data,
719 outboard,
720 scope,
721 format,
722 tx,
723 })
724}
725
726async fn import_byte_stream(
727 scope: Scope,
728 format: BlobFormat,
729 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
730 tx: mpsc::Sender<AddProgressItem>,
731) -> anyhow::Result<ImportEntry> {
732 let mut res = Vec::new();
733 loop {
734 match rx.recv().await {
735 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
736 res.extend_from_slice(&data);
737 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
738 .await?;
739 }
740 Ok(Some(ImportByteStreamUpdate::Done)) => {
741 break;
742 }
743 Ok(None) => {
744 return Err(api::Error::io(
745 io::ErrorKind::UnexpectedEof,
746 "byte stream ended unexpectedly",
747 )
748 .into());
749 }
750 Err(e) => {
751 return Err(e.into());
752 }
753 }
754 }
755 import_bytes(res.into(), scope, format, tx).await
756}
757
758#[instrument(skip_all, fields(path = %cmd.path.display()))]
759async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
760 let ImportPathMsg {
761 inner:
762 ImportPathRequest {
763 path,
764 scope,
765 format,
766 ..
767 },
768 tx,
769 ..
770 } = cmd;
771 let mut res = Vec::new();
772 let mut file = tokio::fs::File::open(path).await?;
773 let mut buf = [0u8; 1024 * 64];
774 loop {
775 let size = file.read(&mut buf).await?;
776 if size == 0 {
777 break;
778 }
779 res.extend_from_slice(&buf[..size]);
780 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
781 .await?;
782 }
783 import_bytes(res.into(), scope, format, tx).await
784}
785
786#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
787async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
788 let ExportPathMsg { inner, mut tx, .. } = cmd;
789 let Some(entry) = entry else {
790 tx.send(ExportProgressItem::Error(api::Error::io(
791 io::ErrorKind::NotFound,
792 "hash not found",
793 )))
794 .await
795 .ok();
796 return;
797 };
798 match export_path_impl(entry, inner, &mut tx).await {
799 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
800 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
801 };
802}
803
804async fn export_path_impl(
805 entry: BaoFileHandle,
806 cmd: ExportPathRequest,
807 tx: &mut mpsc::Sender<ExportProgressItem>,
808) -> io::Result<()> {
809 let ExportPathRequest { target, .. } = cmd;
810 if !target.is_absolute() {
811 return Err(io::Error::new(
812 io::ErrorKind::InvalidInput,
813 "path is not absolute",
814 ));
815 }
816 if let Some(parent) = target.parent() {
817 std::fs::create_dir_all(parent)?;
818 }
819 let mut file = std::fs::File::create(target)?;
821 let size = entry.0.state.borrow().size();
822 tx.send(ExportProgressItem::Size(size)).await?;
823 let mut buf = [0u8; 1024 * 64];
824 for offset in (0..size).step_by(1024 * 64) {
825 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
826 let buf = &mut buf[..len];
827 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
828 file.write_all(buf)?;
829 tx.try_send(ExportProgressItem::CopyProgress(offset))
830 .await
831 .map_err(|_e| io::Error::other(""))?;
832 yield_now().await;
833 }
834 Ok(())
835}
836
837struct ImportEntry {
838 scope: Scope,
839 format: BlobFormat,
840 data: Bytes,
841 outboard: PreOrderMemOutboard,
842 tx: mpsc::Sender<AddProgressItem>,
843}
844
845pub struct DataReader(BaoFileHandle);
846
847impl ReadBytesAt for DataReader {
848 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
849 let entry = self.0 .0.state.borrow();
850 entry.data().read_bytes_at(offset, size)
851 }
852}
853
854pub struct OutboardReader {
855 hash: blake3::Hash,
856 tree: BaoTree,
857 data: BaoFileHandle,
858}
859
860impl Outboard for OutboardReader {
861 fn root(&self) -> blake3::Hash {
862 self.hash
863 }
864
865 fn tree(&self) -> BaoTree {
866 self.tree
867 }
868
869 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
870 let Some(offset) = self.tree.pre_order_offset(node) else {
871 return Ok(None);
872 };
873 let mut buf = [0u8; 64];
874 let size = self
875 .data
876 .0
877 .state
878 .borrow()
879 .outboard()
880 .read_at(offset * 64, &mut buf)?;
881 if size != 64 {
882 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
883 }
884 let left: [u8; 32] = buf[..32].try_into().unwrap();
885 let right: [u8; 32] = buf[32..].try_into().unwrap();
886 Ok(Some((left.into(), right.into())))
887 }
888}
889
890struct State {
891 data: HashMap<Hash, BaoFileHandle>,
892 tags: BTreeMap<Tag, HashAndFormat>,
893 empty_hash: BaoFileHandle,
894}
895
896#[derive(Debug, derive_more::From)]
897pub enum BaoFileStorage {
898 Partial(PartialMemStorage),
899 Complete(CompleteStorage),
900}
901
902impl BaoFileStorage {
903 pub fn bitfield(&self) -> Bitfield {
905 match self {
906 Self::Partial(entry) => entry.bitfield.clone(),
907 Self::Complete(entry) => Bitfield::complete(entry.size()),
908 }
909 }
910}
911
912#[derive(Debug)]
913pub struct BaoFileHandleInner {
914 state: watch::Sender<BaoFileStorage>,
915 hash: Hash,
916}
917
918#[derive(Debug, Clone, derive_more::Deref)]
920pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
921
922impl BaoFileHandle {
923 pub fn new_partial(hash: Hash) -> Self {
924 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
925 data: SparseMemFile::new(),
926 outboard: SparseMemFile::new(),
927 size: SizeInfo::default(),
928 bitfield: Bitfield::empty(),
929 }));
930 Self(Arc::new(BaoFileHandleInner { state, hash }))
931 }
932
933 pub fn hash(&self) -> Hash {
934 self.hash
935 }
936
937 pub fn bitfield(&self) -> Bitfield {
938 self.0.state.borrow().bitfield()
939 }
940
941 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
942 BaoFileStorageSubscriber::new(self.0.state.subscribe())
943 }
944
945 pub fn data_reader(&self) -> DataReader {
946 DataReader(self.clone())
947 }
948
949 pub fn outboard_reader(&self) -> OutboardReader {
950 let entry = self.0.state.borrow();
951 let hash = self.hash.into();
952 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
953 OutboardReader {
954 hash,
955 tree,
956 data: self.clone(),
957 }
958 }
959}
960
961impl Default for BaoFileStorage {
962 fn default() -> Self {
963 Self::Partial(Default::default())
964 }
965}
966
967impl BaoFileStorage {
968 fn data(&self) -> &[u8] {
969 match self {
970 Self::Partial(entry) => entry.data.as_ref(),
971 Self::Complete(entry) => &entry.data,
972 }
973 }
974
975 fn outboard(&self) -> &[u8] {
976 match self {
977 Self::Partial(entry) => entry.outboard.as_ref(),
978 Self::Complete(entry) => &entry.outboard,
979 }
980 }
981
982 fn size(&self) -> u64 {
983 match self {
984 Self::Partial(entry) => entry.current_size(),
985 Self::Complete(entry) => entry.size(),
986 }
987 }
988}
989
990#[derive(Debug, Clone)]
991pub struct CompleteStorage {
992 pub(crate) data: Bytes,
993 pub(crate) outboard: Bytes,
994}
995
996impl CompleteStorage {
997 pub fn create(data: Bytes) -> (Hash, Self) {
998 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
999 let hash = outboard.root().into();
1000 let outboard = outboard.data.into();
1001 let entry = Self::new(data, outboard);
1002 (hash, entry)
1003 }
1004
1005 pub fn new(data: Bytes, outboard: Bytes) -> Self {
1006 Self { data, outboard }
1007 }
1008
1009 pub fn size(&self) -> u64 {
1010 self.data.len() as u64
1011 }
1012}
1013
1014#[allow(dead_code)]
1015fn print_outboard(hashes: &[u8]) {
1016 assert!(hashes.len() % 64 == 0);
1017 for chunk in hashes.chunks(64) {
1018 let left: [u8; 32] = chunk[..32].try_into().unwrap();
1019 let right: [u8; 32] = chunk[32..].try_into().unwrap();
1020 let left = blake3::Hash::from(left);
1021 let right = blake3::Hash::from(right);
1022 println!("l: {left:?}, r: {right:?}");
1023 }
1024}
1025
1026pub struct BaoFileStorageSubscriber {
1027 receiver: watch::Receiver<BaoFileStorage>,
1028}
1029
1030impl BaoFileStorageSubscriber {
1031 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1032 Self { receiver }
1033 }
1034
1035 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1039 let value = self.receiver.borrow().bitfield();
1040 tx.send(value).await?;
1041 loop {
1042 self.update_or_closed(&mut tx).await?;
1043 let value = self.receiver.borrow().bitfield();
1044 tx.send(value.clone()).await?;
1045 }
1046 }
1047
1048 #[allow(dead_code)]
1052 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1053 let value = self.receiver.borrow().bitfield();
1054 let mut old = value.clone();
1055 tx.send(value).await?;
1056 loop {
1057 self.update_or_closed(&mut tx).await?;
1058 let new = self.receiver.borrow().bitfield();
1059 let diff = old.diff(&new);
1060 if diff.is_empty() {
1061 continue;
1062 }
1063 tx.send(diff).await?;
1064 old = new;
1065 }
1066 }
1067
1068 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1069 tokio::select! {
1070 _ = tx.closed() => {
1071 Err(irpc::channel::SendError::ReceiverClosed.into())
1073 }
1074 e = self.receiver.changed() => Ok(e?),
1075 }
1076 }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use n0_future::StreamExt;
1082 use testresult::TestResult;
1083
1084 use super::*;
1085
1086 #[tokio::test]
1087 async fn smoke() -> TestResult<()> {
1088 let store = MemStore::new();
1089 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1090 let hash = tt.hash();
1091 println!("hash: {hash:?}");
1092 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1093 while let Some(item) = stream.next().await {
1094 println!("item: {item:?}");
1095 }
1096 let stream = store.export_bao(hash, ChunkRanges::all());
1097 let exported = stream.bao_to_vec().await?;
1098
1099 let store2 = MemStore::new();
1100 let mut or = store2.observe(hash).stream().await?;
1101 tokio::spawn(async move {
1102 while let Some(event) = or.next().await {
1103 println!("event: {event:?}");
1104 }
1105 });
1106 store2
1107 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1108 .await?;
1109
1110 let exported2 = store2
1111 .export_bao(hash, ChunkRanges::all())
1112 .bao_to_vec()
1113 .await?;
1114 assert_eq!(exported, exported2);
1115
1116 Ok(())
1117 }
1118}