Skip to main content

vortex_bittorrent/
torrent.rs

1use std::{
2    cell::OnceCell,
3    collections::VecDeque,
4    io::{self},
5    net::{SocketAddrV4, TcpListener},
6    path::{Path, PathBuf},
7    sync::mpsc::{Receiver, Sender},
8    time::{Duration, Instant},
9};
10
11use crate::{
12    buf_pool::{Buffer, BufferPool},
13    event_loop::{ConnectionId, EventLoop},
14    file_store::DiskOpType,
15    peer_comm::{extended_protocol::MetadataProgress, peer_protocol::PeerId},
16};
17use crate::{
18    file_store::DiskOp,
19    piece_selector::{DownloadedPiece, Piece, PieceSelector, SUBPIECE_SIZE, Subpiece},
20};
21use crate::{file_store::FileStore, peer_connection::PeerConnection};
22use ahash::HashSetExt;
23use bitvec::{boxed::BitBox, order::Msb0, vec::BitVec};
24use heapless::spsc::Producer;
25use io_uring::IoUring;
26use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
27use slotmap::SlotMap;
28use thiserror::Error;
29
30use crate::peer_connection::DisconnectReason;
31
32use crate::TorrentMetadata;
33
34#[derive(Error, Debug)]
35pub enum Error {
36    #[error("Encountered IO issue: {0}")]
37    Io(#[from] io::Error),
38    #[error("Peer provider disconnected")]
39    PeerProviderDisconnect,
40}
41
42pub const CQE_WAIT_TIME_NS: u32 = 150_000_000;
43
44/// Configuration settings for a given torrent
45#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize)]
46#[serde(default)]
47pub struct Config {
48    /// The max number of total connections that can be open at a time for the torrent
49    pub max_connections: usize,
50    /// The max number of outstanding requests this vortex should report to other peers via the
51    /// `reqq` extension. This currently does not impact the behavoir of vortex but it should impact
52    /// the amount of load connected peers send to us. Vortex does respect connected peers reported
53    /// `reqq` value.
54    pub max_reported_outstanding_requests: u64,
55    /// The maximal amount of allowed unchoked peers at any given time
56    pub max_unchoked: u32,
57    /// Controls how frequently "which peers should be unchoked" is calculated.
58    /// After this number of ticks, vortex will look over all peers and redistribute
59    /// which ones are unchoked and not.
60    pub num_ticks_before_unchoke_recalc: u32,
61    /// Controls the longest possible interval "which peers should be optimistically unchoked" is calculated.
62    /// After this number of ticks, vortex will look over all peers and redistribute
63    /// which ones are optimistically unchoked and not. Note that this may happen more frequently
64    /// than this number suggests due to the normal unchoke distribution "promoting" optimistically
65    /// unchoked peers to normally unchoked peers. In that case this recaluclation will happen
66    /// immedieately afterwards.
67    pub num_ticks_before_optimistic_unchoke_recalc: u32,
68    /// This determines the minimal target of pieces we want to upload to a peer before
69    /// moving on to another peer when the "round-robin" unchoking strategy is used. The
70    /// "round-robin" strategy is currently only used when seeding.
71    pub seeding_piece_quota: u32,
72    /// Controls the size of the io_uring completion queue
73    pub cq_size: u32,
74    /// Controls the size of the io_uring submission queue
75    pub sq_size: u32,
76    /// The event loop will wait for at least these amont of completion events
77    /// before it starts processing them. If the target isn't reached it will wait for
78    /// at most [ `CQE_WAIT_TIME_NS` ] nanoseconds before processing the ones currently in the completion queue.
79    pub completion_event_want: usize,
80    /// The size of the Read buffers used for network operations. Defaults to SUBPIECE_SIZE * 2
81    pub network_read_buffer_size: usize,
82    /// The size of the Write buffers used for network operations. Defaults to SUBPIECE_SIZE + 4Kb
83    pub network_write_buffer_size: usize,
84    /// The size of the Write network buffer pools
85    pub write_buffer_pool_size: usize,
86    /// The size of the Reaad network buffer pools
87    pub read_buffer_pool_size: usize,
88}
89
90impl Default for Config {
91    fn default() -> Self {
92        Self {
93            max_connections: 128,
94            max_reported_outstanding_requests: 512,
95            max_unchoked: 8,
96            num_ticks_before_unchoke_recalc: 15,
97            num_ticks_before_optimistic_unchoke_recalc: 30,
98            seeding_piece_quota: 20,
99            cq_size: 4096,
100            sq_size: 4096,
101            completion_event_want: 32,
102            network_read_buffer_size: (SUBPIECE_SIZE * 2) as usize,
103            read_buffer_pool_size: 512,
104            network_write_buffer_size: (SUBPIECE_SIZE + 4096) as usize,
105            write_buffer_pool_size: 128,
106        }
107    }
108}
109
110/// A single torrent containing the current torrent state
111/// and our peer id. This object is responsible for starting
112/// the I/O event loop.
113pub struct Torrent {
114    our_id: PeerId,
115    state: State,
116}
117
118impl Torrent {
119    /// Create a new torrent from the given state. Use the given peer id
120    /// when communicating with other peers.
121    pub fn new(our_id: PeerId, state: State) -> Self {
122        Self { our_id, state }
123    }
124
125    /// Returns if the torrent is completed or not.
126    /// This is mostly useful to check status BEFORE
127    /// starting the torrent. Use [ `TorrentEvent::TorrentComplete` ]
128    /// to detect if a torrent have been completed download if you start
129    /// an incomplete torrent.
130    pub fn is_complete(&self) -> bool {
131        self.state
132            .torrent_state
133            .as_ref()
134            .is_some_and(|state| state.is_complete)
135    }
136
137    /// Start the I/O event loop and the torrent itself. This will start
138    /// a new io_uring instance and new buffer pools
139    /// will be allocated and registered to the io uring instance.
140    /// This is expected to run in a separate thread and interaction with
141    /// the event loop happens via the mpsc command queue. Torrent events will be sent to the
142    /// consumer of the `event_tx` spsc channel.
143    pub fn start(
144        &mut self,
145        event_tx: Producer<TorrentEvent>,
146        command_rc: Receiver<Command>,
147        listener: TcpListener,
148    ) -> Result<(), Error> {
149        // check ulimit
150        let ring: IoUring = IoUring::builder()
151            .setup_single_issuer()
152            .setup_clamp()
153            .setup_cqsize(self.state.config.cq_size)
154            .setup_defer_taskrun()
155            .setup_coop_taskrun()
156            .build(self.state.config.sq_size)
157            .unwrap();
158        let events = SlotMap::with_capacity_and_key(self.state.config.cq_size as usize);
159        let mut event_loop = EventLoop::new(self.our_id, events, &self.state.config);
160        event_loop.run(ring, &mut self.state, event_tx, command_rc, listener)
161    }
162}
163
164/// Commands that can be sent to the torrent event loop through the command channel
165#[derive(Debug)]
166pub enum Command {
167    /// Connect to peers at the given address.
168    /// Already connected peers will be filtered out
169    ConnectToPeers(Vec<SocketAddrV4>),
170    /// Stop the event loop gracefully
171    Stop,
172}
173
174/// Metrics for a given peer
175#[derive(Debug)]
176pub struct PeerMetrics {
177    /// The numer of bytes per second we are currently downloading from the peer
178    pub download_throughput: u64,
179    /// The number of bytes per second we are currently uploading to the peer
180    pub upload_throughput: u64,
181    /// Progress for downloading metadata if supported
182    pub metadata_progress: Option<MetadataProgress>,
183}
184
185/// Events from the torrent
186#[derive(Debug)]
187pub enum TorrentEvent {
188    /// The torrent finished downloading. Note you will NOT receive
189    /// this event if the torrent already is completed when you are
190    /// starting the torrent. Use [ `Torrent::is_complete` ] to check if it's already completed
191    /// before starting the torrent.
192    TorrentComplete,
193    /// The metadata has finished downloading from the connected peers.
194    /// Note you will NOT receive this event if the metadata already is
195    /// completed when starting the torrent.
196    MetadataComplete(Box<TorrentMetadata>),
197    /// The listener for incoming connection has finished set up
198    /// on the provided port.
199    ListenerStarted { port: u16 },
200    /// Over all metrics for the torrent, sent every tick.
201    TorrentMetrics {
202        /// How many pieces have currently been completed
203        pieces_completed: usize,
204        /// How many pieces have been allocated by peers for
205        /// download
206        pieces_allocated: usize,
207        /// Peer metrics for all currently connected peers
208        peer_metrics: Vec<PeerMetrics>,
209    },
210}
211
212pub struct InitializedState {
213    pub piece_selector: PieceSelector,
214    pub num_unchoked: u32,
215    pub config: Config,
216    pub ticks_to_recalc_unchoke: u32,
217    pub ticks_to_recalc_optimistic_unchoke: u32,
218    pub downloaded_piece_rc: Receiver<DownloadedPiece>,
219    pub downloaded_piece_tx: Sender<DownloadedPiece>,
220    pub pieces: Vec<Option<Piece>>,
221    pub file_store: FileStore,
222    pub piece_buffer_pool: BufferPool,
223    pub is_complete: bool,
224}
225
226impl InitializedState {
227    pub fn new(root: &Path, metadata: &TorrentMetadata, config: Config) -> io::Result<Self> {
228        let mut pieces = Vec::with_capacity(metadata.pieces.len());
229        for _ in 0..metadata.pieces.len() {
230            pieces.push(None);
231        }
232        let (tx, rc) = std::sync::mpsc::channel();
233        Ok(Self {
234            piece_selector: PieceSelector::new(metadata),
235            num_unchoked: 0,
236            config,
237            ticks_to_recalc_unchoke: config.num_ticks_before_unchoke_recalc,
238            ticks_to_recalc_optimistic_unchoke: config.num_ticks_before_optimistic_unchoke_recalc,
239            downloaded_piece_rc: rc,
240            downloaded_piece_tx: tx,
241            pieces,
242            is_complete: false,
243            piece_buffer_pool: BufferPool::new("pieces", 256, metadata.piece_length as usize),
244            file_store: FileStore::new(root, metadata)?,
245        })
246    }
247
248    #[allow(dead_code)]
249    pub fn num_allocated(&self) -> usize {
250        self.pieces
251            .iter()
252            .filter(|piece| piece.as_ref().is_some_and(|piece| piece.ref_count > 0))
253            .count()
254    }
255
256    #[inline]
257    pub fn num_pieces(&self) -> usize {
258        self.pieces.len()
259    }
260
261    pub(crate) fn complete_piece(
262        &mut self,
263        piece_idx: i32,
264        connections: &mut SlotMap<ConnectionId, PeerConnection>,
265        event_tx: &mut Producer<'_, TorrentEvent>,
266        piece_buffer: Buffer,
267    ) {
268        self.piece_buffer_pool.return_buffer(piece_buffer);
269        self.piece_selector.mark_complete(piece_idx as usize);
270        if !self.is_complete && self.piece_selector.completed_all() {
271            log::info!("Torrent complete!");
272            self.is_complete = true;
273            if event_tx.enqueue(TorrentEvent::TorrentComplete).is_err() {
274                log::error!("Torrent completion event missed");
275            }
276            // We are no longer interestead in any of the
277            // peers
278            for (_, peer) in connections.iter_mut() {
279                peer.not_interested();
280                // If the peer is upload only and
281                // we are upload only there is no reason
282                // to stay connected
283                if peer.is_upload_only {
284                    peer.pending_disconnect = Some(DisconnectReason::RedundantConnection);
285                }
286                // Notify all extensions that the torrent completed
287                for (_, extension) in peer.extensions.iter_mut() {
288                    extension.on_torrent_complete(&mut peer.outgoing_msgs_buffer);
289                }
290            }
291        }
292        for (conn_id, peer) in connections.iter_mut() {
293            if let Some(bitfield) = self.piece_selector.interesting_peer_pieces(conn_id)
294                && !bitfield.any()
295                && peer.is_interesting
296                && peer.queued.is_empty()
297                && peer.inflight.is_empty()
298            {
299                // We are no longer interestead in this peer
300                peer.not_interested();
301                // if it's upload only we can close the
302                // connection since it will never download from
303                // us
304                if peer.is_upload_only {
305                    peer.pending_disconnect = Some(DisconnectReason::RedundantConnection);
306                }
307            }
308            peer.have(piece_idx);
309        }
310        log::debug!("Piece {piece_idx} completed!");
311    }
312
313    // TODO: Put this in the event loop directly instead when that is easier to test
314    pub(crate) fn queue_disk_write_for_downloaded_pieces(
315        &mut self,
316        pending_disk_operations: &mut Vec<DiskOp>,
317    ) {
318        while let Ok(completed_piece) = self.downloaded_piece_rc.try_recv() {
319            if completed_piece.hash_matched {
320                let piece_len = self.piece_selector.piece_len(completed_piece.index as i32);
321                self.file_store.queue_piece_disk_operation(
322                    completed_piece.index as i32,
323                    completed_piece.buffer,
324                    piece_len as usize,
325                    DiskOpType::Write,
326                    pending_disk_operations,
327                );
328            } else {
329                // Only need to mark this as not downloaded when it fails
330                // since otherwise it will be marked as completed and this is moot
331                self.piece_selector
332                    .mark_not_downloaded(completed_piece.index);
333                self.piece_buffer_pool.return_buffer(completed_piece.buffer);
334                // deallocate piece peer peer
335                // TODO: disconnect
336                log::error!("Piece hash didn't match expected hash!");
337                self.piece_selector
338                    .mark_not_allocated(completed_piece.index as i32, completed_piece.conn_id);
339            }
340        }
341    }
342
343    // Allocates a piece and increments the piece ref count
344    pub fn allocate_piece(&mut self, index: i32, conn_id: ConnectionId) -> VecDeque<Subpiece> {
345        log::debug!("Allocating piece: conn_id: {conn_id:?}, index: {index}");
346        self.piece_selector.mark_allocated(index, conn_id);
347        match &mut self.pieces[index as usize] {
348            Some(allocated_piece) => allocated_piece.allocate_remaining_subpieces(),
349            None => {
350                let length = self.piece_selector.piece_len(index);
351                let buffer = self.piece_buffer_pool.get_buffer();
352                let mut piece = Piece::new(index, length, buffer);
353                let subpieces = piece.allocate_remaining_subpieces();
354                self.pieces[index as usize] = Some(piece);
355                subpieces
356            }
357        }
358    }
359
360    // Deallocate a piece
361    pub fn deallocate_piece(&mut self, index: i32, conn_id: ConnectionId) {
362        log::debug!("Deallocating piece: conn_id: {conn_id:?}, index: {index}");
363        // Mark the piece as interesting again so it can be picked again
364        // if necessary
365        self.piece_selector
366            .update_peer_piece_intrest(conn_id, index as usize);
367        // The piece might have been mid hashing when a timeout is received
368        // (two separate peer) which causes to be completed whilst another peer
369        // tried to download it. It's fine
370        if let Some(piece) = self.pieces[index as usize].take_if(|piece| {
371            piece.ref_count = piece.ref_count.saturating_sub(1);
372            piece.ref_count == 0
373        }) {
374            log::debug!("Marked as not allocated: conn_id: {conn_id:?}, index: {index}");
375            self.piece_buffer_pool.return_buffer(piece.into_buffer());
376            self.piece_selector.mark_not_allocated(index, conn_id);
377        }
378    }
379
380    pub fn can_preemtively_unchoke(&self) -> bool {
381        self.num_unchoked < self.config.max_unchoked
382    }
383
384    /// Determine the most suited peers to be unchoked based on the selected unchoking strategy.
385    /// Some unchoke slots are left for optimistic unchokes. The strategy currently is only
386    /// affected if the torrent is completed or not.
387    pub fn recalculate_unchokes(
388        &mut self,
389        connections: &mut SlotMap<ConnectionId, PeerConnection>,
390    ) {
391        struct ComparisonData {
392            is_choking: bool,
393            uploaded_since_unchoked: u64,
394            downloaded_in_last_round: u64,
395            uploaded_in_last_round: u64,
396            last_unchoked: Option<Instant>,
397        }
398        log::info!("Recalculating unchokes");
399        let mut peers = Vec::with_capacity(connections.len());
400        for (id, peer) in connections.iter_mut() {
401            if !peer.peer_interested || peer.pending_disconnect.is_some() {
402                peer.network_stats.reset_round();
403                if !peer.is_choking {
404                    if peer.optimistically_unchoked {
405                        peer.optimistically_unchoked = false;
406                        // reset so another peer can be optimistically unchoked
407                        self.ticks_to_recalc_optimistic_unchoke = 0;
408                    }
409                    peer.choke(self);
410                }
411
412                continue;
413            }
414            peers.push((
415                id,
416                ComparisonData {
417                    is_choking: peer.is_choking,
418                    uploaded_since_unchoked: peer.network_stats.upload_since_unchoked,
419                    downloaded_in_last_round: peer.network_stats.downloaded_in_last_round,
420                    uploaded_in_last_round: peer.network_stats.uploaded_in_last_round,
421                    last_unchoked: peer.last_unchoked,
422                },
423            ));
424        }
425        if !self.is_complete {
426            // Sort based of downloaded_in_last_round
427            peers.sort_unstable_by(|(_, a), (_, b)| {
428                a.downloaded_in_last_round
429                    .cmp(&b.downloaded_in_last_round)
430                    .reverse()
431            });
432        } else {
433            let piece_length = self.piece_selector.avg_piece_length();
434            let quota_bytes = (piece_length * self.config.seeding_piece_quota) as u64;
435            // The torrent is completed. Do round robin sorting like in libtorrent
436            peers.sort_unstable_by(|(_, a), (_, b)| {
437                let a_quota_complete = !a.is_choking
438                    && a.uploaded_since_unchoked > quota_bytes
439                    && a.last_unchoked
440                        .is_some_and(|time| time.elapsed() > Duration::from_mins(1));
441                let b_quota_complete = !b.is_choking
442                    && b.uploaded_since_unchoked > quota_bytes
443                    && b.last_unchoked
444                        .is_some_and(|time| time.elapsed() > Duration::from_mins(1));
445                if a_quota_complete != b_quota_complete {
446                    a_quota_complete.cmp(&b_quota_complete)
447                } else if a.uploaded_in_last_round != b.uploaded_in_last_round {
448                    a.uploaded_in_last_round
449                        .cmp(&b.uploaded_in_last_round)
450                        .reverse()
451                } else {
452                    a.last_unchoked
453                        .map_or(Duration::MAX, |time| time.elapsed())
454                        .cmp(&b.last_unchoked.map_or(Duration::MAX, |time| time.elapsed()))
455                        .reverse()
456                }
457            });
458        }
459        let optimistic_unchoke_slots = std::cmp::max(1, self.config.max_unchoked / 5);
460        let mut remaining_unchoke_slots = self.config.max_unchoked - optimistic_unchoke_slots;
461        for (id, _) in peers {
462            let peer = &mut connections[id];
463            peer.network_stats.reset_round();
464            if remaining_unchoke_slots > 0 {
465                if peer.is_choking {
466                    log::debug!(
467                        "Peer[{}] now unchoked after recalculating throughputs",
468                        peer.peer_id
469                    );
470                    peer.unchoke(self);
471                }
472                remaining_unchoke_slots -= 1;
473                if peer.optimistically_unchoked {
474                    log::trace!(
475                        "Peer[{}] previously optimistically unchoked, promoted to normal unchoke",
476                        peer.peer_id
477                    );
478                    // no longer optimistic, the peer is "promoted"
479                    // to a normal unchoke slot
480                    peer.optimistically_unchoked = false;
481                    // reset so another peer can be optimistically unchoked
482                    self.ticks_to_recalc_optimistic_unchoke = 0;
483                }
484            } else if !peer.is_choking && !peer.optimistically_unchoked {
485                log::debug!(
486                    "Peer[{}] no longer unchoked after recalculating throughputs",
487                    peer.peer_id
488                );
489                peer.choke(self);
490            }
491        }
492    }
493
494    // Give some lucky winners unchokes to test if they will have better throughput than the
495    // currently unchoked peers
496    pub fn recalculate_optimistic_unchokes(
497        &mut self,
498        connections: &mut SlotMap<ConnectionId, PeerConnection>,
499    ) {
500        log::info!("Recalculating optimistic unchokes");
501        let num_opt_unchoked = std::cmp::max(1, self.config.max_unchoked / 5) as usize;
502        let mut previously_opt_unchoked = ahash::HashSet::with_capacity(num_opt_unchoked);
503        let mut candidates = Vec::with_capacity(self.config.max_unchoked as usize);
504        for (id, peer) in connections.iter_mut() {
505            if peer.optimistically_unchoked {
506                previously_opt_unchoked.insert(id);
507            }
508            if peer.pending_disconnect.is_none()
509                && peer.peer_interested
510                && (peer.is_choking || peer.optimistically_unchoked)
511            {
512                candidates.push((
513                    id,
514                    peer.last_optimistically_unchoked
515                        .map_or(u64::MAX, |time| time.elapsed().as_secs()),
516                ));
517            }
518        }
519        // Sort in the order of peers that have waited the longest
520        candidates.sort_unstable_by(|(_, a), (_, b)| a.cmp(b).reverse());
521        for (id, _) in candidates.iter().take(num_opt_unchoked) {
522            let peer = &mut connections[*id];
523            if peer.optimistically_unchoked {
524                log::debug!("Peer[{}] optmistically unchoked again", peer.peer_id);
525                previously_opt_unchoked.remove(id);
526            } else {
527                peer.optimistically_unchoke(self);
528            }
529        }
530
531        for id in previously_opt_unchoked {
532            let peer = &mut connections[id];
533            peer.choke(self);
534        }
535    }
536}
537
538impl Drop for InitializedState {
539    fn drop(&mut self) {
540        self.piece_buffer_pool.stop_tracking();
541    }
542}
543
544/// Current state of the torrent
545pub struct State {
546    pub(crate) info_hash: [u8; 20],
547    pub(crate) listener_port: Option<u16>,
548    // TODO: Consider checking this is accessible at construction
549    root: PathBuf,
550    torrent_state: Option<InitializedState>,
551    file: OnceCell<Box<TorrentMetadata>>,
552    pub(crate) config: Config,
553}
554
555impl State {
556    /// The torrent info hash
557    pub fn info_hash(&self) -> [u8; 20] {
558        self.info_hash
559    }
560
561    /// Use this constructor if the torrent is unstarted.
562    /// `info_hash` is the info hash of the torrent that should be downloaded.
563    /// `root` is the directory where the torrent will be downloaded into.
564    /// Vortex will create this directory if it doesn't already exist.
565    /// `config` is the vortex config that should be used
566    pub fn unstarted(info_hash: [u8; 20], root: PathBuf, config: Config) -> Self {
567        Self {
568            info_hash,
569            root,
570            listener_port: None,
571            torrent_state: None,
572            file: OnceCell::new(),
573            config,
574        }
575    }
576
577    /// Returns if the torrent has been completed
578    pub fn is_complete(&self) -> bool {
579        self.torrent_state
580            .as_ref()
581            .is_some_and(|state| state.is_complete)
582    }
583
584    /// Use this constructor if you have access to the torrent metadata
585    /// and/or if the torrent has already started.
586    ///
587    /// `metadata` is the metadata associated with the torrent.
588    /// `root` is the directory where potentially already started torrent files
589    /// are expected to be found. If the folder doesn't exist vortex will create it.
590    ///
591    /// `config` is the vortex config that should be used.
592    ///
593    /// NOTE: This will go through all files in `root` and hash their pieces (in parallel) to determine torrent progress
594    /// which may be slow on large torrents.
595    pub fn from_metadata_and_root(
596        metadata: TorrentMetadata,
597        root: PathBuf,
598        config: Config,
599    ) -> io::Result<Self> {
600        let mut initialized_state = InitializedState::new(&root, &metadata, config)?;
601        let file_store = &initialized_state.file_store;
602        let completed_pieces: Box<[bool]> = metadata
603            .pieces
604            .as_slice()
605            .par_iter()
606            .enumerate()
607            .map(
608                |(idx, hash)| match file_store.check_piece_hash_sync(idx as i32, hash) {
609                    Ok(is_valid) => is_valid,
610                    Err(err) => {
611                        if err.kind() != io::ErrorKind::NotFound {
612                            log::warn!("Error checking piece {idx}: {err}");
613                        }
614                        false
615                    }
616                },
617            )
618            .collect();
619        let completed_pieces: BitVec<u8, Msb0> = completed_pieces.into_iter().collect();
620        let completed_pieces: BitBox<u8, Msb0> = completed_pieces.into_boxed_bitslice();
621        log::trace!("Completed pieces: {completed_pieces}");
622        initialized_state
623            .piece_selector
624            .set_completed_bitfield(completed_pieces);
625        initialized_state.is_complete = initialized_state.piece_selector.completed_all();
626
627        Ok(Self {
628            info_hash: metadata
629                .info_hash_bytes()
630                .try_into()
631                .expect("Invalid info hash"),
632            root,
633            listener_port: None,
634            torrent_state: Some(initialized_state),
635            file: OnceCell::from(Box::new(metadata)),
636            config,
637        })
638    }
639
640    #[cfg(test)]
641    pub fn inprogress(
642        info_hash: [u8; 20],
643        root: PathBuf,
644        metadata: lava_torrent::torrent::v1::Torrent,
645        state: InitializedState,
646        config: Config,
647    ) -> Self {
648        Self {
649            info_hash,
650            root,
651            listener_port: None,
652            torrent_state: Some(state),
653            file: OnceCell::from(Box::new(metadata)),
654            config,
655        }
656    }
657
658    pub fn as_ref(&mut self) -> StateRef<'_> {
659        StateRef {
660            info_hash: self.info_hash,
661            root: &self.root,
662            listener_port: &self.listener_port,
663            torrent: &mut self.torrent_state,
664            full: &self.file,
665            config: &self.config,
666        }
667    }
668}
669
670// is this even needed?
671pub struct StateRef<'state> {
672    info_hash: [u8; 20],
673    root: &'state Path,
674    pub listener_port: &'state Option<u16>,
675    torrent: &'state mut Option<InitializedState>,
676    full: &'state OnceCell<Box<TorrentMetadata>>,
677    pub config: &'state Config,
678}
679
680impl<'e_iter, 'state: 'e_iter> StateRef<'state> {
681    pub fn info_hash(&self) -> &[u8; 20] {
682        &self.info_hash
683    }
684
685    pub fn state(&'e_iter mut self) -> Option<&'e_iter mut InitializedState> {
686        self.torrent.as_mut()
687    }
688
689    #[allow(clippy::borrowed_box)]
690    pub fn metadata(&'e_iter mut self) -> Option<&'state Box<TorrentMetadata>> {
691        self.full.get()
692    }
693
694    #[inline]
695    pub fn is_initialzied(&self) -> bool {
696        self.full.get().is_some()
697    }
698
699    pub fn init(&'e_iter mut self, metadata: TorrentMetadata) -> io::Result<()> {
700        if self.is_initialzied() {
701            return Err(io::Error::other("State initialized twice"));
702        }
703        *self.torrent = Some(InitializedState::new(self.root, &metadata, *self.config)?);
704        self.full
705            .set(Box::new(metadata))
706            .map_err(|_e| io::Error::other("State initialized twice"))?;
707        Ok(())
708    }
709}