iroh_bytes/get/
request.rs1use std::sync::Arc;
3
4use crate::{
5 hashseq::HashSeq,
6 protocol::{GetRequest, RangeSpecSeq},
7 Hash, HashAndFormat,
8};
9use bao_tree::{ChunkNum, ChunkRanges};
10use bytes::Bytes;
11use rand::Rng;
12
13use super::{fsm, Stats};
14
15pub async fn get_unverified_size(
20 connection: &quinn::Connection,
21 hash: &Hash,
22) -> anyhow::Result<(u64, Stats)> {
23 let request = GetRequest::new(
24 *hash,
25 RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
26 );
27 let request = fsm::start(connection.clone(), request);
28 let connected = request.next().await?;
29 let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
30 unreachable!("expected start root");
31 };
32 let at_blob_header = start.next();
33 let (curr, size) = at_blob_header.next().await?;
34 let stats = curr.finish().next().await?;
35 Ok((size, stats))
36}
37
38pub async fn get_verified_size(
43 connection: &quinn::Connection,
44 hash: &Hash,
45) -> anyhow::Result<(u64, Stats)> {
46 tracing::trace!("Getting verified size of {}", hash.to_hex());
47 let request = GetRequest::new(
48 *hash,
49 RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
50 );
51 let request = fsm::start(connection.clone(), request);
52 let connected = request.next().await?;
53 let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
54 unreachable!("expected start root");
55 };
56 let header = start.next();
57 let (mut curr, size) = header.next().await?;
58 let end = loop {
59 match curr.next().await {
60 fsm::BlobContentNext::More((next, res)) => {
61 let _ = res?;
62 curr = next;
63 }
64 fsm::BlobContentNext::Done(end) => {
65 break end;
66 }
67 }
68 };
69 let fsm::EndBlobNext::Closing(closing) = end.next() else {
70 unreachable!("expected closing");
71 };
72 let stats = closing.next().await?;
73 tracing::trace!(
74 "Got verified size of {}, {:.6}s",
75 hash.to_hex(),
76 stats.elapsed.as_secs_f64()
77 );
78 Ok((size, stats))
79}
80
81pub async fn get_hash_seq_and_sizes(
86 connection: &quinn::Connection,
87 hash: &Hash,
88 max_size: u64,
89) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
90 let content = HashAndFormat::hash_seq(*hash);
91 tracing::debug!("Getting hash seq and children sizes of {}", content);
92 let request = GetRequest::new(
93 *hash,
94 RangeSpecSeq::from_ranges_infinite([
95 ChunkRanges::all(),
96 ChunkRanges::from(ChunkNum(u64::MAX)..),
97 ]),
98 );
99 let at_start = fsm::start(connection.clone(), request);
100 let at_connected = at_start.next().await?;
101 let fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
102 unreachable!("query includes root");
103 };
104 let at_start_root = start.next();
105 let (at_blob_content, size) = at_start_root.next().await?;
106 if size > max_size {
108 anyhow::bail!("size too large");
109 }
110 let (mut curr, hash_seq) = at_blob_content.concatenate_into_vec().await?;
111 let hash_seq = HashSeq::try_from(Bytes::from(hash_seq))?;
112 let mut sizes = Vec::with_capacity(hash_seq.len());
113 let closing = loop {
114 match curr.next() {
115 fsm::EndBlobNext::MoreChildren(more) => {
116 let hash = match hash_seq.get(sizes.len()) {
117 Some(hash) => hash,
118 None => break more.finish(),
119 };
120 let at_header = more.next(hash);
121 let (at_content, size) = at_header.next().await?;
122 let next = at_content.drain().await?;
123 sizes.push(size);
124 curr = next;
125 }
126 fsm::EndBlobNext::Closing(closing) => break closing,
127 }
128 };
129 let _stats = closing.next().await?;
130 tracing::debug!(
131 "Got hash seq and children sizes of {}: {:?}",
132 content,
133 sizes
134 );
135 Ok((hash_seq, sizes.into()))
136}
137
138pub async fn get_chunk_probe(
142 connection: &quinn::Connection,
143 hash: &Hash,
144 chunk: ChunkNum,
145) -> anyhow::Result<Stats> {
146 let ranges = ChunkRanges::from(chunk..chunk + 1);
147 let ranges = RangeSpecSeq::from_ranges([ranges]);
148 let request = GetRequest::new(*hash, ranges);
149 let request = fsm::start(connection.clone(), request);
150 let connected = request.next().await?;
151 let fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
152 unreachable!("query includes root");
153 };
154 let header = start.next();
155 let (mut curr, _size) = header.next().await?;
156 let end = loop {
157 match curr.next().await {
158 fsm::BlobContentNext::More((next, res)) => {
159 res?;
160 curr = next;
161 }
162 fsm::BlobContentNext::Done(end) => {
163 break end;
164 }
165 }
166 };
167 let fsm::EndBlobNext::Closing(closing) = end.next() else {
168 unreachable!("query contains only one blob");
169 };
170 let stats = closing.next().await?;
171 Ok(stats)
172}
173
174pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq {
180 let total_chunks = sizes
181 .iter()
182 .map(|size| ChunkNum::full_chunks(*size).0)
183 .sum::<u64>();
184 let random_chunk = rng.gen_range(0..total_chunks);
185 let mut remaining = random_chunk;
186 let mut ranges = vec![];
187 ranges.push(ChunkRanges::empty());
188 for size in sizes.iter() {
189 let chunks = ChunkNum::full_chunks(*size).0;
190 if remaining < chunks {
191 ranges.push(ChunkRanges::from(
192 ChunkNum(remaining)..ChunkNum(remaining + 1),
193 ));
194 break;
195 } else {
196 remaining -= chunks;
197 ranges.push(ChunkRanges::empty());
198 }
199 }
200 RangeSpecSeq::from_ranges(ranges)
201}