1use std::{
3 borrow::Borrow,
4 fmt,
5 io::{BufReader, Read},
6 sync::{Arc, Weak},
7 time::SystemTime,
8};
9
10use bao_tree::{io::outboard::PreOrderOutboard, BaoTree, ChunkRanges};
11use bytes::Bytes;
12use derive_more::{Debug, Display, From, Into};
13use range_collections::range_set::RangeSetRange;
14use serde::{Deserialize, Serialize};
15
16use crate::{BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE};
17
18pub mod fs;
19pub mod io;
20mod mem_or_file;
21pub mod progress;
22pub use mem_or_file::MemOrFile;
23mod sparse_mem_file;
24pub use sparse_mem_file::SparseMemFile;
25pub mod local_pool;
26
27#[cfg(test)]
28pub(crate) mod hexdump;
29
30#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, From, Into)]
32pub struct Tag(pub Bytes);
33
34#[cfg(feature = "redb")]
35mod redb_support {
36 use bytes::Bytes;
37 use redb::{Key as RedbKey, Value as RedbValue};
38
39 use super::Tag;
40
41 impl RedbValue for Tag {
42 type SelfType<'a> = Self;
43
44 type AsBytes<'a> = bytes::Bytes;
45
46 fn fixed_width() -> Option<usize> {
47 None
48 }
49
50 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
51 where
52 Self: 'a,
53 {
54 Self(Bytes::copy_from_slice(data))
55 }
56
57 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
58 where
59 Self: 'a,
60 Self: 'b,
61 {
62 value.0.clone()
63 }
64
65 fn type_name() -> redb::TypeName {
66 redb::TypeName::new("Tag")
67 }
68 }
69
70 impl RedbKey for Tag {
71 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
72 data1.cmp(data2)
73 }
74 }
75}
76
77impl From<&[u8]> for Tag {
78 fn from(value: &[u8]) -> Self {
79 Self(Bytes::copy_from_slice(value))
80 }
81}
82
83impl AsRef<[u8]> for Tag {
84 fn as_ref(&self) -> &[u8] {
85 self.0.as_ref()
86 }
87}
88
89impl Borrow<[u8]> for Tag {
90 fn borrow(&self) -> &[u8] {
91 self.0.as_ref()
92 }
93}
94
95impl From<String> for Tag {
96 fn from(value: String) -> Self {
97 Self(Bytes::from(value))
98 }
99}
100
101impl From<&str> for Tag {
102 fn from(value: &str) -> Self {
103 Self(Bytes::from(value.to_owned()))
104 }
105}
106
107impl Display for Tag {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 let bytes = self.0.as_ref();
110 match std::str::from_utf8(bytes) {
111 Ok(s) => write!(f, "\"{}\"", s),
112 Err(_) => write!(f, "{}", hex::encode(bytes)),
113 }
114 }
115}
116
117struct DD<T: fmt::Display>(T);
118
119impl<T: fmt::Display> fmt::Debug for DD<T> {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 fmt::Display::fmt(&self.0, f)
122 }
123}
124
125impl Debug for Tag {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.debug_tuple("Tag").field(&DD(self)).finish()
128 }
129}
130
131impl Tag {
132 pub fn auto(time: SystemTime, exists: impl Fn(&[u8]) -> bool) -> Self {
134 let now = chrono::DateTime::<chrono::Utc>::from(time);
135 let mut i = 0;
136 loop {
137 let mut text = format!("auto-{}", now.format("%Y-%m-%dT%H:%M:%S%.3fZ"));
138 if i != 0 {
139 text.push_str(&format!("-{}", i));
140 }
141 if !exists(text.as_bytes()) {
142 return Self::from(text);
143 }
144 i += 1;
145 }
146 }
147
148 pub fn successor(&self) -> Self {
150 let mut bytes = self.0.to_vec();
151 bytes.push(0);
153 Self(bytes.into())
154 }
155
156 pub fn next_prefix(&self) -> Option<Self> {
160 let mut bytes = self.0.to_vec();
161 if next_prefix(&mut bytes) {
162 Some(Self(bytes.into()))
163 } else {
164 None
165 }
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
171pub enum SetTagOption {
172 Auto,
174 Named(Tag),
176}
177
178pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
181 fn on_drop(&self, inner: &HashAndFormat);
183}
184
185pub trait TagCounter: TagDrop + Sized {
194 fn on_create(&self, inner: &HashAndFormat);
196
197 fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
199 let on_drop: Arc<dyn TagDrop> = self.clone();
200 Arc::downgrade(&on_drop)
201 }
202
203 fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
205 self.on_create(&inner);
206 TempTag::new(inner, Some(self.as_weak()))
207 }
208}
209
210#[derive(Debug)]
215pub struct TempTag {
216 inner: HashAndFormat,
218 on_drop: Option<Weak<dyn TagDrop>>,
220}
221
222impl TempTag {
223 pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
231 Self { inner, on_drop }
232 }
233
234 pub fn inner(&self) -> &HashAndFormat {
236 &self.inner
237 }
238
239 pub fn hash(&self) -> &Hash {
241 &self.inner.hash
242 }
243
244 pub fn format(&self) -> BlobFormat {
246 self.inner.format
247 }
248
249 pub fn hash_and_format(&self) -> HashAndFormat {
251 self.inner
252 }
253
254 pub fn leak(mut self) {
256 self.on_drop = None;
260 }
261}
262
263impl Drop for TempTag {
264 fn drop(&mut self) {
265 if let Some(on_drop) = self.on_drop.take() {
266 if let Some(on_drop) = on_drop.upgrade() {
267 on_drop.on_drop(&self.inner);
268 }
269 }
270 }
271}
272
273pub fn total_bytes(ranges: ChunkRanges, size: u64) -> u64 {
277 ranges
278 .iter()
279 .map(|range| {
280 let (start, end) = match range {
281 RangeSetRange::Range(r) => {
282 (r.start.to_bytes().min(size), r.end.to_bytes().min(size))
283 }
284 RangeSetRange::RangeFrom(range) => (range.start.to_bytes().min(size), size),
285 };
286 end.saturating_sub(start)
287 })
288 .reduce(u64::saturating_add)
289 .unwrap_or_default()
290}
291
292#[derive(Debug)]
294pub(crate) struct NonSend {
295 _marker: std::marker::PhantomData<std::rc::Rc<()>>,
296}
297
298impl NonSend {
299 #[allow(dead_code)]
301 pub const fn new() -> Self {
302 Self {
303 _marker: std::marker::PhantomData,
304 }
305 }
306}
307
308pub(crate) fn copy_limited_slice(bytes: &[u8], offset: u64, len: usize) -> Bytes {
310 bytes[limited_range(offset, len, bytes.len())]
311 .to_vec()
312 .into()
313}
314
315pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
316 if offset < buf_len as u64 {
317 let start = offset as usize;
318 let end = start.saturating_add(len).min(buf_len);
319 start..end
320 } else {
321 0..0
322 }
323}
324
325#[allow(dead_code)]
327pub(crate) fn get_limited_slice(bytes: &Bytes, offset: u64, len: usize) -> Bytes {
328 bytes.slice(limited_range(offset, len, bytes.len()))
329}
330
331#[allow(dead_code)]
333pub(crate) fn raw_outboard_size(size: u64) -> u64 {
334 BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size()
335}
336
337#[allow(dead_code)]
342pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool {
343 for byte in bytes.iter_mut().rev() {
344 if *byte < 255 {
345 *byte += 1;
346 return true;
347 }
348 *byte = 0;
349 }
350 false
351}
352
353pub(crate) fn compute_outboard(
365 read: impl Read,
366 size: u64,
367 progress: impl Fn(u64) -> std::io::Result<()> + Send + Sync + 'static,
368) -> std::io::Result<(Hash, Option<Vec<u8>>)> {
369 use bao_tree::io::sync::CreateOutboard;
370
371 let reader = ProgressReader::new(read, progress);
373 let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
376 let reader = BufReader::with_capacity(buf_size, reader);
377
378 let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
379 let root = ob.root.into();
380 let data = ob.data;
381 tracing::trace!(%root, "done");
382 let data = if !data.is_empty() { Some(data) } else { None };
383 Ok((root, data))
384}
385
386#[cfg(test)]
388#[allow(dead_code)]
389pub(crate) fn raw_outboard(data: &[u8]) -> (Vec<u8>, Hash) {
390 let res = bao_tree::io::outboard::PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
391 (res.data, res.root.into())
392}
393
394pub(crate) struct ProgressReader<R, F: Fn(u64) -> std::io::Result<()>> {
396 inner: R,
397 offset: u64,
398 cb: F,
399}
400
401impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> ProgressReader<R, F> {
402 pub fn new(inner: R, cb: F) -> Self {
403 Self {
404 inner,
405 offset: 0,
406 cb,
407 }
408 }
409}
410
411impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> std::io::Read for ProgressReader<R, F> {
412 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
413 let read = self.inner.read(buf)?;
414 self.offset += read as u64;
415 (self.cb)(self.offset)?;
416 Ok(read)
417 }
418}