1pub mod initializing;
2pub mod live;
3pub mod paused;
4pub mod stats;
5mod streaming;
6pub mod utils;
7
8use std::collections::HashSet;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::sync::Weak;
14use std::time::Duration;
15
16use anyhow::bail;
17use anyhow::Context;
18use arc_swap::ArcSwapOption;
19use buffers::ByteBufOwned;
20use bytes::Bytes;
21use futures::future::BoxFuture;
22use futures::FutureExt;
23use librqbit_core::hash_id::Id20;
24use librqbit_core::lengths::Lengths;
25
26use librqbit_core::spawn_utils::spawn_with_cancel;
27use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
28pub use live::*;
29use parking_lot::RwLock;
30
31use tokio::sync::Notify;
32use tokio::time::timeout;
33use tokio_stream::StreamExt;
34use tokio_util::sync::CancellationToken;
35use tracing::debug;
36use tracing::error_span;
37use tracing::trace;
38use tracing::warn;
39
40use crate::chunk_tracker::ChunkTracker;
41use crate::file_info::FileInfo;
42use crate::limits::LimitsConfig;
43use crate::session::TorrentId;
44use crate::spawn_utils::BlockingSpawner;
45use crate::storage::BoxStorageFactory;
46use crate::stream_connect::StreamConnector;
47use crate::torrent_state::stats::LiveStats;
48use crate::type_aliases::DiskWorkQueueSender;
49use crate::type_aliases::FileInfos;
50use crate::type_aliases::PeerStream;
51use crate::Session;
52
53use initializing::TorrentStateInitializing;
54
55use self::paused::TorrentStatePaused;
56pub use self::stats::{TorrentStats, TorrentStatsState};
57pub use self::streaming::FileStream;
58
59pub enum ManagedTorrentState {
69 Initializing(Arc<TorrentStateInitializing>),
70 Paused(TorrentStatePaused),
71 Live(Arc<TorrentStateLive>),
72 Error(anyhow::Error),
73
74 None,
76}
77
78impl ManagedTorrentState {
79 pub fn name(&self) -> &'static str {
80 match self {
81 ManagedTorrentState::Initializing(_) => "initializing",
82 ManagedTorrentState::Paused(_) => "paused",
83 ManagedTorrentState::Live(_) => "live",
84 ManagedTorrentState::Error(_) => "error",
85 ManagedTorrentState::None => "<invalid: none>",
86 }
87 }
88
89 fn assert_paused(self) -> TorrentStatePaused {
90 match self {
91 Self::Paused(paused) => paused,
92 _ => panic!("Expected paused state"),
93 }
94 }
95
96 pub(crate) fn take(&mut self) -> Self {
97 std::mem::replace(self, Self::None)
98 }
99}
100
101pub(crate) struct ManagedTorrentLocked {
102 pub(crate) paused: bool,
107 pub(crate) state: ManagedTorrentState,
108 pub(crate) only_files: Option<Vec<usize>>,
109}
110
111#[derive(Default)]
112pub(crate) struct ManagedTorrentOptions {
113 pub force_tracker_interval: Option<Duration>,
114 pub peer_connect_timeout: Option<Duration>,
115 pub peer_read_write_timeout: Option<Duration>,
116 pub allow_overwrite: bool,
117 pub output_folder: PathBuf,
118 pub disk_write_queue: Option<DiskWorkQueueSender>,
119 pub ratelimits: LimitsConfig,
120 pub initial_peers: Vec<SocketAddr>,
121 #[cfg(feature = "disable-upload")]
122 pub _disable_upload: bool,
123}
124
125impl ManagedTorrentOptions {
126 #[cfg(feature = "disable-upload")]
127 pub fn disable_upload(&self) -> bool {
128 self._disable_upload
129 }
130
131 #[cfg(not(feature = "disable-upload"))]
132 pub const fn disable_upload(&self) -> bool {
133 false
134 }
135}
136
137pub struct TorrentMetadata {
139 pub info: TorrentMetaV1Info<ByteBufOwned>,
140 pub torrent_bytes: Bytes,
141 pub info_bytes: Bytes,
142 pub lengths: Lengths,
143 pub file_infos: FileInfos,
144 pub name: Option<String>,
145}
146
147impl TorrentMetadata {
148 pub(crate) fn new(
149 info: TorrentMetaV1Info<ByteBufOwned>,
150 torrent_bytes: Bytes,
151 info_bytes: Bytes,
152 ) -> anyhow::Result<Self> {
153 let lengths = Lengths::from_torrent(&info)?;
154 let file_infos = info
155 .iter_file_details_ext(&lengths)?
156 .map(|fd| {
157 Ok::<_, anyhow::Error>(FileInfo {
158 relative_filename: fd.details.filename.to_pathbuf()?,
159 offset_in_torrent: fd.offset,
160 piece_range: fd.pieces,
161 len: fd.details.len,
162 attrs: fd.details.attrs(),
163 })
164 })
165 .collect::<anyhow::Result<Vec<FileInfo>>>()?;
166 let name = info
167 .name
168 .as_ref()
169 .and_then(|n| std::str::from_utf8(n.as_ref()).ok())
170 .map(|s| s.to_owned());
171 Ok(Self {
172 info,
173 torrent_bytes,
174 info_bytes,
175 lengths,
176 file_infos,
177 name,
178 })
179 }
180}
181
182pub struct ManagedTorrentShared {
188 pub id: TorrentId,
189 pub info_hash: Id20,
190 pub(crate) spawner: BlockingSpawner,
191 pub trackers: HashSet<url::Url>,
192 pub peer_id: Id20,
193 pub span: tracing::Span,
194 pub(crate) options: ManagedTorrentOptions,
195 pub(crate) connector: Arc<StreamConnector>,
196 pub(crate) storage_factory: BoxStorageFactory,
197 pub(crate) session: Weak<Session>,
198
199 pub(crate) magnet_name: Option<String>,
201}
202
203pub struct ManagedTorrent {
204 pub shared: Arc<ManagedTorrentShared>,
206 pub metadata: ArcSwapOption<TorrentMetadata>,
208 pub(crate) state_change_notify: Notify,
209 pub(crate) locked: RwLock<ManagedTorrentLocked>,
210}
211
212impl ManagedTorrent {
213 pub fn id(&self) -> TorrentId {
214 self.shared.id
215 }
216
217 pub fn name(&self) -> Option<String> {
218 if let Some(m) = &*self.metadata.load() {
219 return m.name.clone().or_else(|| self.shared.magnet_name.clone());
220 }
221 self.shared.magnet_name.clone()
222 }
223
224 pub fn shared(&self) -> &ManagedTorrentShared {
225 &self.shared
226 }
227
228 pub fn with_metadata<R>(
229 &self,
230 mut f: impl FnMut(&Arc<TorrentMetadata>) -> R,
231 ) -> anyhow::Result<R> {
232 let r = self.metadata.load();
233 let r = r.as_ref().context("torrent is not resolved")?;
234 Ok(f(r))
235 }
236
237 pub fn info_hash(&self) -> Id20 {
238 self.shared.info_hash
239 }
240
241 pub fn only_files(&self) -> Option<Vec<usize>> {
242 self.locked.read().only_files.clone()
243 }
244
245 pub fn with_state<R>(&self, f: impl FnOnce(&ManagedTorrentState) -> R) -> R {
246 f(&self.locked.read().state)
247 }
248
249 pub(crate) fn with_state_mut<R>(&self, f: impl FnOnce(&mut ManagedTorrentState) -> R) -> R {
250 f(&mut self.locked.write().state)
251 }
252
253 pub(crate) fn with_chunk_tracker<R>(
254 &self,
255 f: impl FnOnce(&ChunkTracker) -> R,
256 ) -> anyhow::Result<R> {
257 let g = self.locked.read();
258 match &g.state {
259 ManagedTorrentState::Paused(p) => Ok(f(&p.chunk_tracker)),
260 ManagedTorrentState::Live(l) => Ok(f(l
261 .lock_read("chunk_tracker")
262 .get_chunks()
263 .context("error getting chunks")?)),
264 _ => bail!("no chunk tracker, torrent neither paused nor live"),
265 }
266 }
267
268 pub fn live(&self) -> Option<Arc<TorrentStateLive>> {
270 let g = self.locked.read();
271 match &g.state {
272 ManagedTorrentState::Live(live) => Some(live.clone()),
273 _ => None,
274 }
275 }
276
277 fn stop_with_error(&self, error: anyhow::Error) {
278 let mut g = self.locked.write();
279
280 match g.state.take() {
281 ManagedTorrentState::Live(live) => {
282 if let Err(err) = live.pause() {
283 warn!(
284 "error pausing live torrent during fatal error handling: {:?}",
285 err
286 );
287 }
288 }
289 ManagedTorrentState::Error(e) => {
290 warn!("bug: torrent already was in error state when trying to stop it. Previous error was: {:?}", e);
291 }
292 ManagedTorrentState::None => {
293 warn!("bug: torrent encountered in None state during fatal error handling")
294 }
295 _ => {}
296 };
297
298 self.state_change_notify.notify_waiters();
299
300 g.state = ManagedTorrentState::Error(error)
301 }
302
303 pub(crate) fn start(
306 self: &Arc<Self>,
307 peer_rx: Option<PeerStream>,
308 start_paused: bool,
309 ) -> anyhow::Result<()> {
310 fn _start<'a>(
311 t: &'a Arc<ManagedTorrent>,
312 peer_rx: Option<PeerStream>,
313 start_paused: bool,
314 session: Arc<Session>,
315 g: Option<parking_lot::RwLockWriteGuard<'a, ManagedTorrentLocked>>,
316 token: CancellationToken,
317 ) -> anyhow::Result<()> {
318 let mut g = g.unwrap_or_else(|| t.locked.write());
319
320 match &g.state {
321 ManagedTorrentState::Live(_) => {
322 bail!("torrent is already live");
323 }
324 ManagedTorrentState::Initializing(init) => {
325 let init = init.clone();
326 let t = t.clone();
327 let span = t.shared().span.clone();
328 let token = token.clone();
329
330 spawn_with_cancel(
331 error_span!(parent: span.clone(), "initialize_and_start"),
332 token.clone(),
333 async move {
334 let concurrent_init_semaphore =
335 session.concurrent_initialize_semaphore.clone();
336 let _permit = concurrent_init_semaphore
337 .acquire()
338 .await
339 .context("bug: concurrent init semaphore was closed")?;
340
341 match init.check().await {
342 Ok(paused) => {
343 let mut g = t.locked.write();
344 if let ManagedTorrentState::Initializing(_) = &g.state {
345 } else {
346 debug!("no need to start torrent anymore, as it switched state from initilizing");
347 return Ok(());
348 }
349
350 g.state = ManagedTorrentState::Paused(paused);
351 t.state_change_notify.notify_waiters();
352 _start(&t, peer_rx, start_paused, session, Some(g), token)
353 }
354 Err(err) => {
355 let result = anyhow::anyhow!("{:?}", err);
356 t.locked.write().state = ManagedTorrentState::Error(err);
357 t.state_change_notify.notify_waiters();
358 Err(result)
359 }
360 }
361 },
362 );
363 Ok(())
364 }
365 ManagedTorrentState::Paused(_) => {
366 if start_paused {
367 return Ok(());
368 }
369 let paused = g.state.take().assert_paused();
370 let (tx, rx) = tokio::sync::oneshot::channel();
371 let live = TorrentStateLive::new(paused, tx, token.clone())?;
372 g.state = ManagedTorrentState::Live(live.clone());
373 t.state_change_notify.notify_waiters();
374
375 spawn_fatal_errors_receiver(t, rx, token);
376 if let Some(peer_rx) = peer_rx {
377 spawn_peer_adder(&live, peer_rx);
378 }
379 Ok(())
380 }
381 ManagedTorrentState::Error(_) => {
382 let metadata = t.metadata.load_full().expect("TODO");
383 let initializing = Arc::new(TorrentStateInitializing::new(
384 t.shared.clone(),
385 metadata.clone(),
386 g.only_files.clone(),
387 t.shared
388 .storage_factory
389 .create_and_init(t.shared(), &metadata)?,
390 true,
391 ));
392 g.state = ManagedTorrentState::Initializing(initializing.clone());
393 t.state_change_notify.notify_waiters();
394
395 _start(t, peer_rx, start_paused, session, Some(g), token)
397 }
398 ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
399 }
400 }
401
402 let session = self
403 .shared
404 .session
405 .upgrade()
406 .context("session is dead, cannot start torrent")?;
407 let mut g = self.locked.write();
408 g.paused = start_paused;
409 let cancellation_token = session.cancellation_token().child_token();
410
411 _start(
412 self,
413 peer_rx,
414 start_paused,
415 session,
416 Some(g),
417 cancellation_token,
418 )
419 }
420
421 pub fn is_paused(&self) -> bool {
422 self.locked.read().paused
423 }
424
425 pub(crate) fn pause(&self) -> anyhow::Result<()> {
427 let mut g = self.locked.write();
428 match &g.state {
429 ManagedTorrentState::Live(live) => {
430 let paused = live.pause()?;
431 g.state = ManagedTorrentState::Paused(paused);
432 g.paused = true;
433 self.state_change_notify.notify_waiters();
434 Ok(())
435 }
436 ManagedTorrentState::Initializing(_) => {
437 bail!("torrent is initializing, can't pause");
438 }
439 ManagedTorrentState::Paused(_) => {
440 bail!("torrent is already paused");
441 }
442 ManagedTorrentState::Error(_) => {
443 bail!("can't pause torrent in error state")
444 }
445 ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
446 }
447 }
448
449 pub fn stats(&self) -> TorrentStats {
451 use stats::TorrentStatsState as S;
452 let mut resp = TorrentStats {
453 total_bytes: self
454 .metadata
455 .load()
456 .as_ref()
457 .map(|r| r.lengths.total_length())
458 .unwrap_or_default(),
459 file_progress: Vec::new(),
460 state: S::Error,
461 error: None,
462 progress_bytes: 0,
463 uploaded_bytes: 0,
464 finished: false,
465 live: None,
466 };
467
468 self.with_state(|s| {
469 match s {
470 ManagedTorrentState::Initializing(i) => {
471 resp.state = S::Initializing;
472 resp.progress_bytes = i.checked_bytes.load(Ordering::Relaxed);
473 }
474 ManagedTorrentState::Paused(p) => {
475 resp.state = S::Paused;
476 let hns = p.hns();
477 resp.total_bytes = hns.total();
478 resp.progress_bytes = hns.progress();
479 resp.finished = hns.finished();
480 resp.file_progress = p.chunk_tracker.per_file_have_bytes().to_owned();
481 }
482 ManagedTorrentState::Live(l) => {
483 resp.state = S::Live;
484 let live_stats = LiveStats::from(l.as_ref());
485 let hns = l.get_hns().unwrap_or_default();
486 resp.total_bytes = hns.total();
487 resp.progress_bytes = hns.progress();
488 resp.finished = hns.finished();
489 resp.uploaded_bytes = l.get_uploaded_bytes();
490 resp.file_progress = l
491 .lock_read("file_progress")
492 .get_chunks()
493 .ok()
494 .map(|c| c.per_file_have_bytes().to_owned())
495 .unwrap_or_default();
496 resp.live = Some(live_stats);
497 }
498 ManagedTorrentState::Error(e) => {
499 resp.state = S::Error;
500 resp.error = Some(format!("{:?}", e))
501 }
502 ManagedTorrentState::None => {
503 resp.state = S::Error;
504 resp.error = Some("bug: torrent in broken \"None\" state".to_string());
505 }
506 }
507 resp
508 })
509 }
510
511 #[inline(never)]
512 pub fn wait_until_initialized(&self) -> BoxFuture<'_, anyhow::Result<()>> {
513 async move {
514 loop {
516 let done = self.with_state(|s| match s {
517 ManagedTorrentState::Initializing(_) => Ok(false),
518 ManagedTorrentState::Error(e) => bail!("{:?}", e),
519 ManagedTorrentState::None => bail!("bug: torrent state is None"),
520 _ => Ok(true),
521 })?;
522 if done {
523 return Ok(());
524 }
525 let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await;
526 }
527 }
528 .boxed()
529 }
530
531 #[inline(never)]
532 pub fn wait_until_completed(&self) -> BoxFuture<'_, anyhow::Result<()>> {
533 async move {
534 let live = loop {
536 let live = self.with_state(|s| match s {
537 ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => {
538 Ok(None)
539 }
540 ManagedTorrentState::Live(l) => Ok(Some(l.clone())),
541 ManagedTorrentState::Error(e) => bail!("{:?}", e),
542 ManagedTorrentState::None => bail!("bug: torrent state is None"),
543 })?;
544 if let Some(live) = live {
545 break live;
546 }
547 let _ = timeout(Duration::from_secs(1), self.state_change_notify.notified()).await;
548 };
549
550 live.wait_until_completed().await;
551 Ok(())
552 }
553 .boxed()
554 }
555
556 pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
559 let metadata = self.metadata.load();
560 let metadata = metadata.as_ref().context("torrent is not resolved")?;
561 let file_count = metadata.file_infos.len();
562 for f in only_files.iter().copied() {
563 if f >= file_count {
564 anyhow::bail!("only_files contains invalid value {f}")
565 }
566 }
567
568 let mut g = self.locked.write();
573 match &mut g.state {
574 ManagedTorrentState::Initializing(_) => bail!("can't update initializing torrent"),
575 ManagedTorrentState::Error(_) => {}
576 ManagedTorrentState::None => {}
577 ManagedTorrentState::Paused(p) => {
578 p.update_only_files(only_files)?;
579 }
580 ManagedTorrentState::Live(l) => {
581 l.update_only_files(only_files)?;
582 }
583 };
584
585 g.only_files = Some(only_files.iter().copied().collect());
586 Ok(())
587 }
588}
589
590pub type ManagedTorrentHandle = Arc<ManagedTorrent>;
591
592fn spawn_fatal_errors_receiver(
593 state: &Arc<ManagedTorrent>,
594 rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
595 token: CancellationToken,
596) {
597 let span = state.shared.span.clone();
598 let state = Arc::downgrade(state);
599 spawn_with_cancel(
600 error_span!(parent: span, "fatal_errors_receiver"),
601 token,
602 async move {
603 let e = match rx.await {
604 Ok(e) => e,
605 Err(_) => return Ok(()),
606 };
607 if let Some(state) = state.upgrade() {
608 state.stop_with_error(e);
609 } else {
610 warn!("tried to stop the torrent with error, but couldn't upgrade the arc");
611 }
612 Ok(())
613 },
614 );
615}
616
617fn spawn_peer_adder(live: &Arc<TorrentStateLive>, mut peer_rx: PeerStream) {
618 live.spawn(
619 error_span!(parent: live.torrent().span.clone(), "external_peer_adder"),
620 {
621 let live = live.clone();
622 async move {
623 let live = {
624 let weak = Arc::downgrade(&live);
625 drop(live);
626 weak
627 };
628
629 loop {
630 match timeout(Duration::from_secs(5), peer_rx.next()).await {
631 Ok(Some(peer)) => {
632 trace!(?peer, "received peer from peer_rx");
633 let live = match live.upgrade() {
634 Some(live) => live,
635 None => return Ok(()),
636 };
637 live.add_peer_if_not_seen(peer).context("torrent closed")?;
638 }
639 Ok(None) => {
640 debug!("peer_rx closed, closing peer adder");
641 return Ok(());
642 }
643 Err(_) if live.strong_count() == 0 => {
645 debug!("timed out waiting for peers, torrent isn't live, closing peer adder");
646 return Ok(());
647 }
648 Err(_) => continue,
649 }
650 }
651 }
652 },
653 );
654}