1use bao_tree::{
5 io::{fsm::Outboard, outboard::PreOrderOutboard, sync::WriteAt},
6 BaoTree,
7};
8use bytes::{Bytes, BytesMut};
9use futures_lite::{Stream, StreamExt};
10use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
11use iroh_io::AsyncSliceReader;
12use std::{
13 collections::BTreeMap,
14 io,
15 path::PathBuf,
16 sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
17 time::SystemTime,
18};
19
20use crate::{
21 store::{
22 mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore,
23 },
24 util::{
25 progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
26 LivenessTracker,
27 },
28 Tag, TempTag, IROH_BLOCK_SIZE,
29};
30
31use super::{
32 temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode,
33 ImportProgress, Map, TempCounterMap,
34};
35
36#[derive(Debug, Clone, Default)]
39pub struct Store {
40 inner: Arc<StoreInner>,
41}
42
43#[derive(Debug, Default)]
44struct StoreInner(RwLock<StateInner>);
45
46impl LivenessTracker for StoreInner {
47 fn on_clone(&self, inner: &HashAndFormat) {
48 tracing::trace!("temp tagging: {:?}", inner);
49 let mut state = self.0.write().unwrap();
50 state.temp.inc(inner);
51 }
52
53 fn on_drop(&self, inner: &HashAndFormat) {
54 tracing::trace!("temp tag drop: {:?}", inner);
55 let mut state = self.0.write().unwrap();
56 state.temp.dec(inner);
57 }
58}
59
60impl Store {
61 pub fn new() -> Self {
63 Self::default()
64 }
65
66 fn write_lock(&self) -> RwLockWriteGuard<'_, StateInner> {
68 self.inner.0.write().unwrap()
69 }
70
71 fn read_lock(&self) -> RwLockReadGuard<'_, StateInner> {
73 self.inner.0.read().unwrap()
74 }
75
76 fn import_bytes_sync(
77 &self,
78 id: u64,
79 bytes: Bytes,
80 format: BlobFormat,
81 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
82 ) -> io::Result<TempTag> {
83 progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
84 let (storage, hash) = MutableMemStorage::complete(bytes);
85 progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
86 use super::Store;
87 let tag = self.temp_tag(HashAndFormat { hash, format });
88 let entry = Entry {
89 inner: Arc::new(EntryInner {
90 hash,
91 data: RwLock::new(storage),
92 }),
93 complete: true,
94 };
95 self.write_lock().entries.insert(hash, entry);
96 Ok(tag)
97 }
98
99 fn export_sync(
100 &self,
101 hash: Hash,
102 target: PathBuf,
103 _mode: ExportMode,
104 progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
105 ) -> io::Result<()> {
106 tracing::trace!("exporting {} to {}", hash, target.display());
107
108 if !target.is_absolute() {
109 return Err(io::Error::new(
110 io::ErrorKind::InvalidInput,
111 "target path must be absolute",
112 ));
113 }
114 let parent = target.parent().ok_or_else(|| {
115 io::Error::new(
116 io::ErrorKind::InvalidInput,
117 "target path has no parent directory",
118 )
119 })?;
120 std::fs::create_dir_all(parent)?;
122 let state = self.read_lock();
123 let entry = state
124 .entries
125 .get(&hash)
126 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
127 let reader = &entry.inner.data;
128 let size = reader.read().unwrap().current_size();
129 let mut file = std::fs::File::create(target)?;
130 for offset in (0..size).step_by(1024 * 1024) {
131 let bytes = reader.read().unwrap().read_data_at(offset, 1024 * 1024);
132 file.write_at(offset, &bytes)?;
133 progress(offset)?;
134 }
135 std::io::Write::flush(&mut file)?;
136 drop(file);
137 Ok(())
138 }
139}
140
141impl super::Store for Store {
142 async fn import_file(
143 &self,
144 path: std::path::PathBuf,
145 _mode: ImportMode,
146 format: BlobFormat,
147 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
148 ) -> io::Result<(TempTag, u64)> {
149 let this = self.clone();
150 tokio::task::spawn_blocking(move || {
151 let id = progress.new_id();
152 progress.blocking_send(ImportProgress::Found {
153 id,
154 name: path.to_string_lossy().to_string(),
155 })?;
156 progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
157 let bytes: Bytes = std::fs::read(path)?.into();
159 let size = bytes.len() as u64;
160 progress.blocking_send(ImportProgress::Size { id, size })?;
161 let tag = this.import_bytes_sync(id, bytes, format, progress)?;
162 Ok((tag, size))
163 })
164 .await?
165 }
166
167 async fn import_stream(
168 &self,
169 mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
170 format: BlobFormat,
171 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
172 ) -> io::Result<(TempTag, u64)> {
173 let this = self.clone();
174 let id = progress.new_id();
175 let name = temp_name();
176 progress.send(ImportProgress::Found { id, name }).await?;
177 let mut bytes = BytesMut::new();
178 while let Some(chunk) = data.next().await {
179 bytes.extend_from_slice(&chunk?);
180 progress
181 .try_send(ImportProgress::CopyProgress {
182 id,
183 offset: bytes.len() as u64,
184 })
185 .ok();
186 }
187 let bytes = bytes.freeze();
188 let size = bytes.len() as u64;
189 progress.blocking_send(ImportProgress::Size { id, size })?;
190 let tag = this.import_bytes_sync(id, bytes, format, progress)?;
191 Ok((tag, size))
192 }
193
194 async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
195 let this = self.clone();
196 tokio::task::spawn_blocking(move || {
197 this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
198 })
199 .await?
200 }
201
202 async fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> {
203 let mut state = self.write_lock();
204 if let Some(value) = value {
205 state.tags.insert(name, value);
206 } else {
207 state.tags.remove(&name);
208 }
209 Ok(())
210 }
211
212 async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
213 let mut state = self.write_lock();
214 let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
215 state.tags.insert(tag.clone(), hash);
216 Ok(tag)
217 }
218
219 fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
220 TempTag::new(tag, Some(self.inner.clone()))
221 }
222
223 async fn gc_start(&self) -> io::Result<()> {
224 Ok(())
225 }
226
227 async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
228 let mut state = self.write_lock();
229 for hash in hashes {
230 if !state.temp.contains(&hash) {
231 state.entries.remove(&hash);
232 }
233 }
234 Ok(())
235 }
236
237 async fn shutdown(&self) {}
238}
239
240#[derive(Debug, Default)]
241struct StateInner {
242 entries: BTreeMap<Hash, Entry>,
243 tags: BTreeMap<Tag, HashAndFormat>,
244 temp: TempCounterMap,
245}
246
247#[derive(Debug, Clone)]
249pub struct Entry {
250 inner: Arc<EntryInner>,
251 complete: bool,
252}
253
254#[derive(Debug)]
255struct EntryInner {
256 hash: Hash,
257 data: RwLock<MutableMemStorage>,
258}
259
260impl MapEntry for Entry {
261 fn hash(&self) -> Hash {
262 self.inner.hash
263 }
264
265 fn size(&self) -> BaoBlobSize {
266 let size = self.inner.data.read().unwrap().current_size();
267 BaoBlobSize::new(size, self.complete)
268 }
269
270 fn is_complete(&self) -> bool {
271 self.complete
272 }
273
274 async fn outboard(&self) -> io::Result<impl Outboard> {
275 let size = self.inner.data.read().unwrap().current_size();
276 Ok(PreOrderOutboard {
277 root: self.hash().into(),
278 tree: BaoTree::new(size, IROH_BLOCK_SIZE),
279 data: OutboardReader(self.inner.clone()),
280 })
281 }
282
283 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
284 Ok(DataReader(self.inner.clone()))
285 }
286}
287
288impl MapEntryMut for Entry {
289 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
290 Ok(BatchWriter(self.inner.clone()))
291 }
292}
293
294struct DataReader(Arc<EntryInner>);
295
296impl AsyncSliceReader for DataReader {
297 async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
298 Ok(self.0.data.read().unwrap().read_data_at(offset, len))
299 }
300
301 async fn size(&mut self) -> std::io::Result<u64> {
302 Ok(self.0.data.read().unwrap().data_len())
303 }
304}
305
306struct OutboardReader(Arc<EntryInner>);
307
308impl AsyncSliceReader for OutboardReader {
309 async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
310 Ok(self.0.data.read().unwrap().read_outboard_at(offset, len))
311 }
312
313 async fn size(&mut self) -> std::io::Result<u64> {
314 Ok(self.0.data.read().unwrap().outboard_len())
315 }
316}
317
318struct BatchWriter(Arc<EntryInner>);
319
320impl super::BaoBatchWriter for BatchWriter {
321 async fn write_batch(
322 &mut self,
323 size: u64,
324 batch: Vec<bao_tree::io::fsm::BaoContentItem>,
325 ) -> io::Result<()> {
326 self.0.data.write().unwrap().write_batch(size, &batch)
327 }
328
329 async fn sync(&mut self) -> io::Result<()> {
330 Ok(())
331 }
332}
333
334impl super::Map for Store {
335 type Entry = Entry;
336
337 async fn get(&self, hash: &Hash) -> std::io::Result<Option<Self::Entry>> {
338 Ok(self.inner.0.read().unwrap().entries.get(hash).cloned())
339 }
340}
341
342impl super::MapMut for Store {
343 type EntryMut = Entry;
344
345 async fn get_mut(&self, hash: &Hash) -> std::io::Result<Option<Self::EntryMut>> {
346 self.get(hash).await
347 }
348
349 async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result<Entry> {
350 let entry = Entry {
351 inner: Arc::new(EntryInner {
352 hash,
353 data: RwLock::new(MutableMemStorage::default()),
354 }),
355 complete: false,
356 };
357 Ok(entry)
358 }
359
360 async fn entry_status(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
361 self.entry_status_sync(hash)
362 }
363
364 fn entry_status_sync(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
365 Ok(match self.inner.0.read().unwrap().entries.get(hash) {
366 Some(entry) => {
367 if entry.complete {
368 crate::store::EntryStatus::Complete
369 } else {
370 crate::store::EntryStatus::Partial
371 }
372 }
373 None => crate::store::EntryStatus::NotFound,
374 })
375 }
376
377 async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> {
378 let hash = entry.hash();
379 let mut inner = self.inner.0.write().unwrap();
380 let complete = inner
381 .entries
382 .get(&hash)
383 .map(|x| x.complete)
384 .unwrap_or_default();
385 if !complete {
386 entry.complete = true;
387 inner.entries.insert(hash, entry);
388 }
389 Ok(())
390 }
391}
392
393impl ReadableStore for Store {
394 async fn blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
395 let entries = self.read_lock().entries.clone();
396 Ok(Box::new(
397 entries
398 .into_values()
399 .filter(|x| x.complete)
400 .map(|x| Ok(x.hash())),
401 ))
402 }
403
404 async fn partial_blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
405 let entries = self.read_lock().entries.clone();
406 Ok(Box::new(
407 entries
408 .into_values()
409 .filter(|x| !x.complete)
410 .map(|x| Ok(x.hash())),
411 ))
412 }
413
414 async fn tags(
415 &self,
416 ) -> io::Result<crate::store::DbIter<(crate::Tag, iroh_base::hash::HashAndFormat)>> {
417 #[allow(clippy::mutable_key_type)]
418 let tags = self.read_lock().tags.clone();
419 Ok(Box::new(tags.into_iter().map(Ok)))
420 }
421
422 fn temp_tags(
423 &self,
424 ) -> Box<dyn Iterator<Item = iroh_base::hash::HashAndFormat> + Send + Sync + 'static> {
425 let tags = self.read_lock().temp.keys();
426 Box::new(tags)
427 }
428
429 async fn consistency_check(
430 &self,
431 _repair: bool,
432 _tx: BoxedProgressSender<ConsistencyCheckProgress>,
433 ) -> io::Result<()> {
434 todo!()
435 }
436
437 async fn export(
438 &self,
439 hash: Hash,
440 target: std::path::PathBuf,
441 mode: crate::store::ExportMode,
442 progress: ExportProgressCb,
443 ) -> io::Result<()> {
444 let this = self.clone();
445 tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)).await?
446 }
447}