use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::time::Instant;
use parking_lot::Mutex;
use tokio::sync::{Notify, Semaphore};
use irontide_core::Lengths;
use irontide_storage::Bitfield;
use crate::piece_reservation::{
AtomicPieceStates, AvailabilitySnapshot, BlockMaps, PieceWriteGuards, StealCandidates,
};
pub(crate) const INITIAL_QUEUE_DEPTH: usize = 128;
#[inline]
pub(crate) fn refill_grant(granted: u32, target: u32) -> u32 {
match granted.cmp(&target) {
std::cmp::Ordering::Greater => 0,
std::cmp::Ordering::Equal => 1,
std::cmp::Ordering::Less => 2,
}
}
pub(crate) fn return_permit_budgeted(shared: &PeerShared) {
use std::sync::atomic::Ordering::Relaxed;
let available = u32::try_from(shared.semaphore.available_permits()).unwrap_or(u32::MAX);
let granted = shared
.in_flight
.load(Relaxed)
.saturating_add(available)
.saturating_add(1);
let add = refill_grant(granted, shared.target_depth.load(Relaxed));
if add == 0 {
shared
.counters
.inc_diag(crate::stats::BUDGET_PERMITS_ABSORBED_TOTAL, 1);
} else {
shared.semaphore.add_permits(add as usize);
}
}
pub(crate) struct PeerShared {
pub semaphore: Arc<Semaphore>,
pub peer_choking: AtomicBool,
pub unchoke_notify: Notify,
pub addr: SocketAddr,
pub in_flight: Arc<AtomicU32>,
#[allow(
dead_code,
reason = "shared Arc read via PeerState for debug/state API"
)]
pub target_depth: Arc<AtomicU32>,
pub dispatch_drain_notify: Notify,
pub event_drain_notify: Arc<Notify>,
pub counters: Arc<crate::stats::SessionCounters>,
pub dispatch_backlog_cap: usize,
pub event_backlog_cap: usize,
pub remote_unchoked_at: Mutex<Option<Instant>>,
pub first_block_pending: AtomicBool,
pub request_sent_times: Mutex<std::collections::HashMap<(u32, u32), Instant>>,
}
pub(crate) const RTT_PROBE_MAX_INFLIGHT: u32 = 8;
impl PeerShared {
#[allow(
dead_code,
reason = "test helper used in `peer_tasks::tests` and `torrent_peer_handler::tests`"
)]
pub fn new(
addr: SocketAddr,
in_flight: Arc<AtomicU32>,
target_depth: Arc<AtomicU32>,
event_drain_notify: Arc<Notify>,
) -> Self {
Self::new_with_counters(
addr,
in_flight,
target_depth,
event_drain_notify,
Arc::new(crate::stats::SessionCounters::new()),
)
}
pub fn new_with_counters(
addr: SocketAddr,
in_flight: Arc<AtomicU32>,
target_depth: Arc<AtomicU32>,
event_drain_notify: Arc<Notify>,
counters: Arc<crate::stats::SessionCounters>,
) -> Self {
Self::new_with_loop_config(
addr,
in_flight,
target_depth,
event_drain_notify,
counters,
crate::peer_backpressure::DISPATCH_BACKLOG_CAP,
crate::peer_backpressure::EVENT_BACKLOG_CAP,
)
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_loop_config(
addr: SocketAddr,
in_flight: Arc<AtomicU32>,
target_depth: Arc<AtomicU32>,
event_drain_notify: Arc<Notify>,
counters: Arc<crate::stats::SessionCounters>,
dispatch_backlog_cap: usize,
event_backlog_cap: usize,
) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(0)),
peer_choking: AtomicBool::new(true),
unchoke_notify: Notify::new(),
addr,
in_flight,
target_depth,
dispatch_drain_notify: Notify::new(),
event_drain_notify,
counters,
dispatch_backlog_cap,
event_backlog_cap,
remote_unchoked_at: Mutex::new(None),
first_block_pending: AtomicBool::new(false),
request_sent_times: Mutex::new(std::collections::HashMap::new()),
}
}
}
#[allow(dead_code)]
pub(crate) enum DispatchCommand {
Start {
lengths: Lengths,
peer_bitfield: Bitfield,
piece_notify: Arc<Notify>,
atomic_states: Option<Arc<AtomicPieceStates>>,
snapshot: Option<Arc<AvailabilitySnapshot>>,
block_maps: Option<Arc<BlockMaps>>,
steal_candidates: Option<Arc<StealCandidates>>,
piece_write_guards: Option<Arc<PieceWriteGuards>>,
},
PeerHave(u32),
PeerBitfield(Bitfield),
Snapshot(Arc<AvailabilitySnapshot>),
Stop,
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering::Relaxed;
fn test_shared(target: u32) -> PeerShared {
PeerShared::new(
"127.0.0.1:6881".parse().unwrap(),
Arc::new(AtomicU32::new(0)),
Arc::new(AtomicU32::new(target)),
Arc::new(Notify::new()),
)
}
#[test]
fn m257c_refill_grant_holds_at_target() {
assert_eq!(refill_grant(128, 128), 1);
assert_eq!(refill_grant(8, 8), 1);
}
#[test]
fn m257c_refill_grant_shrinks_above_target() {
assert_eq!(refill_grant(128, 8), 0);
assert_eq!(refill_grant(9, 8), 0);
}
#[test]
fn m257c_refill_grant_grows_below_target() {
assert_eq!(refill_grant(8, 128), 2);
assert_eq!(refill_grant(127, 128), 2);
}
#[tokio::test]
async fn m257c_return_permit_budgeted_converges_to_target() {
let shared = test_shared(8);
shared.semaphore.add_permits(10);
shared.in_flight.store(2, Relaxed);
shared.in_flight.store(1, Relaxed); return_permit_budgeted(&shared);
assert_eq!(
shared.semaphore.available_permits(),
10,
"absorbed, not returned"
);
shared.target_depth.store(20, Relaxed);
return_permit_budgeted(&shared);
assert_eq!(shared.semaphore.available_permits(), 12, "grew by one net");
}
#[tokio::test]
async fn m257c_dangling_returns_overshoot_then_reconverge() {
let shared = test_shared(8);
shared.in_flight.store(8, Relaxed);
for _ in 0..3 {
let v = shared.in_flight.load(Relaxed);
shared.in_flight.store(v.saturating_sub(1), Relaxed);
return_permit_budgeted(&shared);
}
assert_eq!(shared.semaphore.available_permits(), 3, "bounded overshoot");
for _ in 0..8 {
let v = shared.in_flight.load(Relaxed);
shared.in_flight.store(v.saturating_sub(1), Relaxed);
return_permit_budgeted(&shared);
}
assert_eq!(shared.in_flight.load(Relaxed), 0);
assert_eq!(
shared.semaphore.available_permits(),
8,
"circulation reconverges to exactly target"
);
}
}