ant-node 0.11.5

Pure quantum-proof network node for the Autonomi decentralized network
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
//! New-node bootstrap logic (Section 16).
//!
//! A joining node performs active sync to discover and verify keys it should
//! hold, then transitions to normal operation once all bootstrap work drains.

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use crate::logging::{debug, info, warn};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

use saorsa_core::DhtNetworkEvent;

use crate::ant_protocol::XorName;
use crate::replication::scheduling::ReplicationQueues;
use crate::replication::types::BootstrapState;

// ---------------------------------------------------------------------------
// DHT bootstrap gate
// ---------------------------------------------------------------------------

/// Outcome of waiting for the `DhtNetworkEvent::BootstrapComplete` event.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BootstrapGateResult {
    /// The event was received — routing table is populated.
    Received,
    /// Timed out or channel error — proceed anyway (bootstrap node scenario).
    TimedOut,
    /// Shutdown was requested while waiting.
    Shutdown,
}

/// Wait for saorsa-core's `DhtNetworkEvent::BootstrapComplete` before
/// returning.
///
/// The caller must supply a pre-subscribed `dht_events` receiver. This is
/// critical: the subscription must be created **before**
/// `P2PNode::start()` so the `BootstrapComplete` event is not missed.
///
/// Returns [`BootstrapGateResult::Received`] on success,
/// [`BootstrapGateResult::TimedOut`] if the timeout elapses (e.g. a
/// bootstrap node with no peers), or [`BootstrapGateResult::Shutdown`] if
/// cancellation is signalled.
pub async fn wait_for_bootstrap_complete(
    mut dht_events: tokio::sync::broadcast::Receiver<DhtNetworkEvent>,
    timeout_secs: u64,
    shutdown: &CancellationToken,
) -> BootstrapGateResult {
    let timeout = Duration::from_secs(timeout_secs);

    let result = tokio::select! {
        () = shutdown.cancelled() => {
            debug!("Bootstrap sync: shutdown during BootstrapComplete wait");
            BootstrapGateResult::Shutdown
        }
        () = tokio::time::sleep(timeout) => {
            warn!(
                "Bootstrap sync: timed out after {timeout_secs}s waiting for \
                 BootstrapComplete — proceeding (likely a bootstrap node with no peers)",
            );
            BootstrapGateResult::TimedOut
        }
        gate = async {
            loop {
                match dht_events.recv().await {
                    Ok(DhtNetworkEvent::BootstrapComplete { num_peers }) => {
                        info!(
                            "Bootstrap sync: DHT bootstrap complete \
                             with {num_peers} peers in routing table"
                        );
                        break BootstrapGateResult::Received;
                    }
                    Ok(_) => {}
                    Err(e) => {
                        warn!(
                            "Bootstrap sync: DHT event channel error: {e}, \
                             proceeding without gate"
                        );
                        break BootstrapGateResult::TimedOut;
                    }
                }
            }
        } => gate,
    };
    drop(dht_events);
    result
}

// ---------------------------------------------------------------------------
// Bootstrap sync
// ---------------------------------------------------------------------------

// `snapshot_close_neighbors` is defined in `neighbor_sync` and re-used here.

/// Mark bootstrap as complete, updating the shared state.
pub async fn mark_bootstrap_drained(bootstrap_state: &Arc<RwLock<BootstrapState>>) {
    let mut state = bootstrap_state.write().await;
    state.drained = true;
    info!("Bootstrap explicitly marked as drained");
}

/// Check if bootstrap is drained and update state if so.
///
/// Bootstrap is drained when:
/// 1. All bootstrap peer requests have completed.
/// 2. All bootstrap-discovered keys have left the pipeline (no longer in
///    `PendingVerify`, `FetchQueue`, or `InFlightFetch`).
///
/// Returns `true` if bootstrap is (now) drained.
pub async fn check_bootstrap_drained(
    bootstrap_state: &Arc<RwLock<BootstrapState>>,
    queues: &ReplicationQueues,
) -> bool {
    let mut state = bootstrap_state.write().await;
    if state.drained {
        return true;
    }

    if state.pending_peer_requests > 0 {
        return false;
    }

    // Hints capacity-rejected at the pending_verify bounds during bootstrap
    // must be re-delivered by the originating source before drain can be
    // claimed; otherwise we'd silently mark ourselves complete with
    // outstanding work the source still owes us (codex round-2 BLOCKER).
    // The set retires per-source as each source's next admission cycle
    // completes with zero rejections — see `clear_capacity_rejected`.
    if !state.capacity_rejected_sources.is_empty() {
        let n = state.capacity_rejected_sources.len();
        debug!("Bootstrap NOT drained: {n} source(s) have outstanding capacity-rejected hints");
        return false;
    }

    if queues.is_bootstrap_work_empty(&state.pending_keys) {
        state.drained = true;
        info!("Bootstrap drained: all peer requests completed and work queues empty");
        true
    } else {
        false
    }
}

