1use std::sync::Arc;
7use std::time::Duration;
8
9use reqwest::StatusCode;
10use tokio::sync::RwLock;
11use tracing::{debug, warn};
12
13use crate::error::XtreamError;
14use crate::parse::decode_epg_field;
15use crate::types::{
16 StreamFormat, StreamType, XtreamCategory, XtreamChannel, XtreamEpisode, XtreamFullEpg,
17 XtreamMovie, XtreamMovieListing, XtreamProfile, XtreamShortEpg, XtreamShow, XtreamShowListing,
18 XtreamUserProfile,
19};
20use crate::url::{
21 build_api_url, build_stream_url, build_timeshift_url, build_xmltv_url,
22 effective_channel_extension,
23};
24
25#[derive(Debug, Clone)]
31pub struct XtreamClientConfig {
32 pub connect_timeout: Duration,
34 pub request_timeout: Duration,
36 pub accept_invalid_certs: bool,
38 pub preferred_format: StreamFormat,
40}
41
42impl Default for XtreamClientConfig {
43 fn default() -> Self {
44 Self {
45 connect_timeout: Duration::from_secs(15),
46 request_timeout: Duration::from_secs(120),
47 accept_invalid_certs: false,
48 preferred_format: StreamFormat::Ts,
49 }
50 }
51}
52
53#[derive(Clone)]
59pub struct XtreamCredentials {
60 pub base_url: String,
61 pub username: String,
62 pub password: String,
63}
64
65impl XtreamCredentials {
66 pub fn new(
67 base_url: impl Into<String>,
68 username: impl Into<String>,
69 password: impl Into<String>,
70 ) -> Self {
71 let mut base = base_url.into();
72 while base.ends_with('/') {
74 base.pop();
75 }
76 Self {
77 base_url: base,
78 username: username.into(),
79 password: password.into(),
80 }
81 }
82}
83
84impl std::fmt::Debug for XtreamCredentials {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("XtreamCredentials")
87 .field("base_url", &self.base_url)
88 .field("username", &"***")
89 .field("password", &"***")
90 .finish()
91 }
92}
93
94#[derive(Clone)]
103pub struct XtreamClient {
104 http: reqwest::Client,
105 creds: XtreamCredentials,
106 config: XtreamClientConfig,
107 profile: Arc<RwLock<Option<XtreamUserProfile>>>,
109}
110
111impl std::fmt::Debug for XtreamClient {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("XtreamClient")
114 .field("creds", &self.creds)
115 .field("config", &self.config)
116 .finish()
117 }
118}
119
120impl XtreamClient {
121 pub fn new(creds: XtreamCredentials) -> Result<Self, XtreamError> {
123 Self::with_config(creds, XtreamClientConfig::default())
124 }
125
126 pub fn with_config(
128 creds: XtreamCredentials,
129 config: XtreamClientConfig,
130 ) -> Result<Self, XtreamError> {
131 let http = reqwest::Client::builder()
132 .connect_timeout(config.connect_timeout)
133 .timeout(config.request_timeout)
134 .danger_accept_invalid_certs(config.accept_invalid_certs)
135 .build()
136 .map_err(|e| XtreamError::Network(format!("failed to build HTTP client: {e}")))?;
137
138 Ok(Self {
139 http,
140 creds,
141 config,
142 profile: Arc::new(RwLock::new(None)),
143 })
144 }
145
146 pub fn with_http_client(
149 http: reqwest::Client,
150 creds: XtreamCredentials,
151 config: XtreamClientConfig,
152 ) -> Self {
153 Self {
154 http,
155 creds,
156 config,
157 profile: Arc::new(RwLock::new(None)),
158 }
159 }
160
161 pub fn base_url(&self) -> &str {
165 &self.creds.base_url
166 }
167
168 pub fn username(&self) -> &str {
170 &self.creds.username
171 }
172
173 pub fn preferred_format(&self) -> StreamFormat {
175 self.config.preferred_format
176 }
177
178 pub async fn authenticate(&self) -> Result<XtreamProfile, XtreamError> {
186 let url = build_api_url(
187 &self.creds.base_url,
188 &self.creds.username,
189 &self.creds.password,
190 "get_profile",
191 );
192
193 debug!(url = %url, "authenticating with Xtream server");
194 let profile: XtreamProfile = self.get_json(&url).await?;
195
196 if profile.user_info.auth != 1 {
197 return Err(XtreamError::Auth(format!(
198 "authentication failed: status={}",
199 profile.user_info.status
200 )));
201 }
202
203 let mut cached = self.profile.write().await;
205 *cached = Some(profile.user_info.clone());
206
207 Ok(profile)
208 }
209
210 pub async fn get_user_profile(&self) -> Result<XtreamUserProfile, XtreamError> {
212 {
213 let cached = self.profile.read().await;
214 if let Some(ref p) = *cached {
215 return Ok(p.clone());
216 }
217 }
218 let profile = self.authenticate().await?;
219 Ok(profile.user_info)
220 }
221
222 pub async fn get_live_categories(&self) -> Result<Vec<XtreamCategory>, XtreamError> {
226 let url = build_api_url(
227 &self.creds.base_url,
228 &self.creds.username,
229 &self.creds.password,
230 "get_live_categories",
231 );
232 self.get_json(&url).await
233 }
234
235 pub async fn get_vod_categories(&self) -> Result<Vec<XtreamCategory>, XtreamError> {
237 let url = build_api_url(
238 &self.creds.base_url,
239 &self.creds.username,
240 &self.creds.password,
241 "get_vod_categories",
242 );
243 self.get_json(&url).await
244 }
245
246 pub async fn get_series_categories(&self) -> Result<Vec<XtreamCategory>, XtreamError> {
248 let url = build_api_url(
249 &self.creds.base_url,
250 &self.creds.username,
251 &self.creds.password,
252 "get_series_categories",
253 );
254 self.get_json(&url).await
255 }
256
257 pub async fn get_live_streams(
263 &self,
264 category_id: Option<&str>,
265 ) -> Result<Vec<XtreamChannel>, XtreamError> {
266 let user = self.get_user_profile().await?;
268
269 let action = match category_id {
270 Some(cid) => format!("get_live_streams&category_id={cid}"),
271 None => "get_live_streams".to_string(),
272 };
273
274 let url = build_api_url(
275 &self.creds.base_url,
276 &self.creds.username,
277 &self.creds.password,
278 &action,
279 );
280
281 let mut channels: Vec<XtreamChannel> = self.get_json(&url).await?;
282
283 let ext =
285 effective_channel_extension(self.config.preferred_format, &user.allowed_output_formats);
286 for ch in &mut channels {
287 ch.url = Some(build_stream_url(
288 &self.creds.base_url,
289 &self.creds.username,
290 &self.creds.password,
291 StreamType::Channel,
292 ch.stream_id,
293 &ext,
294 ));
295 }
296
297 Ok(channels)
298 }
299
300 pub async fn get_vod_streams(
306 &self,
307 category_id: Option<&str>,
308 ) -> Result<Vec<XtreamMovieListing>, XtreamError> {
309 let _user = self.get_user_profile().await?;
311
312 let action = match category_id {
313 Some(cid) => format!("get_vod_streams&category_id={cid}"),
314 None => "get_vod_streams".to_string(),
315 };
316
317 let url = build_api_url(
318 &self.creds.base_url,
319 &self.creds.username,
320 &self.creds.password,
321 &action,
322 );
323
324 let mut movies: Vec<XtreamMovieListing> = self.get_json(&url).await?;
325
326 for movie in &mut movies {
327 let ext = movie.container_extension.as_deref().unwrap_or("mp4");
328 movie.url = Some(build_stream_url(
329 &self.creds.base_url,
330 &self.creds.username,
331 &self.creds.password,
332 StreamType::Movie,
333 movie.stream_id,
334 ext,
335 ));
336 }
337
338 Ok(movies)
339 }
340
341 pub async fn get_vod_info(&self, vod_id: i64) -> Result<XtreamMovie, XtreamError> {
343 let _user = self.get_user_profile().await?;
345
346 let action = format!("get_vod_info&vod_id={vod_id}");
347 let url = build_api_url(
348 &self.creds.base_url,
349 &self.creds.username,
350 &self.creds.password,
351 &action,
352 );
353
354 let mut movie: XtreamMovie = self.get_json(&url).await?;
355
356 if movie.info.is_none() {
359 return Err(XtreamError::NotFound(format!("movie {vod_id} not found")));
360 }
361
362 if let Some(ref data) = movie.movie_data {
364 let ext = data.container_extension.as_deref().unwrap_or("mp4");
365 movie.url = Some(build_stream_url(
366 &self.creds.base_url,
367 &self.creds.username,
368 &self.creds.password,
369 StreamType::Movie,
370 data.stream_id,
371 ext,
372 ));
373 }
374
375 Ok(movie)
376 }
377
378 pub async fn get_series(
382 &self,
383 category_id: Option<&str>,
384 ) -> Result<Vec<XtreamShowListing>, XtreamError> {
385 let action = match category_id {
386 Some(cid) => format!("get_series&category_id={cid}"),
387 None => "get_series".to_string(),
388 };
389
390 let url = build_api_url(
391 &self.creds.base_url,
392 &self.creds.username,
393 &self.creds.password,
394 &action,
395 );
396
397 self.get_json(&url).await
398 }
399
400 pub async fn get_series_info(&self, series_id: i64) -> Result<XtreamShow, XtreamError> {
403 let action = format!("get_series_info&series_id={series_id}");
404 let url = build_api_url(
405 &self.creds.base_url,
406 &self.creds.username,
407 &self.creds.password,
408 &action,
409 );
410
411 let mut show: XtreamShow = self.get_json(&url).await?;
412
413 if show.info.as_ref().is_none_or(|i| i.name.is_none()) {
415 return Err(XtreamError::NotFound(format!(
416 "series {series_id} not found"
417 )));
418 }
419
420 if let Some(ref mut info) = show.info {
422 info.series_id = Some(series_id);
423 }
424
425 for episodes in show.episodes.values_mut() {
427 for ep in episodes.iter_mut() {
428 let ep_id = ep_id_as_i64(ep);
429 let ext = ep.container_extension.as_deref().unwrap_or("mp4");
430 if let Some(id) = ep_id {
431 ep.url = Some(build_stream_url(
432 &self.creds.base_url,
433 &self.creds.username,
434 &self.creds.password,
435 StreamType::Episode,
436 id,
437 ext,
438 ));
439 }
440 }
441 }
442
443 Ok(show)
444 }
445
446 pub async fn get_short_epg(
452 &self,
453 stream_id: i64,
454 limit: Option<u32>,
455 ) -> Result<XtreamShortEpg, XtreamError> {
456 let action = match limit {
457 Some(l) => format!("get_short_epg&stream_id={stream_id}&limit={l}"),
458 None => format!("get_short_epg&stream_id={stream_id}"),
459 };
460
461 let url = build_api_url(
462 &self.creds.base_url,
463 &self.creds.username,
464 &self.creds.password,
465 &action,
466 );
467
468 let mut epg: XtreamShortEpg = self.get_json(&url).await?;
469
470 for listing in &mut epg.epg_listings {
472 listing.title = decode_epg_field(&listing.title);
473 listing.description = decode_epg_field(&listing.description);
474 }
475
476 Ok(epg)
477 }
478
479 pub async fn get_full_epg(&self, stream_id: i64) -> Result<XtreamFullEpg, XtreamError> {
483 let action = format!("get_simple_data_table&stream_id={stream_id}");
484 let url = build_api_url(
485 &self.creds.base_url,
486 &self.creds.username,
487 &self.creds.password,
488 &action,
489 );
490
491 let mut epg: XtreamFullEpg = self.get_json(&url).await?;
492
493 for listing in &mut epg.epg_listings {
495 listing.title = decode_epg_field(&listing.title);
496 listing.description = decode_epg_field(&listing.description);
497 }
498
499 Ok(epg)
500 }
501
502 pub fn stream_url(&self, stream_type: StreamType, stream_id: i64, extension: &str) -> String {
506 build_stream_url(
507 &self.creds.base_url,
508 &self.creds.username,
509 &self.creds.password,
510 stream_type,
511 stream_id,
512 extension,
513 )
514 }
515
516 pub fn xmltv_url(&self) -> String {
518 build_xmltv_url(
519 &self.creds.base_url,
520 &self.creds.username,
521 &self.creds.password,
522 )
523 }
524
525 pub fn timeshift_url(&self, stream_id: i64, duration_minutes: u32, start: &str) -> String {
527 build_timeshift_url(
528 &self.creds.base_url,
529 &self.creds.username,
530 &self.creds.password,
531 stream_id,
532 duration_minutes,
533 start,
534 )
535 }
536
537 async fn get_json<T: serde::de::DeserializeOwned>(&self, url: &str) -> Result<T, XtreamError> {
541 let response = self
542 .http
543 .get(url)
544 .header("Content-Type", "application/json")
545 .send()
546 .await?;
547
548 let status = response.status();
549
550 match status {
551 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
552 return Err(XtreamError::Auth(format!("server returned {status}")));
553 }
554 StatusCode::TOO_MANY_REQUESTS => {
555 let retry_after = response
556 .headers()
557 .get("retry-after")
558 .and_then(|v| v.to_str().ok())
559 .and_then(|v| v.parse::<u64>().ok())
560 .unwrap_or(60);
561 return Err(XtreamError::RateLimited {
562 retry_after_secs: retry_after,
563 });
564 }
565 s if s.is_server_error() => {
566 let body = response.text().await.unwrap_or_default();
567 return Err(XtreamError::Network(format!("server error {s}: {body}")));
568 }
569 _ => {}
570 }
571
572 if !status.is_success() {
573 let body = response.text().await.unwrap_or_default();
574 return Err(XtreamError::UnexpectedResponse(format!(
575 "unexpected status {status}: {body}"
576 )));
577 }
578
579 let text = response.text().await?;
580
581 if text.is_empty() {
583 return Err(XtreamError::UnexpectedResponse(
584 "empty response body".into(),
585 ));
586 }
587
588 serde_json::from_str(&text).map_err(|e| {
589 warn!(
590 error = %e,
591 body_preview = &text[..text.len().min(200)],
592 "failed to parse Xtream response"
593 );
594 XtreamError::UnexpectedResponse(format!("json parse error: {e}"))
595 })
596 }
597}
598
599fn ep_id_as_i64(ep: &XtreamEpisode) -> Option<i64> {
601 ep.id.as_ref().and_then(|v| match v {
602 serde_json::Value::Number(n) => n.as_i64(),
603 serde_json::Value::String(s) => s.parse::<i64>().ok(),
604 _ => None,
605 })
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611
612 #[test]
613 fn credentials_debug_redacts_secrets() {
614 let creds = XtreamCredentials::new("http://example.com", "admin", "secret123");
615 let debug = format!("{creds:?}");
616 assert!(!debug.contains("admin"));
617 assert!(!debug.contains("secret123"));
618 assert!(debug.contains("***"));
619 }
620
621 #[test]
622 fn credentials_strip_trailing_slash() {
623 let creds = XtreamCredentials::new("http://example.com///", "u", "p");
624 assert_eq!(creds.base_url, "http://example.com");
625 }
626
627 #[test]
628 fn stream_url_via_client() {
629 let client =
630 XtreamClient::new(XtreamCredentials::new("http://example.com", "user", "pass"))
631 .unwrap();
632
633 let url = client.stream_url(StreamType::Channel, 42, "ts");
634 assert_eq!(url, "http://example.com/live/user/pass/42.ts");
635 }
636
637 #[test]
638 fn xmltv_url_via_client() {
639 let client =
640 XtreamClient::new(XtreamCredentials::new("http://example.com", "user", "pass"))
641 .unwrap();
642
643 let url = client.xmltv_url();
644 assert_eq!(
645 url,
646 "http://example.com/xmltv.php?username=user&password=pass"
647 );
648 }
649
650 #[test]
651 fn timeshift_url_via_client() {
652 let client =
653 XtreamClient::new(XtreamCredentials::new("http://example.com", "user", "pass"))
654 .unwrap();
655
656 let url = client.timeshift_url(42, 120, "2024-01-01:10-00");
657 assert_eq!(
658 url,
659 "http://example.com/timeshift/user/pass/120/2024-01-01:10-00/42.ts"
660 );
661 }
662
663 #[test]
664 fn default_config_values() {
665 let config = XtreamClientConfig::default();
666 assert_eq!(config.connect_timeout, Duration::from_secs(15));
667 assert_eq!(config.request_timeout, Duration::from_secs(120));
668 assert!(!config.accept_invalid_certs);
669 assert_eq!(config.preferred_format, StreamFormat::Ts);
670 }
671
672 #[test]
673 fn ep_id_from_number() {
674 let ep = XtreamEpisode {
675 id: Some(serde_json::Value::Number(serde_json::Number::from(42))),
676 ..default_episode()
677 };
678 assert_eq!(ep_id_as_i64(&ep), Some(42));
679 }
680
681 #[test]
682 fn ep_id_from_string() {
683 let ep = XtreamEpisode {
684 id: Some(serde_json::Value::String("99".into())),
685 ..default_episode()
686 };
687 assert_eq!(ep_id_as_i64(&ep), Some(99));
688 }
689
690 #[test]
691 fn ep_id_none() {
692 let ep = XtreamEpisode {
693 id: None,
694 ..default_episode()
695 };
696 assert_eq!(ep_id_as_i64(&ep), None);
697 }
698
699 fn default_episode() -> XtreamEpisode {
700 XtreamEpisode {
701 id: None,
702 episode_num: None,
703 title: None,
704 container_extension: None,
705 info: None,
706 custom_sid: None,
707 added: None,
708 season: None,
709 direct_source: None,
710 subtitles: None,
711 url: None,
712 }
713 }
714}