iroh_bytes/get/
request.rs

1//! Utilities for complex get requests.
2use std::sync::Arc;
3
4use crate::{
5    hashseq::HashSeq,
6    protocol::{GetRequest, RangeSpecSeq},
7    Hash, HashAndFormat,
8};
9use bao_tree::{ChunkNum, ChunkRanges};
10use bytes::Bytes;
11use rand::Rng;
12
13use super::{fsm, Stats};
14
15/// Get the claimed size of a blob from a peer.
16///
17/// This is just reading the size header and then immediately closing the connection.
18/// It can be used to check if a peer has any data at all.
19pub async fn get_unverified_size(
20    connection: &quinn::Connection,
21    hash: &Hash,
22) -> anyhow::Result<(u64, Stats)> {
23    let request = GetRequest::new(
24        *hash,
25        RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
26    );
27    let request = fsm::start(connection.clone(), request);
28    let connected = request.next().await?;
29    let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
30        unreachable!("expected start root");
31    };
32    let at_blob_header = start.next();
33    let (curr, size) = at_blob_header.next().await?;
34    let stats = curr.finish().next().await?;
35    Ok((size, stats))
36}
37
38/// Get the verified size of a blob from a peer.
39///
40/// This asks for the last chunk of the blob and validates the response.
41/// Note that this does not validate that the peer has all the data.
42pub async fn get_verified_size(
43    connection: &quinn::Connection,
44    hash: &Hash,
45) -> anyhow::Result<(u64, Stats)> {
46    tracing::trace!("Getting verified size of {}", hash.to_hex());
47    let request = GetRequest::new(
48        *hash,
49        RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
50    );
51    let request = fsm::start(connection.clone(), request);
52    let connected = request.next().await?;
53    let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
54        unreachable!("expected start root");
55    };
56    let header = start.next();
57    let (mut curr, size) = header.next().await?;
58    let end = loop {
59        match curr.next().await {
60            fsm::BlobContentNext::More((next, res)) => {
61                let _ = res?;
62                curr = next;
63            }
64            fsm::BlobContentNext::Done(end) => {
65                break end;
66            }
67        }
68    };
69    let fsm::EndBlobNext::Closing(closing) = end.next() else {
70        unreachable!("expected closing");
71    };
72    let stats = closing.next().await?;
73    tracing::trace!(
74        "Got verified size of {}, {:.6}s",
75        hash.to_hex(),
76        stats.elapsed.as_secs_f64()
77    );
78    Ok((size, stats))
79}
80
81/// Given a hash of a hash seq, get the hash seq and the verified sizes of its
82/// children.
83///
84/// This can be used to compute the total size when requesting a hash seq.
85pub async fn get_hash_seq_and_sizes(
86    connection: &quinn::Connection,
87    hash: &Hash,
88    max_size: u64,
89) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
90    let content = HashAndFormat::hash_seq(*hash);
91    tracing::debug!("Getting hash seq and children sizes of {}", content);
92    let request = GetRequest::new(
93        *hash,
94        RangeSpecSeq::from_ranges_infinite([
95            ChunkRanges::all(),
96            ChunkRanges::from(ChunkNum(u64::MAX)..),
97        ]),
98    );
99    let at_start = fsm::start(connection.clone(), request);
100    let at_connected = at_start.next().await?;
101    let fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
102        unreachable!("query includes root");
103    };
104    let at_start_root = start.next();
105    let (at_blob_content, size) = at_start_root.next().await?;
106    // check the size to avoid parsing a maliciously large hash seq
107    if size > max_size {
108        anyhow::bail!("size too large");
109    }
110    let (mut curr, hash_seq) = at_blob_content.concatenate_into_vec().await?;
111    let hash_seq = HashSeq::try_from(Bytes::from(hash_seq))?;
112    let mut sizes = Vec::with_capacity(hash_seq.len());
113    let closing = loop {
114        match curr.next() {
115            fsm::EndBlobNext::MoreChildren(more) => {
116                let hash = match hash_seq.get(sizes.len()) {
117                    Some(hash) => hash,
118                    None => break more.finish(),
119                };
120                let at_header = more.next(hash);
121                let (at_content, size) = at_header.next().await?;
122                let next = at_content.drain().await?;
123                sizes.push(size);
124                curr = next;
125            }
126            fsm::EndBlobNext::Closing(closing) => break closing,
127        }
128    };
129    let _stats = closing.next().await?;
130    tracing::debug!(
131        "Got hash seq and children sizes of {}: {:?}",
132        content,
133        sizes
134    );
135    Ok((hash_seq, sizes.into()))
136}
137
138/// Probe for a single chunk of a blob.
139///
140/// This is used to check if a peer has a specific chunk.
141pub async fn get_chunk_probe(
142    connection: &quinn::Connection,
143    hash: &Hash,
144    chunk: ChunkNum,
145) -> anyhow::Result<Stats> {
146    let ranges = ChunkRanges::from(chunk..chunk + 1);
147    let ranges = RangeSpecSeq::from_ranges([ranges]);
148    let request = GetRequest::new(*hash, ranges);
149    let request = fsm::start(connection.clone(), request);
150    let connected = request.next().await?;
151    let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
152        unreachable!("query includes root");
153    };
154    let header = start.next();
155    let (mut curr, _size) = header.next().await?;
156    let end = loop {
157        match curr.next().await {
158            fsm::BlobContentNext::More((next, res)) => {
159                res?;
160                curr = next;
161            }
162            fsm::BlobContentNext::Done(end) => {
163                break end;
164            }
165        }
166    };
167    let fsm::EndBlobNext::Closing(closing) = end.next() else {
168        unreachable!("query contains only one blob");
169    };
170    let stats = closing.next().await?;
171    Ok(stats)
172}
173
174/// Given a sequence of sizes of children, generate a range spec that selects a
175/// random chunk of a random child.
176///
177/// The random chunk is chosen uniformly from the chunks of the children, so
178/// larger children are more likely to be selected.
179pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq {
180    let total_chunks = sizes
181        .iter()
182        .map(|size| ChunkNum::full_chunks(*size).0)
183        .sum::<u64>();
184    let random_chunk = rng.gen_range(0..total_chunks);
185    let mut remaining = random_chunk;
186    let mut ranges = vec![];
187    ranges.push(ChunkRanges::empty());
188    for size in sizes.iter() {
189        let chunks = ChunkNum::full_chunks(*size).0;
190        if remaining < chunks {
191            ranges.push(ChunkRanges::from(
192                ChunkNum(remaining)..ChunkNum(remaining + 1),
193            ));
194            break;
195        } else {
196            remaining -= chunks;
197            ranges.push(ChunkRanges::empty());
198        }
199    }
200    RangeSpecSeq::from_ranges(ranges)
201}