1use std::{
2 cell::OnceCell,
3 collections::VecDeque,
4 io::{self},
5 net::{SocketAddrV4, TcpListener},
6 path::{Path, PathBuf},
7 sync::mpsc::{Receiver, Sender},
8 time::{Duration, Instant},
9};
10
11use crate::{
12 buf_pool::{Buffer, BufferPool},
13 event_loop::{ConnectionId, EventLoop},
14 file_store::DiskOpType,
15 peer_comm::{extended_protocol::MetadataProgress, peer_protocol::PeerId},
16};
17use crate::{
18 file_store::DiskOp,
19 piece_selector::{DownloadedPiece, Piece, PieceSelector, SUBPIECE_SIZE, Subpiece},
20};
21use crate::{file_store::FileStore, peer_connection::PeerConnection};
22use ahash::HashSetExt;
23use bitvec::{boxed::BitBox, order::Msb0, vec::BitVec};
24use heapless::spsc::Producer;
25use io_uring::IoUring;
26use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
27use slotmap::SlotMap;
28use thiserror::Error;
29
30use crate::peer_connection::DisconnectReason;
31
32use crate::TorrentMetadata;
33
34#[derive(Error, Debug)]
35pub enum Error {
36 #[error("Encountered IO issue: {0}")]
37 Io(#[from] io::Error),
38 #[error("Peer provider disconnected")]
39 PeerProviderDisconnect,
40}
41
42pub const CQE_WAIT_TIME_NS: u32 = 150_000_000;
43
44#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize)]
46#[serde(default)]
47pub struct Config {
48 pub max_connections: usize,
50 pub max_reported_outstanding_requests: u64,
55 pub max_unchoked: u32,
57 pub num_ticks_before_unchoke_recalc: u32,
61 pub num_ticks_before_optimistic_unchoke_recalc: u32,
68 pub seeding_piece_quota: u32,
72 pub cq_size: u32,
74 pub sq_size: u32,
76 pub completion_event_want: usize,
80 pub network_read_buffer_size: usize,
82 pub network_write_buffer_size: usize,
84 pub write_buffer_pool_size: usize,
86 pub read_buffer_pool_size: usize,
88}
89
90impl Default for Config {
91 fn default() -> Self {
92 Self {
93 max_connections: 128,
94 max_reported_outstanding_requests: 512,
95 max_unchoked: 8,
96 num_ticks_before_unchoke_recalc: 15,
97 num_ticks_before_optimistic_unchoke_recalc: 30,
98 seeding_piece_quota: 20,
99 cq_size: 4096,
100 sq_size: 4096,
101 completion_event_want: 32,
102 network_read_buffer_size: (SUBPIECE_SIZE * 2) as usize,
103 read_buffer_pool_size: 512,
104 network_write_buffer_size: (SUBPIECE_SIZE + 4096) as usize,
105 write_buffer_pool_size: 128,
106 }
107 }
108}
109
110pub struct Torrent {
114 our_id: PeerId,
115 state: State,
116}
117
118impl Torrent {
119 pub fn new(our_id: PeerId, state: State) -> Self {
122 Self { our_id, state }
123 }
124
125 pub fn is_complete(&self) -> bool {
131 self.state
132 .torrent_state
133 .as_ref()
134 .is_some_and(|state| state.is_complete)
135 }
136
137 pub fn start(
144 &mut self,
145 event_tx: Producer<TorrentEvent>,
146 command_rc: Receiver<Command>,
147 listener: TcpListener,
148 ) -> Result<(), Error> {
149 let ring: IoUring = IoUring::builder()
151 .setup_single_issuer()
152 .setup_clamp()
153 .setup_cqsize(self.state.config.cq_size)
154 .setup_defer_taskrun()
155 .setup_coop_taskrun()
156 .build(self.state.config.sq_size)
157 .unwrap();
158 let events = SlotMap::with_capacity_and_key(self.state.config.cq_size as usize);
159 let mut event_loop = EventLoop::new(self.our_id, events, &self.state.config);
160 event_loop.run(ring, &mut self.state, event_tx, command_rc, listener)
161 }
162}
163
164#[derive(Debug)]
166pub enum Command {
167 ConnectToPeers(Vec<SocketAddrV4>),
170 Stop,
172}
173
174#[derive(Debug)]
176pub struct PeerMetrics {
177 pub download_throughput: u64,
179 pub upload_throughput: u64,
181 pub metadata_progress: Option<MetadataProgress>,
183}
184
185#[derive(Debug)]
187pub enum TorrentEvent {
188 TorrentComplete,
193 MetadataComplete(Box<TorrentMetadata>),
197 ListenerStarted { port: u16 },
200 TorrentMetrics {
202 pieces_completed: usize,
204 pieces_allocated: usize,
207 peer_metrics: Vec<PeerMetrics>,
209 },
210}
211
212pub struct InitializedState {
213 pub piece_selector: PieceSelector,
214 pub num_unchoked: u32,
215 pub config: Config,
216 pub ticks_to_recalc_unchoke: u32,
217 pub ticks_to_recalc_optimistic_unchoke: u32,
218 pub downloaded_piece_rc: Receiver<DownloadedPiece>,
219 pub downloaded_piece_tx: Sender<DownloadedPiece>,
220 pub pieces: Vec<Option<Piece>>,
221 pub file_store: FileStore,
222 pub piece_buffer_pool: BufferPool,
223 pub is_complete: bool,
224}
225
226impl InitializedState {
227 pub fn new(root: &Path, metadata: &TorrentMetadata, config: Config) -> io::Result<Self> {
228 let mut pieces = Vec::with_capacity(metadata.pieces.len());
229 for _ in 0..metadata.pieces.len() {
230 pieces.push(None);
231 }
232 let (tx, rc) = std::sync::mpsc::channel();
233 Ok(Self {
234 piece_selector: PieceSelector::new(metadata),
235 num_unchoked: 0,
236 config,
237 ticks_to_recalc_unchoke: config.num_ticks_before_unchoke_recalc,
238 ticks_to_recalc_optimistic_unchoke: config.num_ticks_before_optimistic_unchoke_recalc,
239 downloaded_piece_rc: rc,
240 downloaded_piece_tx: tx,
241 pieces,
242 is_complete: false,
243 piece_buffer_pool: BufferPool::new("pieces", 256, metadata.piece_length as usize),
244 file_store: FileStore::new(root, metadata)?,
245 })
246 }
247
248 #[allow(dead_code)]
249 pub fn num_allocated(&self) -> usize {
250 self.pieces
251 .iter()
252 .filter(|piece| piece.as_ref().is_some_and(|piece| piece.ref_count > 0))
253 .count()
254 }
255
256 #[inline]
257 pub fn num_pieces(&self) -> usize {
258 self.pieces.len()
259 }
260
261 pub(crate) fn complete_piece(
262 &mut self,
263 piece_idx: i32,
264 connections: &mut SlotMap<ConnectionId, PeerConnection>,
265 event_tx: &mut Producer<'_, TorrentEvent>,
266 piece_buffer: Buffer,
267 ) {
268 self.piece_buffer_pool.return_buffer(piece_buffer);
269 self.piece_selector.mark_complete(piece_idx as usize);
270 if !self.is_complete && self.piece_selector.completed_all() {
271 log::info!("Torrent complete!");
272 self.is_complete = true;
273 if event_tx.enqueue(TorrentEvent::TorrentComplete).is_err() {
274 log::error!("Torrent completion event missed");
275 }
276 for (_, peer) in connections.iter_mut() {
279 peer.not_interested();
280 if peer.is_upload_only {
284 peer.pending_disconnect = Some(DisconnectReason::RedundantConnection);
285 }
286 for (_, extension) in peer.extensions.iter_mut() {
288 extension.on_torrent_complete(&mut peer.outgoing_msgs_buffer);
289 }
290 }
291 }
292 for (conn_id, peer) in connections.iter_mut() {
293 if let Some(bitfield) = self.piece_selector.interesting_peer_pieces(conn_id)
294 && !bitfield.any()
295 && peer.is_interesting
296 && peer.queued.is_empty()
297 && peer.inflight.is_empty()
298 {
299 peer.not_interested();
301 if peer.is_upload_only {
305 peer.pending_disconnect = Some(DisconnectReason::RedundantConnection);
306 }
307 }
308 peer.have(piece_idx);
309 }
310 log::debug!("Piece {piece_idx} completed!");
311 }
312
313 pub(crate) fn queue_disk_write_for_downloaded_pieces(
315 &mut self,
316 pending_disk_operations: &mut Vec<DiskOp>,
317 ) {
318 while let Ok(completed_piece) = self.downloaded_piece_rc.try_recv() {
319 if completed_piece.hash_matched {
320 let piece_len = self.piece_selector.piece_len(completed_piece.index as i32);
321 self.file_store.queue_piece_disk_operation(
322 completed_piece.index as i32,
323 completed_piece.buffer,
324 piece_len as usize,
325 DiskOpType::Write,
326 pending_disk_operations,
327 );
328 } else {
329 self.piece_selector
332 .mark_not_downloaded(completed_piece.index);
333 self.piece_buffer_pool.return_buffer(completed_piece.buffer);
334 log::error!("Piece hash didn't match expected hash!");
337 self.piece_selector
338 .mark_not_allocated(completed_piece.index as i32, completed_piece.conn_id);
339 }
340 }
341 }
342
343 pub fn allocate_piece(&mut self, index: i32, conn_id: ConnectionId) -> VecDeque<Subpiece> {
345 log::debug!("Allocating piece: conn_id: {conn_id:?}, index: {index}");
346 self.piece_selector.mark_allocated(index, conn_id);
347 match &mut self.pieces[index as usize] {
348 Some(allocated_piece) => allocated_piece.allocate_remaining_subpieces(),
349 None => {
350 let length = self.piece_selector.piece_len(index);
351 let buffer = self.piece_buffer_pool.get_buffer();
352 let mut piece = Piece::new(index, length, buffer);
353 let subpieces = piece.allocate_remaining_subpieces();
354 self.pieces[index as usize] = Some(piece);
355 subpieces
356 }
357 }
358 }
359
360 pub fn deallocate_piece(&mut self, index: i32, conn_id: ConnectionId) {
362 log::debug!("Deallocating piece: conn_id: {conn_id:?}, index: {index}");
363 self.piece_selector
366 .update_peer_piece_intrest(conn_id, index as usize);
367 if let Some(piece) = self.pieces[index as usize].take_if(|piece| {
371 piece.ref_count = piece.ref_count.saturating_sub(1);
372 piece.ref_count == 0
373 }) {
374 log::debug!("Marked as not allocated: conn_id: {conn_id:?}, index: {index}");
375 self.piece_buffer_pool.return_buffer(piece.into_buffer());
376 self.piece_selector.mark_not_allocated(index, conn_id);
377 }
378 }
379
380 pub fn can_preemtively_unchoke(&self) -> bool {
381 self.num_unchoked < self.config.max_unchoked
382 }
383
384 pub fn recalculate_unchokes(
388 &mut self,
389 connections: &mut SlotMap<ConnectionId, PeerConnection>,
390 ) {
391 struct ComparisonData {
392 is_choking: bool,
393 uploaded_since_unchoked: u64,
394 downloaded_in_last_round: u64,
395 uploaded_in_last_round: u64,
396 last_unchoked: Option<Instant>,
397 }
398 log::info!("Recalculating unchokes");
399 let mut peers = Vec::with_capacity(connections.len());
400 for (id, peer) in connections.iter_mut() {
401 if !peer.peer_interested || peer.pending_disconnect.is_some() {
402 peer.network_stats.reset_round();
403 if !peer.is_choking {
404 if peer.optimistically_unchoked {
405 peer.optimistically_unchoked = false;
406 self.ticks_to_recalc_optimistic_unchoke = 0;
408 }
409 peer.choke(self);
410 }
411
412 continue;
413 }
414 peers.push((
415 id,
416 ComparisonData {
417 is_choking: peer.is_choking,
418 uploaded_since_unchoked: peer.network_stats.upload_since_unchoked,
419 downloaded_in_last_round: peer.network_stats.downloaded_in_last_round,
420 uploaded_in_last_round: peer.network_stats.uploaded_in_last_round,
421 last_unchoked: peer.last_unchoked,
422 },
423 ));
424 }
425 if !self.is_complete {
426 peers.sort_unstable_by(|(_, a), (_, b)| {
428 a.downloaded_in_last_round
429 .cmp(&b.downloaded_in_last_round)
430 .reverse()
431 });
432 } else {
433 let piece_length = self.piece_selector.avg_piece_length();
434 let quota_bytes = (piece_length * self.config.seeding_piece_quota) as u64;
435 peers.sort_unstable_by(|(_, a), (_, b)| {
437 let a_quota_complete = !a.is_choking
438 && a.uploaded_since_unchoked > quota_bytes
439 && a.last_unchoked
440 .is_some_and(|time| time.elapsed() > Duration::from_mins(1));
441 let b_quota_complete = !b.is_choking
442 && b.uploaded_since_unchoked > quota_bytes
443 && b.last_unchoked
444 .is_some_and(|time| time.elapsed() > Duration::from_mins(1));
445 if a_quota_complete != b_quota_complete {
446 a_quota_complete.cmp(&b_quota_complete)
447 } else if a.uploaded_in_last_round != b.uploaded_in_last_round {
448 a.uploaded_in_last_round
449 .cmp(&b.uploaded_in_last_round)
450 .reverse()
451 } else {
452 a.last_unchoked
453 .map_or(Duration::MAX, |time| time.elapsed())
454 .cmp(&b.last_unchoked.map_or(Duration::MAX, |time| time.elapsed()))
455 .reverse()
456 }
457 });
458 }
459 let optimistic_unchoke_slots = std::cmp::max(1, self.config.max_unchoked / 5);
460 let mut remaining_unchoke_slots = self.config.max_unchoked - optimistic_unchoke_slots;
461 for (id, _) in peers {
462 let peer = &mut connections[id];
463 peer.network_stats.reset_round();
464 if remaining_unchoke_slots > 0 {
465 if peer.is_choking {
466 log::debug!(
467 "Peer[{}] now unchoked after recalculating throughputs",
468 peer.peer_id
469 );
470 peer.unchoke(self);
471 }
472 remaining_unchoke_slots -= 1;
473 if peer.optimistically_unchoked {
474 log::trace!(
475 "Peer[{}] previously optimistically unchoked, promoted to normal unchoke",
476 peer.peer_id
477 );
478 peer.optimistically_unchoked = false;
481 self.ticks_to_recalc_optimistic_unchoke = 0;
483 }
484 } else if !peer.is_choking && !peer.optimistically_unchoked {
485 log::debug!(
486 "Peer[{}] no longer unchoked after recalculating throughputs",
487 peer.peer_id
488 );
489 peer.choke(self);
490 }
491 }
492 }
493
494 pub fn recalculate_optimistic_unchokes(
497 &mut self,
498 connections: &mut SlotMap<ConnectionId, PeerConnection>,
499 ) {
500 log::info!("Recalculating optimistic unchokes");
501 let num_opt_unchoked = std::cmp::max(1, self.config.max_unchoked / 5) as usize;
502 let mut previously_opt_unchoked = ahash::HashSet::with_capacity(num_opt_unchoked);
503 let mut candidates = Vec::with_capacity(self.config.max_unchoked as usize);
504 for (id, peer) in connections.iter_mut() {
505 if peer.optimistically_unchoked {
506 previously_opt_unchoked.insert(id);
507 }
508 if peer.pending_disconnect.is_none()
509 && peer.peer_interested
510 && (peer.is_choking || peer.optimistically_unchoked)
511 {
512 candidates.push((
513 id,
514 peer.last_optimistically_unchoked
515 .map_or(u64::MAX, |time| time.elapsed().as_secs()),
516 ));
517 }
518 }
519 candidates.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
521 for (id, _) in candidates.iter().take(num_opt_unchoked) {
522 let peer = &mut connections[*id];
523 if peer.optimistically_unchoked {
524 log::debug!("Peer[{}] optmistically unchoked again", peer.peer_id);
525 previously_opt_unchoked.remove(id);
526 } else {
527 peer.optimistically_unchoke(self);
528 }
529 }
530
531 for id in previously_opt_unchoked {
532 let peer = &mut connections[id];
533 peer.choke(self);
534 }
535 }
536}
537
538impl Drop for InitializedState {
539 fn drop(&mut self) {
540 self.piece_buffer_pool.stop_tracking();
541 }
542}
543
544pub struct State {
546 pub(crate) info_hash: [u8; 20],
547 pub(crate) listener_port: Option<u16>,
548 root: PathBuf,
550 torrent_state: Option<InitializedState>,
551 file: OnceCell<Box<TorrentMetadata>>,
552 pub(crate) config: Config,
553}
554
555impl State {
556 pub fn info_hash(&self) -> [u8; 20] {
558 self.info_hash
559 }
560
561 pub fn unstarted(info_hash: [u8; 20], root: PathBuf, config: Config) -> Self {
567 Self {
568 info_hash,
569 root,
570 listener_port: None,
571 torrent_state: None,
572 file: OnceCell::new(),
573 config,
574 }
575 }
576
577 pub fn is_complete(&self) -> bool {
579 self.torrent_state
580 .as_ref()
581 .is_some_and(|state| state.is_complete)
582 }
583
584 pub fn from_metadata_and_root(
596 metadata: TorrentMetadata,
597 root: PathBuf,
598 config: Config,
599 ) -> io::Result<Self> {
600 let mut initialized_state = InitializedState::new(&root, &metadata, config)?;
601 let file_store = &initialized_state.file_store;
602 let completed_pieces: Box<[bool]> = metadata
603 .pieces
604 .as_slice()
605 .par_iter()
606 .enumerate()
607 .map(
608 |(idx, hash)| match file_store.check_piece_hash_sync(idx as i32, hash) {
609 Ok(is_valid) => is_valid,
610 Err(err) => {
611 if err.kind() != io::ErrorKind::NotFound {
612 log::warn!("Error checking piece {idx}: {err}");
613 }
614 false
615 }
616 },
617 )
618 .collect();
619 let completed_pieces: BitVec<u8, Msb0> = completed_pieces.into_iter().collect();
620 let completed_pieces: BitBox<u8, Msb0> = completed_pieces.into_boxed_bitslice();
621 log::trace!("Completed pieces: {completed_pieces}");
622 initialized_state
623 .piece_selector
624 .set_completed_bitfield(completed_pieces);
625 initialized_state.is_complete = initialized_state.piece_selector.completed_all();
626
627 Ok(Self {
628 info_hash: metadata
629 .info_hash_bytes()
630 .try_into()
631 .expect("Invalid info hash"),
632 root,
633 listener_port: None,
634 torrent_state: Some(initialized_state),
635 file: OnceCell::from(Box::new(metadata)),
636 config,
637 })
638 }
639
640 #[cfg(test)]
641 pub fn inprogress(
642 info_hash: [u8; 20],
643 root: PathBuf,
644 metadata: lava_torrent::torrent::v1::Torrent,
645 state: InitializedState,
646 config: Config,
647 ) -> Self {
648 Self {
649 info_hash,
650 root,
651 listener_port: None,
652 torrent_state: Some(state),
653 file: OnceCell::from(Box::new(metadata)),
654 config,
655 }
656 }
657
658 pub fn as_ref(&mut self) -> StateRef<'_> {
659 StateRef {
660 info_hash: self.info_hash,
661 root: &self.root,
662 listener_port: &self.listener_port,
663 torrent: &mut self.torrent_state,
664 full: &self.file,
665 config: &self.config,
666 }
667 }
668}
669
670pub struct StateRef<'state> {
672 info_hash: [u8; 20],
673 root: &'state Path,
674 pub listener_port: &'state Option<u16>,
675 torrent: &'state mut Option<InitializedState>,
676 full: &'state OnceCell<Box<TorrentMetadata>>,
677 pub config: &'state Config,
678}
679
680impl<'e_iter, 'state: 'e_iter> StateRef<'state> {
681 pub fn info_hash(&self) -> &[u8; 20] {
682 &self.info_hash
683 }
684
685 pub fn state(&'e_iter mut self) -> Option<&'e_iter mut InitializedState> {
686 self.torrent.as_mut()
687 }
688
689 #[allow(clippy::borrowed_box)]
690 pub fn metadata(&'e_iter mut self) -> Option<&'state Box<TorrentMetadata>> {
691 self.full.get()
692 }
693
694 #[inline]
695 pub fn is_initialzied(&self) -> bool {
696 self.full.get().is_some()
697 }
698
699 pub fn init(&'e_iter mut self, metadata: TorrentMetadata) -> io::Result<()> {
700 if self.is_initialzied() {
701 return Err(io::Error::other("State initialized twice"));
702 }
703 *self.torrent = Some(InitializedState::new(self.root, &metadata, *self.config)?);
704 self.full
705 .set(Box::new(metadata))
706 .map_err(|_e| io::Error::other("State initialized twice"))?;
707 Ok(())
708 }
709}