librqbit/
api.rs

1use std::{collections::HashSet, marker::PhantomData, net::SocketAddr, str::FromStr, sync::Arc};
2
3use anyhow::Context;
4use buffers::ByteBufOwned;
5use dht::{DhtStats, Id20};
6use http::StatusCode;
7use librqbit_core::torrent_metainfo::{FileDetailsAttrs, TorrentMetaV1Info};
8use serde::{Deserialize, Serialize};
9use tokio::sync::mpsc::UnboundedSender;
10use tracing::warn;
11
12use crate::{
13    api_error::{ApiError, ApiErrorExt},
14    session::{
15        AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId,
16    },
17    session_stats::snapshot::SessionStatsSnapshot,
18    torrent_state::{
19        peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
20        FileStream, ManagedTorrentHandle,
21    },
22};
23
24#[cfg(feature = "tracing-subscriber-utils")]
25use crate::tracing_subscriber_config_utils::LineBroadcast;
26#[cfg(feature = "tracing-subscriber-utils")]
27use futures::Stream;
28#[cfg(feature = "tracing-subscriber-utils")]
29use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
30
31pub use crate::torrent_state::stats::{LiveStats, TorrentStats};
32
33pub type Result<T> = std::result::Result<T, ApiError>;
34
35/// Library API for use in different web frameworks.
36/// Contains all methods you might want to expose with (de)serializable inputs/outputs.
37#[derive(Clone)]
38pub struct Api {
39    session: Arc<Session>,
40    rust_log_reload_tx: Option<UnboundedSender<String>>,
41    #[cfg(feature = "tracing-subscriber-utils")]
42    line_broadcast: Option<LineBroadcast>,
43}
44
45#[derive(Debug, Clone, Copy)]
46pub enum TorrentIdOrHash {
47    Id(TorrentId),
48    Hash(Id20),
49}
50
51impl Serialize for TorrentIdOrHash {
52    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
53    where
54        S: serde::Serializer,
55    {
56        match self {
57            TorrentIdOrHash::Id(id) => id.serialize(serializer),
58            TorrentIdOrHash::Hash(h) => h.as_string().serialize(serializer),
59        }
60    }
61}
62
63impl<'de> Deserialize<'de> for TorrentIdOrHash {
64    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
65    where
66        D: serde::Deserializer<'de>,
67    {
68        #[derive(Default)]
69        struct V<'de> {
70            p: PhantomData<&'de ()>,
71        }
72
73        macro_rules! visit_int {
74            ($v:expr) => {{
75                let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:#}")))?;
76                Ok(TorrentIdOrHash::from(tid))
77            }};
78        }
79
80        impl<'de> serde::de::Visitor<'de> for V<'de> {
81            type Value = TorrentIdOrHash;
82
83            fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
84                f.write_str("integer or 40 byte info hash")
85            }
86
87            fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
88            where
89                E: serde::de::Error,
90            {
91                visit_int!(v)
92            }
93
94            fn visit_i128<E>(self, v: i128) -> std::result::Result<Self::Value, E>
95            where
96                E: serde::de::Error,
97            {
98                visit_int!(v)
99            }
100
101            fn visit_u128<E>(self, v: u128) -> std::result::Result<Self::Value, E>
102            where
103                E: serde::de::Error,
104            {
105                visit_int!(v)
106            }
107
108            fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
109            where
110                E: serde::de::Error,
111            {
112                visit_int!(v)
113            }
114
115            fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
116            where
117                E: serde::de::Error,
118            {
119                TorrentIdOrHash::parse(v).map_err(|e| {
120                    E::custom(format!(
121                        "expected integer or 40 byte info hash, couldn't parse string: {e:#}"
122                    ))
123                })
124            }
125        }
126
127        deserializer.deserialize_any(V::default())
128    }
129}
130
131impl std::fmt::Display for TorrentIdOrHash {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        match self {
134            TorrentIdOrHash::Id(id) => write!(f, "{}", id),
135            TorrentIdOrHash::Hash(h) => write!(f, "{:?}", h),
136        }
137    }
138}
139
140impl From<TorrentId> for TorrentIdOrHash {
141    fn from(value: TorrentId) -> Self {
142        TorrentIdOrHash::Id(value)
143    }
144}
145
146impl From<Id20> for TorrentIdOrHash {
147    fn from(value: Id20) -> Self {
148        TorrentIdOrHash::Hash(value)
149    }
150}
151
152impl<'a> TryFrom<&'a str> for TorrentIdOrHash {
153    type Error = anyhow::Error;
154
155    fn try_from(value: &'a str) -> std::result::Result<Self, Self::Error> {
156        Self::parse(value)
157    }
158}
159
160impl TorrentIdOrHash {
161    pub fn parse(s: &str) -> anyhow::Result<Self> {
162        if s.len() == 40 {
163            let id = Id20::from_str(s)?;
164            return Ok(id.into());
165        }
166        let id: TorrentId = s.parse()?;
167        Ok(id.into())
168    }
169}
170
171#[derive(Deserialize, Default)]
172pub struct ApiTorrentListOpts {
173    #[serde(default)]
174    pub with_stats: bool,
175}
176
177impl Api {
178    pub fn new(
179        session: Arc<Session>,
180        rust_log_reload_tx: Option<UnboundedSender<String>>,
181        #[cfg(feature = "tracing-subscriber-utils")] line_broadcast: Option<LineBroadcast>,
182    ) -> Self {
183        Self {
184            session,
185            rust_log_reload_tx,
186            #[cfg(feature = "tracing-subscriber-utils")]
187            line_broadcast,
188        }
189    }
190
191    pub fn session(&self) -> &Arc<Session> {
192        &self.session
193    }
194
195    pub fn mgr_handle(&self, idx: TorrentIdOrHash) -> Result<ManagedTorrentHandle> {
196        self.session
197            .get(idx)
198            .ok_or(ApiError::torrent_not_found(idx))
199    }
200
201    pub fn api_torrent_list(&self) -> TorrentListResponse {
202        self.api_torrent_list_ext(ApiTorrentListOpts { with_stats: false })
203    }
204
205    pub fn api_torrent_list_ext(&self, opts: ApiTorrentListOpts) -> TorrentListResponse {
206        let items = self.session.with_torrents(|torrents| {
207            torrents
208                .map(|(id, mgr)| {
209                    let mut r = TorrentDetailsResponse {
210                        id: Some(id),
211                        info_hash: mgr.shared().info_hash.as_string(),
212                        name: mgr.name(),
213                        output_folder: mgr
214                            .shared()
215                            .options
216                            .output_folder
217                            .to_string_lossy()
218                            .into_owned(),
219
220                        // These will be filled in /details and /stats endpoints
221                        files: None,
222                        stats: None,
223                    };
224                    if opts.with_stats {
225                        r.stats = Some(mgr.stats());
226                    }
227                    r
228                })
229                .collect()
230        });
231        TorrentListResponse { torrents: items }
232    }
233
234    pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result<TorrentDetailsResponse> {
235        let handle = self.mgr_handle(idx)?;
236        let info_hash = handle.shared().info_hash;
237        let only_files = handle.only_files();
238        let output_folder = handle
239            .shared()
240            .options
241            .output_folder
242            .to_string_lossy()
243            .into_owned()
244            .to_string();
245        make_torrent_details(
246            Some(handle.id()),
247            &info_hash,
248            handle.metadata.load().as_ref().map(|r| &r.info),
249            handle.name().as_deref(),
250            only_files.as_deref(),
251            output_folder,
252        )
253    }
254
255    pub fn api_session_stats(&self) -> SessionStatsSnapshot {
256        self.session().stats_snapshot()
257    }
258
259    pub fn torrent_file_mime_type(
260        &self,
261        idx: TorrentIdOrHash,
262        file_idx: usize,
263    ) -> Result<&'static str> {
264        let handle = self.mgr_handle(idx)?;
265        handle.with_metadata(|r| torrent_file_mime_type(&r.info, file_idx))?
266    }
267
268    pub fn api_peer_stats(
269        &self,
270        idx: TorrentIdOrHash,
271        filter: PeerStatsFilter,
272    ) -> Result<PeerStatsSnapshot> {
273        let handle = self.mgr_handle(idx)?;
274        Ok(handle
275            .live()
276            .context("not live")?
277            .per_peer_stats_snapshot(filter))
278    }
279
280    pub async fn api_torrent_action_pause(
281        &self,
282        idx: TorrentIdOrHash,
283    ) -> Result<EmptyJsonResponse> {
284        let handle = self.mgr_handle(idx)?;
285        self.session()
286            .pause(&handle)
287            .await
288            .context("error pausing torrent")
289            .with_error_status_code(StatusCode::BAD_REQUEST)?;
290        Ok(Default::default())
291    }
292
293    pub async fn api_torrent_action_start(
294        &self,
295        idx: TorrentIdOrHash,
296    ) -> Result<EmptyJsonResponse> {
297        let handle = self.mgr_handle(idx)?;
298        self.session
299            .unpause(&handle)
300            .await
301            .context("error unpausing torrent")
302            .with_error_status_code(StatusCode::BAD_REQUEST)?;
303        Ok(Default::default())
304    }
305
306    pub async fn api_torrent_action_forget(
307        &self,
308        idx: TorrentIdOrHash,
309    ) -> Result<EmptyJsonResponse> {
310        self.session
311            .delete(idx, false)
312            .await
313            .context("error forgetting torrent")?;
314        Ok(Default::default())
315    }
316
317    pub async fn api_torrent_action_delete(
318        &self,
319        idx: TorrentIdOrHash,
320    ) -> Result<EmptyJsonResponse> {
321        self.session
322            .delete(idx, true)
323            .await
324            .context("error deleting torrent with files")?;
325        Ok(Default::default())
326    }
327
328    pub async fn api_torrent_action_update_only_files(
329        &self,
330        idx: TorrentIdOrHash,
331        only_files: &HashSet<usize>,
332    ) -> Result<EmptyJsonResponse> {
333        let handle = self.mgr_handle(idx)?;
334        self.session
335            .update_only_files(&handle, only_files)
336            .await
337            .context("error updating only_files")?;
338        Ok(Default::default())
339    }
340
341    pub fn api_set_rust_log(&self, new_value: String) -> Result<EmptyJsonResponse> {
342        let tx = self
343            .rust_log_reload_tx
344            .as_ref()
345            .context("rust_log_reload_tx was not set")?;
346        tx.send(new_value)
347            .context("noone is listening to RUST_LOG changes")?;
348        Ok(Default::default())
349    }
350
351    #[cfg(feature = "tracing-subscriber-utils")]
352    pub fn api_log_lines_stream(
353        &self,
354    ) -> Result<
355        impl Stream<Item = std::result::Result<bytes::Bytes, BroadcastStreamRecvError>>
356            + Send
357            + Sync
358            + 'static,
359    > {
360        Ok(self
361            .line_broadcast
362            .as_ref()
363            .map(|sender| BroadcastStream::new(sender.subscribe()))
364            .context("line_rx wasn't set")?)
365    }
366
367    pub async fn api_add_torrent(
368        &self,
369        add: AddTorrent<'_>,
370        opts: Option<AddTorrentOptions>,
371    ) -> Result<ApiAddTorrentResponse> {
372        let response = match self
373            .session
374            .add_torrent(add, opts)
375            .await
376            .context("error adding torrent")
377            .with_error_status_code(StatusCode::BAD_REQUEST)?
378        {
379            AddTorrentResponse::AlreadyManaged(id, handle) => {
380                let details = make_torrent_details(
381                    Some(id),
382                    &handle.info_hash(),
383                    handle.metadata.load().as_ref().map(|r| &r.info),
384                    handle.name().as_deref(),
385                    handle.only_files().as_deref(),
386                    handle
387                        .shared()
388                        .options
389                        .output_folder
390                        .to_string_lossy()
391                        .into_owned(),
392                )
393                .context("error making torrent details")?;
394                ApiAddTorrentResponse {
395                    id: Some(id),
396                    details,
397                    seen_peers: None,
398                    output_folder: handle
399                        .shared()
400                        .options
401                        .output_folder
402                        .to_string_lossy()
403                        .into_owned(),
404                }
405            }
406            AddTorrentResponse::ListOnly(ListOnlyResponse {
407                info_hash,
408                info,
409                only_files,
410                seen_peers,
411                output_folder,
412                ..
413            }) => ApiAddTorrentResponse {
414                id: None,
415                output_folder: output_folder.to_string_lossy().into_owned(),
416                seen_peers: Some(seen_peers),
417                details: make_torrent_details(
418                    None,
419                    &info_hash,
420                    Some(&info),
421                    None,
422                    only_files.as_deref(),
423                    output_folder.to_string_lossy().into_owned().to_string(),
424                )
425                .context("error making torrent details")?,
426            },
427            AddTorrentResponse::Added(id, handle) => {
428                let details = make_torrent_details(
429                    Some(id),
430                    &handle.info_hash(),
431                    handle.metadata.load().as_ref().map(|r| &r.info),
432                    handle.name().as_deref(),
433                    handle.only_files().as_deref(),
434                    handle
435                        .shared()
436                        .options
437                        .output_folder
438                        .to_string_lossy()
439                        .into_owned(),
440                )
441                .context("error making torrent details")?;
442                ApiAddTorrentResponse {
443                    id: Some(id),
444                    details,
445                    seen_peers: None,
446                    output_folder: handle
447                        .shared()
448                        .options
449                        .output_folder
450                        .to_string_lossy()
451                        .into_owned(),
452                }
453            }
454        };
455        Ok(response)
456    }
457
458    pub fn api_dht_stats(&self) -> Result<DhtStats> {
459        self.session
460            .get_dht()
461            .as_ref()
462            .map(|d| d.stats())
463            .ok_or(ApiError::dht_disabled())
464    }
465
466    pub fn api_dht_table(&self) -> Result<impl Serialize> {
467        let dht = self.session.get_dht().ok_or(ApiError::dht_disabled())?;
468        Ok(dht.with_routing_table(|r| r.clone()))
469    }
470
471    pub fn api_stats_v0(&self, idx: TorrentIdOrHash) -> Result<LiveStats> {
472        let mgr = self.mgr_handle(idx)?;
473        let live = mgr.live().context("torrent not live")?;
474        Ok(LiveStats::from(&*live))
475    }
476
477    pub fn api_stats_v1(&self, idx: TorrentIdOrHash) -> Result<TorrentStats> {
478        let mgr = self.mgr_handle(idx)?;
479        Ok(mgr.stats())
480    }
481
482    pub fn api_dump_haves(&self, idx: TorrentIdOrHash) -> Result<String> {
483        let mgr = self.mgr_handle(idx)?;
484        Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces().as_slice()))?)
485    }
486
487    pub fn api_stream(&self, idx: TorrentIdOrHash, file_id: usize) -> Result<FileStream> {
488        let mgr = self.mgr_handle(idx)?;
489        Ok(mgr.stream(file_id)?)
490    }
491}
492
493#[derive(Serialize)]
494pub struct TorrentListResponse {
495    pub torrents: Vec<TorrentDetailsResponse>,
496}
497
498#[derive(Serialize, Deserialize)]
499pub struct TorrentDetailsResponseFile {
500    pub name: String,
501    pub components: Vec<String>,
502    pub length: u64,
503    pub included: bool,
504    pub attributes: FileDetailsAttrs,
505}
506
507#[derive(Default, Serialize)]
508pub struct EmptyJsonResponse {}
509
510#[derive(Serialize, Deserialize)]
511pub struct TorrentDetailsResponse {
512    #[serde(skip_serializing_if = "Option::is_none")]
513    pub id: Option<usize>,
514    pub info_hash: String,
515    pub name: Option<String>,
516    pub output_folder: String,
517
518    #[serde(skip_serializing_if = "Option::is_none")]
519    pub files: Option<Vec<TorrentDetailsResponseFile>>,
520    #[serde(skip_serializing_if = "Option::is_none", skip_deserializing)]
521    pub stats: Option<TorrentStats>,
522}
523
524#[derive(Serialize, Deserialize)]
525pub struct ApiAddTorrentResponse {
526    pub id: Option<usize>,
527    pub details: TorrentDetailsResponse,
528    pub output_folder: String,
529    pub seen_peers: Option<Vec<SocketAddr>>,
530}
531
532fn make_torrent_details(
533    id: Option<TorrentId>,
534    info_hash: &Id20,
535    info: Option<&TorrentMetaV1Info<ByteBufOwned>>,
536    name: Option<&str>,
537    only_files: Option<&[usize]>,
538    output_folder: String,
539) -> Result<TorrentDetailsResponse> {
540    let files = match info {
541        Some(info) => info
542            .iter_file_details()
543            .context("error iterating filenames and lengths")?
544            .enumerate()
545            .map(|(idx, d)| {
546                let name = match d.filename.to_string() {
547                    Ok(s) => s,
548                    Err(err) => {
549                        warn!("error reading filename: {:?}", err);
550                        "<INVALID NAME>".to_string()
551                    }
552                };
553                let components = d.filename.to_vec().unwrap_or_default();
554                let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true);
555                TorrentDetailsResponseFile {
556                    name,
557                    components,
558                    length: d.len,
559                    included,
560                    attributes: d.attrs(),
561                }
562            })
563            .collect(),
564        None => Default::default(),
565    };
566    Ok(TorrentDetailsResponse {
567        id,
568        info_hash: info_hash.as_string(),
569        name: name.map(|s| s.to_owned()).or_else(|| {
570            info.and_then(|i| {
571                i.name
572                    .as_ref()
573                    .map(|b| String::from_utf8_lossy(b.as_ref()).into())
574            })
575        }),
576        files: Some(files),
577        output_folder,
578        stats: None,
579    })
580}
581
582fn torrent_file_mime_type(
583    info: &TorrentMetaV1Info<ByteBufOwned>,
584    file_idx: usize,
585) -> Result<&'static str> {
586    info.iter_file_details()?
587        .nth(file_idx)
588        .and_then(|d| {
589            d.filename
590                .iter_components()
591                .last()
592                .and_then(|r| r.ok())
593                .and_then(|s| mime_guess::from_path(s).first_raw())
594        })
595        .ok_or_else(|| {
596            ApiError::new_from_text(
597                StatusCode::INTERNAL_SERVER_ERROR,
598                "cannot determine mime type for file",
599            )
600        })
601}