1use std::{collections::HashSet, marker::PhantomData, net::SocketAddr, str::FromStr, sync::Arc};
2
3use anyhow::Context;
4use buffers::ByteBufOwned;
5use dht::{DhtStats, Id20};
6use http::StatusCode;
7use librqbit_core::torrent_metainfo::{FileDetailsAttrs, TorrentMetaV1Info};
8use serde::{Deserialize, Serialize};
9use tokio::sync::mpsc::UnboundedSender;
10use tracing::warn;
11
12use crate::{
13 api_error::{ApiError, ApiErrorExt},
14 session::{
15 AddTorrent, AddTorrentOptions, AddTorrentResponse, ListOnlyResponse, Session, TorrentId,
16 },
17 session_stats::snapshot::SessionStatsSnapshot,
18 torrent_state::{
19 peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
20 FileStream, ManagedTorrentHandle,
21 },
22};
23
24#[cfg(feature = "tracing-subscriber-utils")]
25use crate::tracing_subscriber_config_utils::LineBroadcast;
26#[cfg(feature = "tracing-subscriber-utils")]
27use futures::Stream;
28#[cfg(feature = "tracing-subscriber-utils")]
29use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
30
31pub use crate::torrent_state::stats::{LiveStats, TorrentStats};
32
33pub type Result<T> = std::result::Result<T, ApiError>;
34
35#[derive(Clone)]
38pub struct Api {
39 session: Arc<Session>,
40 rust_log_reload_tx: Option<UnboundedSender<String>>,
41 #[cfg(feature = "tracing-subscriber-utils")]
42 line_broadcast: Option<LineBroadcast>,
43}
44
45#[derive(Debug, Clone, Copy)]
46pub enum TorrentIdOrHash {
47 Id(TorrentId),
48 Hash(Id20),
49}
50
51impl Serialize for TorrentIdOrHash {
52 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
53 where
54 S: serde::Serializer,
55 {
56 match self {
57 TorrentIdOrHash::Id(id) => id.serialize(serializer),
58 TorrentIdOrHash::Hash(h) => h.as_string().serialize(serializer),
59 }
60 }
61}
62
63impl<'de> Deserialize<'de> for TorrentIdOrHash {
64 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
65 where
66 D: serde::Deserializer<'de>,
67 {
68 #[derive(Default)]
69 struct V<'de> {
70 p: PhantomData<&'de ()>,
71 }
72
73 macro_rules! visit_int {
74 ($v:expr) => {{
75 let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:#}")))?;
76 Ok(TorrentIdOrHash::from(tid))
77 }};
78 }
79
80 impl<'de> serde::de::Visitor<'de> for V<'de> {
81 type Value = TorrentIdOrHash;
82
83 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
84 f.write_str("integer or 40 byte info hash")
85 }
86
87 fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
88 where
89 E: serde::de::Error,
90 {
91 visit_int!(v)
92 }
93
94 fn visit_i128<E>(self, v: i128) -> std::result::Result<Self::Value, E>
95 where
96 E: serde::de::Error,
97 {
98 visit_int!(v)
99 }
100
101 fn visit_u128<E>(self, v: u128) -> std::result::Result<Self::Value, E>
102 where
103 E: serde::de::Error,
104 {
105 visit_int!(v)
106 }
107
108 fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
109 where
110 E: serde::de::Error,
111 {
112 visit_int!(v)
113 }
114
115 fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
116 where
117 E: serde::de::Error,
118 {
119 TorrentIdOrHash::parse(v).map_err(|e| {
120 E::custom(format!(
121 "expected integer or 40 byte info hash, couldn't parse string: {e:#}"
122 ))
123 })
124 }
125 }
126
127 deserializer.deserialize_any(V::default())
128 }
129}
130
131impl std::fmt::Display for TorrentIdOrHash {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 match self {
134 TorrentIdOrHash::Id(id) => write!(f, "{}", id),
135 TorrentIdOrHash::Hash(h) => write!(f, "{:?}", h),
136 }
137 }
138}
139
140impl From<TorrentId> for TorrentIdOrHash {
141 fn from(value: TorrentId) -> Self {
142 TorrentIdOrHash::Id(value)
143 }
144}
145
146impl From<Id20> for TorrentIdOrHash {
147 fn from(value: Id20) -> Self {
148 TorrentIdOrHash::Hash(value)
149 }
150}
151
152impl<'a> TryFrom<&'a str> for TorrentIdOrHash {
153 type Error = anyhow::Error;
154
155 fn try_from(value: &'a str) -> std::result::Result<Self, Self::Error> {
156 Self::parse(value)
157 }
158}
159
160impl TorrentIdOrHash {
161 pub fn parse(s: &str) -> anyhow::Result<Self> {
162 if s.len() == 40 {
163 let id = Id20::from_str(s)?;
164 return Ok(id.into());
165 }
166 let id: TorrentId = s.parse()?;
167 Ok(id.into())
168 }
169}
170
171#[derive(Deserialize, Default)]
172pub struct ApiTorrentListOpts {
173 #[serde(default)]
174 pub with_stats: bool,
175}
176
177impl Api {
178 pub fn new(
179 session: Arc<Session>,
180 rust_log_reload_tx: Option<UnboundedSender<String>>,
181 #[cfg(feature = "tracing-subscriber-utils")] line_broadcast: Option<LineBroadcast>,
182 ) -> Self {
183 Self {
184 session,
185 rust_log_reload_tx,
186 #[cfg(feature = "tracing-subscriber-utils")]
187 line_broadcast,
188 }
189 }
190
191 pub fn session(&self) -> &Arc<Session> {
192 &self.session
193 }
194
195 pub fn mgr_handle(&self, idx: TorrentIdOrHash) -> Result<ManagedTorrentHandle> {
196 self.session
197 .get(idx)
198 .ok_or(ApiError::torrent_not_found(idx))
199 }
200
201 pub fn api_torrent_list(&self) -> TorrentListResponse {
202 self.api_torrent_list_ext(ApiTorrentListOpts { with_stats: false })
203 }
204
205 pub fn api_torrent_list_ext(&self, opts: ApiTorrentListOpts) -> TorrentListResponse {
206 let items = self.session.with_torrents(|torrents| {
207 torrents
208 .map(|(id, mgr)| {
209 let mut r = TorrentDetailsResponse {
210 id: Some(id),
211 info_hash: mgr.shared().info_hash.as_string(),
212 name: mgr.name(),
213 output_folder: mgr
214 .shared()
215 .options
216 .output_folder
217 .to_string_lossy()
218 .into_owned(),
219
220 files: None,
222 stats: None,
223 };
224 if opts.with_stats {
225 r.stats = Some(mgr.stats());
226 }
227 r
228 })
229 .collect()
230 });
231 TorrentListResponse { torrents: items }
232 }
233
234 pub fn api_torrent_details(&self, idx: TorrentIdOrHash) -> Result<TorrentDetailsResponse> {
235 let handle = self.mgr_handle(idx)?;
236 let info_hash = handle.shared().info_hash;
237 let only_files = handle.only_files();
238 let output_folder = handle
239 .shared()
240 .options
241 .output_folder
242 .to_string_lossy()
243 .into_owned()
244 .to_string();
245 make_torrent_details(
246 Some(handle.id()),
247 &info_hash,
248 handle.metadata.load().as_ref().map(|r| &r.info),
249 handle.name().as_deref(),
250 only_files.as_deref(),
251 output_folder,
252 )
253 }
254
255 pub fn api_session_stats(&self) -> SessionStatsSnapshot {
256 self.session().stats_snapshot()
257 }
258
259 pub fn torrent_file_mime_type(
260 &self,
261 idx: TorrentIdOrHash,
262 file_idx: usize,
263 ) -> Result<&'static str> {
264 let handle = self.mgr_handle(idx)?;
265 handle.with_metadata(|r| torrent_file_mime_type(&r.info, file_idx))?
266 }
267
268 pub fn api_peer_stats(
269 &self,
270 idx: TorrentIdOrHash,
271 filter: PeerStatsFilter,
272 ) -> Result<PeerStatsSnapshot> {
273 let handle = self.mgr_handle(idx)?;
274 Ok(handle
275 .live()
276 .context("not live")?
277 .per_peer_stats_snapshot(filter))
278 }
279
280 pub async fn api_torrent_action_pause(
281 &self,
282 idx: TorrentIdOrHash,
283 ) -> Result<EmptyJsonResponse> {
284 let handle = self.mgr_handle(idx)?;
285 self.session()
286 .pause(&handle)
287 .await
288 .context("error pausing torrent")
289 .with_error_status_code(StatusCode::BAD_REQUEST)?;
290 Ok(Default::default())
291 }
292
293 pub async fn api_torrent_action_start(
294 &self,
295 idx: TorrentIdOrHash,
296 ) -> Result<EmptyJsonResponse> {
297 let handle = self.mgr_handle(idx)?;
298 self.session
299 .unpause(&handle)
300 .await
301 .context("error unpausing torrent")
302 .with_error_status_code(StatusCode::BAD_REQUEST)?;
303 Ok(Default::default())
304 }
305
306 pub async fn api_torrent_action_forget(
307 &self,
308 idx: TorrentIdOrHash,
309 ) -> Result<EmptyJsonResponse> {
310 self.session
311 .delete(idx, false)
312 .await
313 .context("error forgetting torrent")?;
314 Ok(Default::default())
315 }
316
317 pub async fn api_torrent_action_delete(
318 &self,
319 idx: TorrentIdOrHash,
320 ) -> Result<EmptyJsonResponse> {
321 self.session
322 .delete(idx, true)
323 .await
324 .context("error deleting torrent with files")?;
325 Ok(Default::default())
326 }
327
328 pub async fn api_torrent_action_update_only_files(
329 &self,
330 idx: TorrentIdOrHash,
331 only_files: &HashSet<usize>,
332 ) -> Result<EmptyJsonResponse> {
333 let handle = self.mgr_handle(idx)?;
334 self.session
335 .update_only_files(&handle, only_files)
336 .await
337 .context("error updating only_files")?;
338 Ok(Default::default())
339 }
340
341 pub fn api_set_rust_log(&self, new_value: String) -> Result<EmptyJsonResponse> {
342 let tx = self
343 .rust_log_reload_tx
344 .as_ref()
345 .context("rust_log_reload_tx was not set")?;
346 tx.send(new_value)
347 .context("noone is listening to RUST_LOG changes")?;
348 Ok(Default::default())
349 }
350
351 #[cfg(feature = "tracing-subscriber-utils")]
352 pub fn api_log_lines_stream(
353 &self,
354 ) -> Result<
355 impl Stream<Item = std::result::Result<bytes::Bytes, BroadcastStreamRecvError>>
356 + Send
357 + Sync
358 + 'static,
359 > {
360 Ok(self
361 .line_broadcast
362 .as_ref()
363 .map(|sender| BroadcastStream::new(sender.subscribe()))
364 .context("line_rx wasn't set")?)
365 }
366
367 pub async fn api_add_torrent(
368 &self,
369 add: AddTorrent<'_>,
370 opts: Option<AddTorrentOptions>,
371 ) -> Result<ApiAddTorrentResponse> {
372 let response = match self
373 .session
374 .add_torrent(add, opts)
375 .await
376 .context("error adding torrent")
377 .with_error_status_code(StatusCode::BAD_REQUEST)?
378 {
379 AddTorrentResponse::AlreadyManaged(id, handle) => {
380 let details = make_torrent_details(
381 Some(id),
382 &handle.info_hash(),
383 handle.metadata.load().as_ref().map(|r| &r.info),
384 handle.name().as_deref(),
385 handle.only_files().as_deref(),
386 handle
387 .shared()
388 .options
389 .output_folder
390 .to_string_lossy()
391 .into_owned(),
392 )
393 .context("error making torrent details")?;
394 ApiAddTorrentResponse {
395 id: Some(id),
396 details,
397 seen_peers: None,
398 output_folder: handle
399 .shared()
400 .options
401 .output_folder
402 .to_string_lossy()
403 .into_owned(),
404 }
405 }
406 AddTorrentResponse::ListOnly(ListOnlyResponse {
407 info_hash,
408 info,
409 only_files,
410 seen_peers,
411 output_folder,
412 ..
413 }) => ApiAddTorrentResponse {
414 id: None,
415 output_folder: output_folder.to_string_lossy().into_owned(),
416 seen_peers: Some(seen_peers),
417 details: make_torrent_details(
418 None,
419 &info_hash,
420 Some(&info),
421 None,
422 only_files.as_deref(),
423 output_folder.to_string_lossy().into_owned().to_string(),
424 )
425 .context("error making torrent details")?,
426 },
427 AddTorrentResponse::Added(id, handle) => {
428 let details = make_torrent_details(
429 Some(id),
430 &handle.info_hash(),
431 handle.metadata.load().as_ref().map(|r| &r.info),
432 handle.name().as_deref(),
433 handle.only_files().as_deref(),
434 handle
435 .shared()
436 .options
437 .output_folder
438 .to_string_lossy()
439 .into_owned(),
440 )
441 .context("error making torrent details")?;
442 ApiAddTorrentResponse {
443 id: Some(id),
444 details,
445 seen_peers: None,
446 output_folder: handle
447 .shared()
448 .options
449 .output_folder
450 .to_string_lossy()
451 .into_owned(),
452 }
453 }
454 };
455 Ok(response)
456 }
457
458 pub fn api_dht_stats(&self) -> Result<DhtStats> {
459 self.session
460 .get_dht()
461 .as_ref()
462 .map(|d| d.stats())
463 .ok_or(ApiError::dht_disabled())
464 }
465
466 pub fn api_dht_table(&self) -> Result<impl Serialize> {
467 let dht = self.session.get_dht().ok_or(ApiError::dht_disabled())?;
468 Ok(dht.with_routing_table(|r| r.clone()))
469 }
470
471 pub fn api_stats_v0(&self, idx: TorrentIdOrHash) -> Result<LiveStats> {
472 let mgr = self.mgr_handle(idx)?;
473 let live = mgr.live().context("torrent not live")?;
474 Ok(LiveStats::from(&*live))
475 }
476
477 pub fn api_stats_v1(&self, idx: TorrentIdOrHash) -> Result<TorrentStats> {
478 let mgr = self.mgr_handle(idx)?;
479 Ok(mgr.stats())
480 }
481
482 pub fn api_dump_haves(&self, idx: TorrentIdOrHash) -> Result<String> {
483 let mgr = self.mgr_handle(idx)?;
484 Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces().as_slice()))?)
485 }
486
487 pub fn api_stream(&self, idx: TorrentIdOrHash, file_id: usize) -> Result<FileStream> {
488 let mgr = self.mgr_handle(idx)?;
489 Ok(mgr.stream(file_id)?)
490 }
491}
492
493#[derive(Serialize)]
494pub struct TorrentListResponse {
495 pub torrents: Vec<TorrentDetailsResponse>,
496}
497
498#[derive(Serialize, Deserialize)]
499pub struct TorrentDetailsResponseFile {
500 pub name: String,
501 pub components: Vec<String>,
502 pub length: u64,
503 pub included: bool,
504 pub attributes: FileDetailsAttrs,
505}
506
507#[derive(Default, Serialize)]
508pub struct EmptyJsonResponse {}
509
510#[derive(Serialize, Deserialize)]
511pub struct TorrentDetailsResponse {
512 #[serde(skip_serializing_if = "Option::is_none")]
513 pub id: Option<usize>,
514 pub info_hash: String,
515 pub name: Option<String>,
516 pub output_folder: String,
517
518 #[serde(skip_serializing_if = "Option::is_none")]
519 pub files: Option<Vec<TorrentDetailsResponseFile>>,
520 #[serde(skip_serializing_if = "Option::is_none", skip_deserializing)]
521 pub stats: Option<TorrentStats>,
522}
523
524#[derive(Serialize, Deserialize)]
525pub struct ApiAddTorrentResponse {
526 pub id: Option<usize>,
527 pub details: TorrentDetailsResponse,
528 pub output_folder: String,
529 pub seen_peers: Option<Vec<SocketAddr>>,
530}
531
532fn make_torrent_details(
533 id: Option<TorrentId>,
534 info_hash: &Id20,
535 info: Option<&TorrentMetaV1Info<ByteBufOwned>>,
536 name: Option<&str>,
537 only_files: Option<&[usize]>,
538 output_folder: String,
539) -> Result<TorrentDetailsResponse> {
540 let files = match info {
541 Some(info) => info
542 .iter_file_details()
543 .context("error iterating filenames and lengths")?
544 .enumerate()
545 .map(|(idx, d)| {
546 let name = match d.filename.to_string() {
547 Ok(s) => s,
548 Err(err) => {
549 warn!("error reading filename: {:?}", err);
550 "<INVALID NAME>".to_string()
551 }
552 };
553 let components = d.filename.to_vec().unwrap_or_default();
554 let included = only_files.map(|o| o.contains(&idx)).unwrap_or(true);
555 TorrentDetailsResponseFile {
556 name,
557 components,
558 length: d.len,
559 included,
560 attributes: d.attrs(),
561 }
562 })
563 .collect(),
564 None => Default::default(),
565 };
566 Ok(TorrentDetailsResponse {
567 id,
568 info_hash: info_hash.as_string(),
569 name: name.map(|s| s.to_owned()).or_else(|| {
570 info.and_then(|i| {
571 i.name
572 .as_ref()
573 .map(|b| String::from_utf8_lossy(b.as_ref()).into())
574 })
575 }),
576 files: Some(files),
577 output_folder,
578 stats: None,
579 })
580}
581
582fn torrent_file_mime_type(
583 info: &TorrentMetaV1Info<ByteBufOwned>,
584 file_idx: usize,
585) -> Result<&'static str> {
586 info.iter_file_details()?
587 .nth(file_idx)
588 .and_then(|d| {
589 d.filename
590 .iter_components()
591 .last()
592 .and_then(|r| r.ok())
593 .and_then(|s| mime_guess::from_path(s).first_raw())
594 })
595 .ok_or_else(|| {
596 ApiError::new_from_text(
597 StatusCode::INTERNAL_SERVER_ERROR,
598 "cannot determine mime type for file",
599 )
600 })
601}