vortex_bittorrent/
lib.rs

1use std::{
2    io,
3    net::{SocketAddrV4, TcpListener},
4    os::fd::AsRawFd,
5    path::Path,
6    sync::mpsc::{Receiver, Sender},
7};
8
9use event_loop::{EventLoop, EventType};
10use file_store::FileStore;
11use io_uring::{
12    IoUring, opcode,
13    types::{self},
14};
15use io_utils::UserData;
16use piece_selector::{CompletedPiece, PieceSelector};
17use slab::Slab;
18use thiserror::Error;
19
20mod buf_pool;
21mod buf_ring;
22mod event_loop;
23mod file_store;
24mod io_utils;
25mod peer_comm;
26mod piece_selector;
27
28use peer_comm::*;
29
30#[cfg(feature = "fuzzing")]
31pub use peer_protocol::*;
32
33pub use peer_protocol::PeerId;
34pub use peer_protocol::generate_peer_id;
35
36#[derive(Error, Debug)]
37pub enum Error {
38    #[error("Encountered IO issue: {0}")]
39    Io(#[from] io::Error),
40    #[error("Peer provider disconnected")]
41    PeerProviderDisconnect,
42}
43
44pub struct Torrent {
45    our_id: PeerId,
46    torrent_info: lava_torrent::torrent::v1::Torrent,
47}
48
49impl Torrent {
50    pub fn new(torrent_info: lava_torrent::torrent::v1::Torrent, our_id: PeerId) -> Self {
51        Self {
52            our_id,
53            torrent_info,
54        }
55    }
56
57    pub fn start(
58        &self,
59        peer_provider: Receiver<SocketAddrV4>,
60        downloads_path: impl AsRef<Path>,
61    ) -> Result<(), Error> {
62        // check ulimit
63        let mut ring: IoUring = IoUring::builder()
64            .setup_single_issuer()
65            .setup_clamp()
66            .setup_cqsize(4096)
67            .setup_defer_taskrun()
68            .setup_coop_taskrun()
69            .build(4096)
70            .unwrap();
71
72        let mut events = Slab::with_capacity(4096);
73        let event_idx = events.insert(EventType::Accept);
74        let user_data = UserData::new(event_idx, None);
75
76        let listener = TcpListener::bind(("0.0.0.0", 6881)).unwrap();
77        let accept_op = opcode::AcceptMulti::new(types::Fd(listener.as_raw_fd()))
78            .build()
79            .user_data(user_data.as_u64());
80
81        unsafe {
82            ring.submission().push(&accept_op).unwrap();
83        }
84        ring.submission().sync();
85        let file_store = FileStore::new(downloads_path, &self.torrent_info).unwrap();
86        let torrent_state = TorrentState::new(&self.torrent_info);
87        let mut event_loop = EventLoop::new(self.our_id, events, peer_provider);
88        event_loop.run(ring, torrent_state, &file_store, &self.torrent_info)
89    }
90}
91
92struct TorrentState {
93    info_hash: [u8; 20],
94    piece_selector: PieceSelector,
95    num_unchoked: u32,
96    max_unchoked: u32,
97    num_pieces: usize,
98    completed_piece_rc: Receiver<CompletedPiece>,
99    completed_piece_tx: Sender<CompletedPiece>,
100    is_complete: bool,
101}
102
103impl TorrentState {
104    pub fn new(torrent: &lava_torrent::torrent::v1::Torrent) -> Self {
105        let info_hash = torrent.info_hash_bytes().try_into().unwrap();
106
107        let (tx, rc) = std::sync::mpsc::channel();
108        Self {
109            info_hash,
110            piece_selector: PieceSelector::new(torrent),
111            num_pieces: torrent.pieces.len(),
112            num_unchoked: 0,
113            max_unchoked: 8,
114            completed_piece_rc: rc,
115            completed_piece_tx: tx,
116            is_complete: false,
117        }
118    }
119
120    #[inline]
121    pub fn num_pieces(&self) -> usize {
122        self.num_pieces
123    }
124
125    pub(crate) fn update_torrent_status(&mut self) {
126        while let Ok(completed_piece) = self.completed_piece_rc.try_recv() {
127            match completed_piece.hash_matched {
128                Ok(hash_matched) => {
129                    if hash_matched {
130                        self.piece_selector.mark_complete(completed_piece.index);
131                        log::info!(
132                            "Piece {}/{} completed!",
133                            self.piece_selector.total_completed(),
134                            self.piece_selector.pieces()
135                        );
136                        // TODO: send have messages
137                        if self.piece_selector.completed_all() {
138                            self.is_complete = true;
139                        }
140                    } else {
141                        log::error!("Piece hash didn't match expected hash!");
142                        self.piece_selector.mark_not_inflight(completed_piece.index);
143                    }
144                }
145                Err(err) => {
146                    log::error!(
147                        "Failed to sync and hash piece: {} Error: {err}",
148                        completed_piece.index
149                    );
150                }
151            }
152        }
153    }
154
155    // TODO: Something like release in flight pieces?
156
157    pub fn should_unchoke(&self) -> bool {
158        self.num_unchoked < self.max_unchoked
159    }
160}