iroh_blobs/store/
util.rs

1use std::{
2    borrow::Borrow,
3    fmt,
4    fs::{File, OpenOptions},
5    io::{self, Read, Write},
6    path::Path,
7    time::SystemTime,
8};
9
10use arrayvec::ArrayString;
11use bao_tree::{blake3, io::mixed::EncodedItem};
12use bytes::Bytes;
13use derive_more::{From, Into};
14
15mod mem_or_file;
16mod sparse_mem_file;
17use irpc::channel::mpsc;
18pub use mem_or_file::{FixedSize, MemOrFile};
19use range_collections::{range_set::RangeSetEntry, RangeSetRef};
20use ref_cast::RefCast;
21use serde::{de::DeserializeOwned, Deserialize, Serialize};
22pub use sparse_mem_file::SparseMemFile;
23pub mod observer;
24mod size_info;
25pub use size_info::SizeInfo;
26mod partial_mem_storage;
27pub use partial_mem_storage::PartialMemStorage;
28
29/// A named, persistent tag.
30#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, From, Into)]
31pub struct Tag(pub Bytes);
32
33impl From<&[u8]> for Tag {
34    fn from(value: &[u8]) -> Self {
35        Self(Bytes::copy_from_slice(value))
36    }
37}
38
39impl AsRef<[u8]> for Tag {
40    fn as_ref(&self) -> &[u8] {
41        self.0.as_ref()
42    }
43}
44
45impl Borrow<[u8]> for Tag {
46    fn borrow(&self) -> &[u8] {
47        self.0.as_ref()
48    }
49}
50
51impl From<String> for Tag {
52    fn from(value: String) -> Self {
53        Self(Bytes::from(value))
54    }
55}
56
57impl From<&str> for Tag {
58    fn from(value: &str) -> Self {
59        Self(Bytes::from(value.to_owned()))
60    }
61}
62
63impl fmt::Display for Tag {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        let bytes = self.0.as_ref();
66        match std::str::from_utf8(bytes) {
67            Ok(s) => write!(f, "\"{s}\""),
68            Err(_) => write!(f, "{}", hex::encode(bytes)),
69        }
70    }
71}
72
73impl Tag {
74    /// Create a new tag that does not exist yet.
75    pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
76        let now = chrono::DateTime::<chrono::Utc>::from(time);
77        let mut i = 0;
78        loop {
79            let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
80            if i != 0 {
81                text.push_str(&format!("-{i}"));
82            }
83            if !exists(text.as_bytes()) {
84                return Self::from(text);
85            }
86            i += 1;
87        }
88    }
89
90    /// The successor of this tag in lexicographic order.
91    pub fn successor(&self) -> Self {
92        let mut bytes = self.0.to_vec();
93        // increment_vec(&mut bytes);
94        bytes.push(0);
95        Self(bytes.into())
96    }
97
98    /// If this is a prefix, get the next prefix.
99    ///
100    /// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte.
101    pub fn next_prefix(&self) -> Option<Self> {
102        let mut bytes = self.0.to_vec();
103        if next_prefix(&mut bytes) {
104            Some(Self(bytes.into()))
105        } else {
106            None
107        }
108    }
109}
110
111pub struct DD<T: fmt::Display>(pub T);
112
113impl<T: fmt::Display> fmt::Debug for DD<T> {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        fmt::Display::fmt(&self.0, f)
116    }
117}
118
119impl fmt::Debug for Tag {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        f.debug_tuple("Tag").field(&DD(self)).finish()
122    }
123}
124
125pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
126    if offset < buf_len as u64 {
127        let start = offset as usize;
128        let end = start.saturating_add(len).min(buf_len);
129        start..end
130    } else {
131        0..0
132    }
133}
134
135/// zero copy get a limited slice from a `Bytes` as a `Bytes`.
136#[allow(dead_code)]
137pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
138    bytes.slice(limited_range(offset, len, bytes.len()))
139}
140
141mod redb_support {
142    use bytes::Bytes;
143    use redb::{Key as RedbKey, Value as RedbValue};
144
145    use super::Tag;
146
147    impl RedbValue for Tag {
148        type SelfType<'a> = Self;
149
150        type AsBytes<'a> = bytes::Bytes;
151
152        fn fixed_width() -> Option<usize> {
153            None
154        }
155
156        fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
157        where
158            Self: 'a,
159        {
160            Self(Bytes::copy_from_slice(data))
161        }
162
163        fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
164        where
165            Self: 'a,
166            Self: 'b,
167        {
168            value.0.clone()
169        }
170
171        fn type_name() -> redb::TypeName {
172            redb::TypeName::new("Tag")
173        }
174    }
175
176    impl RedbKey for Tag {
177        fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
178            data1.cmp(data2)
179        }
180    }
181}
182
183pub trait RangeSetExt<T> {
184    fn upper_bound(&self) -> Option<T>;
185}
186
187impl<T: RangeSetEntry + Clone> RangeSetExt<T> for RangeSetRef<T> {
188    /// The upper (exclusive) bound of the bitfield
189    fn upper_bound(&self) -> Option<T> {
190        let boundaries = self.boundaries();
191        if boundaries.is_empty() {
192            Some(RangeSetEntry::min_value())
193        } else if boundaries.len() % 2 == 0 {
194            Some(boundaries[boundaries.len() - 1].clone())
195        } else {
196            None
197        }
198    }
199}
200
201pub fn write_checksummed<P: AsRef<Path>, T: Serialize>(path: P, data: &T) -> io::Result<()> {
202    // Build Vec with space for hash
203    let mut buffer = Vec::with_capacity(32 + 128);
204    buffer.extend_from_slice(&[0u8; 32]);
205
206    // Serialize directly into buffer
207    postcard::to_io(data, &mut buffer).map_err(io::Error::other)?;
208
209    // Compute hash over data (skip first 32 bytes)
210    let data_slice = &buffer[32..];
211    let hash = blake3::hash(data_slice);
212    buffer[..32].copy_from_slice(hash.as_bytes());
213
214    // Write all at once
215    let mut file = File::create(&path)?;
216    file.write_all(&buffer)?;
217    file.sync_all()?;
218
219    Ok(())
220}
221
222pub fn read_checksummed_and_truncate<T: DeserializeOwned>(path: impl AsRef<Path>) -> io::Result<T> {
223    let path = path.as_ref();
224    let mut file = OpenOptions::new()
225        .read(true)
226        .write(true)
227        .truncate(false)
228        .open(path)?;
229    let mut buffer = Vec::new();
230    file.read_to_end(&mut buffer)?;
231    file.set_len(0)?;
232    file.sync_all()?;
233
234    if buffer.is_empty() {
235        return Err(io::Error::new(
236            io::ErrorKind::InvalidData,
237            "File marked dirty",
238        ));
239    }
240
241    if buffer.len() < 32 {
242        return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
243    }
244
245    let stored_hash = &buffer[..32];
246    let data = &buffer[32..];
247
248    let computed_hash = blake3::hash(data);
249    if computed_hash.as_bytes() != stored_hash {
250        return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
251    }
252
253    let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
254
255    Ok(deserialized)
256}
257
258#[cfg(test)]
259pub fn read_checksummed<T: DeserializeOwned>(path: impl AsRef<Path>) -> io::Result<T> {
260    use tracing::info;
261
262    let path = path.as_ref();
263    let mut file = File::open(path)?;
264    let mut buffer = Vec::new();
265    file.read_to_end(&mut buffer)?;
266    info!("{} {}", path.display(), hex::encode(&buffer));
267
268    if buffer.is_empty() {
269        return Err(io::Error::new(
270            io::ErrorKind::InvalidData,
271            "File marked dirty",
272        ));
273    }
274
275    if buffer.len() < 32 {
276        return Err(io::Error::new(io::ErrorKind::InvalidData, "File too short"));
277    }
278
279    let stored_hash = &buffer[..32];
280    let data = &buffer[32..];
281
282    let computed_hash = blake3::hash(data);
283    if computed_hash.as_bytes() != stored_hash {
284        return Err(io::Error::new(io::ErrorKind::InvalidData, "Hash mismatch"));
285    }
286
287    let deserialized = postcard::from_bytes(data).map_err(io::Error::other)?;
288
289    Ok(deserialized)
290}
291
292/// Helper trait for bytes for debugging
293pub trait SliceInfoExt: AsRef<[u8]> {
294    // get the addr of the actual data, to check if data was copied
295    fn addr(&self) -> usize;
296
297    // a short symbol string for the address
298    fn addr_short(&self) -> ArrayString<12> {
299        let addr = self.addr().to_le_bytes();
300        symbol_string(&addr)
301    }
302
303    #[allow(dead_code)]
304    fn hash_short(&self) -> ArrayString<10> {
305        crate::Hash::new(self.as_ref()).fmt_short()
306    }
307}
308
309impl<T: AsRef<[u8]>> SliceInfoExt for T {
310    fn addr(&self) -> usize {
311        self.as_ref() as *const [u8] as *const u8 as usize
312    }
313
314    fn hash_short(&self) -> ArrayString<10> {
315        crate::Hash::new(self.as_ref()).fmt_short()
316    }
317}
318
319pub fn symbol_string(data: &[u8]) -> ArrayString<12> {
320    const SYMBOLS: &[char] = &[
321        '😀', '😂', '😍', '😎', '😢', '😡', '😱', '😴', '🤓', '🤔', '🤗', '🤢', '🤡', '🤖', '👽',
322        '👾', '👻', '💀', '💩', '♥', '💥', '💦', '💨', '💫', '💬', '💭', '💰', '💳', '💼', '📈',
323        '📉', '📍', '📢', '📦', '📱', '📷', '📺', '🎃', '🎄', '🎉', '🎋', '🎍', '🎒', '🎓', '🎖',
324        '🎤', '🎧', '🎮', '🎰', '🎲', '🎳', '🎴', '🎵', '🎷', '🎸', '🎹', '🎺', '🎻', '🎼', '🏀',
325        '🏁', '🏆', '🏈',
326    ];
327    const BASE: usize = SYMBOLS.len(); // 64
328
329    // Hash the input with BLAKE3
330    let hash = blake3::hash(data);
331    let bytes = hash.as_bytes(); // 32-byte hash
332
333    // Create an ArrayString with capacity 12 (bytes)
334    let mut result = ArrayString::<12>::new();
335
336    // Fill with 3 symbols
337    for byte in bytes.iter().take(3) {
338        let byte = *byte as usize;
339        let index = byte % BASE;
340        result.push(SYMBOLS[index]); // Each char can be up to 4 bytes
341    }
342
343    result
344}
345
346pub struct ValueOrPoisioned<T>(pub Option<T>);
347
348impl<T: fmt::Debug> fmt::Debug for ValueOrPoisioned<T> {
349    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350        match &self.0 {
351            Some(x) => x.fmt(f),
352            None => f.debug_tuple("Poisoned").finish(),
353        }
354    }
355}
356
357/// Given a prefix, increment it lexographically.
358///
359/// If the prefix is all FF, this will return false because there is no
360/// higher prefix than that.
361#[allow(dead_code)]
362pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool {
363    for byte in bytes.iter_mut().rev() {
364        if *byte < 255 {
365            *byte += 1;
366            return true;
367        }
368        *byte = 0;
369    }
370    false
371}
372
373#[derive(ref_cast::RefCast)]
374#[repr(transparent)]
375pub struct BaoTreeSender(mpsc::Sender<EncodedItem>);
376
377impl BaoTreeSender {
378    pub fn new(sender: &mut mpsc::Sender<EncodedItem>) -> &mut Self {
379        BaoTreeSender::ref_cast_mut(sender)
380    }
381}
382
383impl bao_tree::io::mixed::Sender for BaoTreeSender {
384    type Error = irpc::channel::SendError;
385    async fn send(&mut self, item: EncodedItem) -> std::result::Result<(), Self::Error> {
386        self.0.send(item).await
387    }
388}