iroh_blobs/store/
util.rs

1use std::{borrow::Borrow, fmt, time::SystemTime};
2
3use bao_tree::io::mixed::EncodedItem;
4use bytes::Bytes;
5use derive_more::{From, Into};
6
7mod sparse_mem_file;
8use irpc::channel::mpsc;
9use range_collections::{range_set::RangeSetEntry, RangeSetRef};
10use ref_cast::RefCast;
11use serde::{Deserialize, Serialize};
12pub use sparse_mem_file::SparseMemFile;
13pub mod observer;
14mod size_info;
15pub use size_info::SizeInfo;
16mod partial_mem_storage;
17pub use partial_mem_storage::PartialMemStorage;
18
19#[cfg(feature = "fs-store")]
20mod mem_or_file;
21#[cfg(feature = "fs-store")]
22pub use mem_or_file::{FixedSize, MemOrFile};
23
24/// A named, persistent tag.
25#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, From, Into)]
26pub struct Tag(pub Bytes);
27
28impl From<&[u8]> for Tag {
29    fn from(value: &[u8]) -> Self {
30        Self(Bytes::copy_from_slice(value))
31    }
32}
33
34impl AsRef<[u8]> for Tag {
35    fn as_ref(&self) -> &[u8] {
36        self.0.as_ref()
37    }
38}
39
40impl Borrow<[u8]> for Tag {
41    fn borrow(&self) -> &[u8] {
42        self.0.as_ref()
43    }
44}
45
46impl From<String> for Tag {
47    fn from(value: String) -> Self {
48        Self(Bytes::from(value))
49    }
50}
51
52impl From<&str> for Tag {
53    fn from(value: &str) -> Self {
54        Self(Bytes::from(value.to_owned()))
55    }
56}
57
58impl fmt::Display for Tag {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        let bytes = self.0.as_ref();
61        match std::str::from_utf8(bytes) {
62            Ok(s) => write!(f, "\"{s}\""),
63            Err(_) => write!(f, "{}", hex::encode(bytes)),
64        }
65    }
66}
67
68impl Tag {
69    /// Create a new tag that does not exist yet.
70    pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
71        let now = chrono::DateTime::<chrono::Utc>::from(time);
72        let mut i = 0;
73        loop {
74            let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
75            if i != 0 {
76                text.push_str(&format!("-{i}"));
77            }
78            if !exists(text.as_bytes()) {
79                return Self::from(text);
80            }
81            i += 1;
82        }
83    }
84
85    /// The successor of this tag in lexicographic order.
86    pub fn successor(&self) -> Self {
87        let mut bytes = self.0.to_vec();
88        // increment_vec(&mut bytes);
89        bytes.push(0);
90        Self(bytes.into())
91    }
92
93    /// If this is a prefix, get the next prefix.
94    ///
95    /// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte.
96    pub fn next_prefix(&self) -> Option<Self> {
97        let mut bytes = self.0.to_vec();
98        if next_prefix(&mut bytes) {
99            Some(Self(bytes.into()))
100        } else {
101            None
102        }
103    }
104}
105
106pub struct DD<T: fmt::Display>(pub T);
107
108impl<T: fmt::Display> fmt::Debug for DD<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        fmt::Display::fmt(&self.0, f)
111    }
112}
113
114impl fmt::Debug for Tag {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        f.debug_tuple("Tag").field(&DD(self)).finish()
117    }
118}
119
120pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
121    if offset < buf_len as u64 {
122        let start = offset as usize;
123        let end = start.saturating_add(len).min(buf_len);
124        start..end
125    } else {
126        0..0
127    }
128}
129
130/// zero copy get a limited slice from a `Bytes` as a `Bytes`.
131#[allow(dead_code)]
132pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
133    bytes.slice(limited_range(offset, len, bytes.len()))
134}
135
136pub trait RangeSetExt<T> {
137    fn upper_bound(&self) -> Option<T>;
138}
139
140impl<T: RangeSetEntry + Clone> RangeSetExt<T> for RangeSetRef<T> {
141    /// The upper (exclusive) bound of the bitfield
142    fn upper_bound(&self) -> Option<T> {
143        let boundaries = self.boundaries();
144        if boundaries.is_empty() {
145            Some(RangeSetEntry::min_value())
146        } else if boundaries.len() % 2 == 0 {
147            Some(boundaries[boundaries.len() - 1].clone())
148        } else {
149            None
150        }
151    }
152}
153
154#[cfg(feature = "fs-store")]
155mod fs {
156    use std::{
157        fmt,
158        fs::{File, OpenOptions},
159        io::{self, Read, Write},
160        path::Path,
161    };
162
163    use arrayvec::ArrayString;
164    use bao_tree::blake3;
165    use serde::{de::DeserializeOwned, Serialize};
166
167    mod redb_support {
168        use bytes::Bytes;
169        use redb::{Key as RedbKey, Value as RedbValue};
170
171        use super::super::Tag;
172
173        impl RedbValue for Tag {
174            type SelfType<'a> = Self;
175
176            type AsBytes<'a> = bytes::Bytes;
177
178            fn fixed_width() -> Option<usize> {
179                None
180            }
181
182            fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
183            where
184                Self: 'a,
185            {
186                Self(Bytes::copy_from_slice(data))
187            }
188
189            fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
190            where
191                Self: 'a,
192                Self: 'b,
193            {
194                value.0.clone()
195            }
196
197            fn type_name() -> redb::TypeName {
198                redb::TypeName::new("Tag")
199            }
200        }
201
202        impl RedbKey for Tag {
203            fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
204                data1.cmp(data2)
205            }
206        }
207    }
208
209    pub fn write_checksummed<P: AsRef<Path>, T: Serialize>(path: P, data: &T) -> io::Result<()> {
210        // Build Vec with space for hash
211        let mut buffer = Vec::with_capacity(32 + 128);
212        buffer.extend_from_slice(&[0u8; 32]);
213
214        // Serialize directly into buffer
215        postcard::to_io(data, &mut buffer).map_err(io::Error::other)?;
216
217        // Compute hash over data (skip first 32 bytes)
218        let data_slice = &buffer[32..];
219        let hash = blake3::hash(data_slice);
220        buffer[..32].copy_from_slice(hash.as_bytes());
221
222        // Write all at once
223        let mut file = File::create(&path)?;
224        file.write_all(&buffer)?;
225        file.sync_all()?;
226
227        Ok(())
228    }
229
230    pub fn read_checksummed_and_truncate<T: DeserializeOwned>(
231        path: impl AsRef<Path>,
232    ) -> io::Result<T> {
233        let path = path.as_ref();
234        let mut file = OpenOptions::new()
235            .read(true)
236            .write(true)
237            .truncate(false)
238            .open(path)?;
239        let mut buffer = Vec::new();
240        file.read_to_end(&mut buffer)?;
241        file.set_len(0)?;
242        file.sync_all()?;
243
244        if buffer.is_empty() {
245            return Err(io::Error::new(
246                io::ErrorKind::InvalidData,
247                "File marked dirty",
248            ));
249        }
250
251        if buffer.len() < 32 {
252            return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
253        }
254
255        let stored_hash = &buffer[..32];
256        let data = &buffer[32..];
257
258        let computed_hash = blake3::hash(data);
259        if computed_hash.as_bytes() != stored_hash {
260            return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
261        }
262
263        let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
264
265        Ok(deserialized)
266    }
267
268    #[cfg(test)]
269    pub fn read_checksummed<T: DeserializeOwned>(path: impl AsRef<Path>) -> io::Result<T> {
270        use std::{fs::File, io::Read};
271
272        use bao_tree::blake3;
273        use tracing::info;
274
275        let path = path.as_ref();
276        let mut file = File::open(path)?;
277        let mut buffer = Vec::new();
278        file.read_to_end(&mut buffer)?;
279        info!("{} {}", path.display(), hex::encode(&buffer));
280
281        if buffer.is_empty() {
282            use std::io;
283
284            return Err(io::Error::new(
285                io::ErrorKind::InvalidData,
286                "File marked dirty",
287            ));
288        }
289
290        if buffer.len() < 32 {
291            return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
292        }
293
294        let stored_hash = &buffer[..32];
295        let data = &buffer[32..];
296
297        let computed_hash = blake3::hash(data);
298        if computed_hash.as_bytes() != stored_hash {
299            return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
300        }
301
302        let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
303
304        Ok(deserialized)
305    }
306
307    /// Helper trait for bytes for debugging
308    pub trait SliceInfoExt: AsRef<[u8]> {
309        // get the addr of the actual data, to check if data was copied
310        fn addr(&self) -> usize;
311
312        // a short symbol string for the address
313        fn addr_short(&self) -> ArrayString<12> {
314            let addr = self.addr().to_le_bytes();
315            symbol_string(&addr)
316        }
317
318        #[allow(dead_code)]
319        fn hash_short(&self) -> ArrayString<10> {
320            crate::Hash::new(self.as_ref()).fmt_short()
321        }
322    }
323
324    impl<T: AsRef<[u8]>> SliceInfoExt for T {
325        fn addr(&self) -> usize {
326            self.as_ref() as *const [u8] as *const u8 as usize
327        }
328
329        fn hash_short(&self) -> ArrayString<10> {
330            crate::Hash::new(self.as_ref()).fmt_short()
331        }
332    }
333
334    pub fn symbol_string(data: &[u8]) -> ArrayString<12> {
335        const SYMBOLS: &[char] = &[
336            '😀', '😂', '😍', '😎', '😢', '😡', '😱', '😴', '🤓', '🤔', '🤗', '🤢', '🤡', '🤖',
337            '👽', '👾', '👻', '💀', '💩', '♥', '💥', '💦', '💨', '💫', '💬', '💭', '💰', '💳',
338            '💼', '📈', '📉', '📍', '📢', '📦', '📱', '📷', '📺', '🎃', '🎄', '🎉', '🎋', '🎍',
339            '🎒', '🎓', '🎖', '🎤', '🎧', '🎮', '🎰', '🎲', '🎳', '🎴', '🎵', '🎷', '🎸', '🎹',
340            '🎺', '🎻', '🎼', '🏀', '🏁', '🏆', '🏈',
341        ];
342        const BASE: usize = SYMBOLS.len(); // 64
343
344        // Hash the input with BLAKE3
345        let hash = blake3::hash(data);
346        let bytes = hash.as_bytes(); // 32-byte hash
347
348        // Create an ArrayString with capacity 12 (bytes)
349        let mut result = ArrayString::<12>::new();
350
351        // Fill with 3 symbols
352        for byte in bytes.iter().take(3) {
353            let byte = *byte as usize;
354            let index = byte % BASE;
355            result.push(SYMBOLS[index]); // Each char can be up to 4 bytes
356        }
357
358        result
359    }
360
361    pub struct ValueOrPoisioned<T>(pub Option<T>);
362
363    impl<T: fmt::Debug> fmt::Debug for ValueOrPoisioned<T> {
364        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365            match &self.0 {
366                Some(x) => x.fmt(f),
367                None => f.debug_tuple("Poisoned").finish(),
368            }
369        }
370    }
371}
372#[cfg(feature = "fs-store")]
373pub use fs::*;
374
375/// Given a prefix, increment it lexographically.
376///
377/// If the prefix is all FF, this will return false because there is no
378/// higher prefix than that.
379#[allow(dead_code)]
380pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool {
381    for byte in bytes.iter_mut().rev() {
382        if *byte < 255 {
383            *byte += 1;
384            return true;
385        }
386        *byte = 0;
387    }
388    false
389}
390
391#[derive(ref_cast::RefCast)]
392#[repr(transparent)]
393pub struct BaoTreeSender(mpsc::Sender<EncodedItem>);
394
395impl BaoTreeSender {
396    pub fn new(sender: &mut mpsc::Sender<EncodedItem>) -> &mut Self {
397        BaoTreeSender::ref_cast_mut(sender)
398    }
399}
400
401impl bao_tree::io::mixed::Sender for BaoTreeSender {
402    type Error = irpc::channel::SendError;
403    async fn send(&mut self, item: EncodedItem) -> std::result::Result<(), Self::Error> {
404        self.0.send(item).await
405    }
406}