1use std::{
5 collections::{BTreeMap, BTreeSet},
6 future::Future,
7 io,
8 path::PathBuf,
9 sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
10 time::SystemTime,
11};
12
13use bao_tree::{
14 io::{fsm::Outboard, outboard::PreOrderOutboard, sync::WriteAt},
15 BaoTree,
16};
17use bytes::{Bytes, BytesMut};
18use futures_lite::{Stream, StreamExt};
19use iroh_io::AsyncSliceReader;
20use tracing::info;
21
22use super::{
23 temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode,
24 ImportProgress, Map, TempCounterMap,
25};
26use crate::{
27 store::{
28 mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore,
29 },
30 util::{
31 progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
32 TagCounter, TagDrop,
33 },
34 BlobFormat, Hash, HashAndFormat, Tag, TempTag, IROH_BLOCK_SIZE,
35};
36
37#[derive(Debug, Clone, Default)]
40pub struct Store {
41 inner: Arc<StoreInner>,
42}
43
44#[derive(Debug, Default)]
45struct StoreInner(RwLock<StateInner>);
46
47impl TagDrop for StoreInner {
48 fn on_drop(&self, inner: &HashAndFormat) {
49 tracing::trace!("temp tag drop: {:?}", inner);
50 let mut state = self.0.write().unwrap();
51 state.temp.dec(inner);
52 }
53}
54
55impl TagCounter for StoreInner {
56 fn on_create(&self, inner: &HashAndFormat) {
57 tracing::trace!("temp tagging: {:?}", inner);
58 let mut state = self.0.write().unwrap();
59 state.temp.inc(inner);
60 }
61}
62
63impl Store {
64 pub fn new() -> Self {
66 Self::default()
67 }
68
69 fn write_lock(&self) -> RwLockWriteGuard<'_, StateInner> {
71 self.inner.0.write().unwrap()
72 }
73
74 fn read_lock(&self) -> RwLockReadGuard<'_, StateInner> {
76 self.inner.0.read().unwrap()
77 }
78
79 fn import_bytes_sync(
80 &self,
81 id: u64,
82 bytes: Bytes,
83 format: BlobFormat,
84 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
85 ) -> io::Result<TempTag> {
86 progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
87 let progress2 = progress.clone();
88 let cb = move |offset| {
89 progress2
90 .try_send(ImportProgress::OutboardProgress { id, offset })
91 .ok();
92 };
93 let (storage, hash) = MutableMemStorage::complete(bytes, cb);
94 progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
95 use super::Store;
96 let tag = self.temp_tag(HashAndFormat { hash, format });
97 let entry = Entry {
98 inner: Arc::new(EntryInner {
99 hash,
100 data: RwLock::new(storage),
101 }),
102 complete: true,
103 };
104 self.write_lock().entries.insert(hash, entry);
105 Ok(tag)
106 }
107
108 fn export_sync(
109 &self,
110 hash: Hash,
111 target: PathBuf,
112 _mode: ExportMode,
113 progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
114 ) -> io::Result<()> {
115 tracing::trace!("exporting {} to {}", hash, target.display());
116
117 if !target.is_absolute() {
118 return Err(io::Error::new(
119 io::ErrorKind::InvalidInput,
120 "target path must be absolute",
121 ));
122 }
123 let parent = target.parent().ok_or_else(|| {
124 io::Error::new(
125 io::ErrorKind::InvalidInput,
126 "target path has no parent directory",
127 )
128 })?;
129 std::fs::create_dir_all(parent)?;
131 let state = self.read_lock();
132 let entry = state
133 .entries
134 .get(&hash)
135 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
136 let reader = &entry.inner.data;
137 let size = reader.read().unwrap().current_size();
138 let mut file = std::fs::File::create(target)?;
139 for offset in (0..size).step_by(1024 * 1024) {
140 let bytes = reader.read().unwrap().read_data_at(offset, 1024 * 1024);
141 file.write_at(offset, &bytes)?;
142 progress(offset)?;
143 }
144 std::io::Write::flush(&mut file)?;
145 drop(file);
146 Ok(())
147 }
148}
149
150impl super::Store for Store {
151 async fn import_file(
152 &self,
153 path: std::path::PathBuf,
154 _mode: ImportMode,
155 format: BlobFormat,
156 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
157 ) -> io::Result<(TempTag, u64)> {
158 let this = self.clone();
159 tokio::task::spawn_blocking(move || {
160 let id = progress.new_id();
161 progress.blocking_send(ImportProgress::Found {
162 id,
163 name: path.to_string_lossy().to_string(),
164 })?;
165 progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
166 let bytes: Bytes = std::fs::read(path)?.into();
168 let size = bytes.len() as u64;
169 progress.blocking_send(ImportProgress::Size { id, size })?;
170 let tag = this.import_bytes_sync(id, bytes, format, progress)?;
171 Ok((tag, size))
172 })
173 .await?
174 }
175
176 async fn import_stream(
177 &self,
178 mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
179 format: BlobFormat,
180 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
181 ) -> io::Result<(TempTag, u64)> {
182 let this = self.clone();
183 let id = progress.new_id();
184 let name = temp_name();
185 progress.send(ImportProgress::Found { id, name }).await?;
186 let mut bytes = BytesMut::new();
187 while let Some(chunk) = data.next().await {
188 bytes.extend_from_slice(&chunk?);
189 progress
190 .try_send(ImportProgress::CopyProgress {
191 id,
192 offset: bytes.len() as u64,
193 })
194 .ok();
195 }
196 let bytes = bytes.freeze();
197 let size = bytes.len() as u64;
198 progress.blocking_send(ImportProgress::Size { id, size })?;
199 let tag = this.import_bytes_sync(id, bytes, format, progress)?;
200 Ok((tag, size))
201 }
202
203 async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
204 let this = self.clone();
205 tokio::task::spawn_blocking(move || {
206 this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
207 })
208 .await?
209 }
210
211 async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> {
212 let mut state = self.write_lock();
213 let value = state.tags.remove(&from).ok_or_else(|| {
214 io::Error::new(
215 io::ErrorKind::NotFound,
216 format!("tag not found: {:?}", from),
217 )
218 })?;
219 state.tags.insert(to, value);
220 Ok(())
221 }
222
223 async fn set_tag(&self, name: Tag, value: HashAndFormat) -> io::Result<()> {
224 let mut state = self.write_lock();
225 state.tags.insert(name, value);
226 Ok(())
227 }
228
229 async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> {
230 let mut state = self.write_lock();
231 info!("deleting tags from {:?} to {:?}", from, to);
232 state.tags.retain(|tag, _| {
235 if let Some(from) = &from {
236 if tag < from {
237 return true;
238 }
239 }
240 if let Some(to) = &to {
241 if tag >= to {
242 return true;
243 }
244 }
245 info!(" removing {:?}", tag);
246 false
247 });
248 Ok(())
249 }
250
251 async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
252 let mut state = self.write_lock();
253 let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
254 state.tags.insert(tag.clone(), hash);
255 Ok(tag)
256 }
257
258 fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
259 self.inner.temp_tag(tag)
260 }
261
262 async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
263 where
264 G: Fn() -> Gut,
265 Gut: Future<Output = BTreeSet<Hash>> + Send,
266 {
267 super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
268 }
269
270 async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
271 let mut state = self.write_lock();
272 for hash in hashes {
273 if !state.temp.contains(&hash) {
274 state.entries.remove(&hash);
275 }
276 }
277 Ok(())
278 }
279
280 async fn shutdown(&self) {}
281
282 async fn sync(&self) -> io::Result<()> {
283 Ok(())
284 }
285}
286
287#[derive(Debug, Default)]
288struct StateInner {
289 entries: BTreeMap<Hash, Entry>,
290 tags: BTreeMap<Tag, HashAndFormat>,
291 temp: TempCounterMap,
292}
293
294#[derive(Debug, Clone)]
296pub struct Entry {
297 inner: Arc<EntryInner>,
298 complete: bool,
299}
300
301#[derive(Debug)]
302struct EntryInner {
303 hash: Hash,
304 data: RwLock<MutableMemStorage>,
305}
306
307impl MapEntry for Entry {
308 fn hash(&self) -> Hash {
309 self.inner.hash
310 }
311
312 fn size(&self) -> BaoBlobSize {
313 let size = self.inner.data.read().unwrap().current_size();
314 BaoBlobSize::new(size, self.complete)
315 }
316
317 fn is_complete(&self) -> bool {
318 self.complete
319 }
320
321 async fn outboard(&self) -> io::Result<impl Outboard> {
322 let size = self.inner.data.read().unwrap().current_size();
323 Ok(PreOrderOutboard {
324 root: self.hash().into(),
325 tree: BaoTree::new(size, IROH_BLOCK_SIZE),
326 data: OutboardReader(self.inner.clone()),
327 })
328 }
329
330 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
331 Ok(DataReader(self.inner.clone()))
332 }
333}
334
335impl MapEntryMut for Entry {
336 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
337 Ok(BatchWriter(self.inner.clone()))
338 }
339}
340
341struct DataReader(Arc<EntryInner>);
342
343impl AsyncSliceReader for DataReader {
344 async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
345 Ok(self.0.data.read().unwrap().read_data_at(offset, len))
346 }
347
348 async fn size(&mut self) -> std::io::Result<u64> {
349 Ok(self.0.data.read().unwrap().data_len())
350 }
351}
352
353struct OutboardReader(Arc<EntryInner>);
354
355impl AsyncSliceReader for OutboardReader {
356 async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
357 Ok(self.0.data.read().unwrap().read_outboard_at(offset, len))
358 }
359
360 async fn size(&mut self) -> std::io::Result<u64> {
361 Ok(self.0.data.read().unwrap().outboard_len())
362 }
363}
364
365struct BatchWriter(Arc<EntryInner>);
366
367impl super::BaoBatchWriter for BatchWriter {
368 async fn write_batch(
369 &mut self,
370 size: u64,
371 batch: Vec<bao_tree::io::fsm::BaoContentItem>,
372 ) -> io::Result<()> {
373 self.0.data.write().unwrap().write_batch(size, &batch)
374 }
375
376 async fn sync(&mut self) -> io::Result<()> {
377 Ok(())
378 }
379}
380
381impl super::Map for Store {
382 type Entry = Entry;
383
384 async fn get(&self, hash: &Hash) -> std::io::Result<Option<Self::Entry>> {
385 Ok(self.inner.0.read().unwrap().entries.get(hash).cloned())
386 }
387}
388
389impl super::MapMut for Store {
390 type EntryMut = Entry;
391
392 async fn get_mut(&self, hash: &Hash) -> std::io::Result<Option<Self::EntryMut>> {
393 self.get(hash).await
394 }
395
396 async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result<Entry> {
397 let entry = Entry {
398 inner: Arc::new(EntryInner {
399 hash,
400 data: RwLock::new(MutableMemStorage::default()),
401 }),
402 complete: false,
403 };
404 Ok(entry)
405 }
406
407 async fn entry_status(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
408 self.entry_status_sync(hash)
409 }
410
411 fn entry_status_sync(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
412 Ok(match self.inner.0.read().unwrap().entries.get(hash) {
413 Some(entry) => {
414 if entry.complete {
415 crate::store::EntryStatus::Complete
416 } else {
417 crate::store::EntryStatus::Partial
418 }
419 }
420 None => crate::store::EntryStatus::NotFound,
421 })
422 }
423
424 async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> {
425 let hash = entry.hash();
426 let mut inner = self.inner.0.write().unwrap();
427 let complete = inner
428 .entries
429 .get(&hash)
430 .map(|x| x.complete)
431 .unwrap_or_default();
432 if !complete {
433 entry.complete = true;
434 inner.entries.insert(hash, entry);
435 }
436 Ok(())
437 }
438}
439
440impl ReadableStore for Store {
441 async fn blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
442 let entries = self.read_lock().entries.clone();
443 Ok(Box::new(
444 entries
445 .into_values()
446 .filter(|x| x.complete)
447 .map(|x| Ok(x.hash())),
448 ))
449 }
450
451 async fn partial_blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
452 let entries = self.read_lock().entries.clone();
453 Ok(Box::new(
454 entries
455 .into_values()
456 .filter(|x| !x.complete)
457 .map(|x| Ok(x.hash())),
458 ))
459 }
460
461 async fn tags(
462 &self,
463 from: Option<Tag>,
464 to: Option<Tag>,
465 ) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> {
466 #[allow(clippy::mutable_key_type)]
467 let tags = self.read_lock().tags.clone();
468 let tags = tags
469 .into_iter()
470 .filter(move |(tag, _)| {
471 if let Some(from) = &from {
472 if tag < from {
473 return false;
474 }
475 }
476 if let Some(to) = &to {
477 if tag >= to {
478 return false;
479 }
480 }
481 true
482 })
483 .map(Ok);
484 Ok(Box::new(tags))
485 }
486
487 fn temp_tags(&self) -> Box<dyn Iterator<Item = crate::HashAndFormat> + Send + Sync + 'static> {
488 let tags = self.read_lock().temp.keys();
489 Box::new(tags)
490 }
491
492 async fn consistency_check(
493 &self,
494 _repair: bool,
495 _tx: BoxedProgressSender<ConsistencyCheckProgress>,
496 ) -> io::Result<()> {
497 todo!()
498 }
499
500 async fn export(
501 &self,
502 hash: Hash,
503 target: std::path::PathBuf,
504 mode: crate::store::ExportMode,
505 progress: ExportProgressCb,
506 ) -> io::Result<()> {
507 let this = self.clone();
508 tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)).await?
509 }
510}