1use std::{
3 borrow::Borrow,
4 fmt,
5 io::{BufReader, Read},
6 sync::{Arc, Weak},
7 time::SystemTime,
8};
9
10use bao_tree::{io::outboard::PreOrderOutboard, BaoTree, ChunkRanges};
11use bytes::Bytes;
12use derive_more::{Debug, Display, From, Into};
13use range_collections::range_set::RangeSetRange;
14use serde::{Deserialize, Serialize};
15
16use crate::{BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};
17
18pub mod fs;
19pub mod io;
20mod mem_or_file;
21pub mod progress;
22pub use mem_or_file::MemOrFile;
23mod sparse_mem_file;
24pub use sparse_mem_file::SparseMemFile;
25pub mod local_pool;
26
27#[cfg(test)]
28pub(crate) mod hexdump;
29
30#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)]
32pub struct Tag(pub Bytes);
33
34#[cfg(feature = "redb")]
35mod redb_support {
36 use bytes::Bytes;
37 use redb::{Key as RedbKey, Value as RedbValue};
38
39 use super::Tag;
40
41 impl RedbValue for Tag {
42 type SelfType<'a> = Self;
43
44 type AsBytes<'a> = bytes::Bytes;
45
46 fn fixed_width() -> Option<usize> {
47 None
48 }
49
50 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
51 where
52 Self: 'a,
53 {
54 Self(Bytes::copy_from_slice(data))
55 }
56
57 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
58 where
59 Self: 'a,
60 Self: 'b,
61 {
62 value.0.clone()
63 }
64
65 fn type_name() -> redb::TypeName {
66 redb::TypeName::new("Tag")
67 }
68 }
69
70 impl RedbKey for Tag {
71 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
72 data1.cmp(data2)
73 }
74 }
75}
76
77impl Borrow<[u8]> for Tag {
78 fn borrow(&self) -> &[u8] {
79 self.0.as_ref()
80 }
81}
82
83impl From<String> for Tag {
84 fn from(value: String) -> Self {
85 Self(Bytes::from(value))
86 }
87}
88
89impl From<&str> for Tag {
90 fn from(value: &str) -> Self {
91 Self(Bytes::from(value.to_owned()))
92 }
93}
94
95impl Display for Tag {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 let bytes = self.0.as_ref();
98 match std::str::from_utf8(bytes) {
99 Ok(s) => write!(f, "\"{}\"", s),
100 Err(_) => write!(f, "{}", hex::encode(bytes)),
101 }
102 }
103}
104
105struct DD<T: fmt::Display>(T);
106
107impl<T: fmt::Display> fmt::Debug for DD<T> {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 fmt::Display::fmt(&self.0, f)
110 }
111}
112
113impl Debug for Tag {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 f.debug_tuple("Tag").field(&DD(self)).finish()
116 }
117}
118
119impl Tag {
120 pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
122 let now = chrono::DateTime::<chrono::Utc>::from(time);
123 let mut i = 0;
124 loop {
125 let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
126 if i != 0 {
127 text.push_str(&format!("-{}", i));
128 }
129 if !exists(text.as_bytes()) {
130 return Self::from(text);
131 }
132 i += 1;
133 }
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub enum SetTagOption {
140 Auto,
142 Named(Tag),
144}
145
146pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
149 fn on_drop(&self, inner: &HashAndFormat);
151}
152
153pub trait TagCounter: TagDrop + Sized {
162 fn on_create(&self, inner: &HashAndFormat);
164
165 fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
167 let on_drop: Arc<dyn TagDrop> = self.clone();
168 Arc::downgrade(&on_drop)
169 }
170
171 fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
173 self.on_create(&inner);
174 TempTag::new(inner, Some(self.as_weak()))
175 }
176}
177
178#[derive(Debug)]
183pub struct TempTag {
184 inner: HashAndFormat,
186 on_drop: Option<Weak<dyn TagDrop>>,
188}
189
190impl TempTag {
191 pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
199 Self { inner, on_drop }
200 }
201
202 pub fn inner(&self) -> &HashAndFormat {
204 &self.inner
205 }
206
207 pub fn hash(&self) -> &Hash {
209 &self.inner.hash
210 }
211
212 pub fn format(&self) -> BlobFormat {
214 self.inner.format
215 }
216
217 pub fn hash_and_format(&self) -> HashAndFormat {
219 self.inner
220 }
221
222 pub fn leak(mut self) {
224 self.on_drop = None;
228 }
229}
230
231impl Drop for TempTag {
232 fn drop(&mut self) {
233 if let Some(on_drop) = self.on_drop.take() {
234 if let Some(on_drop) = on_drop.upgrade() {
235 on_drop.on_drop(&self.inner);
236 }
237 }
238 }
239}
240
241pub fn total_bytes(ranges: ChunkRanges, size: u64) -> u64 {
245 ranges
246 .iter()
247 .map(|range| {
248 let (start, end) = match range {
249 RangeSetRange::Range(r) => {
250 (r.start.to_bytes().min(size), r.end.to_bytes().min(size))
251 }
252 RangeSetRange::RangeFrom(range) => (range.start.to_bytes().min(size), size),
253 };
254 end.saturating_sub(start)
255 })
256 .reduce(u64::saturating_add)
257 .unwrap_or_default()
258}
259
260#[derive(Debug)]
262pub(crate) struct NonSend {
263 _marker: std::marker::PhantomData<std::rc::Rc<()>>,
264}
265
266impl NonSend {
267 #[allow(dead_code)]
269 pub const fn new() -> Self {
270 Self {
271 _marker: std::marker::PhantomData,
272 }
273 }
274}
275
276pub(crate) fn copy_limited_slice(bytes: &[u8], offset: u64, len: usize) -> Bytes {
278 bytes[limited_range(offset, len, bytes.len())]
279 .to_vec()
280 .into()
281}
282
283pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
284 if offset < buf_len as u64 {
285 let start = offset as usize;
286 let end = start.saturating_add(len).min(buf_len);
287 start..end
288 } else {
289 0..0
290 }
291}
292
293#[allow(dead_code)]
295pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
296 bytes.slice(limited_range(offset, len, bytes.len()))
297}
298
299#[allow(dead_code)]
301pub(crate) fn raw_outboard_size(size: u64) -> u64 {
302 BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size()
303}
304
305pub(crate) fn compute_outboard(
317 read: impl Read,
318 size: u64,
319 progress: impl Fn(u64) -> std::io::Result<()> + Send + Sync + 'static,
320) -> std::io::Result<(Hash, Option<Vec<u8>>)> {
321 use bao_tree::io::sync::CreateOutboard;
322
323 let reader = ProgressReader::new(read, progress);
325 let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
328 let reader = BufReader::with_capacity(buf_size, reader);
329
330 let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
331 let root = ob.root.into();
332 let data = ob.data;
333 tracing::trace!(%root, "done");
334 let data = if !data.is_empty() { Some(data) } else { None };
335 Ok((root, data))
336}
337
338#[cfg(test)]
340#[allow(dead_code)]
341pub(crate) fn raw_outboard(data: &[u8]) -> (Vec<u8>, Hash) {
342 let res = bao_tree::io::outboard::PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
343 (res.data, res.root.into())
344}
345
346pub(crate) struct ProgressReader<R, F: Fn(u64) -> std::io::Result<()>> {
348 inner: R,
349 offset: u64,
350 cb: F,
351}
352
353impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> ProgressReader<R, F> {
354 pub fn new(inner: R, cb: F) -> Self {
355 Self {
356 inner,
357 offset: 0,
358 cb,
359 }
360 }
361}
362
363impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> std::io::Read for ProgressReader<R, F> {
364 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
365 let read = self.inner.read(buf)?;
366 self.offset += read as u64;
367 (self.cb)(self.offset)?;
368 Ok(read)
369 }
370}