kick_rust/fetch/
client.rs

1//! Main HTTP client for Kick API operations
2//!
3//! This module provides a clean, modern HTTP client that focuses on the official
4//! Kick API endpoint: https://kick.com/api/v2/channels/{channel_name}
5//! with multiple fallback strategies for reliability.
6
7use crate::fetch::{
8    strategies::StrategyManager,
9    types::{ChannelInfo, ClientConfig, FetchResult, FetchError, RequestContext, FetchMetrics, StrategyConfig},
10};
11use reqwest::{Client, redirect::Policy};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{info, warn, debug, error};
16use std::collections::HashMap;
17
18/// Main HTTP client for Kick API operations
19pub struct KickApiClient {
20    /// HTTP client for making requests
21    client: Client,
22    /// Strategy manager for different fetching approaches
23    strategy_manager: Arc<StrategyManager>,
24    /// Client configuration
25    config: ClientConfig,
26    /// Strategy configuration
27    strategy_config: StrategyConfig,
28    /// Request metrics
29    metrics: Arc<RwLock<FetchMetrics>>,
30    /// Request cache (optional, for rate limiting)
31    cache: Arc<RwLock<HashMap<String, CachedChannel>>>,
32}
33
34/// Cached channel information
35#[derive(Debug, Clone)]
36struct CachedChannel {
37    channel: ChannelInfo,
38    timestamp: Instant,
39    ttl: Duration,
40}
41
42impl CachedChannel {
43    fn new(channel: ChannelInfo, ttl: Duration) -> Self {
44        Self {
45            channel,
46            timestamp: Instant::now(),
47            ttl,
48        }
49    }
50
51    fn is_expired(&self) -> bool {
52        self.timestamp.elapsed() > self.ttl
53    }
54}
55
56impl KickApiClient {
57    /// Create a new Kick API client with default configuration
58    pub fn new() -> FetchResult<Self> {
59        let config = ClientConfig::default();
60        Self::with_config(config)
61    }
62
63    /// Create a new Kick API client with custom configuration
64    pub fn with_config(config: ClientConfig) -> FetchResult<Self> {
65        let mut builder = Client::builder()
66            .timeout(Duration::from_secs(config.timeout_seconds))
67            .cookie_store(true)
68            .redirect(Policy::limited(5));
69
70        // Set user agent
71        let user_agent = config.user_agent.as_deref()
72            .unwrap_or("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36");
73        builder = builder.user_agent(user_agent);
74
75        // Build the HTTP client
76        let client = builder.build()
77            .map_err(|e| FetchError::Network(format!("Failed to build HTTP client: {}", e)))?;
78
79        // Create strategy manager
80        let strategy_config = StrategyConfig::default();
81        let strategy_manager = Arc::new(StrategyManager::new()?);
82
83        Ok(Self {
84            client,
85            strategy_manager,
86            config,
87            strategy_config,
88            metrics: Arc::new(RwLock::new(FetchMetrics::default())),
89            cache: Arc::new(RwLock::new(HashMap::new())),
90        })
91    }
92
93    /// Create a client with both client and strategy configurations
94    pub fn with_full_config(
95        client_config: ClientConfig,
96        strategy_config: StrategyConfig,
97    ) -> FetchResult<Self> {
98        let mut builder = Client::builder()
99            .timeout(Duration::from_secs(client_config.timeout_seconds))
100            .cookie_store(true)
101            .redirect(Policy::limited(5));
102
103        // Set user agent
104        let user_agent = client_config.user_agent.as_deref()
105            .unwrap_or("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36");
106        builder = builder.user_agent(user_agent);
107
108        // Add custom headers using default_headers
109        let mut headers = reqwest::header::HeaderMap::new();
110        for (key, value) in &client_config.custom_headers {
111            if let (Ok(name), Ok(val)) = (
112                reqwest::header::HeaderName::from_bytes(key.as_bytes()),
113                reqwest::header::HeaderValue::from_str(value),
114            ) {
115                headers.insert(name, val);
116            }
117        }
118        builder = builder.default_headers(headers);
119
120        // Build the HTTP client
121        let client = builder.build()
122            .map_err(|e| FetchError::Network(format!("Failed to build HTTP client: {}", e)))?;
123
124        // Create strategy manager with custom config
125        let strategy_manager = Arc::new(StrategyManager::with_config(&strategy_config)?);
126
127        Ok(Self {
128            client,
129            strategy_manager,
130            config: client_config,
131            strategy_config,
132            metrics: Arc::new(RwLock::new(FetchMetrics::default())),
133            cache: Arc::new(RwLock::new(HashMap::new())),
134        })
135    }
136
137    /// Fetch channel information by channel name
138    /// This is the main method that uses the official API endpoint
139    pub async fn get_channel(&self, channel_name: &str) -> FetchResult<ChannelInfo> {
140        let start_time = Instant::now();
141        let context = RequestContext::new(channel_name, "auto");
142
143        info!("Fetching channel: {}", channel_name);
144
145        // Check cache first
146        if let Some(cached) = self.check_cache(channel_name).await {
147            debug!("Channel {} found in cache", channel_name);
148            self.record_metrics(start_time, true).await;
149            return Ok(cached);
150        }
151
152        // Try to fetch with retries
153        let result = self.fetch_with_retries(channel_name, &context).await;
154
155        // Record metrics
156        let success = result.is_ok();
157        self.record_metrics(start_time, success).await;
158
159        // Cache successful results
160        if let Ok(ref channel) = result {
161            self.cache_channel(channel_name, channel).await;
162        }
163
164        result
165    }
166
167    /// Fetch channel information with automatic retries
168    async fn fetch_with_retries(&self, channel_name: &str, _context: &RequestContext) -> FetchResult<ChannelInfo> {
169        let mut last_error = None;
170
171        for attempt in 1..=self.config.max_retries {
172            debug!("Attempt {} for channel {}", attempt, channel_name);
173
174            match self.strategy_manager.fetch_channel(&self.client, channel_name, &self.strategy_config).await {
175                Ok(channel) => {
176                    info!("Successfully fetched channel {} on attempt {}", channel_name, attempt);
177                    return Ok(channel);
178                }
179                Err(e) => {
180                    warn!("Attempt {} failed for channel {}: {}", attempt, channel_name, e);
181                    last_error = Some(e.clone());
182
183                    // Don't retry on certain errors
184                    match &e {
185                        FetchError::ChannelNotFound(_) => break,
186                        FetchError::AuthenticationRequired => break,
187                        _ => {
188                            // Add delay before retry
189                            if attempt < self.config.max_retries {
190                                let delay = self.config.retry_delay_ms * attempt as u64;
191                                debug!("Waiting {}ms before retry", delay);
192                                tokio::time::sleep(Duration::from_millis(delay)).await;
193                            }
194                        }
195                    }
196                }
197            }
198        }
199
200        Err(last_error.unwrap_or_else(|| FetchError::ChannelNotFound(format!("All retries failed for channel: {}", channel_name))))
201    }
202
203    /// Check if channel exists (faster version that only gets basic info)
204    pub async fn channel_exists(&self, channel_name: &str) -> FetchResult<bool> {
205        match self.get_channel(channel_name).await {
206            Ok(_) => Ok(true),
207            Err(FetchError::ChannelNotFound(_)) => Ok(false),
208            Err(e) => Err(e),
209        }
210    }
211
212    /// Get channel ID only (lightweight request)
213    pub async fn get_channel_id(&self, channel_name: &str) -> FetchResult<u64> {
214        let channel = self.get_channel(channel_name).await?;
215        Ok(channel.id)
216    }
217
218    /// Get chatroom ID only (for WebSocket subscription)
219    pub async fn get_chatroom_id(&self, channel_name: &str) -> FetchResult<u64> {
220        let channel = self.get_channel(channel_name).await?;
221        channel.chatroom
222            .map(|chatroom| chatroom.id)
223            .ok_or_else(|| FetchError::InvalidResponse)
224    }
225
226    /// Get multiple channels in parallel
227    pub async fn get_channels(&self, channel_names: &[&str]) -> Vec<FetchResult<ChannelInfo>> {
228        let futures: Vec<_> = channel_names.iter()
229            .map(|&name| self.get_channel(name))
230            .collect();
231
232        futures_util::future::join_all(futures).await
233    }
234
235    /// Get current metrics
236    pub async fn get_metrics(&self) -> FetchMetrics {
237        self.metrics.read().await.clone()
238    }
239
240    /// Reset metrics
241    pub async fn reset_metrics(&self) {
242        *self.metrics.write().await = FetchMetrics::default();
243    }
244
245    /// Clear cache
246    pub async fn clear_cache(&self) {
247        self.cache.write().await.clear();
248    }
249
250    /// Get available strategy names
251    pub fn strategy_names(&self) -> Vec<&'static str> {
252        self.strategy_manager.strategy_names()
253    }
254
255    /// Test connection to Kick API
256    pub async fn test_connection(&self) -> FetchResult<()> {
257        let test_channel = "kick"; // Official Kick channel
258        match self.get_channel(test_channel).await {
259            Ok(_) => {
260                info!("Connection test successful");
261                Ok(())
262            }
263            Err(e) => {
264                error!("Connection test failed: {}", e);
265                Err(e)
266            }
267        }
268    }
269
270    /// Check cache for channel information
271    async fn check_cache(&self, channel_name: &str) -> Option<ChannelInfo> {
272        let cache = self.cache.read().await;
273        cache.get(channel_name)
274            .filter(|cached| !cached.is_expired())
275            .map(|cached| cached.channel.clone())
276    }
277
278    /// Cache channel information
279    async fn cache_channel(&self, channel_name: &str, channel: &ChannelInfo) {
280        let ttl = Duration::from_secs(300); // 5 minutes cache
281        let cached = CachedChannel::new(channel.clone(), ttl);
282
283        let mut cache = self.cache.write().await;
284        cache.insert(channel_name.to_string(), cached);
285
286        // Clean up expired entries
287        cache.retain(|_, cached| !cached.is_expired());
288    }
289
290    /// Record metrics for a request
291    async fn record_metrics(&self, start_time: Instant, success: bool) {
292        let response_time = start_time.elapsed().as_millis() as u64;
293        let mut metrics = self.metrics.write().await;
294
295        if success {
296            metrics.record_success(response_time);
297        } else {
298            metrics.record_failure();
299        }
300    }
301}
302
303impl Default for KickApiClient {
304    fn default() -> Self {
305        Self::new().expect("Failed to create default KickApiClient")
306    }
307}
308
309/// Builder for creating KickApiClient with custom configuration
310pub struct KickApiClientBuilder {
311    client_config: ClientConfig,
312    strategy_config: StrategyConfig,
313}
314
315impl KickApiClientBuilder {
316    /// Create a new builder
317    pub fn new() -> Self {
318        Self {
319            client_config: ClientConfig::default(),
320            strategy_config: StrategyConfig::default(),
321        }
322    }
323
324    /// Set user agent
325    pub fn user_agent<S: Into<String>>(mut self, user_agent: S) -> Self {
326        self.client_config.user_agent = Some(user_agent.into());
327        self
328    }
329
330    /// Set timeout in seconds
331    pub fn timeout(mut self, seconds: u64) -> Self {
332        self.client_config.timeout_seconds = seconds;
333        self
334    }
335
336    /// Set maximum retries
337    pub fn max_retries(mut self, retries: u32) -> Self {
338        self.client_config.max_retries = retries;
339        self
340    }
341
342    /// Set retry delay in milliseconds
343    pub fn retry_delay(mut self, delay_ms: u64) -> Self {
344        self.client_config.retry_delay_ms = delay_ms;
345        self
346    }
347
348    /// Enable/disable logging
349    pub fn enable_logging(mut self, enable: bool) -> Self {
350        self.client_config.enable_logging = enable;
351        self
352    }
353
354    /// Add custom header
355    pub fn header<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
356        self.client_config.custom_headers.push((key.into(), value.into()));
357        self
358    }
359
360    /// Set random delay range for rate limiting
361    pub fn random_delay(mut self, min_ms: u64, max_ms: u64) -> Self {
362        self.strategy_config.random_delay_range = (min_ms, max_ms);
363        self
364    }
365
366
367
368    /// Build the client
369    pub fn build(self) -> FetchResult<KickApiClient> {
370        KickApiClient::with_full_config(self.client_config, self.strategy_config)
371    }
372}
373
374impl Default for KickApiClientBuilder {
375    fn default() -> Self {
376        Self::new()
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    #[tokio::test]
385    async fn test_client_creation() {
386        let client = KickApiClient::new();
387        assert!(client.is_ok());
388    }
389
390    #[tokio::test]
391    async fn test_client_builder() {
392        let client = KickApiClientBuilder::new()
393            .timeout(60)
394            .max_retries(5)
395            .build();
396
397        assert!(client.is_ok());
398    }
399
400    #[tokio::test]
401    async fn test_metrics() {
402        let client = KickApiClient::new().unwrap();
403
404        // Initial metrics should be empty
405        let metrics = client.get_metrics().await;
406        assert_eq!(metrics.total_requests, 0);
407        assert_eq!(metrics.success_rate(), 0.0);
408
409        // Reset metrics
410        client.reset_metrics().await;
411        let metrics = client.get_metrics().await;
412        assert_eq!(metrics.total_requests, 0);
413    }
414
415    #[tokio::test]
416    async fn test_cache_operations() {
417        let client = KickApiClient::new().unwrap();
418
419        // Clear cache
420        client.clear_cache().await;
421
422        // Check strategy names
423        let strategies = client.strategy_names();
424        assert!(!strategies.is_empty());
425    }
426
427    #[tokio::test]
428    async fn test_cached_channel() {
429        let channel = ChannelInfo {
430            id: 12345,
431            slug: "test".to_string(),
432            title: Some("Test".to_string()),
433            followers_count: None,
434            subscribers_count: None,
435            is_live: false,
436            viewers_count: None,
437            category: None,
438            tags: None,
439            language: None,
440            user: None,
441            chatroom: None,
442        };
443
444        let cached = CachedChannel::new(channel.clone(), Duration::from_secs(60));
445        assert!(!cached.is_expired());
446
447        let cached_expired = CachedChannel::new(channel, Duration::from_millis(1));
448        tokio::time::sleep(Duration::from_millis(10)).await;
449        assert!(cached_expired.is_expired());
450    }
451}