1use std::{
11 collections::{BTreeMap, HashMap, HashSet},
12 future::Future,
13 io::{self, Write},
14 num::NonZeroU64,
15 ops::Deref,
16 sync::Arc,
17 time::SystemTime,
18};
19
20use bao_tree::{
21 blake3,
22 io::{
23 mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
24 outboard::PreOrderMemOutboard,
25 sync::{Outboard, ReadAt, WriteAt},
26 BaoContentItem, Leaf,
27 },
28 BaoTree, ChunkNum, ChunkRanges, TreeNode,
29};
30use bytes::Bytes;
31use irpc::channel::mpsc;
32use n0_future::future::yield_now;
33use range_collections::range_set::RangeSetRange;
34use tokio::{
35 io::AsyncReadExt,
36 sync::watch,
37 task::{JoinError, JoinSet},
38};
39use tracing::{error, info, instrument, trace, Instrument};
40
41use super::util::{BaoTreeSender, PartialMemStorage};
42use crate::{
43 api::{
44 self,
45 blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
46 proto::{
47 BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
48 CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
49 DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
50 ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
51 ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
52 ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
53 ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54 SetTagRequest, ShutdownMsg, SyncDbMsg,
55 },
56 tags::TagInfo,
57 ApiClient,
58 },
59 store::{
60 util::{SizeInfo, SparseMemFile, Tag},
61 HashAndFormat, IROH_BLOCK_SIZE,
62 },
63 util::{
64 temp_tag::{TagDrop, TempTagScope, TempTags},
65 ChunkRangesExt,
66 },
67 BlobFormat, Hash,
68};
69
70#[derive(Debug, Default)]
71pub struct Options {}
72
73#[derive(Debug, Clone)]
74#[repr(transparent)]
75pub struct MemStore {
76 client: ApiClient,
77}
78
79impl AsRef<crate::api::Store> for MemStore {
80 fn as_ref(&self) -> &crate::api::Store {
81 crate::api::Store::ref_from_sender(&self.client)
82 }
83}
84
85impl Deref for MemStore {
86 type Target = crate::api::Store;
87
88 fn deref(&self) -> &Self::Target {
89 crate::api::Store::ref_from_sender(&self.client)
90 }
91}
92
93impl Default for MemStore {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99#[derive(derive_more::From)]
100enum TaskResult {
101 Unit(()),
102 Import(anyhow::Result<ImportEntry>),
103 Scope(Scope),
104}
105
106impl MemStore {
107 pub fn from_sender(client: ApiClient) -> Self {
108 Self { client }
109 }
110
111 pub fn new() -> Self {
112 let (sender, receiver) = tokio::sync::mpsc::channel(32);
113 tokio::spawn(
114 Actor {
115 commands: receiver,
116 tasks: JoinSet::new(),
117 state: State {
118 data: HashMap::new(),
119 tags: BTreeMap::new(),
120 },
121 options: Arc::new(Options::default()),
122 temp_tags: Default::default(),
123 protected: Default::default(),
124 }
125 .run(),
126 );
127 Self::from_sender(sender.into())
128 }
129}
130
131struct Actor {
132 commands: tokio::sync::mpsc::Receiver<Command>,
133 tasks: JoinSet<TaskResult>,
134 state: State,
135 #[allow(dead_code)]
136 options: Arc<Options>,
137 temp_tags: TempTags,
139 protected: HashSet<Hash>,
140}
141
142impl Actor {
143 fn spawn<F, T>(&mut self, f: F)
144 where
145 F: Future<Output = T> + Send + 'static,
146 T: Into<TaskResult>,
147 {
148 let span = tracing::Span::current();
149 let fut = async move { f.await.into() }.instrument(span);
150 self.tasks.spawn(fut);
151 }
152
153 async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
154 match cmd {
155 Command::ImportBao(ImportBaoMsg {
156 inner: ImportBaoRequest { hash, size },
157 rx: data,
158 tx,
159 ..
160 }) => {
161 let entry = self.get_or_create_entry(hash);
162 self.spawn(import_bao(entry, size, data, tx));
163 }
164 Command::Observe(ObserveMsg {
165 inner: ObserveRequest { hash },
166 tx,
167 ..
168 }) => {
169 let entry = self.get_or_create_entry(hash);
170 self.spawn(observe(entry, tx));
171 }
172 Command::ImportBytes(ImportBytesMsg {
173 inner:
174 ImportBytesRequest {
175 data,
176 scope,
177 format,
178 ..
179 },
180 tx,
181 ..
182 }) => {
183 self.spawn(import_bytes(data, scope, format, tx));
184 }
185 Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
186 self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
187 }
188 Command::ImportPath(cmd) => {
189 self.spawn(import_path(cmd));
190 }
191 Command::ExportBao(ExportBaoMsg {
192 inner: ExportBaoRequest { hash, ranges },
193 tx,
194 ..
195 }) => {
196 let entry = self.get_or_create_entry(hash);
197 self.spawn(export_bao(entry, ranges, tx));
198 }
199 Command::ExportPath(cmd) => {
200 let entry = self.state.data.get(&cmd.hash).cloned();
201 self.spawn(export_path(entry, cmd));
202 }
203 Command::DeleteTags(cmd) => {
204 let DeleteTagsMsg {
205 inner: DeleteTagsRequest { from, to },
206 tx,
207 ..
208 } = cmd;
209 info!("deleting tags from {:?} to {:?}", from, to);
210 self.state.tags.retain(|tag, _| {
213 if let Some(from) = &from {
214 if tag < from {
215 return true;
216 }
217 }
218 if let Some(to) = &to {
219 if tag >= to {
220 return true;
221 }
222 }
223 info!(" removing {:?}", tag);
224 false
225 });
226 tx.send(Ok(())).await.ok();
227 }
228 Command::RenameTag(cmd) => {
229 let RenameTagMsg {
230 inner: RenameTagRequest { from, to },
231 tx,
232 ..
233 } = cmd;
234 let tags = &mut self.state.tags;
235 let value = match tags.remove(&from) {
236 Some(value) => value,
237 None => {
238 tx.send(Err(api::Error::io(
239 io::ErrorKind::NotFound,
240 format!("tag not found: {from:?}"),
241 )))
242 .await
243 .ok();
244 return None;
245 }
246 };
247 tags.insert(to, value);
248 tx.send(Ok(())).await.ok();
249 return None;
250 }
251 Command::ListTags(cmd) => {
252 let ListTagsMsg {
253 inner:
254 ListTagsRequest {
255 from,
256 to,
257 raw,
258 hash_seq,
259 },
260 tx,
261 ..
262 } = cmd;
263 let tags = self
264 .state
265 .tags
266 .iter()
267 .filter(move |(tag, value)| {
268 if let Some(from) = &from {
269 if tag < &from {
270 return false;
271 }
272 }
273 if let Some(to) = &to {
274 if tag >= &to {
275 return false;
276 }
277 }
278 raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
279 })
280 .map(|(tag, value)| TagInfo {
281 name: tag.clone(),
282 hash: value.hash,
283 format: value.format,
284 })
285 .map(Ok);
286 tx.send(tags.collect()).await.ok();
287 }
288 Command::SetTag(SetTagMsg {
289 inner: SetTagRequest { name: tag, value },
290 tx,
291 ..
292 }) => {
293 self.state.tags.insert(tag, value);
294 tx.send(Ok(())).await.ok();
295 }
296 Command::CreateTag(CreateTagMsg {
297 inner: CreateTagRequest { value },
298 tx,
299 ..
300 }) => {
301 let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
302 self.state.tags.insert(tag.clone(), value);
303 tx.send(Ok(tag)).await.ok();
304 }
305 Command::CreateTempTag(cmd) => {
306 trace!("{cmd:?}");
307 self.create_temp_tag(cmd).await;
308 }
309 Command::ListTempTags(cmd) => {
310 trace!("{cmd:?}");
311 let tts = self.temp_tags.list();
312 cmd.tx.send(tts).await.ok();
313 }
314 Command::ListBlobs(cmd) => {
315 let ListBlobsMsg { tx, .. } = cmd;
316 let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
317 self.spawn(async move {
318 for blob in blobs {
319 if tx.send(Ok(blob)).await.is_err() {
320 break;
321 }
322 }
323 });
324 }
325 Command::BlobStatus(cmd) => {
326 trace!("{cmd:?}");
327 let BlobStatusMsg {
328 inner: BlobStatusRequest { hash },
329 tx,
330 ..
331 } = cmd;
332 let res = match self.state.data.get(&hash) {
333 None => api::blobs::BlobStatus::NotFound,
334 Some(x) => {
335 let bitfield = x.0.state.borrow().bitfield();
336 if bitfield.is_complete() {
337 BlobStatus::Complete {
338 size: bitfield.size,
339 }
340 } else {
341 BlobStatus::Partial {
342 size: bitfield.validated_size(),
343 }
344 }
345 }
346 };
347 tx.send(res).await.ok();
348 }
349 Command::DeleteBlobs(cmd) => {
350 trace!("{cmd:?}");
351 let DeleteBlobsMsg {
352 inner: BlobDeleteRequest { hashes, force },
353 tx,
354 ..
355 } = cmd;
356 for hash in hashes {
357 if !force && self.protected.contains(&hash) {
358 continue;
359 }
360 self.state.data.remove(&hash);
361 }
362 tx.send(Ok(())).await.ok();
363 }
364 Command::Batch(cmd) => {
365 trace!("{cmd:?}");
366 let (id, scope) = self.temp_tags.create_scope();
367 self.spawn(handle_batch(cmd, id, scope));
368 }
369 Command::ClearProtected(cmd) => {
370 self.protected.clear();
371 cmd.tx.send(Ok(())).await.ok();
372 }
373 Command::ExportRanges(cmd) => {
374 let entry = self.get_or_create_entry(cmd.hash);
375 self.spawn(export_ranges(cmd, entry.clone()));
376 }
377 Command::SyncDb(SyncDbMsg { tx, .. }) => {
378 tx.send(Ok(())).await.ok();
379 }
380 Command::Shutdown(cmd) => {
381 return Some(cmd);
382 }
383 }
384 None
385 }
386
387 fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
388 self.state
389 .data
390 .entry(hash)
391 .or_insert_with(|| BaoFileHandle::new_partial(hash))
392 .clone()
393 }
394
395 async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
396 let CreateTempTagMsg { tx, inner, .. } = cmd;
397 let mut tt = self.temp_tags.create(inner.scope, inner.value);
398 if tx.is_rpc() {
399 tt.leak();
400 }
401 tx.send(tt).await.ok();
402 }
403
404 async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
405 let import_data = match res {
406 Ok(entry) => entry,
407 Err(e) => {
408 error!("import failed: {e}");
409 return;
410 }
411 };
412 let hash = import_data.outboard.root().into();
413 let entry = self.get_or_create_entry(hash);
414 entry
415 .0
416 .state
417 .send_if_modified(|state: &mut BaoFileStorage| {
418 let BaoFileStorage::Partial(_) = state.deref() else {
419 return false;
420 };
421 *state =
422 CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
423 true
424 });
425 let tt = self.temp_tags.create(
426 import_data.scope,
427 HashAndFormat {
428 hash,
429 format: import_data.format,
430 },
431 );
432 import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
433 }
434
435 fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
436 match res {
437 Ok(x) => Some(x),
438 Err(e) => {
439 if e.is_cancelled() {
440 trace!("task cancelled: {e}");
441 } else {
442 error!("task failed: {e}");
443 }
444 None
445 }
446 }
447 }
448
449 pub async fn run(mut self) {
450 let shutdown = loop {
451 tokio::select! {
452 cmd = self.commands.recv() => {
453 let Some(cmd) = cmd else {
454 break None;
457 };
458 if let Some(cmd) = self.handle_command(cmd).await {
459 break Some(cmd);
460 }
461 }
462 Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
463 let Some(res) = self.log_task_result(res) else {
464 continue;
465 };
466 match res {
467 TaskResult::Import(res) => {
468 self.finish_import(res).await;
469 }
470 TaskResult::Scope(scope) => {
471 self.temp_tags.end_scope(scope);
472 }
473 TaskResult::Unit(_) => {}
474 }
475 }
476 }
477 };
478 if let Some(shutdown) = shutdown {
479 shutdown.tx.send(()).await.ok();
480 }
481 }
482}
483
484async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
485 if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
486 error!("batch failed: {cause}");
487 }
488 id
489}
490
491async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
492 let BatchMsg { tx, mut rx, .. } = cmd;
493 trace!("created scope {}", id);
494 tx.send(id).await.map_err(api::Error::other)?;
495 while let Some(msg) = rx.recv().await? {
496 match msg {
497 BatchResponse::Drop(msg) => scope.on_drop(&msg),
498 BatchResponse::Ping => {}
499 }
500 }
501 Ok(())
502}
503
504async fn export_ranges(mut cmd: ExportRangesMsg, entry: BaoFileHandle) {
505 if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
506 cmd.tx
507 .send(ExportRangesItem::Error(cause.into()))
508 .await
509 .ok();
510 }
511}
512
513async fn export_ranges_impl(
514 cmd: ExportRangesRequest,
515 tx: &mut mpsc::Sender<ExportRangesItem>,
516 entry: BaoFileHandle,
517) -> io::Result<()> {
518 let ExportRangesRequest { ranges, hash } = cmd;
519 let bitfield = entry.bitfield();
520 trace!(
521 "exporting ranges: {hash} {ranges:?} size={}",
522 bitfield.size()
523 );
524 debug_assert!(entry.hash() == hash, "hash mismatch");
525 let data = entry.data_reader();
526 let size = bitfield.size();
527 for range in ranges.iter() {
528 let range = match range {
529 RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
530 RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
531 };
532 let requested = ChunkRanges::bytes(range.start..range.end);
533 if !bitfield.ranges.is_superset(&requested) {
534 return Err(io::Error::other(format!(
535 "missing range: {requested:?}, present: {bitfield:?}",
536 )));
537 }
538 let bs = 1024;
539 let mut offset = range.start;
540 loop {
541 let end: u64 = (offset + bs).min(range.end);
542 let size = (end - offset) as usize;
543 tx.send(
544 Leaf {
545 offset,
546 data: data.read_bytes_at(offset, size)?,
547 }
548 .into(),
549 )
550 .await?;
551 offset = end;
552 if offset >= range.end {
553 break;
554 }
555 }
556 }
557 Ok(())
558}
559
560fn chunk_range(leaf: &Leaf) -> ChunkRanges {
561 let start = ChunkNum::chunks(leaf.offset);
562 let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
563 (start..end).into()
564}
565
566async fn import_bao(
567 entry: BaoFileHandle,
568 size: NonZeroU64,
569 mut stream: mpsc::Receiver<BaoContentItem>,
570 tx: irpc::channel::oneshot::Sender<api::Result<()>>,
571) {
572 let size = size.get();
573 entry
574 .0
575 .state
576 .send_if_modified(|state: &mut BaoFileStorage| {
577 let BaoFileStorage::Partial(entry) = state else {
578 return false;
580 };
581 entry.size.write(0, size);
582 false
583 });
584 let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
585 while let Some(item) = stream.recv().await.unwrap() {
586 entry.0.state.send_if_modified(|state| {
587 let BaoFileStorage::Partial(partial) = state else {
588 return false;
590 };
591 match item {
592 BaoContentItem::Parent(parent) => {
593 if let Some(offset) = tree.pre_order_offset(parent.node) {
594 let mut pair = [0u8; 64];
595 pair[..32].copy_from_slice(parent.pair.0.as_bytes());
596 pair[32..].copy_from_slice(parent.pair.1.as_bytes());
597 partial
598 .outboard
599 .write_at(offset * 64, &pair)
600 .expect("writing to mem can never fail");
601 }
602 false
603 }
604 BaoContentItem::Leaf(leaf) => {
605 let start = leaf.offset;
606 partial
607 .data
608 .write_at(start, &leaf.data)
609 .expect("writing to mem can never fail");
610 let added = chunk_range(&leaf);
611 let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
612 if update.new_state().complete {
613 let data = std::mem::take(&mut partial.data);
614 let outboard = std::mem::take(&mut partial.outboard);
615 let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
616 let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
617 *state = CompleteStorage::new(data, outboard).into();
618 }
619 update.changed()
620 }
621 }
622 });
623 }
624 tx.send(Ok(())).await.ok();
625}
626
627#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
628async fn export_bao(
629 entry: BaoFileHandle,
630 ranges: ChunkRanges,
631 mut sender: mpsc::Sender<EncodedItem>,
632) {
633 let data = entry.data_reader();
634 let outboard = entry.outboard_reader();
635 let tx = BaoTreeSender::new(&mut sender);
636 traverse_ranges_validated(data, outboard, &ranges, tx)
637 .await
638 .ok();
639}
640
641#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
642async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
643 entry.subscribe().forward(tx).await.ok();
644}
645
646async fn import_bytes(
647 data: Bytes,
648 scope: Scope,
649 format: BlobFormat,
650 tx: mpsc::Sender<AddProgressItem>,
651) -> anyhow::Result<ImportEntry> {
652 tx.send(AddProgressItem::Size(data.len() as u64)).await?;
653 tx.send(AddProgressItem::CopyDone).await?;
654 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
655 Ok(ImportEntry {
656 data,
657 outboard,
658 scope,
659 format,
660 tx,
661 })
662}
663
664async fn import_byte_stream(
665 scope: Scope,
666 format: BlobFormat,
667 mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
668 tx: mpsc::Sender<AddProgressItem>,
669) -> anyhow::Result<ImportEntry> {
670 let mut res = Vec::new();
671 loop {
672 match rx.recv().await {
673 Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
674 res.extend_from_slice(&data);
675 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
676 .await?;
677 }
678 Ok(Some(ImportByteStreamUpdate::Done)) => {
679 break;
680 }
681 Ok(None) => {
682 return Err(api::Error::io(
683 io::ErrorKind::UnexpectedEof,
684 "byte stream ended unexpectedly",
685 )
686 .into());
687 }
688 Err(e) => {
689 return Err(e.into());
690 }
691 }
692 }
693 import_bytes(res.into(), scope, format, tx).await
694}
695
696#[instrument(skip_all, fields(path = %cmd.path.display()))]
697async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
698 let ImportPathMsg {
699 inner:
700 ImportPathRequest {
701 path,
702 scope,
703 format,
704 ..
705 },
706 tx,
707 ..
708 } = cmd;
709 let mut res = Vec::new();
710 let mut file = tokio::fs::File::open(path).await?;
711 let mut buf = [0u8; 1024 * 64];
712 loop {
713 let size = file.read(&mut buf).await?;
714 if size == 0 {
715 break;
716 }
717 res.extend_from_slice(&buf[..size]);
718 tx.send(AddProgressItem::CopyProgress(res.len() as u64))
719 .await?;
720 }
721 import_bytes(res.into(), scope, format, tx).await
722}
723
724#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
725async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
726 let ExportPathMsg { inner, mut tx, .. } = cmd;
727 let Some(entry) = entry else {
728 tx.send(ExportProgressItem::Error(api::Error::io(
729 io::ErrorKind::NotFound,
730 "hash not found",
731 )))
732 .await
733 .ok();
734 return;
735 };
736 match export_path_impl(entry, inner, &mut tx).await {
737 Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
738 Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
739 };
740}
741
742async fn export_path_impl(
743 entry: BaoFileHandle,
744 cmd: ExportPathRequest,
745 tx: &mut mpsc::Sender<ExportProgressItem>,
746) -> io::Result<()> {
747 let ExportPathRequest { target, .. } = cmd;
748 let mut file = std::fs::File::create(target)?;
750 let size = entry.0.state.borrow().size();
751 tx.send(ExportProgressItem::Size(size)).await?;
752 let mut buf = [0u8; 1024 * 64];
753 for offset in (0..size).step_by(1024 * 64) {
754 let len = std::cmp::min(size - offset, 1024 * 64) as usize;
755 let buf = &mut buf[..len];
756 entry.0.state.borrow().data().read_exact_at(offset, buf)?;
757 file.write_all(buf)?;
758 tx.try_send(ExportProgressItem::CopyProgress(offset))
759 .await
760 .map_err(|_e| io::Error::other(""))?;
761 yield_now().await;
762 }
763 Ok(())
764}
765
766struct ImportEntry {
767 scope: Scope,
768 format: BlobFormat,
769 data: Bytes,
770 outboard: PreOrderMemOutboard,
771 tx: mpsc::Sender<AddProgressItem>,
772}
773
774pub struct DataReader(BaoFileHandle);
775
776impl ReadBytesAt for DataReader {
777 fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
778 let entry = self.0 .0.state.borrow();
779 entry.data().read_bytes_at(offset, size)
780 }
781}
782
783pub struct OutboardReader {
784 hash: blake3::Hash,
785 tree: BaoTree,
786 data: BaoFileHandle,
787}
788
789impl Outboard for OutboardReader {
790 fn root(&self) -> blake3::Hash {
791 self.hash
792 }
793
794 fn tree(&self) -> BaoTree {
795 self.tree
796 }
797
798 fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
799 let Some(offset) = self.tree.pre_order_offset(node) else {
800 return Ok(None);
801 };
802 let mut buf = [0u8; 64];
803 let size = self
804 .data
805 .0
806 .state
807 .borrow()
808 .outboard()
809 .read_at(offset * 64, &mut buf)?;
810 if size != 64 {
811 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
812 }
813 let left: [u8; 32] = buf[..32].try_into().unwrap();
814 let right: [u8; 32] = buf[32..].try_into().unwrap();
815 Ok(Some((left.into(), right.into())))
816 }
817}
818
819struct State {
820 data: HashMap<Hash, BaoFileHandle>,
821 tags: BTreeMap<Tag, HashAndFormat>,
822}
823
824#[derive(Debug, derive_more::From)]
825pub enum BaoFileStorage {
826 Partial(PartialMemStorage),
827 Complete(CompleteStorage),
828}
829
830impl BaoFileStorage {
831 pub fn bitfield(&self) -> Bitfield {
833 match self {
834 Self::Partial(entry) => entry.bitfield.clone(),
835 Self::Complete(entry) => Bitfield::complete(entry.size()),
836 }
837 }
838}
839
840#[derive(Debug)]
841pub struct BaoFileHandleInner {
842 state: watch::Sender<BaoFileStorage>,
843 hash: Hash,
844}
845
846#[derive(Debug, Clone, derive_more::Deref)]
848pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
849
850impl BaoFileHandle {
851 pub fn new_partial(hash: Hash) -> Self {
852 let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
853 data: SparseMemFile::new(),
854 outboard: SparseMemFile::new(),
855 size: SizeInfo::default(),
856 bitfield: Bitfield::empty(),
857 }));
858 Self(Arc::new(BaoFileHandleInner { state, hash }))
859 }
860
861 pub fn hash(&self) -> Hash {
862 self.hash
863 }
864
865 pub fn bitfield(&self) -> Bitfield {
866 self.0.state.borrow().bitfield()
867 }
868
869 pub fn subscribe(&self) -> BaoFileStorageSubscriber {
870 BaoFileStorageSubscriber::new(self.0.state.subscribe())
871 }
872
873 pub fn data_reader(&self) -> DataReader {
874 DataReader(self.clone())
875 }
876
877 pub fn outboard_reader(&self) -> OutboardReader {
878 let entry = self.0.state.borrow();
879 let hash = self.hash.into();
880 let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
881 OutboardReader {
882 hash,
883 tree,
884 data: self.clone(),
885 }
886 }
887}
888
889impl Default for BaoFileStorage {
890 fn default() -> Self {
891 Self::Partial(Default::default())
892 }
893}
894
895impl BaoFileStorage {
896 fn data(&self) -> &[u8] {
897 match self {
898 Self::Partial(entry) => entry.data.as_ref(),
899 Self::Complete(entry) => &entry.data,
900 }
901 }
902
903 fn outboard(&self) -> &[u8] {
904 match self {
905 Self::Partial(entry) => entry.outboard.as_ref(),
906 Self::Complete(entry) => &entry.outboard,
907 }
908 }
909
910 fn size(&self) -> u64 {
911 match self {
912 Self::Partial(entry) => entry.current_size(),
913 Self::Complete(entry) => entry.size(),
914 }
915 }
916}
917
918#[derive(Debug, Clone)]
919pub struct CompleteStorage {
920 pub(crate) data: Bytes,
921 pub(crate) outboard: Bytes,
922}
923
924impl CompleteStorage {
925 pub fn create(data: Bytes) -> (Hash, Self) {
926 let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
927 let hash = outboard.root().into();
928 let outboard = outboard.data.into();
929 let entry = Self::new(data, outboard);
930 (hash, entry)
931 }
932
933 pub fn new(data: Bytes, outboard: Bytes) -> Self {
934 Self { data, outboard }
935 }
936
937 pub fn size(&self) -> u64 {
938 self.data.len() as u64
939 }
940}
941
942#[allow(dead_code)]
943fn print_outboard(hashes: &[u8]) {
944 assert!(hashes.len() % 64 == 0);
945 for chunk in hashes.chunks(64) {
946 let left: [u8; 32] = chunk[..32].try_into().unwrap();
947 let right: [u8; 32] = chunk[32..].try_into().unwrap();
948 let left = blake3::Hash::from(left);
949 let right = blake3::Hash::from(right);
950 println!("l: {left:?}, r: {right:?}");
951 }
952}
953
954pub struct BaoFileStorageSubscriber {
955 receiver: watch::Receiver<BaoFileStorage>,
956}
957
958impl BaoFileStorageSubscriber {
959 pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
960 Self { receiver }
961 }
962
963 pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
967 let value = self.receiver.borrow().bitfield();
968 tx.send(value).await?;
969 loop {
970 self.update_or_closed(&mut tx).await?;
971 let value = self.receiver.borrow().bitfield();
972 tx.send(value.clone()).await?;
973 }
974 }
975
976 #[allow(dead_code)]
980 pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
981 let value = self.receiver.borrow().bitfield();
982 let mut old = value.clone();
983 tx.send(value).await?;
984 loop {
985 self.update_or_closed(&mut tx).await?;
986 let new = self.receiver.borrow().bitfield();
987 let diff = old.diff(&new);
988 if diff.is_empty() {
989 continue;
990 }
991 tx.send(diff).await?;
992 old = new;
993 }
994 }
995
996 async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
997 tokio::select! {
998 _ = tx.closed() => {
999 Err(irpc::channel::SendError::ReceiverClosed.into())
1001 }
1002 e = self.receiver.changed() => Ok(e?),
1003 }
1004 }
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009 use n0_future::StreamExt;
1010 use testresult::TestResult;
1011
1012 use super::*;
1013
1014 #[tokio::test]
1015 async fn smoke() -> TestResult<()> {
1016 let store = MemStore::new();
1017 let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
1018 let hash = *tt.hash();
1019 println!("hash: {hash:?}");
1020 let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
1021 while let Some(item) = stream.next().await {
1022 println!("item: {item:?}");
1023 }
1024 let stream = store.export_bao(hash, ChunkRanges::all());
1025 let exported = stream.bao_to_vec().await?;
1026
1027 let store2 = MemStore::new();
1028 let mut or = store2.observe(hash).stream().await?;
1029 tokio::spawn(async move {
1030 while let Some(event) = or.next().await {
1031 println!("event: {event:?}");
1032 }
1033 });
1034 store2
1035 .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
1036 .await?;
1037
1038 let exported2 = store2
1039 .export_bao(hash, ChunkRanges::all())
1040 .bao_to_vec()
1041 .await?;
1042 assert_eq!(exported, exported2);
1043
1044 Ok(())
1045 }
1046}