1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17 time::SystemTime,
18};
19
20use bao_tree::{
21 blake3,
22 io::{
23 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24 outboard::PreOrderMemOutboard,
25 sync::{Outboard, ReadAt, WriteAt},
26 BaoContentItem, EncodeError, Leaf,
27 },
28 BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35 io::AsyncReadExt,
36 sync::watch,
37 task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43 api::{
44 self,
45 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46 proto::{
47 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54 SetTagRequest, ShutdownMsg, SyncDbMsg,
55 },
56 tags::TagInfo,
57 ApiClient,
58 },
59 store::{
60 util::{SizeInfo, SparseMemFile, Tag},
61 HashAndFormat, IROH_BLOCK_SIZE,
62 },
63 util::{
64 temp_tag::{TagDrop, TempTagScope, TempTags},
65 ChunkRangesExt,
66 },
67 BlobFormat, Hash,
68};
69
70#[derive(Debug, Default)]
71pub struct Options {}
72
73#[derive(Debug, Clone)]
74#[repr(transparent)]
75pub struct MemStore {
76 client: ApiClient,
77}
78
79impl AsRef<crate::api::Store> for MemStore {
80 fn as_ref(&self) -> &crate::api::Store {
81 crate::api::Store::ref_from_sender(&self.client)
82 }
83}
84
85impl Deref for MemStore {
86 type Target = crate::api::Store;
87
88 fn deref(&self) -> &Self::Target {
89 crate::api::Store::ref_from_sender(&self.client)
90 }
91}
92
93impl Default for MemStore {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99#[derive(derive_more::From)]
100enum TaskResult {
101 Unit(()),
102 Import(anyhow::Result<ImportEntry>),
103 Scope(Scope),
104}
105
106impl MemStore {
107 pub fn from_sender(client: ApiClient) -> Self {
108 Self { client }
109 }
110
111 pub fn new() -> Self {
112 let (sender, receiver) = tokio::sync::mpsc::channel(32);
113 tokio::spawn(
114 Actor {
115 commands: receiver,
116 tasks: JoinSet::new(),
117 state: State {
118 data: HashMap::new(),
119 tags: BTreeMap::new(),
120 empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
121 },
122 options: Arc::new(Options::default()),
123 temp_tags: Default::default(),
124 protected: Default::default(),
125 }
126 .run(),
127 );
128 Self::from_sender(sender.into())
129 }
130}
131
132struct Actor {
133 commands: tokio::sync::mpsc::Receiver<Command>,
134 tasks: JoinSet<TaskResult>,
135 state: State,
136 #[allow(dead_code)]
137 options: Arc<Options>,
138 temp_tags: TempTags,
140 protected: HashSet<Hash>,
141}
142
143impl Actor {
144 fn spawn<F, T>(&mut self, f: F)
145 where
146 F: Future<Output = T> + Send + 'static,
147 T: Into<TaskResult>,
148 {
149 let span = tracing::Span::current();
150 let fut = async move { f.await.into() }.instrument(span);
151 self.tasks.spawn(fut);
152 }
153
154 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
155 match cmd {
156 Command::ImportBao(ImportBaoMsg {
157 inner: ImportBaoRequest { hash, size },
158 rx: data,
159 tx,
160 ..
161 }) => {
162 let entry = self.get_or_create_entry(hash);
163 self.spawn(import_bao(entry, size, data, tx));
164 }
165 Command::Observe(ObserveMsg {
166 inner: ObserveRequest { hash },
167 tx,
168 ..
169 }) => {
170 let entry = self.get_or_create_entry(hash);
171 self.spawn(observe(entry, tx));
172 }
173 Command::ImportBytes(ImportBytesMsg {
174 inner:
175 ImportBytesRequest {
176 data,
177 scope,
178 format,
179 ..
180 },
181 tx,
182 ..
183 }) => {
184 self.spawn(import_bytes(data, scope, format, tx));
185 }
186 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
187 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
188 }
189 Command::ImportPath(cmd) => {
190 self.spawn(import_path(cmd));
191 }
192 Command::ExportBao(ExportBaoMsg {
193 inner: ExportBaoRequest { hash, ranges },
194 tx,
195 ..
196 }) => {
197 let entry = self.get(&hash);
198 self.spawn(export_bao(entry, ranges, tx))
199 }
200 Command::ExportPath(cmd) => {
201 let entry = self.get(&cmd.hash);
202 self.spawn(export_path(entry, cmd));
203 }
204 Command::DeleteTags(cmd) => {
205 let DeleteTagsMsg {
206 inner: DeleteTagsRequest { from, to },
207 tx,
208 ..
209 } = cmd;
210 info!("deleting tags from {:?} to {:?}", from, to);
211 self.state.tags.retain(|tag, _| {
214 if let Some(from) = &from {
215 if tag < from {
216 return true;
217 }
218 }
219 if let Some(to) = &to {
220 if tag >= to {
221 return true;
222 }
223 }
224 info!(" removing {:?}", tag);
225 false
226 });
227 tx.send(Ok(())).await.ok();
228 }
229 Command::RenameTag(cmd) => {
230 let RenameTagMsg {
231 inner: RenameTagRequest { from, to },
232 tx,
233 ..
234 } = cmd;
235 let tags = &mut self.state.tags;
236 let value = match tags.remove(&from) {
237 Some(value) => value,
238 None => {
239 tx.send(Err(api::Error::io(
240 io::ErrorKind::NotFound,
241 format!("tag not found: {from:?}"),
242 )))
243 .await
244 .ok();
245 return None;
246 }
247 };
248 tags.insert(to, value);
249 tx.send(Ok(())).await.ok();
250 return None;
251 }
252 Command::ListTags(cmd) => {
253 let ListTagsMsg {
254 inner:
255 ListTagsRequest {
256 from,
257 to,
258 raw,
259 hash_seq,
260 },
261 tx,
262 ..
263 } = cmd;
264 let tags = self
265 .state
266 .tags
267 .iter()
268 .filter(move |(tag, value)| {
269 if let Some(from) = &from {
270 if tag < &from {
271 return false;
272 }
273 }
274 if let Some(to) = &to {
275 if tag >= &to {
276 return false;
277 }
278 }
279 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
280 })
281 .map(|(tag, value)| TagInfo {
282 name: tag.clone(),
283 hash: value.hash,
284 format: value.format,
285 })
286 .map(Ok);
287 tx.send(tags.collect()).await.ok();
288 }
289 Command::SetTag(SetTagMsg {
290 inner: SetTagRequest { name: tag, value },
291 tx,
292 ..
293 }) => {
294 self.state.tags.insert(tag, value);
295 tx.send(Ok(())).await.ok();
296 }
297 Command::CreateTag(CreateTagMsg {
298 inner: CreateTagRequest { value },
299 tx,
300 ..
301 }) => {
302 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
303 self.state.tags.insert(tag.clone(), value);
304 tx.send(Ok(tag)).await.ok();
305 }
306 Command::CreateTempTag(cmd) => {
307 trace!("{cmd:?}");
308 self.create_temp_tag(cmd).await;
309 }
310 Command::ListTempTags(cmd) => {
311 trace!("{cmd:?}");
312 let tts = self.temp_tags.list();
313 cmd.tx.send(tts).await.ok();
314 }
315 Command::ListBlobs(cmd) => {
316 let ListBlobsMsg { tx, .. } = cmd;
317 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
318 self.spawn(async move {
319 for blob in blobs {
320 if tx.send(Ok(blob)).await.is_err() {
321 break;
322 }
323 }
324 });
325 }
326 Command::BlobStatus(cmd) => {
327 trace!("{cmd:?}");
328 let BlobStatusMsg {
329 inner: BlobStatusRequest { hash },
330 tx,
331 ..
332 } = cmd;
333 let res = match self.get(&hash) {
334 None => api::blobs::BlobStatus::NotFound,
335 Some(x) => {
336 let bitfield = x.0.state.borrow().bitfield();
337 if bitfield.is_complete() {
338 BlobStatus::Complete {
339 size: bitfield.size,
340 }
341 } else {
342 BlobStatus::Partial {
343 size: bitfield.validated_size(),
344 }
345 }
346 }
347 };
348 tx.send(res).await.ok();
349 }
350 Command::DeleteBlobs(cmd) => {
351 trace!("{cmd:?}");
352 let DeleteBlobsMsg {
353 inner: BlobDeleteRequest { hashes, force },
354 tx,
355 ..
356 } = cmd;
357 for hash in hashes {
358 if !force && self.protected.contains(&hash) {
359 continue;
360 }
361 self.state.data.remove(&hash);
362 }
363 tx.send(Ok(())).await.ok();
364 }
365 Command::Batch(cmd) => {
366 trace!("{cmd:?}");
367 let (id, scope) = self.temp_tags.create_scope();
368 self.spawn(handle_batch(cmd, id, scope));
369 }
370 Command::ClearProtected(cmd) => {
371 self.protected.clear();
372 cmd.tx.send(Ok(())).await.ok();
373 }
374 Command::ExportRanges(cmd) => {
375 let entry = self.get(&cmd.hash);
376 self.spawn(export_ranges(cmd, entry));
377 }
378 Command::SyncDb(SyncDbMsg { tx, .. }) => {
379 tx.send(Ok(())).await.ok();
380 }
381 Command::Shutdown(cmd) => {
382 return Some(cmd);
383 }
384 }
385 None
386 }
387
388 fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
389 if *hash == Hash::EMPTY {
390 Some(self.state.empty_hash.clone())
391 } else {
392 self.state.data.get(hash).cloned()
393 }
394 }
395
396 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
397 if hash == Hash::EMPTY {
398 self.state.empty_hash.clone()
399 } else {
400 self.state
401 .data
402 .entry(hash)
403 .or_insert_with(|| BaoFileHandle::new_partial(hash))
404 .clone()
405 }
406 }
407
408 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
409 let CreateTempTagMsg { tx, inner, .. } = cmd;
410 let mut tt = self.temp_tags.create(inner.scope, inner.value);
411 if tx.is_rpc() {
412 tt.leak();
413 }
414 tx.send(tt).await.ok();
415 }
416
417 async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
418 let import_data = match res {
419 Ok(entry) => entry,
420 Err(e) => {
421 error!("import failed: {e}");
422 return;
423 }
424 };
425 let hash = import_data.outboard.root().into();
426 let entry = self.get_or_create_entry(hash);
427 entry
428 .0
429 .state
430 .send_if_modified(|state: &mut BaoFileStorage| {
431 let BaoFileStorage::Partial(_) = state.deref() else {
432 return false;
433 };
434 *state =
435 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
436 true
437 });
438 let tt = self.temp_tags.create(
439 import_data.scope,
440 HashAndFormat {
441 hash,
442 format: import_data.format,
443 },
444 );
445 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
446 }
447
448 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
449 match res {
450 Ok(x) => Some(x),
451 Err(e) => {
452 if e.is_cancelled() {
453 trace!("task cancelled: {e}");
454 } else {
455 error!("task failed: {e}");
456 }
457 None
458 }
459 }
460 }
461
462 pub async fn run(mut self) {
463 let shutdown = loop {
464 tokio::select! {
465 cmd = self.commands.recv() => {
466 let Some(cmd) = cmd else {
467 break None;
470 };
471 if let Some(cmd) = self.handle_command(cmd).await {
472 break Some(cmd);
473 }
474 }
475 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
476 let Some(res) = self.log_task_result(res) else {
477 continue;
478 };
479 match res {
480 TaskResult::Import(res) => {
481 self.finish_import(res).await;
482 }
483 TaskResult::Scope(scope) => {
484 self.temp_tags.end_scope(scope);
485 }
486 TaskResult::Unit(_) => {}
487 }
488 }
489 }
490 };
491 if let Some(shutdown) = shutdown {
492 shutdown.tx.send(()).await.ok();
493 }
494 }
495}
496
497async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
498 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
499 error!("batch failed: {cause}");
500 }
501 id
502}
503
504async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
505 let BatchMsg { tx, mut rx, .. } = cmd;
506 trace!("created scope {}", id);
507 tx.send(id).await.map_err(api::Error::other)?;
508 while let Some(msg) = rx.recv().await? {
509 match msg {
510 BatchResponse::Drop(msg) => scope.on_drop(&msg),
511 BatchResponse::Ping => {}
512 }
513 }
514 Ok(())
515}
516
517async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
518 let Some(entry) = entry else {
519 let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
520 cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
521 return;
522 };
523 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
524 cmd.tx
525 .send(ExportRangesItem::Error(cause.into()))
526 .await
527 .ok();
528 }
529}
530
531async fn export_ranges_impl(
532 cmd: ExportRangesRequest,
533 tx: &mut mpsc::Sender<ExportRangesItem>,
534 entry: BaoFileHandle,
535) -> io::Result<()> {
536 let ExportRangesRequest { ranges, hash } = cmd;
537 let bitfield = entry.bitfield();
538 trace!(
539 "exporting ranges: {hash} {ranges:?} size={}",
540 bitfield.size()
541 );
542 debug_assert!(entry.hash() == hash, "hash mismatch");
543 let data = entry.data_reader();
544 let size = bitfield.size();
545 for range in ranges.iter() {
546 let range = match range {
547 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
548 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
549 };
550 let requested = ChunkRanges::bytes(range.start..range.end);
551 if !bitfield.ranges.is_superset(&requested) {
552 return Err(io::Error::other(format!(
553 "missing range: {requested:?}, present: {bitfield:?}",
554 )));
555 }
556 let bs = 1024;
557 let mut offset = range.start;
558 loop {
559 let end: u64 = (offset + bs).min(range.end);
560 let size = (end - offset) as usize;
561 tx.send(
562 Leaf {
563 offset,
564 data: data.read_bytes_at(offset, size)?,
565 }
566 .into(),
567 )
568 .await?;
569 offset = end;
570 if offset >= range.end {
571 break;
572 }
573 }
574 }
575 Ok(())
576}
577
578fn chunk_range(leaf: &Leaf) -> ChunkRanges {
579 let start = ChunkNum::chunks(leaf.offset);
580 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
581 (start..end).into()
582}
583
584async fn import_bao(
585 entry: BaoFileHandle,
586 size: NonZeroU64,
587 mut stream: mpsc::Receiver<BaoContentItem>,
588 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
589) {
590 let size = size.get();
591 entry
592 .0
593 .state
594 .send_if_modified(|state: &mut BaoFileStorage| {
595 let BaoFileStorage::Partial(entry) = state else {
596 return false;
598 };
599 entry.size.write(0, size);
600 false
601 });
602 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
603 while let Some(item) = stream.recv().await.unwrap() {
604 entry.0.state.send_if_modified(|state| {
605 let BaoFileStorage::Partial(partial) = state else {
606 return false;
608 };
609 match item {
610 BaoContentItem::Parent(parent) => {
611 if let Some(offset) = tree.pre_order_offset(parent.node) {
612 let mut pair = [0u8; 64];
613 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
614 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
615 partial
616 .outboard
617 .write_at(offset * 64, &pair)
618 .expect("writing to mem can never fail");
619 }
620 false
621 }
622 BaoContentItem::Leaf(leaf) => {
623 let start = leaf.offset;
624 partial
625 .data
626 .write_at(start, &leaf.data)
627 .expect("writing to mem can never fail");
628 let added = chunk_range(&leaf);
629 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
630 if update.new_state().complete {
631 let data = std::mem::take(&mut partial.data);
632 let outboard = std::mem::take(&mut partial.outboard);
633 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
634 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
635 *state = CompleteStorage::new(data, outboard).into();
636 }
637 update.changed()
638 }
639 }
640 });
641 }
642 tx.send(Ok(())).await.ok();
643}
644
645#[instrument(skip_all, fields(hash = tracing::field::Empty))]
646async fn export_bao(
647 entry: Option<BaoFileHandle>,
648 ranges: ChunkRanges,
649 mut sender: mpsc::Sender<EncodedItem>,
650) {
651 let Some(entry) = entry else {
652 let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
653 sender.send(err.into()).await.ok();
654 return;
655 };
656 tracing::Span::current().record("hash", tracing::field::display(entry.hash));
657 let data = entry.data_reader();
658 let outboard = entry.outboard_reader();
659 let tx = BaoTreeSender::new(&mut sender);
660 traverse_ranges_validated(data, outboard, &ranges, tx)
661 .await
662 .ok();
663}
664
665#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
666async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
667 entry.subscribe().forward(tx).await.ok();
668}
669
670async fn import_bytes(
671 data: Bytes,
672 scope: Scope,
673 format: BlobFormat,
674 tx: mpsc::Sender<AddProgressItem>,
675) -> anyhow::Result<ImportEntry> {
676 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
677 tx.send(AddProgressItem::CopyDone).await?;
678 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
679 Ok(ImportEntry {
680 data,
681 outboard,
682 scope,
683 format,
684 tx,
685 })
686}
687
688async fn import_byte_stream(
689 scope: Scope,
690 format: BlobFormat,
691 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
692 tx: mpsc::Sender<AddProgressItem>,
693) -> anyhow::Result<ImportEntry> {
694 let mut res = Vec::new();
695 loop {
696 match rx.recv().await {
697 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
698 res.extend_from_slice(&data);
699 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
700 .await?;
701 }
702 Ok(Some(ImportByteStreamUpdate::Done)) => {
703 break;
704 }
705 Ok(None) => {
706 return Err(api::Error::io(
707 io::ErrorKind::UnexpectedEof,
708 "byte stream ended unexpectedly",
709 )
710 .into());
711 }
712 Err(e) => {
713 return Err(e.into());
714 }
715 }
716 }
717 import_bytes(res.into(), scope, format, tx).await
718}
719
720#[instrument(skip_all, fields(path = %cmd.path.display()))]
721async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
722 let ImportPathMsg {
723 inner:
724 ImportPathRequest {
725 path,
726 scope,
727 format,
728 ..
729 },
730 tx,
731 ..
732 } = cmd;
733 let mut res = Vec::new();
734 let mut file = tokio::fs::File::open(path).await?;
735 let mut buf = [0u8; 1024 * 64];
736 loop {
737 let size = file.read(&mut buf).await?;
738 if size == 0 {
739 break;
740 }
741 res.extend_from_slice(&buf[..size]);
742 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
743 .await?;
744 }
745 import_bytes(res.into(), scope, format, tx).await
746}
747
748#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
749async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
750 let ExportPathMsg { inner, mut tx, .. } = cmd;
751 let Some(entry) = entry else {
752 tx.send(ExportProgressItem::Error(api::Error::io(
753 io::ErrorKind::NotFound,
754 "hash not found",
755 )))
756 .await
757 .ok();
758 return;
759 };
760 match export_path_impl(entry, inner, &mut tx).await {
761 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
762 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
763 };
764}
765
766async fn export_path_impl(
767 entry: BaoFileHandle,
768 cmd: ExportPathRequest,
769 tx: &mut mpsc::Sender<ExportProgressItem>,
770) -> io::Result<()> {
771 let ExportPathRequest { target, .. } = cmd;
772 if !target.is_absolute() {
773 return Err(io::Error::new(
774 io::ErrorKind::InvalidInput,
775 "path is not absolute",
776 ));
777 }
778 if let Some(parent) = target.parent() {
779 std::fs::create_dir_all(parent)?;
780 }
781 let mut file = std::fs::File::create(target)?;
783 let size = entry.0.state.borrow().size();
784 tx.send(ExportProgressItem::Size(size)).await?;
785 let mut buf = [0u8; 1024 * 64];
786 for offset in (0..size).step_by(1024 * 64) {
787 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
788 let buf = &mut buf[..len];
789 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
790 file.write_all(buf)?;
791 tx.try_send(ExportProgressItem::CopyProgress(offset))
792 .await
793 .map_err(|_e| io::Error::other(""))?;
794 yield_now().await;
795 }
796 Ok(())
797}
798
799struct ImportEntry {
800 scope: Scope,
801 format: BlobFormat,
802 data: Bytes,
803 outboard: PreOrderMemOutboard,
804 tx: mpsc::Sender<AddProgressItem>,
805}
806
807pub struct DataReader(BaoFileHandle);
808
809impl ReadBytesAt for DataReader {
810 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
811 let entry = self.0 .0.state.borrow();
812 entry.data().read_bytes_at(offset, size)
813 }
814}
815
816pub struct OutboardReader {
817 hash: blake3::Hash,
818 tree: BaoTree,
819 data: BaoFileHandle,
820}
821
822impl Outboard for OutboardReader {
823 fn root(&self) -> blake3::Hash {
824 self.hash
825 }
826
827 fn tree(&self) -> BaoTree {
828 self.tree
829 }
830
831 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
832 let Some(offset) = self.tree.pre_order_offset(node) else {
833 return Ok(None);
834 };
835 let mut buf = [0u8; 64];
836 let size = self
837 .data
838 .0
839 .state
840 .borrow()
841 .outboard()
842 .read_at(offset * 64, &mut buf)?;
843 if size != 64 {
844 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
845 }
846 let left: [u8; 32] = buf[..32].try_into().unwrap();
847 let right: [u8; 32] = buf[32..].try_into().unwrap();
848 Ok(Some((left.into(), right.into())))
849 }
850}
851
852struct State {
853 data: HashMap<Hash, BaoFileHandle>,
854 tags: BTreeMap<Tag, HashAndFormat>,
855 empty_hash: BaoFileHandle,
856}
857
858#[derive(Debug, derive_more::From)]
859pub enum BaoFileStorage {
860 Partial(PartialMemStorage),
861 Complete(CompleteStorage),
862}
863
864impl BaoFileStorage {
865 pub fn bitfield(&self) -> Bitfield {
867 match self {
868 Self::Partial(entry) => entry.bitfield.clone(),
869 Self::Complete(entry) => Bitfield::complete(entry.size()),
870 }
871 }
872}
873
874#[derive(Debug)]
875pub struct BaoFileHandleInner {
876 state: watch::Sender<BaoFileStorage>,
877 hash: Hash,
878}
879
880#[derive(Debug, Clone, derive_more::Deref)]
882pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
883
884impl BaoFileHandle {
885 pub fn new_partial(hash: Hash) -> Self {
886 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
887 data: SparseMemFile::new(),
888 outboard: SparseMemFile::new(),
889 size: SizeInfo::default(),
890 bitfield: Bitfield::empty(),
891 }));
892 Self(Arc::new(BaoFileHandleInner { state, hash }))
893 }
894
895 pub fn hash(&self) -> Hash {
896 self.hash
897 }
898
899 pub fn bitfield(&self) -> Bitfield {
900 self.0.state.borrow().bitfield()
901 }
902
903 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
904 BaoFileStorageSubscriber::new(self.0.state.subscribe())
905 }
906
907 pub fn data_reader(&self) -> DataReader {
908 DataReader(self.clone())
909 }
910
911 pub fn outboard_reader(&self) -> OutboardReader {
912 let entry = self.0.state.borrow();
913 let hash = self.hash.into();
914 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
915 OutboardReader {
916 hash,
917 tree,
918 data: self.clone(),
919 }
920 }
921}
922
923impl Default for BaoFileStorage {
924 fn default() -> Self {
925 Self::Partial(Default::default())
926 }
927}
928
929impl BaoFileStorage {
930 fn data(&self) -> &[u8] {
931 match self {
932 Self::Partial(entry) => entry.data.as_ref(),
933 Self::Complete(entry) => &entry.data,
934 }
935 }
936
937 fn outboard(&self) -> &[u8] {
938 match self {
939 Self::Partial(entry) => entry.outboard.as_ref(),
940 Self::Complete(entry) => &entry.outboard,
941 }
942 }
943
944 fn size(&self) -> u64 {
945 match self {
946 Self::Partial(entry) => entry.current_size(),
947 Self::Complete(entry) => entry.size(),
948 }
949 }
950}
951
952#[derive(Debug, Clone)]
953pub struct CompleteStorage {
954 pub(crate) data: Bytes,
955 pub(crate) outboard: Bytes,
956}
957
958impl CompleteStorage {
959 pub fn create(data: Bytes) -> (Hash, Self) {
960 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
961 let hash = outboard.root().into();
962 let outboard = outboard.data.into();
963 let entry = Self::new(data, outboard);
964 (hash, entry)
965 }
966
967 pub fn new(data: Bytes, outboard: Bytes) -> Self {
968 Self { data, outboard }
969 }
970
971 pub fn size(&self) -> u64 {
972 self.data.len() as u64
973 }
974}
975
976#[allow(dead_code)]
977fn print_outboard(hashes: &[u8]) {
978 assert!(hashes.len() % 64 == 0);
979 for chunk in hashes.chunks(64) {
980 let left: [u8; 32] = chunk[..32].try_into().unwrap();
981 let right: [u8; 32] = chunk[32..].try_into().unwrap();
982 let left = blake3::Hash::from(left);
983 let right = blake3::Hash::from(right);
984 println!("l: {left:?}, r: {right:?}");
985 }
986}
987
988pub struct BaoFileStorageSubscriber {
989 receiver: watch::Receiver<BaoFileStorage>,
990}
991
992impl BaoFileStorageSubscriber {
993 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
994 Self { receiver }
995 }
996
997 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1001 let value = self.receiver.borrow().bitfield();
1002 tx.send(value).await?;
1003 loop {
1004 self.update_or_closed(&mut tx).await?;
1005 let value = self.receiver.borrow().bitfield();
1006 tx.send(value.clone()).await?;
1007 }
1008 }
1009
1010 #[allow(dead_code)]
1014 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1015 let value = self.receiver.borrow().bitfield();
1016 let mut old = value.clone();
1017 tx.send(value).await?;
1018 loop {
1019 self.update_or_closed(&mut tx).await?;
1020 let new = self.receiver.borrow().bitfield();
1021 let diff = old.diff(&new);
1022 if diff.is_empty() {
1023 continue;
1024 }
1025 tx.send(diff).await?;
1026 old = new;
1027 }
1028 }
1029
1030 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
1031 tokio::select! {
1032 _ = tx.closed() => {
1033 Err(irpc::channel::SendError::ReceiverClosed.into())
1035 }
1036 e = self.receiver.changed() => Ok(e?),
1037 }
1038 }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use n0_future::StreamExt;
1044 use testresult::TestResult;
1045
1046 use super::*;
1047
1048 #[tokio::test]
1049 async fn smoke() -> TestResult<()> {
1050 let store = MemStore::new();
1051 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1052 let hash = *tt.hash();
1053 println!("hash: {hash:?}");
1054 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1055 while let Some(item) = stream.next().await {
1056 println!("item: {item:?}");
1057 }
1058 let stream = store.export_bao(hash, ChunkRanges::all());
1059 let exported = stream.bao_to_vec().await?;
1060
1061 let store2 = MemStore::new();
1062 let mut or = store2.observe(hash).stream().await?;
1063 tokio::spawn(async move {
1064 while let Some(event) = or.next().await {
1065 println!("event: {event:?}");
1066 }
1067 });
1068 store2
1069 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1070 .await?;
1071
1072 let exported2 = store2
1073 .export_bao(hash, ChunkRanges::all())
1074 .bao_to_vec()
1075 .await?;
1076 assert_eq!(exported, exported2);
1077
1078 Ok(())
1079 }
1080}