iroh_blobs/downloader/
invariants.rs1#![cfg(any(test, debug_assertions))]
4
5use super::*;
6
7impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
9 #[track_caller]
11 pub(in crate::downloader) fn check_invariants(&self) {
12 self.check_active_request_count();
13 self.check_queued_requests_consistency();
14 self.check_idle_peer_consistency();
15 self.check_concurrency_limits();
16 self.check_provider_map_prunning();
17 }
18
19 #[track_caller]
21 fn check_concurrency_limits(&self) {
22 let ConcurrencyLimits {
23 max_concurrent_requests,
24 max_concurrent_requests_per_node,
25 max_open_connections,
26 max_concurrent_dials_per_hash,
27 } = &self.concurrency_limits;
28
29 assert!(
31 self.in_progress_downloads.len() <= *max_concurrent_requests,
32 "max_concurrent_requests exceeded"
33 );
34
35 tracing::trace!(
37 "limits: conns: {}/{} | reqs: {}/{}",
38 self.connections_count(),
39 max_open_connections,
40 self.in_progress_downloads.len(),
41 max_concurrent_requests
42 );
43 assert!(
44 self.connections_count() <= *max_open_connections,
45 "max_open_connections exceeded"
46 );
47
48 for (node, info) in self.connected_nodes.iter() {
50 assert!(
51 info.active_requests() <= *max_concurrent_requests_per_node,
52 "max_concurrent_requests_per_node exceeded for {node}"
53 )
54 }
55
56 if let Some(kind) = self.queue.front() {
58 let hash = kind.hash();
59 let nodes = self.providers.get_candidates(&hash);
60 let mut dialing = 0;
61 for node in nodes {
62 if self.dialer.is_pending(node) {
63 dialing += 1;
64 }
65 }
66 assert!(
67 dialing <= *max_concurrent_dials_per_hash,
68 "max_concurrent_dials_per_hash exceeded for {hash}"
69 )
70 }
71 }
72
73 #[track_caller]
76 fn check_active_request_count(&self) {
77 assert_eq!(
80 self.active_requests.len(),
81 self.in_progress_downloads.len(),
82 "active_requests and in_progress_downloads are out of sync"
83 );
84 let mut real_count: HashMap<NodeId, usize> =
87 HashMap::with_capacity(self.connected_nodes.len());
88 for req_info in self.active_requests.values() {
89 *real_count.entry(req_info.node).or_default() += 1;
91 }
92 for (peer, info) in self.connected_nodes.iter() {
93 assert_eq!(
94 info.active_requests(),
95 real_count.get(peer).copied().unwrap_or_default(),
96 "mismatched count of active requests for {peer}"
97 )
98 }
99 }
100
101 #[track_caller]
103 fn check_queued_requests_consistency(&self) {
104 for entry in self.queue.iter() {
106 assert!(
107 self.providers
108 .get_candidates(&entry.hash())
109 .next()
110 .is_some(),
111 "all queued requests have providers"
112 );
113 assert!(
114 self.requests.contains_key(entry),
115 "all queued requests have request info"
116 );
117 }
118
119 for entry in self.queue.iter_parked() {
121 assert!(
122 matches!(self.next_step(entry), NextStep::Park),
123 "all parked downloads evaluate to the correct next step"
124 );
125 assert!(
126 self.providers
127 .get_candidates(&entry.hash())
128 .all(|node| matches!(self.node_state(node), NodeState::WaitForRetry)),
129 "all parked downloads have only retrying nodes"
130 );
131 }
132 }
133
134 #[track_caller]
136 fn check_idle_peer_consistency(&self) {
137 let idle_peers = self
138 .connected_nodes
139 .values()
140 .filter(|info| info.active_requests() == 0)
141 .count();
142 assert_eq!(
143 self.goodbye_nodes_queue.len(),
144 idle_peers,
145 "inconsistent count of idle peers"
146 );
147 }
148
149 #[track_caller]
151 fn check_provider_map_prunning(&self) {
152 for hash in self.providers.hash_node.keys() {
153 let as_raw = DownloadKind(HashAndFormat::raw(*hash));
154 let as_hash_seq = DownloadKind(HashAndFormat::hash_seq(*hash));
155 assert!(
156 self.queue.contains_hash(*hash)
157 || self.active_requests.contains_key(&as_raw)
158 || self.active_requests.contains_key(&as_hash_seq),
159 "all hashes in the provider map are in the queue or active"
160 )
161 }
162 }
163}