librqbit/torrent_state/
mod.rs

1pub mod initializing;
2pub mod live;
3pub mod paused;
4pub mod stats;
5mod streaming;
6pub mod utils;
7
8use std::collections::HashSet;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::sync::Weak;
14use std::time::Duration;
15
16use anyhow::bail;
17use anyhow::Context;
18use arc_swap::ArcSwapOption;
19use buffers::ByteBufOwned;
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::FutureExt;
23use librqbit_core::hash_id::Id20;
24use librqbit_core::lengths::Lengths;
25
26use librqbit_core::spawn_utils::spawn_with_cancel;
27use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
28pub use live::*;
29use parking_lot::RwLock;
30
31use tokio::sync::Notify;
32use tokio::time::timeout;
33use tokio_stream::StreamExt;
34use tokio_util::sync::CancellationToken;
35use tracing::debug;
36use tracing::error_span;
37use tracing::trace;
38use tracing::warn;
39
40use crate::chunk_tracker::ChunkTracker;
41use crate::file_info::FileInfo;
42use crate::limits::LimitsConfig;
43use crate::session::TorrentId;
44use crate::spawn_utils::BlockingSpawner;
45use crate::storage::BoxStorageFactory;
46use crate::stream_connect::StreamConnector;
47use crate::torrent_state::stats::LiveStats;
48use crate::type_aliases::DiskWorkQueueSender;
49use crate::type_aliases::FileInfos;
50use crate::type_aliases::PeerStream;
51use crate::Session;
52
53use initializing::TorrentStateInitializing;
54
55use self::paused::TorrentStatePaused;
56pub use self::stats::{TorrentStats, TorrentStatsState};
57pub use self::streaming::FileStream;
58
59// State machine transitions.
60//
61// - error -> initializing
62// - initializing -> paused
63// - paused -> live
64// - live -> paused
65//
66// - initializing -> error
67// - live -> error
68pub enum ManagedTorrentState {
69    Initializing(Arc<TorrentStateInitializing>),
70    Paused(TorrentStatePaused),
71    Live(Arc<TorrentStateLive>),
72    Error(anyhow::Error),
73
74    // This is used when swapping between states, outside world should never see it.
75    None,
76}
77
78impl ManagedTorrentState {
79    pub fn name(&self) -> &'static str {
80        match self {
81            ManagedTorrentState::Initializing(_) => "initializing",
82            ManagedTorrentState::Paused(_) => "paused",
83            ManagedTorrentState::Live(_) => "live",
84            ManagedTorrentState::Error(_) => "error",
85            ManagedTorrentState::None => "<invalid: none>",
86        }
87    }
88
89    fn assert_paused(self) -> TorrentStatePaused {
90        match self {
91            Self::Paused(paused) => paused,
92            _ => panic!("Expected paused state"),
93        }
94    }
95
96    pub(crate) fn take(&mut self) -> Self {
97        std::mem::replace(self, Self::None)
98    }
99}
100
101pub(crate) struct ManagedTorrentLocked {
102    // The torrent might not be in "paused" state technically,
103    // but the intention might be for it to stay paused.
104    //
105    // This should change only on "unpause".
106    pub(crate) paused: bool,
107    pub(crate) state: ManagedTorrentState,
108    pub(crate) only_files: Option<Vec<usize>>,
109}
110
111#[derive(Default)]
112pub(crate) struct ManagedTorrentOptions {
113    pub force_tracker_interval: Option<Duration>,
114    pub peer_connect_timeout: Option<Duration>,
115    pub peer_read_write_timeout: Option<Duration>,
116    pub allow_overwrite: bool,
117    pub output_folder: PathBuf,
118    pub disk_write_queue: Option<DiskWorkQueueSender>,
119    pub ratelimits: LimitsConfig,
120    pub initial_peers: Vec<SocketAddr>,
121    #[cfg(feature = "disable-upload")]
122    pub _disable_upload: bool,
123}
124
125impl ManagedTorrentOptions {
126    #[cfg(feature = "disable-upload")]
127    pub fn disable_upload(&self) -> bool {
128        self._disable_upload
129    }
130
131    #[cfg(not(feature = "disable-upload"))]
132    pub const fn disable_upload(&self) -> bool {
133        false
134    }
135}
136
137// Torrent bencodee "info" + some precomputed fields based on it for frequent access.
138pub struct TorrentMetadata {
139    pub info: TorrentMetaV1Info<ByteBufOwned>,
140    pub torrent_bytes: Bytes,
141    pub info_bytes: Bytes,
142    pub lengths: Lengths,
143    pub file_infos: FileInfos,
144    pub name: Option<String>,
145}
146
147impl TorrentMetadata {
148    pub(crate) fn new(
149        info: TorrentMetaV1Info<ByteBufOwned>,
150        torrent_bytes: Bytes,
151        info_bytes: Bytes,
152    ) -> anyhow::Result<Self> {
153        let lengths = Lengths::from_torrent(&info)?;
154        let file_infos = info
155            .iter_file_details_ext(&lengths)?
156            .map(|fd| {
157                Ok::<_, anyhow::Error>(FileInfo {
158                    relative_filename: fd.details.filename.to_pathbuf()?,
159                    offset_in_torrent: fd.offset,
160                    piece_range: fd.pieces,
161                    len: fd.details.len,
162                    attrs: fd.details.attrs(),
163                })
164            })
165            .collect::<anyhow::Result<Vec<FileInfo>>>()?;
166        let name = info
167            .name
168            .as_ref()
169            .and_then(|n| std::str::from_utf8(n.as_ref()).ok())
170            .map(|s| s.to_owned());
171        Ok(Self {
172            info,
173            torrent_bytes,
174            info_bytes,
175            lengths,
176            file_infos,
177            name,
178        })
179    }
180}
181
182/// Common information about torrent shared among all possible states.
183///
184// The reason it's not inlined into ManagedTorrent is to break the Arc cycle:
185// ManagedTorrent contains the current torrent state, which in turn needs access to a bunch
186// of stuff, but it shouldn't access the state.
187pub struct ManagedTorrentShared {
188    pub id: TorrentId,
189    pub info_hash: Id20,
190    pub(crate) spawner: BlockingSpawner,
191    pub trackers: HashSet<url::Url>,
192    pub peer_id: Id20,
193    pub span: tracing::Span,
194    pub(crate) options: ManagedTorrentOptions,
195    pub(crate) connector: Arc<StreamConnector>,
196    pub(crate) storage_factory: BoxStorageFactory,
197    pub(crate) session: Weak<Session>,
198
199    // "dn" from magnet link
200    pub(crate) magnet_name: Option<String>,
201}
202
203pub struct ManagedTorrent {
204    // Static torrent configuration that doesn't change.
205    pub shared: Arc<ManagedTorrentShared>,
206    // Torrent metadata. Maybe be None when the magnet is resolving (not implemented yet)
207    pub metadata: ArcSwapOption<TorrentMetadata>,
208    pub(crate) state_change_notify: Notify,
209    pub(crate) locked: RwLock<ManagedTorrentLocked>,
210}
211
212impl ManagedTorrent {
213    pub fn id(&self) -> TorrentId {
214        self.shared.id
215    }
216
217    pub fn name(&self) -> Option<String> {
218        if let Some(m) = &*self.metadata.load() {
219            return m.name.clone().or_else(|| self.shared.magnet_name.clone());
220        }
221        self.shared.magnet_name.clone()
222    }
223
224    pub fn shared(&self) -> &ManagedTorrentShared {
225        &self.shared
226    }
227
228    pub fn with_metadata<R>(
229        &self,
230        mut f: impl FnMut(&Arc<TorrentMetadata>) -> R,
231    ) -> anyhow::Result<R> {
232        let r = self.metadata.load();
233        let r = r.as_ref().context("torrent is not resolved")?;
234        Ok(f(r))
235    }
236
237    pub fn info_hash(&self) -> Id20 {
238        self.shared.info_hash
239    }
240
241    pub fn only_files(&self) -> Option<Vec<usize>> {
242        self.locked.read().only_files.clone()
243    }
244
245    pub fn with_state<R>(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R {
246        f(&self.locked.read().state)
247    }
248
249    pub(crate) fn with_state_mut<R>(&self, f: impl FnOnce(&mut ManagedTorrentState) -> R) -> R {
250        f(&mut self.locked.write().state)
251    }
252
253    pub(crate) fn with_chunk_tracker<R>(
254        &self,
255        f: impl FnOnce(&ChunkTracker) -> R,
256    ) -> anyhow::Result<R> {
257        let g = self.locked.read();
258        match &g.state {
259            ManagedTorrentState::Paused(p) => Ok(f(&p.chunk_tracker)),
260            ManagedTorrentState::Live(l) => Ok(f(l
261                .lock_read("chunk_tracker")
262                .get_chunks()
263                .context("error getting chunks")?)),
264            _ => bail!("no chunk tracker, torrent neither paused nor live"),
265        }
266    }
267
268    /// Get the live state if the torrent is live.
269    pub fn live(&self) -> Option<Arc<TorrentStateLive>> {
270        let g = self.locked.read();
271        match &g.state {
272            ManagedTorrentState::Live(live) => Some(live.clone()),
273            _ => None,
274        }
275    }
276
277    fn stop_with_error(&self, error: anyhow::Error) {
278        let mut g = self.locked.write();
279
280        match g.state.take() {
281            ManagedTorrentState::Live(live) => {
282                if let Err(err) = live.pause() {
283                    warn!(
284                        "error pausing live torrent during fatal error handling: {:?}",
285                        err
286                    );
287                }
288            }
289            ManagedTorrentState::Error(e) => {
290                warn!("bug: torrent already was in error state when trying to stop it. Previous error was: {:?}", e);
291            }
292            ManagedTorrentState::None => {
293                warn!("bug: torrent encountered in None state during fatal error handling")
294            }
295            _ => {}
296        };
297
298        self.state_change_notify.notify_waiters();
299
300        g.state = ManagedTorrentState::Error(error)
301    }
302
303    /// peer_rx: the peer stream. If start_paused=false, must be set.
304    /// start_paused: if set, the torrent will initialize (check file integrity), but will not start
305    pub(crate) fn start(
306        self: &Arc<Self>,
307        peer_rx: Option<PeerStream>,
308        start_paused: bool,
309    ) -> anyhow::Result<()> {
310        fn _start<'a>(
311            t: &'a Arc<ManagedTorrent>,
312            peer_rx: Option<PeerStream>,
313            start_paused: bool,
314            session: Arc<Session>,
315            g: Option<parking_lot::RwLockWriteGuard<'a, ManagedTorrentLocked>>,
316            token: CancellationToken,
317        ) -> anyhow::Result<()> {
318            let mut g = g.unwrap_or_else(|| t.locked.write());
319
320            match &g.state {
321                ManagedTorrentState::Live(_) => {
322                    bail!("torrent is already live");
323                }
324                ManagedTorrentState::Initializing(init) => {
325                    let init = init.clone();
326                    let t = t.clone();
327                    let span = t.shared().span.clone();
328                    let token = token.clone();
329
330                    spawn_with_cancel(
331                        error_span!(parent: span.clone(), "initialize_and_start"),
332                        token.clone(),
333                        async move {
334                            let concurrent_init_semaphore =
335                                session.concurrent_initialize_semaphore.clone();
336                            let _permit = concurrent_init_semaphore
337                                .acquire()
338                                .await
339                                .context("bug: concurrent init semaphore was closed")?;
340
341                            match init.check().await {
342                                Ok(paused) => {
343                                    let mut g = t.locked.write();
344                                    if let ManagedTorrentState::Initializing(_) = &g.state {
345                                    } else {
346                                        debug!("no need to start torrent anymore, as it switched state from initilizing");
347                                        return Ok(());
348                                    }
349
350                                    g.state = ManagedTorrentState::Paused(paused);
351                                    t.state_change_notify.notify_waiters();
352                                    _start(&t, peer_rx, start_paused, session, Some(g), token)
353                                }
354                                Err(err) => {
355                                    let result = anyhow::anyhow!("{:?}", err);
356                                    t.locked.write().state = ManagedTorrentState::Error(err);
357                                    t.state_change_notify.notify_waiters();
358                                    Err(result)
359                                }
360                            }
361                        },
362                    );
363                    Ok(())
364                }
365                ManagedTorrentState::Paused(_) => {
366                    if start_paused {
367                        return Ok(());
368                    }
369                    let paused = g.state.take().assert_paused();
370                    let (tx, rx) = tokio::sync::oneshot::channel();
371                    let live = TorrentStateLive::new(paused, tx, token.clone())?;
372                    g.state = ManagedTorrentState::Live(live.clone());
373                    t.state_change_notify.notify_waiters();
374
375                    spawn_fatal_errors_receiver(t, rx, token);
376                    if let Some(peer_rx) = peer_rx {
377                        spawn_peer_adder(&live, peer_rx);
378                    }
379                    Ok(())
380                }
381                ManagedTorrentState::Error(_) => {
382                    let metadata = t.metadata.load_full().expect("TODO");
383                    let initializing = Arc::new(TorrentStateInitializing::new(
384                        t.shared.clone(),
385                        metadata.clone(),
386                        g.only_files.clone(),
387                        t.shared
388                            .storage_factory
389                            .create_and_init(t.shared(), &metadata)?,
390                        true,
391                    ));
392                    g.state = ManagedTorrentState::Initializing(initializing.clone());
393                    t.state_change_notify.notify_waiters();
394
395                    // Recurse.
396                    _start(t, peer_rx, start_paused, session, Some(g), token)
397                }
398                ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
399            }
400        }
401
402        let session = self
403            .shared
404            .session
405            .upgrade()
406            .context("session is dead, cannot start torrent")?;
407        let mut g = self.locked.write();
408        g.paused = start_paused;
409        let cancellation_token = session.cancellation_token().child_token();
410
411        _start(
412            self,
413            peer_rx,
414            start_paused,
415            session,
416            Some(g),
417            cancellation_token,
418        )
419    }
420
421    pub fn is_paused(&self) -> bool {
422        self.locked.read().paused
423    }
424
425    /// Pause the torrent if it's live.
426    pub(crate) fn pause(&self) -> anyhow::Result<()> {
427        let mut g = self.locked.write();
428        match &g.state {
429            ManagedTorrentState::Live(live) => {
430                let paused = live.pause()?;
431                g.state = ManagedTorrentState::Paused(paused);
432                g.paused = true;
433                self.state_change_notify.notify_waiters();
434                Ok(())
435            }
436            ManagedTorrentState::Initializing(_) => {
437                bail!("torrent is initializing, can't pause");
438            }
439            ManagedTorrentState::Paused(_) => {
440                bail!("torrent is already paused");
441            }
442            ManagedTorrentState::Error(_) => {
443                bail!("can't pause torrent in error state")
444            }
445            ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
446        }
447    }
448
449    /// Get stats.
450    pub fn stats(&self) -> TorrentStats {
451        use stats::TorrentStatsState as S;
452        let mut resp = TorrentStats {
453            total_bytes: self
454                .metadata
455                .load()
456                .as_ref()
457                .map(|r| r.lengths.total_length())
458                .unwrap_or_default(),
459            file_progress: Vec::new(),
460            state: S::Error,
461            error: None,
462            progress_bytes: 0,
463            uploaded_bytes: 0,
464            finished: false,
465            live: None,
466        };
467
468        self.with_state(|s| {
469            match s {
470                ManagedTorrentState::Initializing(i) => {
471                    resp.state = S::Initializing;
472                    resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed);
473                }
474                ManagedTorrentState::Paused(p) => {
475                    resp.state = S::Paused;
476                    let hns = p.hns();
477                    resp.total_bytes = hns.total();
478                    resp.progress_bytes = hns.progress();
479                    resp.finished = hns.finished();
480                    resp.file_progress = p.chunk_tracker.per_file_have_bytes().to_owned();
481                }
482                ManagedTorrentState::Live(l) => {
483                    resp.state = S::Live;
484                    let live_stats = LiveStats::from(l.as_ref());
485                    let hns = l.get_hns().unwrap_or_default();
486                    resp.total_bytes = hns.total();
487                    resp.progress_bytes = hns.progress();
488                    resp.finished = hns.finished();
489                    resp.uploaded_bytes = l.get_uploaded_bytes();
490                    resp.file_progress = l
491                        .lock_read("file_progress")
492                        .get_chunks()
493                        .ok()
494                        .map(|c| c.per_file_have_bytes().to_owned())
495                        .unwrap_or_default();
496                    resp.live = Some(live_stats);
497                }
498                ManagedTorrentState::Error(e) => {
499                    resp.state = S::Error;
500                    resp.error = Some(format!("{:?}", e))
501                }
502                ManagedTorrentState::None => {
503                    resp.state = S::Error;
504                    resp.error = Some("bug: torrent in broken \"None\" state".to_string());
505                }
506            }
507            resp
508        })
509    }
510
511    #[inline(never)]
512    pub fn wait_until_initialized(&self) -> BoxFuture<'_, anyhow::Result<()>> {
513        async move {
514            // TODO: rewrite, this polling is horrible
515            loop {
516                let done = self.with_state(|s| match s {
517                    ManagedTorrentState::Initializing(_) => Ok(false),
518                    ManagedTorrentState::Error(e) => bail!("{:?}", e),
519                    ManagedTorrentState::None => bail!("bug: torrent state is None"),
520                    _ => Ok(true),
521                })?;
522                if done {
523                    return Ok(());
524                }
525                let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await;
526            }
527        }
528        .boxed()
529    }
530
531    #[inline(never)]
532    pub fn wait_until_completed(&self) -> BoxFuture<'_, anyhow::Result<()>> {
533        async move {
534            // TODO: rewrite, this polling is horrible
535            let live = loop {
536                let live = self.with_state(|s| match s {
537                    ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => {
538                        Ok(None)
539                    }
540                    ManagedTorrentState::Live(l) => Ok(Some(l.clone())),
541                    ManagedTorrentState::Error(e) => bail!("{:?}", e),
542                    ManagedTorrentState::None => bail!("bug: torrent state is None"),
543                })?;
544                if let Some(live) = live {
545                    break live;
546                }
547                let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await;
548            };
549
550            live.wait_until_completed().await;
551            Ok(())
552        }
553        .boxed()
554    }
555
556    // Returns true if needed to unpause torrent.
557    // This is just implementation detail - it's easier to pause/unpause than to tinker with internals.
558    pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
559        let metadata = self.metadata.load();
560        let metadata = metadata.as_ref().context("torrent is not resolved")?;
561        let file_count = metadata.file_infos.len();
562        for f in only_files.iter().copied() {
563            if f >= file_count {
564                anyhow::bail!("only_files contains invalid value {f}")
565            }
566        }
567
568        // if live, need to update chunk tracker
569        // - if already finished: need to pause, then unpause (to reopen files etc)
570        // if paused, need to update chunk tracker
571
572        let mut g = self.locked.write();
573        match &mut g.state {
574            ManagedTorrentState::Initializing(_) => bail!("can't update initializing torrent"),
575            ManagedTorrentState::Error(_) => {}
576            ManagedTorrentState::None => {}
577            ManagedTorrentState::Paused(p) => {
578                p.update_only_files(only_files)?;
579            }
580            ManagedTorrentState::Live(l) => {
581                l.update_only_files(only_files)?;
582            }
583        };
584
585        g.only_files = Some(only_files.iter().copied().collect());
586        Ok(())
587    }
588}
589
590pub type ManagedTorrentHandle = Arc<ManagedTorrent>;
591
592fn spawn_fatal_errors_receiver(
593    state: &Arc<ManagedTorrent>,
594    rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
595    token: CancellationToken,
596) {
597    let span = state.shared.span.clone();
598    let state = Arc::downgrade(state);
599    spawn_with_cancel(
600        error_span!(parent: span, "fatal_errors_receiver"),
601        token,
602        async move {
603            let e = match rx.await {
604                Ok(e) => e,
605                Err(_) => return Ok(()),
606            };
607            if let Some(state) = state.upgrade() {
608                state.stop_with_error(e);
609            } else {
610                warn!("tried to stop the torrent with error, but couldn't upgrade the arc");
611            }
612            Ok(())
613        },
614    );
615}
616
617fn spawn_peer_adder(live: &Arc<TorrentStateLive>, mut peer_rx: PeerStream) {
618    live.spawn(
619        error_span!(parent: live.torrent().span.clone(), "external_peer_adder"),
620        {
621            let live = live.clone();
622            async move {
623                let live = {
624                    let weak = Arc::downgrade(&live);
625                    drop(live);
626                    weak
627                };
628
629                loop {
630                    match timeout(Duration::from_secs(5), peer_rx.next()).await {
631                        Ok(Some(peer)) => {
632                            trace!(?peer, "received peer from peer_rx");
633                            let live = match live.upgrade() {
634                                Some(live) => live,
635                                None => return Ok(()),
636                            };
637                            live.add_peer_if_not_seen(peer).context("torrent closed")?;
638                        }
639                        Ok(None) => {
640                            debug!("peer_rx closed, closing peer adder");
641                            return Ok(());
642                        }
643                        // If timeout, check if the torrent is live.
644                        Err(_) if live.strong_count() == 0 => {
645                            debug!("timed out waiting for peers, torrent isn't live, closing peer adder");
646                            return Ok(());
647                        }
648                        Err(_) => continue,
649                    }
650                }
651            }
652        },
653    );
654}