1use std::{
2 io,
3 net::{SocketAddrV4, TcpListener},
4 os::fd::AsRawFd,
5 path::Path,
6 sync::mpsc::{Receiver, Sender},
7};
8
9use event_loop::{EventLoop, EventType};
10use file_store::FileStore;
11use io_uring::{
12 IoUring, opcode,
13 types::{self},
14};
15use io_utils::UserData;
16use piece_selector::{CompletedPiece, PieceSelector};
17use slab::Slab;
18use thiserror::Error;
19
20mod buf_pool;
21mod buf_ring;
22mod event_loop;
23mod file_store;
24mod io_utils;
25mod peer_comm;
26mod piece_selector;
27
28use peer_comm::*;
29
30#[cfg(feature = "fuzzing")]
31pub use peer_protocol::*;
32
33pub use peer_protocol::PeerId;
34pub use peer_protocol::generate_peer_id;
35
36#[derive(Error, Debug)]
37pub enum Error {
38 #[error("Encountered IO issue: {0}")]
39 Io(#[from] io::Error),
40 #[error("Peer provider disconnected")]
41 PeerProviderDisconnect,
42}
43
44pub struct Torrent {
45 our_id: PeerId,
46 torrent_info: lava_torrent::torrent::v1::Torrent,
47}
48
49impl Torrent {
50 pub fn new(torrent_info: lava_torrent::torrent::v1::Torrent, our_id: PeerId) -> Self {
51 Self {
52 our_id,
53 torrent_info,
54 }
55 }
56
57 pub fn start(
58 &self,
59 peer_provider: Receiver<SocketAddrV4>,
60 downloads_path: impl AsRef<Path>,
61 ) -> Result<(), Error> {
62 let mut ring: IoUring = IoUring::builder()
64 .setup_single_issuer()
65 .setup_clamp()
66 .setup_cqsize(4096)
67 .setup_defer_taskrun()
68 .setup_coop_taskrun()
69 .build(4096)
70 .unwrap();
71
72 let mut events = Slab::with_capacity(4096);
73 let event_idx = events.insert(EventType::Accept);
74 let user_data = UserData::new(event_idx, None);
75
76 let listener = TcpListener::bind(("0.0.0.0", 6881)).unwrap();
77 let accept_op = opcode::AcceptMulti::new(types::Fd(listener.as_raw_fd()))
78 .build()
79 .user_data(user_data.as_u64());
80
81 unsafe {
82 ring.submission().push(&accept_op).unwrap();
83 }
84 ring.submission().sync();
85 let file_store = FileStore::new(downloads_path, &self.torrent_info).unwrap();
86 let torrent_state = TorrentState::new(&self.torrent_info);
87 let mut event_loop = EventLoop::new(self.our_id, events, peer_provider);
88 event_loop.run(ring, torrent_state, &file_store, &self.torrent_info)
89 }
90}
91
92struct TorrentState {
93 info_hash: [u8; 20],
94 piece_selector: PieceSelector,
95 num_unchoked: u32,
96 max_unchoked: u32,
97 num_pieces: usize,
98 completed_piece_rc: Receiver<CompletedPiece>,
99 completed_piece_tx: Sender<CompletedPiece>,
100 is_complete: bool,
101}
102
103impl TorrentState {
104 pub fn new(torrent: &lava_torrent::torrent::v1::Torrent) -> Self {
105 let info_hash = torrent.info_hash_bytes().try_into().unwrap();
106
107 let (tx, rc) = std::sync::mpsc::channel();
108 Self {
109 info_hash,
110 piece_selector: PieceSelector::new(torrent),
111 num_pieces: torrent.pieces.len(),
112 num_unchoked: 0,
113 max_unchoked: 8,
114 completed_piece_rc: rc,
115 completed_piece_tx: tx,
116 is_complete: false,
117 }
118 }
119
120 #[inline]
121 pub fn num_pieces(&self) -> usize {
122 self.num_pieces
123 }
124
125 pub(crate) fn update_torrent_status(&mut self) {
126 while let Ok(completed_piece) = self.completed_piece_rc.try_recv() {
127 match completed_piece.hash_matched {
128 Ok(hash_matched) => {
129 if hash_matched {
130 self.piece_selector.mark_complete(completed_piece.index);
131 log::info!(
132 "Piece {}/{} completed!",
133 self.piece_selector.total_completed(),
134 self.piece_selector.pieces()
135 );
136 if self.piece_selector.completed_all() {
138 self.is_complete = true;
139 }
140 } else {
141 log::error!("Piece hash didn't match expected hash!");
142 self.piece_selector.mark_not_inflight(completed_piece.index);
143 }
144 }
145 Err(err) => {
146 log::error!(
147 "Failed to sync and hash piece: {} Error: {err}",
148 completed_piece.index
149 );
150 }
151 }
152 }
153 }
154
155 pub fn should_unchoke(&self) -> bool {
158 self.num_unchoked < self.max_unchoked
159 }
160}