iroh_blobs/get/
request.rs1use std::sync::Arc;
3
4use bao_tree::{ChunkNum, ChunkRanges};
5use bytes::Bytes;
6use iroh::endpoint::Connection;
7use rand::Rng;
8
9use super::{fsm, Stats};
10use crate::{
11 hashseq::HashSeq,
12 protocol::{GetRequest, RangeSpecSeq},
13 Hash, HashAndFormat,
14};
15
16pub async fn get_unverified_size(
21 connection: &Connection,
22 hash: &Hash,
23) -> anyhow::Result<(u64, Stats)> {
24 let request = GetRequest::new(
25 *hash,
26 RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
27 );
28 let request = fsm::start(connection.clone(), request);
29 let connected = request.next().await?;
30 let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
31 unreachable!("expected start root");
32 };
33 let at_blob_header = start.next();
34 let (curr, size) = at_blob_header.next().await?;
35 let stats = curr.finish().next().await?;
36 Ok((size, stats))
37}
38
39pub async fn get_verified_size(
44 connection: &Connection,
45 hash: &Hash,
46) -> anyhow::Result<(u64, Stats)> {
47 tracing::trace!("Getting verified size of {}", hash.to_hex());
48 let request = GetRequest::new(
49 *hash,
50 RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
51 );
52 let request = fsm::start(connection.clone(), request);
53 let connected = request.next().await?;
54 let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
55 unreachable!("expected start root");
56 };
57 let header = start.next();
58 let (mut curr, size) = header.next().await?;
59 let end = loop {
60 match curr.next().await {
61 fsm::BlobContentNext::More((next, res)) => {
62 let _ = res?;
63 curr = next;
64 }
65 fsm::BlobContentNext::Done(end) => {
66 break end;
67 }
68 }
69 };
70 let fsm::EndBlobNext::Closing(closing) = end.next() else {
71 unreachable!("expected closing");
72 };
73 let stats = closing.next().await?;
74 tracing::trace!(
75 "Got verified size of {}, {:.6}s",
76 hash.to_hex(),
77 stats.elapsed.as_secs_f64()
78 );
79 Ok((size, stats))
80}
81
82pub async fn get_hash_seq_and_sizes(
87 connection: &Connection,
88 hash: &Hash,
89 max_size: u64,
90) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
91 let content = HashAndFormat::hash_seq(*hash);
92 tracing::debug!("Getting hash seq and children sizes of {}", content);
93 let request = GetRequest::new(
94 *hash,
95 RangeSpecSeq::from_ranges_infinite([
96 ChunkRanges::all(),
97 ChunkRanges::from(ChunkNum(u64::MAX)..),
98 ]),
99 );
100 let at_start = fsm::start(connection.clone(), request);
101 let at_connected = at_start.next().await?;
102 let fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
103 unreachable!("query includes root");
104 };
105 let at_start_root = start.next();
106 let (at_blob_content, size) = at_start_root.next().await?;
107 if size > max_size {
109 anyhow::bail!("size too large");
110 }
111 let (mut curr, hash_seq) = at_blob_content.concatenate_into_vec().await?;
112 let hash_seq = HashSeq::try_from(Bytes::from(hash_seq))?;
113 let mut sizes = Vec::with_capacity(hash_seq.len());
114 let closing = loop {
115 match curr.next() {
116 fsm::EndBlobNext::MoreChildren(more) => {
117 let hash = match hash_seq.get(sizes.len()) {
118 Some(hash) => hash,
119 None => break more.finish(),
120 };
121 let at_header = more.next(hash);
122 let (at_content, size) = at_header.next().await?;
123 let next = at_content.drain().await?;
124 sizes.push(size);
125 curr = next;
126 }
127 fsm::EndBlobNext::Closing(closing) => break closing,
128 }
129 };
130 let _stats = closing.next().await?;
131 tracing::debug!(
132 "Got hash seq and children sizes of {}: {:?}",
133 content,
134 sizes
135 );
136 Ok((hash_seq, sizes.into()))
137}
138
139pub async fn get_chunk_probe(
143 connection: &Connection,
144 hash: &Hash,
145 chunk: ChunkNum,
146) -> anyhow::Result<Stats> {
147 let ranges = ChunkRanges::from(chunk..chunk + 1);
148 let ranges = RangeSpecSeq::from_ranges([ranges]);
149 let request = GetRequest::new(*hash, ranges);
150 let request = fsm::start(connection.clone(), request);
151 let connected = request.next().await?;
152 let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
153 unreachable!("query includes root");
154 };
155 let header = start.next();
156 let (mut curr, _size) = header.next().await?;
157 let end = loop {
158 match curr.next().await {
159 fsm::BlobContentNext::More((next, res)) => {
160 res?;
161 curr = next;
162 }
163 fsm::BlobContentNext::Done(end) => {
164 break end;
165 }
166 }
167 };
168 let fsm::EndBlobNext::Closing(closing) = end.next() else {
169 unreachable!("query contains only one blob");
170 };
171 let stats = closing.next().await?;
172 Ok(stats)
173}
174
175pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq {
181 let total_chunks = sizes
182 .iter()
183 .map(|size| ChunkNum::full_chunks(*size).0)
184 .sum::<u64>();
185 let random_chunk = rng.gen_range(0..total_chunks);
186 let mut remaining = random_chunk;
187 let mut ranges = vec![];
188 ranges.push(ChunkRanges::empty());
189 for size in sizes.iter() {
190 let chunks = ChunkNum::full_chunks(*size).0;
191 if remaining < chunks {
192 ranges.push(ChunkRanges::from(
193 ChunkNum(remaining)..ChunkNum(remaining + 1),
194 ));
195 break;
196 } else {
197 remaining -= chunks;
198 ranges.push(ChunkRanges::empty());
199 }
200 }
201 RangeSpecSeq::from_ranges(ranges)
202}