iroh_blobs/downloader/
invariants.rs

1//! Invariants for the service.
2
3#![cfg(any(test, debug_assertions))]
4
5use super::*;
6
7/// invariants for the service.
8impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
9    /// Checks the various invariants the service must maintain
10    #[track_caller]
11    pub(in crate::downloader) fn check_invariants(&self) {
12        self.check_active_request_count();
13        self.check_queued_requests_consistency();
14        self.check_idle_peer_consistency();
15        self.check_concurrency_limits();
16        self.check_provider_map_prunning();
17    }
18
19    /// Checks concurrency limits are maintained.
20    #[track_caller]
21    fn check_concurrency_limits(&self) {
22        let ConcurrencyLimits {
23            max_concurrent_requests,
24            max_concurrent_requests_per_node,
25            max_open_connections,
26            max_concurrent_dials_per_hash,
27        } = &self.concurrency_limits;
28
29        // check the total number of active requests to ensure it stays within the limit
30        assert!(
31            self.in_progress_downloads.len() <= *max_concurrent_requests,
32            "max_concurrent_requests exceeded"
33        );
34
35        // check that the open and dialing peers don't exceed the connection capacity
36        tracing::trace!(
37            "limits: conns: {}/{} | reqs: {}/{}",
38            self.connections_count(),
39            max_open_connections,
40            self.in_progress_downloads.len(),
41            max_concurrent_requests
42        );
43        assert!(
44            self.connections_count() <= *max_open_connections,
45            "max_open_connections exceeded"
46        );
47
48        // check the active requests per peer don't exceed the limit
49        for (node, info) in self.connected_nodes.iter() {
50            assert!(
51                info.active_requests() <= *max_concurrent_requests_per_node,
52                "max_concurrent_requests_per_node exceeded for {node}"
53            )
54        }
55
56        // check that we do not dial more nodes than allowed for the next pending hashes
57        if let Some(kind) = self.queue.front() {
58            let hash = kind.hash();
59            let nodes = self.providers.get_candidates(&hash);
60            let mut dialing = 0;
61            for node in nodes {
62                if self.dialer.is_pending(node) {
63                    dialing += 1;
64                }
65            }
66            assert!(
67                dialing <= *max_concurrent_dials_per_hash,
68                "max_concurrent_dials_per_hash exceeded for {hash}"
69            )
70        }
71    }
72
73    /// Checks that the count of active requests per peer is consistent with the active requests,
74    /// and that active request are consistent with download futures
75    #[track_caller]
76    fn check_active_request_count(&self) {
77        // check that the count of futures we are polling for downloads is consistent with the
78        // number of requests
79        assert_eq!(
80            self.active_requests.len(),
81            self.in_progress_downloads.len(),
82            "active_requests and in_progress_downloads are out of sync"
83        );
84        // check that the count of requests per peer matches the number of requests that have that
85        // peer as active
86        let mut real_count: HashMap<NodeId, usize> =
87            HashMap::with_capacity(self.connected_nodes.len());
88        for req_info in self.active_requests.values() {
89            // nothing like some classic word count
90            *real_count.entry(req_info.node).or_default() += 1;
91        }
92        for (peer, info) in self.connected_nodes.iter() {
93            assert_eq!(
94                info.active_requests(),
95                real_count.get(peer).copied().unwrap_or_default(),
96                "mismatched count of active requests for {peer}"
97            )
98        }
99    }
100
101    /// Checks that the queued requests all appear in the provider map and request map.
102    #[track_caller]
103    fn check_queued_requests_consistency(&self) {
104        // check that all hashes in the queue have candidates
105        for entry in self.queue.iter() {
106            assert!(
107                self.providers
108                    .get_candidates(&entry.hash())
109                    .next()
110                    .is_some(),
111                "all queued requests have providers"
112            );
113            assert!(
114                self.requests.contains_key(entry),
115                "all queued requests have request info"
116            );
117        }
118
119        // check that all parked hashes should be parked
120        for entry in self.queue.iter_parked() {
121            assert!(
122                matches!(self.next_step(entry), NextStep::Park),
123                "all parked downloads evaluate to the correct next step"
124            );
125            assert!(
126                self.providers
127                    .get_candidates(&entry.hash())
128                    .all(|node| matches!(self.node_state(node), NodeState::WaitForRetry)),
129                "all parked downloads have only retrying nodes"
130            );
131        }
132    }
133
134    /// Check that peers queued to be disconnected are consistent with peers considered idle.
135    #[track_caller]
136    fn check_idle_peer_consistency(&self) {
137        let idle_peers = self
138            .connected_nodes
139            .values()
140            .filter(|info| info.active_requests() == 0)
141            .count();
142        assert_eq!(
143            self.goodbye_nodes_queue.len(),
144            idle_peers,
145            "inconsistent count of idle peers"
146        );
147    }
148
149    /// Check that every hash in the provider map is needed.
150    #[track_caller]
151    fn check_provider_map_prunning(&self) {
152        for hash in self.providers.hash_node.keys() {
153            let as_raw = DownloadKind(HashAndFormat::raw(*hash));
154            let as_hash_seq = DownloadKind(HashAndFormat::hash_seq(*hash));
155            assert!(
156                self.queue.contains_hash(*hash)
157                    || self.active_requests.contains_key(&as_raw)
158                    || self.active_requests.contains_key(&as_hash_seq),
159                "all hashes in the provider map are in the queue or active"
160            )
161        }
162    }
163}