1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17 time::SystemTime,
18};
19
20use bao_tree::{
21 blake3,
22 io::{
23 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24 outboard::PreOrderMemOutboard,
25 sync::{Outboard, ReadAt, WriteAt},
26 BaoContentItem, EncodeError, Leaf,
27 },
28 BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35 io::AsyncReadExt,
36 sync::watch,
37 task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43 api::{
44 self,
45 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46 proto::{
47 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54 SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
55 },
56 tags::TagInfo,
57 ApiClient,
58 },
59 protocol::ChunkRangesExt,
60 store::{
61 util::{SizeInfo, SparseMemFile, Tag},
62 IROH_BLOCK_SIZE,
63 },
64 util::temp_tag::{TagDrop, TempTagScope, TempTags},
65 BlobFormat, Hash, HashAndFormat,
66};
67
68#[derive(Debug, Default)]
69pub struct Options {}
70
71#[derive(Debug, Clone)]
72#[repr(transparent)]
73pub struct MemStore {
74 client: ApiClient,
75}
76
77impl AsRef<crate::api::Store> for MemStore {
78 fn as_ref(&self) -> &crate::api::Store {
79 crate::api::Store::ref_from_sender(&self.client)
80 }
81}
82
83impl Deref for MemStore {
84 type Target = crate::api::Store;
85
86 fn deref(&self) -> &Self::Target {
87 crate::api::Store::ref_from_sender(&self.client)
88 }
89}
90
91impl Default for MemStore {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97#[derive(derive_more::From)]
98enum TaskResult {
99 Unit(()),
100 Import(anyhow::Result<ImportEntry>),
101 Scope(Scope),
102}
103
104impl MemStore {
105 pub fn from_sender(client: ApiClient) -> Self {
106 Self { client }
107 }
108
109 pub fn new() -> Self {
110 let (sender, receiver) = tokio::sync::mpsc::channel(32);
111 tokio::spawn(
112 Actor {
113 commands: receiver,
114 tasks: JoinSet::new(),
115 state: State {
116 data: HashMap::new(),
117 tags: BTreeMap::new(),
118 empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
119 },
120 options: Arc::new(Options::default()),
121 temp_tags: Default::default(),
122 protected: Default::default(),
123 idle_waiters: Default::default(),
124 }
125 .run(),
126 );
127 Self::from_sender(sender.into())
128 }
129}
130
131struct Actor {
132 commands: tokio::sync::mpsc::Receiver<Command>,
133 tasks: JoinSet<TaskResult>,
134 state: State,
135 #[allow(dead_code)]
136 options: Arc<Options>,
137 temp_tags: TempTags,
139 idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
141 protected: HashSet<Hash>,
142}
143
144impl Actor {
145 fn spawn<F, T>(&mut self, f: F)
146 where
147 F: Future<Output = T> + Send + 'static,
148 T: Into<TaskResult>,
149 {
150 let span = tracing::Span::current();
151 let fut = async move { f.await.into() }.instrument(span);
152 self.tasks.spawn(fut);
153 }
154
155 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
156 match cmd {
157 Command::ImportBao(ImportBaoMsg {
158 inner: ImportBaoRequest { hash, size },
159 rx: data,
160 tx,
161 ..
162 }) => {
163 let entry = self.get_or_create_entry(hash);
164 self.spawn(import_bao(entry, size, data, tx));
165 }
166 Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
167 trace!("wait idle");
168 if self.tasks.is_empty() {
169 tx.send(()).await.ok();
171 } else {
172 self.idle_waiters.push(tx);
174 }
175 }
176 Command::Observe(ObserveMsg {
177 inner: ObserveRequest { hash },
178 tx,
179 ..
180 }) => {
181 let entry = self.get_or_create_entry(hash);
182 self.spawn(observe(entry, tx));
183 }
184 Command::ImportBytes(ImportBytesMsg {
185 inner:
186 ImportBytesRequest {
187 data,
188 scope,
189 format,
190 ..
191 },
192 tx,
193 ..
194 }) => {
195 self.spawn(import_bytes(data, scope, format, tx));
196 }
197 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
198 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
199 }
200 Command::ImportPath(cmd) => {
201 self.spawn(import_path(cmd));
202 }
203 Command::ExportBao(ExportBaoMsg {
204 inner: ExportBaoRequest { hash, ranges },
205 tx,
206 ..
207 }) => {
208 let entry = self.get(&hash);
209 self.spawn(export_bao(entry, ranges, tx))
210 }
211 Command::ExportPath(cmd) => {
212 let entry = self.get(&cmd.hash);
213 self.spawn(export_path(entry, cmd));
214 }
215 Command::DeleteTags(cmd) => {
216 let DeleteTagsMsg {
217 inner: DeleteTagsRequest { from, to },
218 tx,
219 ..
220 } = cmd;
221 info!("deleting tags from {:?} to {:?}", from, to);
222 self.state.tags.retain(|tag, _| {
225 if let Some(from) = &from {
226 if tag < from {
227 return true;
228 }
229 }
230 if let Some(to) = &to {
231 if tag >= to {
232 return true;
233 }
234 }
235 info!(" removing {:?}", tag);
236 false
237 });
238 tx.send(Ok(())).await.ok();
239 }
240 Command::RenameTag(cmd) => {
241 let RenameTagMsg {
242 inner: RenameTagRequest { from, to },
243 tx,
244 ..
245 } = cmd;
246 let tags = &mut self.state.tags;
247 let value = match tags.remove(&from) {
248 Some(value) => value,
249 None => {
250 tx.send(Err(api::Error::io(
251 io::ErrorKind::NotFound,
252 format!("tag not found: {from:?}"),
253 )))
254 .await
255 .ok();
256 return None;
257 }
258 };
259 tags.insert(to, value);
260 tx.send(Ok(())).await.ok();
261 return None;
262 }
263 Command::ListTags(cmd) => {
264 let ListTagsMsg {
265 inner:
266 ListTagsRequest {
267 from,
268 to,
269 raw,
270 hash_seq,
271 },
272 tx,
273 ..
274 } = cmd;
275 let tags = self
276 .state
277 .tags
278 .iter()
279 .filter(move |(tag, value)| {
280 if let Some(from) = &from {
281 if tag < &from {
282 return false;
283 }
284 }
285 if let Some(to) = &to {
286 if tag >= &to {
287 return false;
288 }
289 }
290 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
291 })
292 .map(|(tag, value)| TagInfo {
293 name: tag.clone(),
294 hash: value.hash,
295 format: value.format,
296 })
297 .map(Ok);
298 tx.send(tags.collect()).await.ok();
299 }
300 Command::SetTag(SetTagMsg {
301 inner: SetTagRequest { name: tag, value },
302 tx,
303 ..
304 }) => {
305 self.state.tags.insert(tag, value);
306 tx.send(Ok(())).await.ok();
307 }
308 Command::CreateTag(CreateTagMsg {
309 inner: CreateTagRequest { value },
310 tx,
311 ..
312 }) => {
313 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
314 self.state.tags.insert(tag.clone(), value);
315 tx.send(Ok(tag)).await.ok();
316 }
317 Command::CreateTempTag(cmd) => {
318 trace!("{cmd:?}");
319 self.create_temp_tag(cmd).await;
320 }
321 Command::ListTempTags(cmd) => {
322 trace!("{cmd:?}");
323 let tts = self.temp_tags.list();
324 cmd.tx.send(tts).await.ok();
325 }
326 Command::ListBlobs(cmd) => {
327 let ListBlobsMsg { tx, .. } = cmd;
328 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
329 self.spawn(async move {
330 for blob in blobs {
331 if tx.send(Ok(blob)).await.is_err() {
332 break;
333 }
334 }
335 });
336 }
337 Command::BlobStatus(cmd) => {
338 trace!("{cmd:?}");
339 let BlobStatusMsg {
340 inner: BlobStatusRequest { hash },
341 tx,
342 ..
343 } = cmd;
344 let res = match self.get(&hash) {
345 None => api::blobs::BlobStatus::NotFound,
346 Some(x) => {
347 let bitfield = x.0.state.borrow().bitfield();
348 if bitfield.is_complete() {
349 BlobStatus::Complete {
350 size: bitfield.size,
351 }
352 } else {
353 BlobStatus::Partial {
354 size: bitfield.validated_size(),
355 }
356 }
357 }
358 };
359 tx.send(res).await.ok();
360 }
361 Command::DeleteBlobs(cmd) => {
362 trace!("{cmd:?}");
363 let DeleteBlobsMsg {
364 inner: BlobDeleteRequest { hashes, force },
365 tx,
366 ..
367 } = cmd;
368 for hash in hashes {
369 if !force && self.protected.contains(&hash) {
370 continue;
371 }
372 self.state.data.remove(&hash);
373 }
374 tx.send(Ok(())).await.ok();
375 }
376 Command::Batch(cmd) => {
377 trace!("{cmd:?}");
378 let (id, scope) = self.temp_tags.create_scope();
379 self.spawn(handle_batch(cmd, id, scope));
380 }
381 Command::ClearProtected(cmd) => {
382 self.protected.clear();
383 cmd.tx.send(Ok(())).await.ok();
384 }
385 Command::ExportRanges(cmd) => {
386 let entry = self.get(&cmd.hash);
387 self.spawn(export_ranges(cmd, entry));
388 }
389 Command::SyncDb(SyncDbMsg { tx, .. }) => {
390 tx.send(Ok(())).await.ok();
391 }
392 Command::Shutdown(cmd) => {
393 return Some(cmd);
394 }
395 }
396 None
397 }
398
399 fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
400 if *hash == Hash::EMPTY {
401 Some(self.state.empty_hash.clone())
402 } else {
403 self.state.data.get(hash).cloned()
404 }
405 }
406
407 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
408 if hash == Hash::EMPTY {
409 self.state.empty_hash.clone()
410 } else {
411 self.state
412 .data
413 .entry(hash)
414 .or_insert_with(|| BaoFileHandle::new_partial(hash))
415 .clone()
416 }
417 }
418
419 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
420 let CreateTempTagMsg { tx, inner, .. } = cmd;
421 let mut tt = self.temp_tags.create(inner.scope, inner.value);
422 if tx.is_rpc() {
423 tt.leak();
424 }
425 tx.send(tt).await.ok();
426 }
427
428 async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
429 let import_data = match res {
430 Ok(entry) => entry,
431 Err(e) => {
432 error!("import failed: {e}");
433 return;
434 }
435 };
436 let hash = import_data.outboard.root().into();
437 let entry = self.get_or_create_entry(hash);
438 entry
439 .0
440 .state
441 .send_if_modified(|state: &mut BaoFileStorage| {
442 let BaoFileStorage::Partial(_) = state.deref() else {
443 return false;
444 };
445 *state =
446 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
447 true
448 });
449 let tt = self.temp_tags.create(
450 import_data.scope,
451 HashAndFormat {
452 hash,
453 format: import_data.format,
454 },
455 );
456 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
457 }
458
459 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
460 match res {
461 Ok(x) => Some(x),
462 Err(e) => {
463 if e.is_cancelled() {
464 trace!("task cancelled: {e}");
465 } else {
466 error!("task failed: {e}");
467 }
468 None
469 }
470 }
471 }
472
473 pub async fn run(mut self) {
474 let shutdown = loop {
475 tokio::select! {
476 cmd = self.commands.recv() => {
477 let Some(cmd) = cmd else {
478 break None;
481 };
482 if let Some(cmd) = self.handle_command(cmd).await {
483 break Some(cmd);
484 }
485 }
486 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
487 let Some(res) = self.log_task_result(res) else {
488 continue;
489 };
490 match res {
491 TaskResult::Import(res) => {
492 self.finish_import(res).await;
493 }
494 TaskResult::Scope(scope) => {
495 self.temp_tags.end_scope(scope);
496 }
497 TaskResult::Unit(_) => {}
498 }
499 if self.tasks.is_empty() {
500 for tx in self.idle_waiters.drain(..) {
502 tx.send(()).await.ok();
503 }
504 }
505 }
506 }
507 };
508 if let Some(shutdown) = shutdown {
509 shutdown.tx.send(()).await.ok();
510 }
511 }
512}
513
514async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
515 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
516 error!("batch failed: {cause}");
517 }
518 id
519}
520
521async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
522 let BatchMsg { tx, mut rx, .. } = cmd;
523 trace!("created scope {}", id);
524 tx.send(id).await.map_err(api::Error::other)?;
525 while let Some(msg) = rx.recv().await? {
526 match msg {
527 BatchResponse::Drop(msg) => scope.on_drop(&msg),
528 BatchResponse::Ping => {}
529 }
530 }
531 Ok(())
532}
533
534async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
535 let Some(entry) = entry else {
536 let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
537 cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
538 return;
539 };
540 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
541 cmd.tx
542 .send(ExportRangesItem::Error(cause.into()))
543 .await
544 .ok();
545 }
546}
547
548async fn export_ranges_impl(
549 cmd: ExportRangesRequest,
550 tx: &mut mpsc::Sender<ExportRangesItem>,
551 entry: BaoFileHandle,
552) -> io::Result<()> {
553 let ExportRangesRequest { ranges, hash } = cmd;
554 let bitfield = entry.bitfield();
555 trace!(
556 "exporting ranges: {hash} {ranges:?} size={}",
557 bitfield.size()
558 );
559 debug_assert!(entry.hash() == hash, "hash mismatch");
560 let data = entry.data_reader();
561 let size = bitfield.size();
562 for range in ranges.iter() {
563 let range = match range {
564 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
565 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
566 };
567 let requested = ChunkRanges::bytes(range.start..range.end);
568 if !bitfield.ranges.is_superset(&requested) {
569 return Err(io::Error::other(format!(
570 "missing range: {requested:?}, present: {bitfield:?}",
571 )));
572 }
573 let bs = 1024;
574 let mut offset = range.start;
575 loop {
576 let end: u64 = (offset + bs).min(range.end);
577 let size = (end - offset) as usize;
578 tx.send(
579 Leaf {
580 offset,
581 data: data.read_bytes_at(offset, size)?,
582 }
583 .into(),
584 )
585 .await?;
586 offset = end;
587 if offset >= range.end {
588 break;
589 }
590 }
591 }
592 Ok(())
593}
594
595fn chunk_range(leaf: &Leaf) -> ChunkRanges {
596 let start = ChunkNum::chunks(leaf.offset);
597 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
598 (start..end).into()
599}
600
601async fn import_bao(
602 entry: BaoFileHandle,
603 size: NonZeroU64,
604 mut stream: mpsc::Receiver<BaoContentItem>,
605 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
606) {
607 let size = size.get();
608 entry
609 .0
610 .state
611 .send_if_modified(|state: &mut BaoFileStorage| {
612 let BaoFileStorage::Partial(entry) = state else {
613 return false;
615 };
616 entry.size.write(0, size);
617 false
618 });
619 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
620 while let Some(item) = stream.recv().await.unwrap() {
621 entry.0.state.send_if_modified(|state| {
622 let BaoFileStorage::Partial(partial) = state else {
623 return false;
625 };
626 match item {
627 BaoContentItem::Parent(parent) => {
628 if let Some(offset) = tree.pre_order_offset(parent.node) {
629 let mut pair = [0u8; 64];
630 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
631 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
632 partial
633 .outboard
634 .write_at(offset * 64, &pair)
635 .expect("writing to mem can never fail");
636 }
637 false
638 }
639 BaoContentItem::Leaf(leaf) => {
640 let start = leaf.offset;
641 partial
642 .data
643 .write_at(start, &leaf.data)
644 .expect("writing to mem can never fail");
645 let added = chunk_range(&leaf);
646 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
647 if update.new_state().complete {
648 let data = std::mem::take(&mut partial.data);
649 let outboard = std::mem::take(&mut partial.outboard);
650 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
651 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
652 *state = CompleteStorage::new(data, outboard).into();
653 }
654 update.changed()
655 }
656 }
657 });
658 }
659 tx.send(Ok(())).await.ok();
660}
661
662#[instrument(skip_all, fields(hash = tracing::field::Empty))]
663async fn export_bao(
664 entry: Option<BaoFileHandle>,
665 ranges: ChunkRanges,
666 mut sender: mpsc::Sender<EncodedItem>,
667) {
668 let Some(entry) = entry else {
669 let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
670 sender.send(err.into()).await.ok();
671 return;
672 };
673 tracing::Span::current().record("hash", tracing::field::display(entry.hash));
674 let data = entry.data_reader();
675 let outboard = entry.outboard_reader();
676 let tx = BaoTreeSender::new(&mut sender);
677 traverse_ranges_validated(data, outboard, &ranges, tx)
678 .await
679 .ok();
680}
681
682#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
683async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
684 entry.subscribe().forward(tx).await.ok();
685}
686
687async fn import_bytes(
688 data: Bytes,
689 scope: Scope,
690 format: BlobFormat,
691 tx: mpsc::Sender<AddProgressItem>,
692) -> anyhow::Result<ImportEntry> {
693 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
694 tx.send(AddProgressItem::CopyDone).await?;
695 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
696 Ok(ImportEntry {
697 data,
698 outboard,
699 scope,
700 format,
701 tx,
702 })
703}
704
705async fn import_byte_stream(
706 scope: Scope,
707 format: BlobFormat,
708 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
709 tx: mpsc::Sender<AddProgressItem>,
710) -> anyhow::Result<ImportEntry> {
711 let mut res = Vec::new();
712 loop {
713 match rx.recv().await {
714 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
715 res.extend_from_slice(&data);
716 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
717 .await?;
718 }
719 Ok(Some(ImportByteStreamUpdate::Done)) => {
720 break;
721 }
722 Ok(None) => {
723 return Err(api::Error::io(
724 io::ErrorKind::UnexpectedEof,
725 "byte stream ended unexpectedly",
726 )
727 .into());
728 }
729 Err(e) => {
730 return Err(e.into());
731 }
732 }
733 }
734 import_bytes(res.into(), scope, format, tx).await
735}
736
737#[instrument(skip_all, fields(path = %cmd.path.display()))]
738async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
739 let ImportPathMsg {
740 inner:
741 ImportPathRequest {
742 path,
743 scope,
744 format,
745 ..
746 },
747 tx,
748 ..
749 } = cmd;
750 let mut res = Vec::new();
751 let mut file = tokio::fs::File::open(path).await?;
752 let mut buf = [0u8; 1024 * 64];
753 loop {
754 let size = file.read(&mut buf).await?;
755 if size == 0 {
756 break;
757 }
758 res.extend_from_slice(&buf[..size]);
759 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
760 .await?;
761 }
762 import_bytes(res.into(), scope, format, tx).await
763}
764
765#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
766async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
767 let ExportPathMsg { inner, mut tx, .. } = cmd;
768 let Some(entry) = entry else {
769 tx.send(ExportProgressItem::Error(api::Error::io(
770 io::ErrorKind::NotFound,
771 "hash not found",
772 )))
773 .await
774 .ok();
775 return;
776 };
777 match export_path_impl(entry, inner, &mut tx).await {
778 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
779 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
780 };
781}
782
783async fn export_path_impl(
784 entry: BaoFileHandle,
785 cmd: ExportPathRequest,
786 tx: &mut mpsc::Sender<ExportProgressItem>,
787) -> io::Result<()> {
788 let ExportPathRequest { target, .. } = cmd;
789 if !target.is_absolute() {
790 return Err(io::Error::new(
791 io::ErrorKind::InvalidInput,
792 "path is not absolute",
793 ));
794 }
795 if let Some(parent) = target.parent() {
796 std::fs::create_dir_all(parent)?;
797 }
798 let mut file = std::fs::File::create(target)?;
800 let size = entry.0.state.borrow().size();
801 tx.send(ExportProgressItem::Size(size)).await?;
802 let mut buf = [0u8; 1024 * 64];
803 for offset in (0..size).step_by(1024 * 64) {
804 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
805 let buf = &mut buf[..len];
806 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
807 file.write_all(buf)?;
808 tx.try_send(ExportProgressItem::CopyProgress(offset))
809 .await
810 .map_err(|_e| io::Error::other(""))?;
811 yield_now().await;
812 }
813 Ok(())
814}
815
816struct ImportEntry {
817 scope: Scope,
818 format: BlobFormat,
819 data: Bytes,
820 outboard: PreOrderMemOutboard,
821 tx: mpsc::Sender<AddProgressItem>,
822}
823
824pub struct DataReader(BaoFileHandle);
825
826impl ReadBytesAt for DataReader {
827 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
828 let entry = self.0 .0.state.borrow();
829 entry.data().read_bytes_at(offset, size)
830 }
831}
832
833pub struct OutboardReader {
834 hash: blake3::Hash,
835 tree: BaoTree,
836 data: BaoFileHandle,
837}
838
839impl Outboard for OutboardReader {
840 fn root(&self) -> blake3::Hash {
841 self.hash
842 }
843
844 fn tree(&self) -> BaoTree {
845 self.tree
846 }
847
848 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
849 let Some(offset) = self.tree.pre_order_offset(node) else {
850 return Ok(None);
851 };
852 let mut buf = [0u8; 64];
853 let size = self
854 .data
855 .0
856 .state
857 .borrow()
858 .outboard()
859 .read_at(offset * 64, &mut buf)?;
860 if size != 64 {
861 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
862 }
863 let left: [u8; 32] = buf[..32].try_into().unwrap();
864 let right: [u8; 32] = buf[32..].try_into().unwrap();
865 Ok(Some((left.into(), right.into())))
866 }
867}
868
869struct State {
870 data: HashMap<Hash, BaoFileHandle>,
871 tags: BTreeMap<Tag, HashAndFormat>,
872 empty_hash: BaoFileHandle,
873}
874
875#[derive(Debug, derive_more::From)]
876pub enum BaoFileStorage {
877 Partial(PartialMemStorage),
878 Complete(CompleteStorage),
879}
880
881impl BaoFileStorage {
882 pub fn bitfield(&self) -> Bitfield {
884 match self {
885 Self::Partial(entry) => entry.bitfield.clone(),
886 Self::Complete(entry) => Bitfield::complete(entry.size()),
887 }
888 }
889}
890
891#[derive(Debug)]
892pub struct BaoFileHandleInner {
893 state: watch::Sender<BaoFileStorage>,
894 hash: Hash,
895}
896
897#[derive(Debug, Clone, derive_more::Deref)]
899pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
900
901impl BaoFileHandle {
902 pub fn new_partial(hash: Hash) -> Self {
903 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
904 data: SparseMemFile::new(),
905 outboard: SparseMemFile::new(),
906 size: SizeInfo::default(),
907 bitfield: Bitfield::empty(),
908 }));
909 Self(Arc::new(BaoFileHandleInner { state, hash }))
910 }
911
912 pub fn hash(&self) -> Hash {
913 self.hash
914 }
915
916 pub fn bitfield(&self) -> Bitfield {
917 self.0.state.borrow().bitfield()
918 }
919
920 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
921 BaoFileStorageSubscriber::new(self.0.state.subscribe())
922 }
923
924 pub fn data_reader(&self) -> DataReader {
925 DataReader(self.clone())
926 }
927
928 pub fn outboard_reader(&self) -> OutboardReader {
929 let entry = self.0.state.borrow();
930 let hash = self.hash.into();
931 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
932 OutboardReader {
933 hash,
934 tree,
935 data: self.clone(),
936 }
937 }
938}
939
940impl Default for BaoFileStorage {
941 fn default() -> Self {
942 Self::Partial(Default::default())
943 }
944}
945
946impl BaoFileStorage {
947 fn data(&self) -> &[u8] {
948 match self {
949 Self::Partial(entry) => entry.data.as_ref(),
950 Self::Complete(entry) => &entry.data,
951 }
952 }
953
954 fn outboard(&self) -> &[u8] {
955 match self {
956 Self::Partial(entry) => entry.outboard.as_ref(),
957 Self::Complete(entry) => &entry.outboard,
958 }
959 }
960
961 fn size(&self) -> u64 {
962 match self {
963 Self::Partial(entry) => entry.current_size(),
964 Self::Complete(entry) => entry.size(),
965 }
966 }
967}
968
969#[derive(Debug, Clone)]
970pub struct CompleteStorage {
971 pub(crate) data: Bytes,
972 pub(crate) outboard: Bytes,
973}
974
975impl CompleteStorage {
976 pub fn create(data: Bytes) -> (Hash, Self) {
977 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
978 let hash = outboard.root().into();
979 let outboard = outboard.data.into();
980 let entry = Self::new(data, outboard);
981 (hash, entry)
982 }
983
984 pub fn new(data: Bytes, outboard: Bytes) -> Self {
985 Self { data, outboard }
986 }
987
988 pub fn size(&self) -> u64 {
989 self.data.len() as u64
990 }
991}
992
993#[allow(dead_code)]
994fn print_outboard(hashes: &[u8]) {
995 assert!(hashes.len() % 64 == 0);
996 for chunk in hashes.chunks(64) {
997 let left: [u8; 32] = chunk[..32].try_into().unwrap();
998 let right: [u8; 32] = chunk[32..].try_into().unwrap();
999 let left = blake3::Hash::from(left);
1000 let right = blake3::Hash::from(right);
1001 println!("l: {left:?}, r: {right:?}");
1002 }
1003}
1004
1005pub struct BaoFileStorageSubscriber {
1006 receiver: watch::Receiver<BaoFileStorage>,
1007}
1008
1009impl BaoFileStorageSubscriber {
1010 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
1011 Self { receiver }
1012 }
1013
1014 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1018 let value = self.receiver.borrow().bitfield();
1019 tx.send(value).await?;
1020 loop {
1021 self.update_or_closed(&mut tx).await?;
1022 let value = self.receiver.borrow().bitfield();
1023 tx.send(value.clone()).await?;
1024 }
1025 }
1026
1027 #[allow(dead_code)]
1031 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1032 let value = self.receiver.borrow().bitfield();
1033 let mut old = value.clone();
1034 tx.send(value).await?;
1035 loop {
1036 self.update_or_closed(&mut tx).await?;
1037 let new = self.receiver.borrow().bitfield();
1038 let diff = old.diff(&new);
1039 if diff.is_empty() {
1040 continue;
1041 }
1042 tx.send(diff).await?;
1043 old = new;
1044 }
1045 }
1046
1047 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1048 tokio::select! {
1049 _ = tx.closed() => {
1050 Err(irpc::channel::SendError::ReceiverClosed.into())
1052 }
1053 e = self.receiver.changed() => Ok(e?),
1054 }
1055 }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use n0_future::StreamExt;
1061 use testresult::TestResult;
1062
1063 use super::*;
1064
1065 #[tokio::test]
1066 async fn smoke() -> TestResult<()> {
1067 let store = MemStore::new();
1068 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1069 let hash = *tt.hash();
1070 println!("hash: {hash:?}");
1071 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1072 while let Some(item) = stream.next().await {
1073 println!("item: {item:?}");
1074 }
1075 let stream = store.export_bao(hash, ChunkRanges::all());
1076 let exported = stream.bao_to_vec().await?;
1077
1078 let store2 = MemStore::new();
1079 let mut or = store2.observe(hash).stream().await?;
1080 tokio::spawn(async move {
1081 while let Some(event) = or.next().await {
1082 println!("event: {event:?}");
1083 }
1084 });
1085 store2
1086 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1087 .await?;
1088
1089 let exported2 = store2
1090 .export_bao(hash, ChunkRanges::all())
1091 .bao_to_vec()
1092 .await?;
1093 assert_eq!(exported, exported2);
1094
1095 Ok(())
1096 }
1097}