1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap},
6 future::Future,
7 io,
8 path::PathBuf,
9 sync::Arc,
10};
11
12use bao_tree::{
13 blake3,
14 io::{outboard::PreOrderMemOutboard, sync::Outboard},
15};
16use bytes::Bytes;
17use futures_lite::Stream;
18use iroh_io::AsyncSliceReader;
19use tokio::io::AsyncWriteExt;
20
21use super::{BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, ExportProgressCb};
22use crate::{
23 store::{
24 EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut,
25 ReadableStore,
26 },
27 util::{
28 progress::{BoxedProgressSender, IdGenerator, ProgressSender},
29 Tag,
30 },
31 BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
32};
33
34#[derive(Debug, Clone, Default)]
41pub struct Store(Arc<HashMap<Hash, (PreOrderMemOutboard<Bytes>, Bytes)>>);
42
43impl<K, V> FromIterator<(K, V)> for Store
44where
45 K: Into<String>,
46 V: AsRef<[u8]>,
47{
48 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
49 let (db, _m) = Self::new(iter);
50 db
51 }
52}
53
54impl Store {
55 pub fn new(
60 entries: impl IntoIterator<Item = (impl Into<String>, impl AsRef<[u8]>)>,
61 ) -> (Self, BTreeMap<String, blake3::Hash>) {
62 let mut names = BTreeMap::new();
63 let mut res = HashMap::new();
64 for (name, data) in entries.into_iter() {
65 let name = name.into();
66 let data: &[u8] = data.as_ref();
67 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
69 let hash = outboard.root();
70 names.insert(name, hash);
72 let data = Bytes::from(data.to_vec());
73 let hash = Hash::from(hash);
74 res.insert(hash, (outboard, data));
75 }
76 (Self(Arc::new(res)), names)
77 }
78
79 pub fn insert(&mut self, data: impl AsRef<[u8]>) -> Hash {
83 let inner = Arc::make_mut(&mut self.0);
84 let data: &[u8] = data.as_ref();
85 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
87 let hash = outboard.root();
88 let data = Bytes::from(data.to_vec());
89 let hash = Hash::from(hash);
90 inner.insert(hash, (outboard, data));
91 hash
92 }
93
94 pub fn insert_many(
96 &mut self,
97 items: impl IntoIterator<Item = impl AsRef<[u8]>>,
98 ) -> Option<Hash> {
99 let mut hash = None;
100 for item in items.into_iter() {
101 hash = Some(self.insert(item));
102 }
103 hash
104 }
105
106 pub fn get_content(&self, hash: &Hash) -> Option<Bytes> {
108 let entry = self.0.get(hash)?;
109 Some(entry.1.clone())
110 }
111
112 async fn export_impl(
113 &self,
114 hash: Hash,
115 target: PathBuf,
116 _mode: ExportMode,
117 progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
118 ) -> io::Result<()> {
119 tracing::trace!("exporting {} to {}", hash, target.display());
120
121 if !target.is_absolute() {
122 return Err(io::Error::new(
123 io::ErrorKind::InvalidInput,
124 "target path must be absolute",
125 ));
126 }
127 let parent = target.parent().ok_or_else(|| {
128 io::Error::new(
129 io::ErrorKind::InvalidInput,
130 "target path has no parent directory",
131 )
132 })?;
133 tokio::fs::create_dir_all(parent).await?;
135 let data = self
136 .get_content(&hash)
137 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
138
139 let mut offset = 0u64;
140 let mut file = tokio::fs::File::create(&target).await?;
141 for chunk in data.chunks(1024 * 1024) {
142 progress(offset)?;
143 file.write_all(chunk).await?;
144 offset += chunk.len() as u64;
145 }
146 file.sync_all().await?;
147 drop(file);
148 Ok(())
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct Entry {
155 outboard: PreOrderMemOutboard<Bytes>,
156 data: Bytes,
157}
158
159impl MapEntry for Entry {
160 fn hash(&self) -> Hash {
161 self.outboard.root().into()
162 }
163
164 fn size(&self) -> BaoBlobSize {
165 BaoBlobSize::Verified(self.data.len() as u64)
166 }
167
168 async fn outboard(&self) -> io::Result<impl bao_tree::io::fsm::Outboard> {
169 Ok(self.outboard.clone())
170 }
171
172 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
173 Ok(self.data.clone())
174 }
175
176 fn is_complete(&self) -> bool {
177 true
178 }
179}
180
181impl Map for Store {
182 type Entry = Entry;
183
184 async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
185 Ok(self.0.get(hash).map(|(o, d)| Entry {
186 outboard: o.clone(),
187 data: d.clone(),
188 }))
189 }
190}
191
192impl super::MapMut for Store {
193 type EntryMut = Entry;
194
195 async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
196 self.get(hash).await
197 }
198
199 async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result<Entry> {
200 Err(io::Error::new(
201 io::ErrorKind::Other,
202 "cannot create temp entry in readonly database",
203 ))
204 }
205
206 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
207 Ok(match self.0.contains_key(hash) {
208 true => EntryStatus::Complete,
209 false => EntryStatus::NotFound,
210 })
211 }
212
213 async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
214 self.entry_status_sync(hash)
215 }
216
217 async fn insert_complete(&self, _entry: Entry) -> io::Result<()> {
218 unreachable!()
220 }
221}
222
223impl ReadableStore for Store {
224 async fn blobs(&self) -> io::Result<DbIter<Hash>> {
225 Ok(Box::new(
226 self.0
227 .keys()
228 .copied()
229 .map(Ok)
230 .collect::<Vec<_>>()
231 .into_iter(),
232 ))
233 }
234
235 async fn tags(
236 &self,
237 _from: Option<Tag>,
238 _to: Option<Tag>,
239 ) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
240 Ok(Box::new(std::iter::empty()))
241 }
242
243 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
244 Box::new(std::iter::empty())
245 }
246
247 async fn consistency_check(
248 &self,
249 _repair: bool,
250 _tx: BoxedProgressSender<ConsistencyCheckProgress>,
251 ) -> io::Result<()> {
252 Ok(())
253 }
254
255 async fn export(
256 &self,
257 hash: Hash,
258 target: PathBuf,
259 mode: ExportMode,
260 progress: ExportProgressCb,
261 ) -> io::Result<()> {
262 self.export_impl(hash, target, mode, progress).await
263 }
264
265 async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
266 Ok(Box::new(std::iter::empty()))
267 }
268}
269
270impl MapEntryMut for Entry {
271 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
272 enum Bar {}
273 impl BaoBatchWriter for Bar {
274 async fn write_batch(
275 &mut self,
276 _size: u64,
277 _batch: Vec<bao_tree::io::fsm::BaoContentItem>,
278 ) -> io::Result<()> {
279 unreachable!()
280 }
281
282 async fn sync(&mut self) -> io::Result<()> {
283 unreachable!()
284 }
285 }
286
287 #[allow(unreachable_code)]
288 Ok(unreachable!() as Bar)
289 }
290}
291
292impl super::Store for Store {
293 async fn import_file(
294 &self,
295 data: PathBuf,
296 mode: ImportMode,
297 format: BlobFormat,
298 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
299 ) -> io::Result<(TempTag, u64)> {
300 let _ = (data, mode, progress, format);
301 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
302 }
303
304 async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
306 let _ = (bytes, format);
307 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
308 }
309
310 async fn rename_tag(&self, _from: Tag, _to: Tag) -> io::Result<()> {
311 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
312 }
313
314 async fn import_stream(
315 &self,
316 data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send,
317 format: BlobFormat,
318 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
319 ) -> io::Result<(TempTag, u64)> {
320 let _ = (data, format, progress);
321 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
322 }
323
324 async fn set_tag(&self, _name: Tag, _hash: HashAndFormat) -> io::Result<()> {
325 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
326 }
327
328 async fn delete_tags(&self, _from: Option<Tag>, _to: Option<Tag>) -> io::Result<()> {
329 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
330 }
331
332 async fn create_tag(&self, _hash: HashAndFormat) -> io::Result<Tag> {
333 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
334 }
335
336 fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
337 TempTag::new(inner, None)
338 }
339
340 async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
341 where
342 G: Fn() -> Gut,
343 Gut: Future<Output = BTreeSet<Hash>> + Send,
344 {
345 super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
346 }
347
348 async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
349 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
350 }
351
352 async fn shutdown(&self) {}
353
354 async fn sync(&self) -> io::Result<()> {
355 Ok(())
356 }
357}