iroh_blobs/
util.rs

1//! Utility functions and types.
2use std::{
3    borrow::Borrow,
4    fmt,
5    io::{BufReader, Read},
6    sync::{Arc, Weak},
7    time::SystemTime,
8};
9
10use bao_tree::{io::outboard::PreOrderOutboard, BaoTree, ChunkRanges};
11use bytes::Bytes;
12use derive_more::{Debug, Display, From, Into};
13use range_collections::range_set::RangeSetRange;
14use serde::{Deserialize, Serialize};
15
16use crate::{BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};
17
18pub mod fs;
19pub mod io;
20mod mem_or_file;
21pub mod progress;
22pub use mem_or_file::MemOrFile;
23mod sparse_mem_file;
24pub use sparse_mem_file::SparseMemFile;
25pub mod local_pool;
26
27#[cfg(test)]
28pub(crate) mod hexdump;
29
30/// A tag
31#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)]
32pub struct Tag(pub Bytes);
33
34#[cfg(feature = "redb")]
35mod redb_support {
36    use bytes::Bytes;
37    use redb::{Key as RedbKey, Value as RedbValue};
38
39    use super::Tag;
40
41    impl RedbValue for Tag {
42        type SelfType<'a> = Self;
43
44        type AsBytes<'a> = bytes::Bytes;
45
46        fn fixed_width() -> Option<usize> {
47            None
48        }
49
50        fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
51        where
52            Self: 'a,
53        {
54            Self(Bytes::copy_from_slice(data))
55        }
56
57        fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
58        where
59            Self: 'a,
60            Self: 'b,
61        {
62            value.0.clone()
63        }
64
65        fn type_name() -> redb::TypeName {
66            redb::TypeName::new("Tag")
67        }
68    }
69
70    impl RedbKey for Tag {
71        fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
72            data1.cmp(data2)
73        }
74    }
75}
76
77impl From<&[u8]> for Tag {
78    fn from(value: &[u8]) -> Self {
79        Self(Bytes::copy_from_slice(value))
80    }
81}
82
83impl AsRef<[u8]> for Tag {
84    fn as_ref(&self) -> &[u8] {
85        self.0.as_ref()
86    }
87}
88
89impl Borrow<[u8]> for Tag {
90    fn borrow(&self) -> &[u8] {
91        self.0.as_ref()
92    }
93}
94
95impl From<String> for Tag {
96    fn from(value: String) -> Self {
97        Self(Bytes::from(value))
98    }
99}
100
101impl From<&str> for Tag {
102    fn from(value: &str) -> Self {
103        Self(Bytes::from(value.to_owned()))
104    }
105}
106
107impl Display for Tag {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        let bytes = self.0.as_ref();
110        match std::str::from_utf8(bytes) {
111            Ok(s) => write!(f, "\"{}\"", s),
112            Err(_) => write!(f, "{}", hex::encode(bytes)),
113        }
114    }
115}
116
117struct DD<T: fmt::Display>(T);
118
119impl<T: fmt::Display> fmt::Debug for DD<T> {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        fmt::Display::fmt(&self.0, f)
122    }
123}
124
125impl Debug for Tag {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_tuple("Tag").field(&DD(self)).finish()
128    }
129}
130
131impl Tag {
132    /// Create a new tag that does not exist yet.
133    pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
134        let now = chrono::DateTime::<chrono::Utc>::from(time);
135        let mut i = 0;
136        loop {
137            let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
138            if i != 0 {
139                text.push_str(&format!("-{}", i));
140            }
141            if !exists(text.as_bytes()) {
142                return Self::from(text);
143            }
144            i += 1;
145        }
146    }
147
148    /// The successor of this tag in lexicographic order.
149    pub fn successor(&self) -> Self {
150        let mut bytes = self.0.to_vec();
151        // increment_vec(&mut bytes);
152        bytes.push(0);
153        Self(bytes.into())
154    }
155
156    /// If this is a prefix, get the next prefix.
157    ///
158    /// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte.
159    pub fn next_prefix(&self) -> Option<Self> {
160        let mut bytes = self.0.to_vec();
161        if next_prefix(&mut bytes) {
162            Some(Self(bytes.into()))
163        } else {
164            None
165        }
166    }
167}
168
169/// Option for commands that allow setting a tag
170#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
171pub enum SetTagOption {
172    /// A tag will be automatically generated
173    Auto,
174    /// The tag is explicitly named
175    Named(Tag),
176}
177
178/// Trait used from temp tags to notify an abstract store that a temp tag is
179/// being dropped.
180pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
181    /// Called on drop
182    fn on_drop(&self, inner: &HashAndFormat);
183}
184
185/// A trait for things that can track liveness of blobs and collections.
186///
187/// This trait works together with [TempTag] to keep track of the liveness of a
188/// blob or collection.
189///
190/// It is important to include the format in the liveness tracking, since
191/// protecting a collection means protecting the blob and all its children,
192/// whereas protecting a raw blob only protects the blob itself.
193pub trait TagCounter: TagDrop + Sized {
194    /// Called on creation of a temp tag
195    fn on_create(&self, inner: &HashAndFormat);
196
197    /// Get this as a weak reference for use in temp tags
198    fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
199        let on_drop: Arc<dyn TagDrop> = self.clone();
200        Arc::downgrade(&on_drop)
201    }
202
203    /// Create a new temp tag for the given hash and format
204    fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
205        self.on_create(&inner);
206        TempTag::new(inner, Some(self.as_weak()))
207    }
208}
209
210/// A hash and format pair that is protected from garbage collection.
211///
212/// If format is raw, this will protect just the blob
213/// If format is collection, this will protect the collection and all blobs in it
214#[derive(Debug)]
215pub struct TempTag {
216    /// The hash and format we are pinning
217    inner: HashAndFormat,
218    /// optional callback to call on drop
219    on_drop: Option<Weak<dyn TagDrop>>,
220}
221
222impl TempTag {
223    /// Create a new temp tag for the given hash and format
224    ///
225    /// This should only be used by store implementations.
226    ///
227    /// The caller is responsible for increasing the refcount on creation and to
228    /// make sure that temp tags that are created between a mark phase and a sweep
229    /// phase are protected.
230    pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
231        Self { inner, on_drop }
232    }
233
234    /// The hash of the pinned item
235    pub fn inner(&self) -> &HashAndFormat {
236        &self.inner
237    }
238
239    /// The hash of the pinned item
240    pub fn hash(&self) -> &Hash {
241        &self.inner.hash
242    }
243
244    /// The format of the pinned item
245    pub fn format(&self) -> BlobFormat {
246        self.inner.format
247    }
248
249    /// The hash and format of the pinned item
250    pub fn hash_and_format(&self) -> HashAndFormat {
251        self.inner
252    }
253
254    /// Keep the item alive until the end of the process
255    pub fn leak(mut self) {
256        // set the liveness tracker to None, so that the refcount is not decreased
257        // during drop. This means that the refcount will never reach 0 and the
258        // item will not be gced until the end of the process.
259        self.on_drop = None;
260    }
261}
262
263impl Drop for TempTag {
264    fn drop(&mut self) {
265        if let Some(on_drop) = self.on_drop.take() {
266            if let Some(on_drop) = on_drop.upgrade() {
267                on_drop.on_drop(&self.inner);
268            }
269        }
270    }
271}
272
273/// Get the number of bytes given a set of chunk ranges and the total size.
274///
275/// If some ranges are out of bounds, they will be clamped to the size.
276pub fn total_bytes(ranges: ChunkRanges, size: u64) -> u64 {
277    ranges
278        .iter()
279        .map(|range| {
280            let (start, end) = match range {
281                RangeSetRange::Range(r) => {
282                    (r.start.to_bytes().min(size), r.end.to_bytes().min(size))
283                }
284                RangeSetRange::RangeFrom(range) => (range.start.to_bytes().min(size), size),
285            };
286            end.saturating_sub(start)
287        })
288        .reduce(u64::saturating_add)
289        .unwrap_or_default()
290}
291
292/// A non-sendable marker type
293#[derive(Debug)]
294pub(crate) struct NonSend {
295    _marker: std::marker::PhantomData<std::rc::Rc<()>>,
296}
297
298impl NonSend {
299    /// Create a new non-sendable marker.
300    #[allow(dead_code)]
301    pub const fn new() -> Self {
302        Self {
303            _marker: std::marker::PhantomData,
304        }
305    }
306}
307
308/// copy a limited slice from a slice as a `Bytes`.
309pub(crate) fn copy_limited_slice(bytes: &[u8], offset: u64, len: usize) -> Bytes {
310    bytes[limited_range(offset, len, bytes.len())]
311        .to_vec()
312        .into()
313}
314
315pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
316    if offset < buf_len as u64 {
317        let start = offset as usize;
318        let end = start.saturating_add(len).min(buf_len);
319        start..end
320    } else {
321        0..0
322    }
323}
324
325/// zero copy get a limited slice from a `Bytes` as a `Bytes`.
326#[allow(dead_code)]
327pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
328    bytes.slice(limited_range(offset, len, bytes.len()))
329}
330
331/// Compute raw outboard size, without the size header.
332#[allow(dead_code)]
333pub(crate) fn raw_outboard_size(size: u64) -> u64 {
334    BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size()
335}
336
337/// Given a prefix, increment it lexographically.
338///
339/// If the prefix is all FF, this will return false because there is no
340/// higher prefix than that.
341#[allow(dead_code)]
342pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool {
343    for byte in bytes.iter_mut().rev() {
344        if *byte < 255 {
345            *byte += 1;
346            return true;
347        }
348        *byte = 0;
349    }
350    false
351}
352
353/// Synchronously compute the outboard of a file, and return hash and outboard.
354///
355/// It is assumed that the file is not modified while this is running.
356///
357/// If it is modified while or after this is running, the outboard will be
358/// invalid, so any attempt to compute a slice from it will fail.
359///
360/// If the size of the file is changed while this is running, an error will be
361/// returned.
362///
363/// The computed outboard is without length prefix.
364pub(crate) fn compute_outboard(
365    read: impl Read,
366    size: u64,
367    progress: impl Fn(u64) -> std::io::Result<()> + Send + Sync + 'static,
368) -> std::io::Result<(Hash, Option<Vec<u8>>)> {
369    use bao_tree::io::sync::CreateOutboard;
370
371    // wrap the reader in a progress reader, so we can report progress.
372    let reader = ProgressReader::new(read, progress);
373    // wrap the reader in a buffered reader, so we read in large chunks
374    // this reduces the number of io ops and also the number of progress reports
375    let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
376    let reader = BufReader::with_capacity(buf_size, reader);
377
378    let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
379    let root = ob.root.into();
380    let data = ob.data;
381    tracing::trace!(%root, "done");
382    let data = if !data.is_empty() { Some(data) } else { None };
383    Ok((root, data))
384}
385
386/// Compute raw outboard, without the size header.
387#[cfg(test)]
388#[allow(dead_code)]
389pub(crate) fn raw_outboard(data: &[u8]) -> (Vec<u8>, Hash) {
390    let res = bao_tree::io::outboard::PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
391    (res.data, res.root.into())
392}
393
394/// A reader that calls a callback with the number of bytes read after each read.
395pub(crate) struct ProgressReader<R, F: Fn(u64) -> std::io::Result<()>> {
396    inner: R,
397    offset: u64,
398    cb: F,
399}
400
401impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> ProgressReader<R, F> {
402    pub fn new(inner: R, cb: F) -> Self {
403        Self {
404            inner,
405            offset: 0,
406            cb,
407        }
408    }
409}
410
411impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> std::io::Read for ProgressReader<R, F> {
412    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
413        let read = self.inner.read(buf)?;
414        self.offset += read as u64;
415        (self.cb)(self.offset)?;
416        Ok(read)
417    }
418}