iroh_blobs/get/
request.rs

1//! Utilities for complex get requests.
2use std::sync::Arc;
3
4use bao_tree::{ChunkNum, ChunkRanges};
5use bytes::Bytes;
6use iroh::endpoint::Connection;
7use rand::Rng;
8
9use super::{fsm, Stats};
10use crate::{
11    hashseq::HashSeq,
12    protocol::{GetRequest, RangeSpecSeq},
13    Hash, HashAndFormat,
14};
15
16/// Get the claimed size of a blob from a peer.
17///
18/// This is just reading the size header and then immediately closing the connection.
19/// It can be used to check if a peer has any data at all.
20pub async fn get_unverified_size(
21    connection: &Connection,
22    hash: &Hash,
23) -> anyhow::Result<(u64, Stats)> {
24    let request = GetRequest::new(
25        *hash,
26        RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
27    );
28    let request = fsm::start(connection.clone(), request);
29    let connected = request.next().await?;
30    let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
31        unreachable!("expected start root");
32    };
33    let at_blob_header = start.next();
34    let (curr, size) = at_blob_header.next().await?;
35    let stats = curr.finish().next().await?;
36    Ok((size, stats))
37}
38
39/// Get the verified size of a blob from a peer.
40///
41/// This asks for the last chunk of the blob and validates the response.
42/// Note that this does not validate that the peer has all the data.
43pub async fn get_verified_size(
44    connection: &Connection,
45    hash: &Hash,
46) -> anyhow::Result<(u64, Stats)> {
47    tracing::trace!("Getting verified size of {}", hash.to_hex());
48    let request = GetRequest::new(
49        *hash,
50        RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
51    );
52    let request = fsm::start(connection.clone(), request);
53    let connected = request.next().await?;
54    let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
55        unreachable!("expected start root");
56    };
57    let header = start.next();
58    let (mut curr, size) = header.next().await?;
59    let end = loop {
60        match curr.next().await {
61            fsm::BlobContentNext::More((next, res)) => {
62                let _ = res?;
63                curr = next;
64            }
65            fsm::BlobContentNext::Done(end) => {
66                break end;
67            }
68        }
69    };
70    let fsm::EndBlobNext::Closing(closing) = end.next() else {
71        unreachable!("expected closing");
72    };
73    let stats = closing.next().await?;
74    tracing::trace!(
75        "Got verified size of {}, {:.6}s",
76        hash.to_hex(),
77        stats.elapsed.as_secs_f64()
78    );
79    Ok((size, stats))
80}
81
82/// Given a hash of a hash seq, get the hash seq and the verified sizes of its
83/// children.
84///
85/// This can be used to compute the total size when requesting a hash seq.
86pub async fn get_hash_seq_and_sizes(
87    connection: &Connection,
88    hash: &Hash,
89    max_size: u64,
90) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
91    let content = HashAndFormat::hash_seq(*hash);
92    tracing::debug!("Getting hash seq and children sizes of {}", content);
93    let request = GetRequest::new(
94        *hash,
95        RangeSpecSeq::from_ranges_infinite([
96            ChunkRanges::all(),
97            ChunkRanges::from(ChunkNum(u64::MAX)..),
98        ]),
99    );
100    let at_start = fsm::start(connection.clone(), request);
101    let at_connected = at_start.next().await?;
102    let fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
103        unreachable!("query includes root");
104    };
105    let at_start_root = start.next();
106    let (at_blob_content, size) = at_start_root.next().await?;
107    // check the size to avoid parsing a maliciously large hash seq
108    if size > max_size {
109        anyhow::bail!("size too large");
110    }
111    let (mut curr, hash_seq) = at_blob_content.concatenate_into_vec().await?;
112    let hash_seq = HashSeq::try_from(Bytes::from(hash_seq))?;
113    let mut sizes = Vec::with_capacity(hash_seq.len());
114    let closing = loop {
115        match curr.next() {
116            fsm::EndBlobNext::MoreChildren(more) => {
117                let hash = match hash_seq.get(sizes.len()) {
118                    Some(hash) => hash,
119                    None => break more.finish(),
120                };
121                let at_header = more.next(hash);
122                let (at_content, size) = at_header.next().await?;
123                let next = at_content.drain().await?;
124                sizes.push(size);
125                curr = next;
126            }
127            fsm::EndBlobNext::Closing(closing) => break closing,
128        }
129    };
130    let _stats = closing.next().await?;
131    tracing::debug!(
132        "Got hash seq and children sizes of {}: {:?}",
133        content,
134        sizes
135    );
136    Ok((hash_seq, sizes.into()))
137}
138
139/// Probe for a single chunk of a blob.
140///
141/// This is used to check if a peer has a specific chunk.
142pub async fn get_chunk_probe(
143    connection: &Connection,
144    hash: &Hash,
145    chunk: ChunkNum,
146) -> anyhow::Result<Stats> {
147    let ranges = ChunkRanges::from(chunk..chunk + 1);
148    let ranges = RangeSpecSeq::from_ranges([ranges]);
149    let request = GetRequest::new(*hash, ranges);
150    let request = fsm::start(connection.clone(), request);
151    let connected = request.next().await?;
152    let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
153        unreachable!("query includes root");
154    };
155    let header = start.next();
156    let (mut curr, _size) = header.next().await?;
157    let end = loop {
158        match curr.next().await {
159            fsm::BlobContentNext::More((next, res)) => {
160                res?;
161                curr = next;
162            }
163            fsm::BlobContentNext::Done(end) => {
164                break end;
165            }
166        }
167    };
168    let fsm::EndBlobNext::Closing(closing) = end.next() else {
169        unreachable!("query contains only one blob");
170    };
171    let stats = closing.next().await?;
172    Ok(stats)
173}
174
175/// Given a sequence of sizes of children, generate a range spec that selects a
176/// random chunk of a random child.
177///
178/// The random chunk is chosen uniformly from the chunks of the children, so
179/// larger children are more likely to be selected.
180pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq {
181    let total_chunks = sizes
182        .iter()
183        .map(|size| ChunkNum::full_chunks(*size).0)
184        .sum::<u64>();
185    let random_chunk = rng.gen_range(0..total_chunks);
186    let mut remaining = random_chunk;
187    let mut ranges = vec![];
188    ranges.push(ChunkRanges::empty());
189    for size in sizes.iter() {
190        let chunks = ChunkNum::full_chunks(*size).0;
191        if remaining < chunks {
192            ranges.push(ChunkRanges::from(
193                ChunkNum(remaining)..ChunkNum(remaining + 1),
194            ));
195            break;
196        } else {
197            remaining -= chunks;
198            ranges.push(ChunkRanges::empty());
199        }
200    }
201    RangeSpecSeq::from_ranges(ranges)
202}