Skip to main content

crispy_xtream/
client.rs

1//! Async Xtream Codes API client.
2//!
3//! `XtreamClient` is the primary entry point. It wraps a connection-pooled
4//! `reqwest::Client` and provides typed methods for every Xtream endpoint.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use reqwest::StatusCode;
10use tokio::sync::RwLock;
11use tracing::{debug, warn};
12
13use crate::error::XtreamError;
14use crate::parse::decode_epg_field;
15use crate::types::{
16    StreamFormat, StreamType, XtreamCategory, XtreamChannel, XtreamEpisode, XtreamFullEpg,
17    XtreamMovie, XtreamMovieListing, XtreamProfile, XtreamShortEpg, XtreamShow, XtreamShowListing,
18    XtreamUserProfile,
19};
20use crate::url::{
21    build_api_url, build_stream_url, build_timeshift_url, build_xmltv_url,
22    effective_channel_extension,
23};
24
25// ---------------------------------------------------------------------------
26// Configuration
27// ---------------------------------------------------------------------------
28
29/// Client configuration with timeouts and TLS settings.
30#[derive(Debug, Clone)]
31pub struct XtreamClientConfig {
32    /// TCP connect timeout (default: 15 s).
33    pub connect_timeout: Duration,
34    /// Total request timeout (default: 120 s).
35    pub request_timeout: Duration,
36    /// Accept invalid / self-signed TLS certificates.
37    pub accept_invalid_certs: bool,
38    /// Preferred stream format (default: TS).
39    pub preferred_format: StreamFormat,
40}
41
42impl Default for XtreamClientConfig {
43    fn default() -> Self {
44        Self {
45            connect_timeout: Duration::from_secs(15),
46            request_timeout: Duration::from_secs(120),
47            accept_invalid_certs: false,
48            preferred_format: StreamFormat::Ts,
49        }
50    }
51}
52
53// ---------------------------------------------------------------------------
54// Credentials
55// ---------------------------------------------------------------------------
56
57/// Xtream server credentials. `Debug` is intentionally redacted.
58#[derive(Clone)]
59pub struct XtreamCredentials {
60    pub base_url: String,
61    pub username: String,
62    pub password: String,
63}
64
65impl XtreamCredentials {
66    pub fn new(
67        base_url: impl Into<String>,
68        username: impl Into<String>,
69        password: impl Into<String>,
70    ) -> Self {
71        let mut base = base_url.into();
72        // Strip trailing slash for consistent URL building.
73        while base.ends_with('/') {
74            base.pop();
75        }
76        Self {
77            base_url: base,
78            username: username.into(),
79            password: password.into(),
80        }
81    }
82}
83
84impl std::fmt::Debug for XtreamCredentials {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct("XtreamCredentials")
87            .field("base_url", &self.base_url)
88            .field("username", &"***")
89            .field("password", &"***")
90            .finish()
91    }
92}
93
94// ---------------------------------------------------------------------------
95// Client
96// ---------------------------------------------------------------------------
97
98/// Async client for the Xtream Codes API.
99///
100/// Wraps a `reqwest::Client` (connection pooled) and caches the user profile
101/// after the first authentication call.
102#[derive(Clone)]
103pub struct XtreamClient {
104    http: reqwest::Client,
105    creds: XtreamCredentials,
106    config: XtreamClientConfig,
107    /// Cached profile after `authenticate()`.
108    profile: Arc<RwLock<Option<XtreamUserProfile>>>,
109}
110
111impl std::fmt::Debug for XtreamClient {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("XtreamClient")
114            .field("creds", &self.creds)
115            .field("config", &self.config)
116            .finish()
117    }
118}
119
120impl XtreamClient {
121    /// Create a new client with the given credentials and default configuration.
122    pub fn new(creds: XtreamCredentials) -> Result<Self, XtreamError> {
123        Self::with_config(creds, XtreamClientConfig::default())
124    }
125
126    /// Create a new client with explicit configuration.
127    pub fn with_config(
128        creds: XtreamCredentials,
129        config: XtreamClientConfig,
130    ) -> Result<Self, XtreamError> {
131        let http = reqwest::Client::builder()
132            .connect_timeout(config.connect_timeout)
133            .timeout(config.request_timeout)
134            .danger_accept_invalid_certs(config.accept_invalid_certs)
135            .build()
136            .map_err(|e| XtreamError::Network(format!("failed to build HTTP client: {e}")))?;
137
138        Ok(Self {
139            http,
140            creds,
141            config,
142            profile: Arc::new(RwLock::new(None)),
143        })
144    }
145
146    /// Create a client from an existing `reqwest::Client` (useful for sharing
147    /// a connection pool across multiple crates).
148    pub fn with_http_client(
149        http: reqwest::Client,
150        creds: XtreamCredentials,
151        config: XtreamClientConfig,
152    ) -> Self {
153        Self {
154            http,
155            creds,
156            config,
157            profile: Arc::new(RwLock::new(None)),
158        }
159    }
160
161    // -- Accessors ----------------------------------------------------------
162
163    /// The base URL of the Xtream server.
164    pub fn base_url(&self) -> &str {
165        &self.creds.base_url
166    }
167
168    /// The username used for authentication.
169    pub fn username(&self) -> &str {
170        &self.creds.username
171    }
172
173    /// The configured preferred stream format.
174    pub fn preferred_format(&self) -> StreamFormat {
175        self.config.preferred_format
176    }
177
178    // -- API methods --------------------------------------------------------
179
180    /// Authenticate with the server and cache the profile.
181    ///
182    /// This is called implicitly by methods that need the user profile
183    /// (channel/VOD listings), but you can call it explicitly to verify
184    /// credentials early.
185    pub async fn authenticate(&self) -> Result<XtreamProfile, XtreamError> {
186        let url = build_api_url(
187            &self.creds.base_url,
188            &self.creds.username,
189            &self.creds.password,
190            "get_profile",
191        );
192
193        debug!(url = %url, "authenticating with Xtream server");
194        let profile: XtreamProfile = self.get_json(&url).await?;
195
196        if profile.user_info.auth != 1 {
197            return Err(XtreamError::Auth(format!(
198                "authentication failed: status={}",
199                profile.user_info.status
200            )));
201        }
202
203        // Cache the profile.
204        let mut cached = self.profile.write().await;
205        *cached = Some(profile.user_info.clone());
206
207        Ok(profile)
208    }
209
210    /// Get the cached user profile, authenticating first if needed.
211    pub async fn get_user_profile(&self) -> Result<XtreamUserProfile, XtreamError> {
212        {
213            let cached = self.profile.read().await;
214            if let Some(ref p) = *cached {
215                return Ok(p.clone());
216            }
217        }
218        let profile = self.authenticate().await?;
219        Ok(profile.user_info)
220    }
221
222    // -- Categories ---------------------------------------------------------
223
224    /// Fetch live channel categories.
225    pub async fn get_live_categories(&self) -> Result<Vec<XtreamCategory>, XtreamError> {
226        let url = build_api_url(
227            &self.creds.base_url,
228            &self.creds.username,
229            &self.creds.password,
230            "get_live_categories",
231        );
232        self.get_json(&url).await
233    }
234
235    /// Fetch VOD (movie) categories.
236    pub async fn get_vod_categories(&self) -> Result<Vec<XtreamCategory>, XtreamError> {
237        let url = build_api_url(
238            &self.creds.base_url,
239            &self.creds.username,
240            &self.creds.password,
241            "get_vod_categories",
242        );
243        self.get_json(&url).await
244    }
245
246    /// Fetch series categories.
247    pub async fn get_series_categories(&self) -> Result<Vec<XtreamCategory>, XtreamError> {
248        let url = build_api_url(
249            &self.creds.base_url,
250            &self.creds.username,
251            &self.creds.password,
252            "get_series_categories",
253        );
254        self.get_json(&url).await
255    }
256
257    // -- Live Streams -------------------------------------------------------
258
259    /// Fetch live channels, optionally filtered by category.
260    ///
261    /// Stream URLs are automatically generated and attached to each channel.
262    pub async fn get_live_streams(
263        &self,
264        category_id: Option<&str>,
265    ) -> Result<Vec<XtreamChannel>, XtreamError> {
266        // Ensure profile is cached for format resolution.
267        let user = self.get_user_profile().await?;
268
269        let action = match category_id {
270            Some(cid) => format!("get_live_streams&category_id={cid}"),
271            None => "get_live_streams".to_string(),
272        };
273
274        let url = build_api_url(
275            &self.creds.base_url,
276            &self.creds.username,
277            &self.creds.password,
278            &action,
279        );
280
281        let mut channels: Vec<XtreamChannel> = self.get_json(&url).await?;
282
283        // Attach stream URLs.
284        let ext =
285            effective_channel_extension(self.config.preferred_format, &user.allowed_output_formats);
286        for ch in &mut channels {
287            ch.url = Some(build_stream_url(
288                &self.creds.base_url,
289                &self.creds.username,
290                &self.creds.password,
291                StreamType::Channel,
292                ch.stream_id,
293                &ext,
294            ));
295        }
296
297        Ok(channels)
298    }
299
300    // -- VOD ----------------------------------------------------------------
301
302    /// Fetch VOD (movie) listings, optionally filtered by category.
303    ///
304    /// Stream URLs are automatically generated and attached.
305    pub async fn get_vod_streams(
306        &self,
307        category_id: Option<&str>,
308    ) -> Result<Vec<XtreamMovieListing>, XtreamError> {
309        // Ensure profile is cached.
310        let _user = self.get_user_profile().await?;
311
312        let action = match category_id {
313            Some(cid) => format!("get_vod_streams&category_id={cid}"),
314            None => "get_vod_streams".to_string(),
315        };
316
317        let url = build_api_url(
318            &self.creds.base_url,
319            &self.creds.username,
320            &self.creds.password,
321            &action,
322        );
323
324        let mut movies: Vec<XtreamMovieListing> = self.get_json(&url).await?;
325
326        for movie in &mut movies {
327            let ext = movie.container_extension.as_deref().unwrap_or("mp4");
328            movie.url = Some(build_stream_url(
329                &self.creds.base_url,
330                &self.creds.username,
331                &self.creds.password,
332                StreamType::Movie,
333                movie.stream_id,
334                ext,
335            ));
336        }
337
338        Ok(movies)
339    }
340
341    /// Fetch detailed information about a specific movie.
342    pub async fn get_vod_info(&self, vod_id: i64) -> Result<XtreamMovie, XtreamError> {
343        // Ensure profile is cached.
344        let _user = self.get_user_profile().await?;
345
346        let action = format!("get_vod_info&vod_id={vod_id}");
347        let url = build_api_url(
348            &self.creds.base_url,
349            &self.creds.username,
350            &self.creds.password,
351            &action,
352        );
353
354        let mut movie: XtreamMovie = self.get_json(&url).await?;
355
356        // Check for "not found" — the API returns `info: []` (empty array)
357        // instead of a proper error.
358        if movie.info.is_none() {
359            return Err(XtreamError::NotFound(format!("movie {vod_id} not found")));
360        }
361
362        // Attach stream URL.
363        if let Some(ref data) = movie.movie_data {
364            let ext = data.container_extension.as_deref().unwrap_or("mp4");
365            movie.url = Some(build_stream_url(
366                &self.creds.base_url,
367                &self.creds.username,
368                &self.creds.password,
369                StreamType::Movie,
370                data.stream_id,
371                ext,
372            ));
373        }
374
375        Ok(movie)
376    }
377
378    // -- Series -------------------------------------------------------------
379
380    /// Fetch series listings, optionally filtered by category.
381    pub async fn get_series(
382        &self,
383        category_id: Option<&str>,
384    ) -> Result<Vec<XtreamShowListing>, XtreamError> {
385        let action = match category_id {
386            Some(cid) => format!("get_series&category_id={cid}"),
387            None => "get_series".to_string(),
388        };
389
390        let url = build_api_url(
391            &self.creds.base_url,
392            &self.creds.username,
393            &self.creds.password,
394            &action,
395        );
396
397        self.get_json(&url).await
398    }
399
400    /// Fetch detailed information about a specific series, including seasons
401    /// and episodes. Episode stream URLs are automatically attached.
402    pub async fn get_series_info(&self, series_id: i64) -> Result<XtreamShow, XtreamError> {
403        let action = format!("get_series_info&series_id={series_id}");
404        let url = build_api_url(
405            &self.creds.base_url,
406            &self.creds.username,
407            &self.creds.password,
408            &action,
409        );
410
411        let mut show: XtreamShow = self.get_json(&url).await?;
412
413        // Check for "not found".
414        if show.info.as_ref().is_none_or(|i| i.name.is_none()) {
415            return Err(XtreamError::NotFound(format!(
416                "series {series_id} not found"
417            )));
418        }
419
420        // Inject series_id into info (upstream TS library does this).
421        if let Some(ref mut info) = show.info {
422            info.series_id = Some(series_id);
423        }
424
425        // Attach episode stream URLs.
426        for episodes in show.episodes.values_mut() {
427            for ep in episodes.iter_mut() {
428                let ep_id = ep_id_as_i64(ep);
429                let ext = ep.container_extension.as_deref().unwrap_or("mp4");
430                if let Some(id) = ep_id {
431                    ep.url = Some(build_stream_url(
432                        &self.creds.base_url,
433                        &self.creds.username,
434                        &self.creds.password,
435                        StreamType::Episode,
436                        id,
437                        ext,
438                    ));
439                }
440            }
441        }
442
443        Ok(show)
444    }
445
446    // -- EPG ----------------------------------------------------------------
447
448    /// Fetch short EPG for a channel, with an optional entry limit.
449    ///
450    /// Base64-encoded titles and descriptions are decoded transparently.
451    pub async fn get_short_epg(
452        &self,
453        stream_id: i64,
454        limit: Option<u32>,
455    ) -> Result<XtreamShortEpg, XtreamError> {
456        let action = match limit {
457            Some(l) => format!("get_short_epg&stream_id={stream_id}&limit={l}"),
458            None => format!("get_short_epg&stream_id={stream_id}"),
459        };
460
461        let url = build_api_url(
462            &self.creds.base_url,
463            &self.creds.username,
464            &self.creds.password,
465            &action,
466        );
467
468        let mut epg: XtreamShortEpg = self.get_json(&url).await?;
469
470        // Decode base64 fields.
471        for listing in &mut epg.epg_listings {
472            listing.title = decode_epg_field(&listing.title);
473            listing.description = decode_epg_field(&listing.description);
474        }
475
476        Ok(epg)
477    }
478
479    /// Fetch full EPG for a channel.
480    ///
481    /// Base64-encoded titles and descriptions are decoded transparently.
482    pub async fn get_full_epg(&self, stream_id: i64) -> Result<XtreamFullEpg, XtreamError> {
483        let action = format!("get_simple_data_table&stream_id={stream_id}");
484        let url = build_api_url(
485            &self.creds.base_url,
486            &self.creds.username,
487            &self.creds.password,
488            &action,
489        );
490
491        let mut epg: XtreamFullEpg = self.get_json(&url).await?;
492
493        // Decode base64 fields.
494        for listing in &mut epg.epg_listings {
495            listing.title = decode_epg_field(&listing.title);
496            listing.description = decode_epg_field(&listing.description);
497        }
498
499        Ok(epg)
500    }
501
502    // -- URL helpers (synchronous) ------------------------------------------
503
504    /// Build a stream URL for a given content type and ID.
505    pub fn stream_url(&self, stream_type: StreamType, stream_id: i64, extension: &str) -> String {
506        build_stream_url(
507            &self.creds.base_url,
508            &self.creds.username,
509            &self.creds.password,
510            stream_type,
511            stream_id,
512            extension,
513        )
514    }
515
516    /// Build the XMLTV EPG URL.
517    pub fn xmltv_url(&self) -> String {
518        build_xmltv_url(
519            &self.creds.base_url,
520            &self.creds.username,
521            &self.creds.password,
522        )
523    }
524
525    /// Build a timeshift/catchup URL.
526    pub fn timeshift_url(&self, stream_id: i64, duration_minutes: u32, start: &str) -> String {
527        build_timeshift_url(
528            &self.creds.base_url,
529            &self.creds.username,
530            &self.creds.password,
531            stream_id,
532            duration_minutes,
533            start,
534        )
535    }
536
537    // -- Internal -----------------------------------------------------------
538
539    /// Send a GET request and deserialize the JSON response.
540    async fn get_json<T: serde::de::DeserializeOwned>(&self, url: &str) -> Result<T, XtreamError> {
541        let response = self
542            .http
543            .get(url)
544            .header("Content-Type", "application/json")
545            .send()
546            .await?;
547
548        let status = response.status();
549
550        match status {
551            StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
552                return Err(XtreamError::Auth(format!("server returned {status}")));
553            }
554            StatusCode::TOO_MANY_REQUESTS => {
555                let retry_after = response
556                    .headers()
557                    .get("retry-after")
558                    .and_then(|v| v.to_str().ok())
559                    .and_then(|v| v.parse::<u64>().ok())
560                    .unwrap_or(60);
561                return Err(XtreamError::RateLimited {
562                    retry_after_secs: retry_after,
563                });
564            }
565            s if s.is_server_error() => {
566                let body = response.text().await.unwrap_or_default();
567                return Err(XtreamError::Network(format!("server error {s}: {body}")));
568            }
569            _ => {}
570        }
571
572        if !status.is_success() {
573            let body = response.text().await.unwrap_or_default();
574            return Err(XtreamError::UnexpectedResponse(format!(
575                "unexpected status {status}: {body}"
576            )));
577        }
578
579        let text = response.text().await?;
580
581        // Some servers return empty responses or `{"info":[]}` for not-found.
582        if text.is_empty() {
583            return Err(XtreamError::UnexpectedResponse(
584                "empty response body".into(),
585            ));
586        }
587
588        serde_json::from_str(&text).map_err(|e| {
589            warn!(
590                error = %e,
591                body_preview = &text[..text.len().min(200)],
592                "failed to parse Xtream response"
593            );
594            XtreamError::UnexpectedResponse(format!("json parse error: {e}"))
595        })
596    }
597}
598
599/// Extract episode ID as i64 from the polymorphic `id` field.
600fn ep_id_as_i64(ep: &XtreamEpisode) -> Option<i64> {
601    ep.id.as_ref().and_then(|v| match v {
602        serde_json::Value::Number(n) => n.as_i64(),
603        serde_json::Value::String(s) => s.parse::<i64>().ok(),
604        _ => None,
605    })
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611
612    #[test]
613    fn credentials_debug_redacts_secrets() {
614        let creds = XtreamCredentials::new("http://example.com", "admin", "secret123");
615        let debug = format!("{creds:?}");
616        assert!(!debug.contains("admin"));
617        assert!(!debug.contains("secret123"));
618        assert!(debug.contains("***"));
619    }
620
621    #[test]
622    fn credentials_strip_trailing_slash() {
623        let creds = XtreamCredentials::new("http://example.com///", "u", "p");
624        assert_eq!(creds.base_url, "http://example.com");
625    }
626
627    #[test]
628    fn stream_url_via_client() {
629        let client =
630            XtreamClient::new(XtreamCredentials::new("http://example.com", "user", "pass"))
631                .unwrap();
632
633        let url = client.stream_url(StreamType::Channel, 42, "ts");
634        assert_eq!(url, "http://example.com/live/user/pass/42.ts");
635    }
636
637    #[test]
638    fn xmltv_url_via_client() {
639        let client =
640            XtreamClient::new(XtreamCredentials::new("http://example.com", "user", "pass"))
641                .unwrap();
642
643        let url = client.xmltv_url();
644        assert_eq!(
645            url,
646            "http://example.com/xmltv.php?username=user&password=pass"
647        );
648    }
649
650    #[test]
651    fn timeshift_url_via_client() {
652        let client =
653            XtreamClient::new(XtreamCredentials::new("http://example.com", "user", "pass"))
654                .unwrap();
655
656        let url = client.timeshift_url(42, 120, "2024-01-01:10-00");
657        assert_eq!(
658            url,
659            "http://example.com/timeshift/user/pass/120/2024-01-01:10-00/42.ts"
660        );
661    }
662
663    #[test]
664    fn default_config_values() {
665        let config = XtreamClientConfig::default();
666        assert_eq!(config.connect_timeout, Duration::from_secs(15));
667        assert_eq!(config.request_timeout, Duration::from_secs(120));
668        assert!(!config.accept_invalid_certs);
669        assert_eq!(config.preferred_format, StreamFormat::Ts);
670    }
671
672    #[test]
673    fn ep_id_from_number() {
674        let ep = XtreamEpisode {
675            id: Some(serde_json::Value::Number(serde_json::Number::from(42))),
676            ..default_episode()
677        };
678        assert_eq!(ep_id_as_i64(&ep), Some(42));
679    }
680
681    #[test]
682    fn ep_id_from_string() {
683        let ep = XtreamEpisode {
684            id: Some(serde_json::Value::String("99".into())),
685            ..default_episode()
686        };
687        assert_eq!(ep_id_as_i64(&ep), Some(99));
688    }
689
690    #[test]
691    fn ep_id_none() {
692        let ep = XtreamEpisode {
693            id: None,
694            ..default_episode()
695        };
696        assert_eq!(ep_id_as_i64(&ep), None);
697    }
698
699    fn default_episode() -> XtreamEpisode {
700        XtreamEpisode {
701            id: None,
702            episode_num: None,
703            title: None,
704            container_extension: None,
705            info: None,
706            custom_sid: None,
707            added: None,
708            season: None,
709            direct_source: None,
710            subtitles: None,
711            url: None,
712        }
713    }
714}