/// Record that `source` had one or more hints capacity-rejected this cycle.
///
/// Idempotent: tracks a set of sources, not a counter. Bootstrap cannot
/// drain while this source is in the set; cleared by
/// [`clear_capacity_rejected`] when the same source's next admission cycle
/// completes with zero rejections (i.e. the source successfully
/// re-delivered everything that previously overflowed).
pub async fn note_capacity_rejected(
    bootstrap_state: &Arc<RwLock<BootstrapState>>,
    source: saorsa_core::identity::PeerId,
) {
    let mut state = bootstrap_state.write().await;
    if state.capacity_rejected_sources.insert(source) {
        let n = state.capacity_rejected_sources.len();
        debug!(
            "Bootstrap: source {source} now has outstanding capacity-rejected hints \
             ({n} sources outstanding)"
        );
    }
}

/// Mark `source`'s outstanding capacity rejections as cleared.
///
/// Called whenever `source` completes an admission cycle with zero
/// capacity rejections: the source successfully re-delivered any hints
/// that previously overflowed, so its contribution to "bootstrap not
/// drained" is retired. No-op if the source had no outstanding rejections.
pub async fn clear_capacity_rejected(
    bootstrap_state: &Arc<RwLock<BootstrapState>>,
    source: &saorsa_core::identity::PeerId,
) {
    let mut state = bootstrap_state.write().await;
    if state.capacity_rejected_sources.remove(source) {
        let n = state.capacity_rejected_sources.len();
        debug!(
            "Bootstrap: cleared outstanding capacity rejections for {source} \
             ({n} sources still outstanding)"
        );
    }
}

/// Record a set of discovered keys into the bootstrap state for drain tracking.
#[allow(clippy::implicit_hasher)]
pub async fn track_discovered_keys(
    bootstrap_state: &Arc<RwLock<BootstrapState>>,
    keys: &HashSet<XorName>,
) {
    let mut state = bootstrap_state.write().await;
    state.pending_keys.extend(keys);
    debug!(
        "Bootstrap tracking {} total discovered keys",
        state.pending_keys.len()
    );
}

/// Increment the pending peer request counter.
pub async fn increment_pending_requests(
    bootstrap_state: &Arc<RwLock<BootstrapState>>,
    count: usize,
) {
    let mut state = bootstrap_state.write().await;
    state.pending_peer_requests += count;
}

