iroh_blobs/
util.rs

1//! Utility functions and types.
2use std::{
3    borrow::Borrow,
4    fmt,
5    io::{BufReader, Read},
6    sync::{Arc, Weak},
7    time::SystemTime,
8};
9
10use bao_tree::{io::outboard::PreOrderOutboard, BaoTree, ChunkRanges};
11use bytes::Bytes;
12use derive_more::{Debug, Display, From, Into};
13use range_collections::range_set::RangeSetRange;
14use serde::{Deserialize, Serialize};
15
16use crate::{BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};
17
18pub mod fs;
19pub mod io;
20mod mem_or_file;
21pub mod progress;
22pub use mem_or_file::MemOrFile;
23mod sparse_mem_file;
24pub use sparse_mem_file::SparseMemFile;
25pub mod local_pool;
26
27#[cfg(test)]
28pub(crate) mod hexdump;
29
30/// A tag
31#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)]
32pub struct Tag(pub Bytes);
33
34#[cfg(feature = "redb")]
35mod redb_support {
36    use bytes::Bytes;
37    use redb::{Key as RedbKey, Value as RedbValue};
38
39    use super::Tag;
40
41    impl RedbValue for Tag {
42        type SelfType<'a> = Self;
43
44        type AsBytes<'a> = bytes::Bytes;
45
46        fn fixed_width() -> Option<usize> {
47            None
48        }
49
50        fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
51        where
52            Self: 'a,
53        {
54            Self(Bytes::copy_from_slice(data))
55        }
56
57        fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
58        where
59            Self: 'a,
60            Self: 'b,
61        {
62            value.0.clone()
63        }
64
65        fn type_name() -> redb::TypeName {
66            redb::TypeName::new("Tag")
67        }
68    }
69
70    impl RedbKey for Tag {
71        fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
72            data1.cmp(data2)
73        }
74    }
75}
76
77impl Borrow<[u8]> for Tag {
78    fn borrow(&self) -> &[u8] {
79        self.0.as_ref()
80    }
81}
82
83impl From<String> for Tag {
84    fn from(value: String) -> Self {
85        Self(Bytes::from(value))
86    }
87}
88
89impl From<&str> for Tag {
90    fn from(value: &str) -> Self {
91        Self(Bytes::from(value.to_owned()))
92    }
93}
94
95impl Display for Tag {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        let bytes = self.0.as_ref();
98        match std::str::from_utf8(bytes) {
99            Ok(s) => write!(f, "\"{}\"", s),
100            Err(_) => write!(f, "{}", hex::encode(bytes)),
101        }
102    }
103}
104
105struct DD<T: fmt::Display>(T);
106
107impl<T: fmt::Display> fmt::Debug for DD<T> {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        fmt::Display::fmt(&self.0, f)
110    }
111}
112
113impl Debug for Tag {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        f.debug_tuple("Tag").field(&DD(self)).finish()
116    }
117}
118
119impl Tag {
120    /// Create a new tag that does not exist yet.
121    pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
122        let now = chrono::DateTime::<chrono::Utc>::from(time);
123        let mut i = 0;
124        loop {
125            let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
126            if i != 0 {
127                text.push_str(&format!("-{}", i));
128            }
129            if !exists(text.as_bytes()) {
130                return Self::from(text);
131            }
132            i += 1;
133        }
134    }
135}
136
137/// Option for commands that allow setting a tag
138#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub enum SetTagOption {
140    /// A tag will be automatically generated
141    Auto,
142    /// The tag is explicitly named
143    Named(Tag),
144}
145
146/// Trait used from temp tags to notify an abstract store that a temp tag is
147/// being dropped.
148pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
149    /// Called on drop
150    fn on_drop(&self, inner: &HashAndFormat);
151}
152
153/// A trait for things that can track liveness of blobs and collections.
154///
155/// This trait works together with [TempTag] to keep track of the liveness of a
156/// blob or collection.
157///
158/// It is important to include the format in the liveness tracking, since
159/// protecting a collection means protecting the blob and all its children,
160/// whereas protecting a raw blob only protects the blob itself.
161pub trait TagCounter: TagDrop + Sized {
162    /// Called on creation of a temp tag
163    fn on_create(&self, inner: &HashAndFormat);
164
165    /// Get this as a weak reference for use in temp tags
166    fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
167        let on_drop: Arc<dyn TagDrop> = self.clone();
168        Arc::downgrade(&on_drop)
169    }
170
171    /// Create a new temp tag for the given hash and format
172    fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
173        self.on_create(&inner);
174        TempTag::new(inner, Some(self.as_weak()))
175    }
176}
177
178/// A hash and format pair that is protected from garbage collection.
179///
180/// If format is raw, this will protect just the blob
181/// If format is collection, this will protect the collection and all blobs in it
182#[derive(Debug)]
183pub struct TempTag {
184    /// The hash and format we are pinning
185    inner: HashAndFormat,
186    /// optional callback to call on drop
187    on_drop: Option<Weak<dyn TagDrop>>,
188}
189
190impl TempTag {
191    /// Create a new temp tag for the given hash and format
192    ///
193    /// This should only be used by store implementations.
194    ///
195    /// The caller is responsible for increasing the refcount on creation and to
196    /// make sure that temp tags that are created between a mark phase and a sweep
197    /// phase are protected.
198    pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
199        Self { inner, on_drop }
200    }
201
202    /// The hash of the pinned item
203    pub fn inner(&self) -> &HashAndFormat {
204        &self.inner
205    }
206
207    /// The hash of the pinned item
208    pub fn hash(&self) -> &Hash {
209        &self.inner.hash
210    }
211
212    /// The format of the pinned item
213    pub fn format(&self) -> BlobFormat {
214        self.inner.format
215    }
216
217    /// The hash and format of the pinned item
218    pub fn hash_and_format(&self) -> HashAndFormat {
219        self.inner
220    }
221
222    /// Keep the item alive until the end of the process
223    pub fn leak(mut self) {
224        // set the liveness tracker to None, so that the refcount is not decreased
225        // during drop. This means that the refcount will never reach 0 and the
226        // item will not be gced until the end of the process.
227        self.on_drop = None;
228    }
229}
230
231impl Drop for TempTag {
232    fn drop(&mut self) {
233        if let Some(on_drop) = self.on_drop.take() {
234            if let Some(on_drop) = on_drop.upgrade() {
235                on_drop.on_drop(&self.inner);
236            }
237        }
238    }
239}
240
241/// Get the number of bytes given a set of chunk ranges and the total size.
242///
243/// If some ranges are out of bounds, they will be clamped to the size.
244pub fn total_bytes(ranges: ChunkRanges, size: u64) -> u64 {
245    ranges
246        .iter()
247        .map(|range| {
248            let (start, end) = match range {
249                RangeSetRange::Range(r) => {
250                    (r.start.to_bytes().min(size), r.end.to_bytes().min(size))
251                }
252                RangeSetRange::RangeFrom(range) => (range.start.to_bytes().min(size), size),
253            };
254            end.saturating_sub(start)
255        })
256        .reduce(u64::saturating_add)
257        .unwrap_or_default()
258}
259
260/// A non-sendable marker type
261#[derive(Debug)]
262pub(crate) struct NonSend {
263    _marker: std::marker::PhantomData<std::rc::Rc<()>>,
264}
265
266impl NonSend {
267    /// Create a new non-sendable marker.
268    #[allow(dead_code)]
269    pub const fn new() -> Self {
270        Self {
271            _marker: std::marker::PhantomData,
272        }
273    }
274}
275
276/// copy a limited slice from a slice as a `Bytes`.
277pub(crate) fn copy_limited_slice(bytes: &[u8], offset: u64, len: usize) -> Bytes {
278    bytes[limited_range(offset, len, bytes.len())]
279        .to_vec()
280        .into()
281}
282
283pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
284    if offset < buf_len as u64 {
285        let start = offset as usize;
286        let end = start.saturating_add(len).min(buf_len);
287        start..end
288    } else {
289        0..0
290    }
291}
292
293/// zero copy get a limited slice from a `Bytes` as a `Bytes`.
294#[allow(dead_code)]
295pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
296    bytes.slice(limited_range(offset, len, bytes.len()))
297}
298
299/// Compute raw outboard size, without the size header.
300#[allow(dead_code)]
301pub(crate) fn raw_outboard_size(size: u64) -> u64 {
302    BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size()
303}
304
305/// Synchronously compute the outboard of a file, and return hash and outboard.
306///
307/// It is assumed that the file is not modified while this is running.
308///
309/// If it is modified while or after this is running, the outboard will be
310/// invalid, so any attempt to compute a slice from it will fail.
311///
312/// If the size of the file is changed while this is running, an error will be
313/// returned.
314///
315/// The computed outboard is without length prefix.
316pub(crate) fn compute_outboard(
317    read: impl Read,
318    size: u64,
319    progress: impl Fn(u64) -> std::io::Result<()> + Send + Sync + 'static,
320) -> std::io::Result<(Hash, Option<Vec<u8>>)> {
321    use bao_tree::io::sync::CreateOutboard;
322
323    // wrap the reader in a progress reader, so we can report progress.
324    let reader = ProgressReader::new(read, progress);
325    // wrap the reader in a buffered reader, so we read in large chunks
326    // this reduces the number of io ops and also the number of progress reports
327    let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
328    let reader = BufReader::with_capacity(buf_size, reader);
329
330    let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
331    let root = ob.root.into();
332    let data = ob.data;
333    tracing::trace!(%root, "done");
334    let data = if !data.is_empty() { Some(data) } else { None };
335    Ok((root, data))
336}
337
338/// Compute raw outboard, without the size header.
339#[cfg(test)]
340#[allow(dead_code)]
341pub(crate) fn raw_outboard(data: &[u8]) -> (Vec<u8>, Hash) {
342    let res = bao_tree::io::outboard::PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
343    (res.data, res.root.into())
344}
345
346/// A reader that calls a callback with the number of bytes read after each read.
347pub(crate) struct ProgressReader<R, F: Fn(u64) -> std::io::Result<()>> {
348    inner: R,
349    offset: u64,
350    cb: F,
351}
352
353impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> ProgressReader<R, F> {
354    pub fn new(inner: R, cb: F) -> Self {
355        Self {
356            inner,
357            offset: 0,
358            cb,
359        }
360    }
361}
362
363impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> std::io::Read for ProgressReader<R, F> {
364    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
365        let read = self.inner.read(buf)?;
366        self.offset += read as u64;
367        (self.cb)(self.offset)?;
368        Ok(read)
369    }
370}