1use std::{
5 collections::{BTreeMap, BTreeSet},
6 future::Future,
7 io,
8 path::PathBuf,
9 sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
10 time::SystemTime,
11};
12
13use bao_tree::{
14 io::{fsm::Outboard, outboard::PreOrderOutboard, sync::WriteAt},
15 BaoTree,
16};
17use bytes::{Bytes, BytesMut};
18use futures_lite::{Stream, StreamExt};
19use iroh_io::AsyncSliceReader;
20
21use super::{
22 temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode,
23 ImportProgress, Map, TempCounterMap,
24};
25use crate::{
26 store::{
27 mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore,
28 },
29 util::{
30 progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
31 TagCounter, TagDrop,
32 },
33 BlobFormat, Hash, HashAndFormat, Tag, TempTag, IROH_BLOCK_SIZE,
34};
35
36#[derive(Debug, Clone, Default)]
39pub struct Store {
40 inner: Arc<StoreInner>,
41}
42
43#[derive(Debug, Default)]
44struct StoreInner(RwLock<StateInner>);
45
46impl TagDrop for StoreInner {
47 fn on_drop(&self, inner: &HashAndFormat) {
48 tracing::trace!("temp tag drop: {:?}", inner);
49 let mut state = self.0.write().unwrap();
50 state.temp.dec(inner);
51 }
52}
53
54impl TagCounter for StoreInner {
55 fn on_create(&self, inner: &HashAndFormat) {
56 tracing::trace!("temp tagging: {:?}", inner);
57 let mut state = self.0.write().unwrap();
58 state.temp.inc(inner);
59 }
60}
61
62impl Store {
63 pub fn new() -> Self {
65 Self::default()
66 }
67
68 fn write_lock(&self) -> RwLockWriteGuard<'_, StateInner> {
70 self.inner.0.write().unwrap()
71 }
72
73 fn read_lock(&self) -> RwLockReadGuard<'_, StateInner> {
75 self.inner.0.read().unwrap()
76 }
77
78 fn import_bytes_sync(
79 &self,
80 id: u64,
81 bytes: Bytes,
82 format: BlobFormat,
83 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
84 ) -> io::Result<TempTag> {
85 progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
86 let progress2 = progress.clone();
87 let cb = move |offset| {
88 progress2
89 .try_send(ImportProgress::OutboardProgress { id, offset })
90 .ok();
91 };
92 let (storage, hash) = MutableMemStorage::complete(bytes, cb);
93 progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
94 use super::Store;
95 let tag = self.temp_tag(HashAndFormat { hash, format });
96 let entry = Entry {
97 inner: Arc::new(EntryInner {
98 hash,
99 data: RwLock::new(storage),
100 }),
101 complete: true,
102 };
103 self.write_lock().entries.insert(hash, entry);
104 Ok(tag)
105 }
106
107 fn export_sync(
108 &self,
109 hash: Hash,
110 target: PathBuf,
111 _mode: ExportMode,
112 progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
113 ) -> io::Result<()> {
114 tracing::trace!("exporting {} to {}", hash, target.display());
115
116 if !target.is_absolute() {
117 return Err(io::Error::new(
118 io::ErrorKind::InvalidInput,
119 "target path must be absolute",
120 ));
121 }
122 let parent = target.parent().ok_or_else(|| {
123 io::Error::new(
124 io::ErrorKind::InvalidInput,
125 "target path has no parent directory",
126 )
127 })?;
128 std::fs::create_dir_all(parent)?;
130 let state = self.read_lock();
131 let entry = state
132 .entries
133 .get(&hash)
134 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
135 let reader = &entry.inner.data;
136 let size = reader.read().unwrap().current_size();
137 let mut file = std::fs::File::create(target)?;
138 for offset in (0..size).step_by(1024 * 1024) {
139 let bytes = reader.read().unwrap().read_data_at(offset, 1024 * 1024);
140 file.write_at(offset, &bytes)?;
141 progress(offset)?;
142 }
143 std::io::Write::flush(&mut file)?;
144 drop(file);
145 Ok(())
146 }
147}
148
149impl super::Store for Store {
150 async fn import_file(
151 &self,
152 path: std::path::PathBuf,
153 _mode: ImportMode,
154 format: BlobFormat,
155 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
156 ) -> io::Result<(TempTag, u64)> {
157 let this = self.clone();
158 tokio::task::spawn_blocking(move || {
159 let id = progress.new_id();
160 progress.blocking_send(ImportProgress::Found {
161 id,
162 name: path.to_string_lossy().to_string(),
163 })?;
164 progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
165 let bytes: Bytes = std::fs::read(path)?.into();
167 let size = bytes.len() as u64;
168 progress.blocking_send(ImportProgress::Size { id, size })?;
169 let tag = this.import_bytes_sync(id, bytes, format, progress)?;
170 Ok((tag, size))
171 })
172 .await?
173 }
174
175 async fn import_stream(
176 &self,
177 mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
178 format: BlobFormat,
179 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
180 ) -> io::Result<(TempTag, u64)> {
181 let this = self.clone();
182 let id = progress.new_id();
183 let name = temp_name();
184 progress.send(ImportProgress::Found { id, name }).await?;
185 let mut bytes = BytesMut::new();
186 while let Some(chunk) = data.next().await {
187 bytes.extend_from_slice(&chunk?);
188 progress
189 .try_send(ImportProgress::CopyProgress {
190 id,
191 offset: bytes.len() as u64,
192 })
193 .ok();
194 }
195 let bytes = bytes.freeze();
196 let size = bytes.len() as u64;
197 progress.blocking_send(ImportProgress::Size { id, size })?;
198 let tag = this.import_bytes_sync(id, bytes, format, progress)?;
199 Ok((tag, size))
200 }
201
202 async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
203 let this = self.clone();
204 tokio::task::spawn_blocking(move || {
205 this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
206 })
207 .await?
208 }
209
210 async fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> {
211 let mut state = self.write_lock();
212 if let Some(value) = value {
213 state.tags.insert(name, value);
214 } else {
215 state.tags.remove(&name);
216 }
217 Ok(())
218 }
219
220 async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
221 let mut state = self.write_lock();
222 let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
223 state.tags.insert(tag.clone(), hash);
224 Ok(tag)
225 }
226
227 fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
228 self.inner.temp_tag(tag)
229 }
230
231 async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
232 where
233 G: Fn() -> Gut,
234 Gut: Future<Output = BTreeSet<Hash>> + Send,
235 {
236 super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
237 }
238
239 async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
240 let mut state = self.write_lock();
241 for hash in hashes {
242 if !state.temp.contains(&hash) {
243 state.entries.remove(&hash);
244 }
245 }
246 Ok(())
247 }
248
249 async fn shutdown(&self) {}
250
251 async fn sync(&self) -> io::Result<()> {
252 Ok(())
253 }
254}
255
256#[derive(Debug, Default)]
257struct StateInner {
258 entries: BTreeMap<Hash, Entry>,
259 tags: BTreeMap<Tag, HashAndFormat>,
260 temp: TempCounterMap,
261}
262
263#[derive(Debug, Clone)]
265pub struct Entry {
266 inner: Arc<EntryInner>,
267 complete: bool,
268}
269
270#[derive(Debug)]
271struct EntryInner {
272 hash: Hash,
273 data: RwLock<MutableMemStorage>,
274}
275
276impl MapEntry for Entry {
277 fn hash(&self) -> Hash {
278 self.inner.hash
279 }
280
281 fn size(&self) -> BaoBlobSize {
282 let size = self.inner.data.read().unwrap().current_size();
283 BaoBlobSize::new(size, self.complete)
284 }
285
286 fn is_complete(&self) -> bool {
287 self.complete
288 }
289
290 async fn outboard(&self) -> io::Result<impl Outboard> {
291 let size = self.inner.data.read().unwrap().current_size();
292 Ok(PreOrderOutboard {
293 root: self.hash().into(),
294 tree: BaoTree::new(size, IROH_BLOCK_SIZE),
295 data: OutboardReader(self.inner.clone()),
296 })
297 }
298
299 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
300 Ok(DataReader(self.inner.clone()))
301 }
302}
303
304impl MapEntryMut for Entry {
305 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
306 Ok(BatchWriter(self.inner.clone()))
307 }
308}
309
310struct DataReader(Arc<EntryInner>);
311
312impl AsyncSliceReader for DataReader {
313 async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
314 Ok(self.0.data.read().unwrap().read_data_at(offset, len))
315 }
316
317 async fn size(&mut self) -> std::io::Result<u64> {
318 Ok(self.0.data.read().unwrap().data_len())
319 }
320}
321
322struct OutboardReader(Arc<EntryInner>);
323
324impl AsyncSliceReader for OutboardReader {
325 async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
326 Ok(self.0.data.read().unwrap().read_outboard_at(offset, len))
327 }
328
329 async fn size(&mut self) -> std::io::Result<u64> {
330 Ok(self.0.data.read().unwrap().outboard_len())
331 }
332}
333
334struct BatchWriter(Arc<EntryInner>);
335
336impl super::BaoBatchWriter for BatchWriter {
337 async fn write_batch(
338 &mut self,
339 size: u64,
340 batch: Vec<bao_tree::io::fsm::BaoContentItem>,
341 ) -> io::Result<()> {
342 self.0.data.write().unwrap().write_batch(size, &batch)
343 }
344
345 async fn sync(&mut self) -> io::Result<()> {
346 Ok(())
347 }
348}
349
350impl super::Map for Store {
351 type Entry = Entry;
352
353 async fn get(&self, hash: &Hash) -> std::io::Result<Option<Self::Entry>> {
354 Ok(self.inner.0.read().unwrap().entries.get(hash).cloned())
355 }
356}
357
358impl super::MapMut for Store {
359 type EntryMut = Entry;
360
361 async fn get_mut(&self, hash: &Hash) -> std::io::Result<Option<Self::EntryMut>> {
362 self.get(hash).await
363 }
364
365 async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result<Entry> {
366 let entry = Entry {
367 inner: Arc::new(EntryInner {
368 hash,
369 data: RwLock::new(MutableMemStorage::default()),
370 }),
371 complete: false,
372 };
373 Ok(entry)
374 }
375
376 async fn entry_status(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
377 self.entry_status_sync(hash)
378 }
379
380 fn entry_status_sync(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
381 Ok(match self.inner.0.read().unwrap().entries.get(hash) {
382 Some(entry) => {
383 if entry.complete {
384 crate::store::EntryStatus::Complete
385 } else {
386 crate::store::EntryStatus::Partial
387 }
388 }
389 None => crate::store::EntryStatus::NotFound,
390 })
391 }
392
393 async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> {
394 let hash = entry.hash();
395 let mut inner = self.inner.0.write().unwrap();
396 let complete = inner
397 .entries
398 .get(&hash)
399 .map(|x| x.complete)
400 .unwrap_or_default();
401 if !complete {
402 entry.complete = true;
403 inner.entries.insert(hash, entry);
404 }
405 Ok(())
406 }
407}
408
409impl ReadableStore for Store {
410 async fn blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
411 let entries = self.read_lock().entries.clone();
412 Ok(Box::new(
413 entries
414 .into_values()
415 .filter(|x| x.complete)
416 .map(|x| Ok(x.hash())),
417 ))
418 }
419
420 async fn partial_blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
421 let entries = self.read_lock().entries.clone();
422 Ok(Box::new(
423 entries
424 .into_values()
425 .filter(|x| !x.complete)
426 .map(|x| Ok(x.hash())),
427 ))
428 }
429
430 async fn tags(&self) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> {
431 #[allow(clippy::mutable_key_type)]
432 let tags = self.read_lock().tags.clone();
433 Ok(Box::new(tags.into_iter().map(Ok)))
434 }
435
436 fn temp_tags(&self) -> Box<dyn Iterator<Item = crate::HashAndFormat> + Send + Sync + 'static> {
437 let tags = self.read_lock().temp.keys();
438 Box::new(tags)
439 }
440
441 async fn consistency_check(
442 &self,
443 _repair: bool,
444 _tx: BoxedProgressSender<ConsistencyCheckProgress>,
445 ) -> io::Result<()> {
446 todo!()
447 }
448
449 async fn export(
450 &self,
451 hash: Hash,
452 target: std::path::PathBuf,
453 mode: crate::store::ExportMode,
454 progress: ExportProgressCb,
455 ) -> io::Result<()> {
456 let this = self.clone();
457 tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)).await?
458 }
459}