/// Decrement the pending peer request counter (saturating).
pub async fn decrement_pending_requests(
    bootstrap_state: &Arc<RwLock<BootstrapState>>,
    count: usize,
) {
    let mut state = bootstrap_state.write().await;
    state.pending_peer_requests = state.pending_peer_requests.saturating_sub(count);
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
    use std::collections::HashSet;
    use std::sync::Arc;

    use tokio::sync::RwLock;

    use std::time::Instant;

    use super::*;
    use crate::replication::scheduling::ReplicationQueues;
    use crate::replication::types::{
        BootstrapState, HintPipeline, VerificationEntry, VerificationState,
    };

    fn xor_name_from_byte(b: u8) -> XorName {
        [b; 32]
    }

    #[tokio::test]
    async fn check_drained_when_already_drained() {
        let state = Arc::new(RwLock::new(BootstrapState {
            drained: true,
            pending_peer_requests: 5,
            pending_keys: HashSet::new(),
            capacity_rejected_sources: HashSet::new(),
        }));
        let queues = ReplicationQueues::new();

        assert!(
            check_bootstrap_drained(&state, &queues).await,
            "should be drained when flag is already set"
        );
    }

    #[tokio::test]
    async fn check_drained_blocked_by_pending_requests() {
        let state = Arc::new(RwLock::new(BootstrapState {
            drained: false,
            pending_peer_requests: 2,
            pending_keys: HashSet::new(),
            capacity_rejected_sources: HashSet::new(),
        }));
        let queues = ReplicationQueues::new();

        assert!(
            !check_bootstrap_drained(&state, &queues).await,
            "should not drain with pending requests"
        );
    }

    #[tokio::test]
    async fn check_drained_transitions_when_all_work_done() {
        let state = Arc::new(RwLock::new(BootstrapState {
            drained: false,
            pending_peer_requests: 0,
            pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(),
            capacity_rejected_sources: HashSet::new(),
        }));
        let queues = ReplicationQueues::new();

        // Key 0x01 is not in any queue, so bootstrap should drain.
        assert!(check_bootstrap_drained(&state, &queues).await);
        assert!(state.read().await.drained, "drained flag should be set");
    }

    #[tokio::test]
    async fn check_drained_blocked_by_queued_key() {
        let state = Arc::new(RwLock::new(BootstrapState {
            drained: false,
            pending_peer_requests: 0,
            pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(),
            capacity_rejected_sources: HashSet::new(),
        }));
        let mut queues = ReplicationQueues::new();

        // Put the bootstrap key into the pending-verify queue.
        let entry = VerificationEntry {
            state: VerificationState::PendingVerify,
            pipeline: HintPipeline::Replica,
            verified_sources: Vec::new(),
            tried_sources: HashSet::new(),
            created_at: Instant::now(),
            hint_sender: saorsa_core::identity::PeerId::from_bytes([0u8; 32]),
        };
        queues.add_pending_verify(xor_name_from_byte(0x01), entry);

        assert!(
            !check_bootstrap_drained(&state, &queues).await,
            "should not drain while bootstrap key is still in pipeline"
        );
    }

    #[tokio::test]
    async fn mark_bootstrap_drained_sets_flag() {
        let state = Arc::new(RwLock::new(BootstrapState::new()));
        mark_bootstrap_drained(&state).await;
        assert!(state.read().await.drained);
    }

    #[tokio::test]
    async fn track_discovered_keys_accumulates() {
        let state = Arc::new(RwLock::new(BootstrapState::new()));
        let set_a: HashSet<XorName> = [xor_name_from_byte(0x01), xor_name_from_byte(0x02)]
            .into_iter()
            .collect();
        let set_b: HashSet<XorName> = [xor_name_from_byte(0x02), xor_name_from_byte(0x03)]
            .into_iter()
            .collect();

        track_discovered_keys(&state, &set_a).await;
        track_discovered_keys(&state, &set_b).await;

        let s = state.read().await;
        assert_eq!(s.pending_keys.len(), 3, "should deduplicate across calls");
    }

    #[tokio::test]
    async fn increment_and_decrement_pending_requests() {
        let state = Arc::new(RwLock::new(BootstrapState::new()));

        increment_pending_requests(&state, 5).await;
        assert_eq!(state.read().await.pending_peer_requests, 5);

        decrement_pending_requests(&state, 3).await;
        assert_eq!(state.read().await.pending_peer_requests, 2);

        // Saturating subtraction.
        decrement_pending_requests(&state, 10).await;
        assert_eq!(
            state.read().await.pending_peer_requests,
            0,
            "should saturate at zero"
        );
    }

    /// Round-3 regression: a source that previously had capacity-rejected
    /// hints must be retired from the "not yet drained" list when it
    /// completes a later admission cycle with zero rejections, otherwise
    /// `check_bootstrap_drained` is permanently wedged after a single
    /// rejection.
    #[tokio::test]
    async fn capacity_rejected_clears_on_clean_cycle() {
        let state = Arc::new(RwLock::new(BootstrapState::new()));
        let queues = ReplicationQueues::new();
        let source = saorsa_core::identity::PeerId::from_bytes([7u8; 32]);

        // First cycle: this source overflowed, drain blocked.
        note_capacity_rejected(&state, source).await;
        assert!(
            !check_bootstrap_drained(&state, &queues).await,
            "drain must be blocked while a source has outstanding capacity rejections"
        );

        // Second cycle from the SAME source: zero rejections → clear it.
        clear_capacity_rejected(&state, &source).await;
        assert!(
            check_bootstrap_drained(&state, &queues).await,
            "drain must complete once the source's outstanding rejections are cleared"
        );
    }

    /// Per-source granularity: one source's clean cycle must NOT clear a
    /// different source's outstanding rejections.
    #[tokio::test]
    async fn capacity_rejected_is_per_source() {
        let state = Arc::new(RwLock::new(BootstrapState::new()));
        let queues = ReplicationQueues::new();
        let source_a = saorsa_core::identity::PeerId::from_bytes([0xAA; 32]);
        let source_b = saorsa_core::identity::PeerId::from_bytes([0xBB; 32]);

        note_capacity_rejected(&state, source_a).await;
        note_capacity_rejected(&state, source_b).await;
        assert!(!check_bootstrap_drained(&state, &queues).await);

        // Only A clears; B still owes us re-hints.
        clear_capacity_rejected(&state, &source_a).await;
        assert!(
            !check_bootstrap_drained(&state, &queues).await,
            "B's outstanding rejections must keep drain blocked"
        );

        clear_capacity_rejected(&state, &source_b).await;
        assert!(check_bootstrap_drained(&state, &queues).await);
    }
}