1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap},
6 future::Future,
7 io,
8 path::PathBuf,
9 sync::Arc,
10};
11
12use bao_tree::{
13 blake3,
14 io::{outboard::PreOrderMemOutboard, sync::Outboard},
15};
16use bytes::Bytes;
17use futures_lite::Stream;
18use iroh_io::AsyncSliceReader;
19use tokio::io::AsyncWriteExt;
20
21use super::{BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, ExportProgressCb};
22use crate::{
23 store::{
24 EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut,
25 ReadableStore,
26 },
27 util::{
28 progress::{BoxedProgressSender, IdGenerator, ProgressSender},
29 Tag,
30 },
31 BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
32};
33
34#[derive(Debug, Clone, Default)]
41pub struct Store(Arc<HashMap<Hash, (PreOrderMemOutboard<Bytes>, Bytes)>>);
42
43impl<K, V> FromIterator<(K, V)> for Store
44where
45 K: Into<String>,
46 V: AsRef<[u8]>,
47{
48 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
49 let (db, _m) = Self::new(iter);
50 db
51 }
52}
53
54impl Store {
55 pub fn new(
60 entries: impl IntoIterator<Item = (impl Into<String>, impl AsRef<[u8]>)>,
61 ) -> (Self, BTreeMap<String, blake3::Hash>) {
62 let mut names = BTreeMap::new();
63 let mut res = HashMap::new();
64 for (name, data) in entries.into_iter() {
65 let name = name.into();
66 let data: &[u8] = data.as_ref();
67 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
69 let hash = outboard.root();
70 names.insert(name, hash);
72 let data = Bytes::from(data.to_vec());
73 let hash = Hash::from(hash);
74 res.insert(hash, (outboard, data));
75 }
76 (Self(Arc::new(res)), names)
77 }
78
79 pub fn insert(&mut self, data: impl AsRef<[u8]>) -> Hash {
83 let inner = Arc::make_mut(&mut self.0);
84 let data: &[u8] = data.as_ref();
85 let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
87 let hash = outboard.root();
88 let data = Bytes::from(data.to_vec());
89 let hash = Hash::from(hash);
90 inner.insert(hash, (outboard, data));
91 hash
92 }
93
94 pub fn insert_many(
96 &mut self,
97 items: impl IntoIterator<Item = impl AsRef<[u8]>>,
98 ) -> Option<Hash> {
99 let mut hash = None;
100 for item in items.into_iter() {
101 hash = Some(self.insert(item));
102 }
103 hash
104 }
105
106 pub fn get_content(&self, hash: &Hash) -> Option<Bytes> {
108 let entry = self.0.get(hash)?;
109 Some(entry.1.clone())
110 }
111
112 async fn export_impl(
113 &self,
114 hash: Hash,
115 target: PathBuf,
116 _mode: ExportMode,
117 progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
118 ) -> io::Result<()> {
119 tracing::trace!("exporting {} to {}", hash, target.display());
120
121 if !target.is_absolute() {
122 return Err(io::Error::new(
123 io::ErrorKind::InvalidInput,
124 "target path must be absolute",
125 ));
126 }
127 let parent = target.parent().ok_or_else(|| {
128 io::Error::new(
129 io::ErrorKind::InvalidInput,
130 "target path has no parent directory",
131 )
132 })?;
133 tokio::fs::create_dir_all(parent).await?;
135 let data = self
136 .get_content(&hash)
137 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
138
139 let mut offset = 0u64;
140 let mut file = tokio::fs::File::create(&target).await?;
141 for chunk in data.chunks(1024 * 1024) {
142 progress(offset)?;
143 file.write_all(chunk).await?;
144 offset += chunk.len() as u64;
145 }
146 file.sync_all().await?;
147 drop(file);
148 Ok(())
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct Entry {
155 outboard: PreOrderMemOutboard<Bytes>,
156 data: Bytes,
157}
158
159impl MapEntry for Entry {
160 fn hash(&self) -> Hash {
161 self.outboard.root().into()
162 }
163
164 fn size(&self) -> BaoBlobSize {
165 BaoBlobSize::Verified(self.data.len() as u64)
166 }
167
168 async fn outboard(&self) -> io::Result<impl bao_tree::io::fsm::Outboard> {
169 Ok(self.outboard.clone())
170 }
171
172 async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
173 Ok(self.data.clone())
174 }
175
176 fn is_complete(&self) -> bool {
177 true
178 }
179}
180
181impl Map for Store {
182 type Entry = Entry;
183
184 async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
185 Ok(self.0.get(hash).map(|(o, d)| Entry {
186 outboard: o.clone(),
187 data: d.clone(),
188 }))
189 }
190}
191
192impl super::MapMut for Store {
193 type EntryMut = Entry;
194
195 async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
196 self.get(hash).await
197 }
198
199 async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result<Entry> {
200 Err(io::Error::new(
201 io::ErrorKind::Other,
202 "cannot create temp entry in readonly database",
203 ))
204 }
205
206 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
207 Ok(match self.0.contains_key(hash) {
208 true => EntryStatus::Complete,
209 false => EntryStatus::NotFound,
210 })
211 }
212
213 async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
214 self.entry_status_sync(hash)
215 }
216
217 async fn insert_complete(&self, _entry: Entry) -> io::Result<()> {
218 unreachable!()
220 }
221}
222
223impl ReadableStore for Store {
224 async fn blobs(&self) -> io::Result<DbIter<Hash>> {
225 Ok(Box::new(
226 self.0
227 .keys()
228 .copied()
229 .map(Ok)
230 .collect::<Vec<_>>()
231 .into_iter(),
232 ))
233 }
234
235 async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
236 Ok(Box::new(std::iter::empty()))
237 }
238
239 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
240 Box::new(std::iter::empty())
241 }
242
243 async fn consistency_check(
244 &self,
245 _repair: bool,
246 _tx: BoxedProgressSender<ConsistencyCheckProgress>,
247 ) -> io::Result<()> {
248 Ok(())
249 }
250
251 async fn export(
252 &self,
253 hash: Hash,
254 target: PathBuf,
255 mode: ExportMode,
256 progress: ExportProgressCb,
257 ) -> io::Result<()> {
258 self.export_impl(hash, target, mode, progress).await
259 }
260
261 async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
262 Ok(Box::new(std::iter::empty()))
263 }
264}
265
266impl MapEntryMut for Entry {
267 async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
268 enum Bar {}
269 impl BaoBatchWriter for Bar {
270 async fn write_batch(
271 &mut self,
272 _size: u64,
273 _batch: Vec<bao_tree::io::fsm::BaoContentItem>,
274 ) -> io::Result<()> {
275 unreachable!()
276 }
277
278 async fn sync(&mut self) -> io::Result<()> {
279 unreachable!()
280 }
281 }
282
283 #[allow(unreachable_code)]
284 Ok(unreachable!() as Bar)
285 }
286}
287
288impl super::Store for Store {
289 async fn import_file(
290 &self,
291 data: PathBuf,
292 mode: ImportMode,
293 format: BlobFormat,
294 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
295 ) -> io::Result<(TempTag, u64)> {
296 let _ = (data, mode, progress, format);
297 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
298 }
299
300 async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
302 let _ = (bytes, format);
303 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
304 }
305
306 async fn import_stream(
307 &self,
308 data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send,
309 format: BlobFormat,
310 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
311 ) -> io::Result<(TempTag, u64)> {
312 let _ = (data, format, progress);
313 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
314 }
315
316 async fn set_tag(&self, _name: Tag, _hash: Option<HashAndFormat>) -> io::Result<()> {
317 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
318 }
319
320 async fn create_tag(&self, _hash: HashAndFormat) -> io::Result<Tag> {
321 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
322 }
323
324 fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
325 TempTag::new(inner, None)
326 }
327
328 async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
329 where
330 G: Fn() -> Gut,
331 Gut: Future<Output = BTreeSet<Hash>> + Send,
332 {
333 super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
334 }
335
336 async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
337 Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
338 }
339
340 async fn shutdown(&self) {}
341
342 async fn sync(&self) -> io::Result<()> {
343 Ok(())
344 }
345}