1use crate::fetch::{
8 strategies::StrategyManager,
9 types::{ChannelInfo, ClientConfig, FetchResult, FetchError, RequestContext, FetchMetrics, StrategyConfig},
10};
11use reqwest::{Client, redirect::Policy};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{info, warn, debug, error};
16use std::collections::HashMap;
17
18pub struct KickApiClient {
20 client: Client,
22 strategy_manager: Arc<StrategyManager>,
24 config: ClientConfig,
26 strategy_config: StrategyConfig,
28 metrics: Arc<RwLock<FetchMetrics>>,
30 cache: Arc<RwLock<HashMap<String, CachedChannel>>>,
32}
33
34#[derive(Debug, Clone)]
36struct CachedChannel {
37 channel: ChannelInfo,
38 timestamp: Instant,
39 ttl: Duration,
40}
41
42impl CachedChannel {
43 fn new(channel: ChannelInfo, ttl: Duration) -> Self {
44 Self {
45 channel,
46 timestamp: Instant::now(),
47 ttl,
48 }
49 }
50
51 fn is_expired(&self) -> bool {
52 self.timestamp.elapsed() > self.ttl
53 }
54}
55
56impl KickApiClient {
57 pub fn new() -> FetchResult<Self> {
59 let config = ClientConfig::default();
60 Self::with_config(config)
61 }
62
63 pub fn with_config(config: ClientConfig) -> FetchResult<Self> {
65 let mut builder = Client::builder()
66 .timeout(Duration::from_secs(config.timeout_seconds))
67 .cookie_store(true)
68 .redirect(Policy::limited(5));
69
70 let user_agent = config.user_agent.as_deref()
72 .unwrap_or("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36");
73 builder = builder.user_agent(user_agent);
74
75 let client = builder.build()
77 .map_err(|e| FetchError::Network(format!("Failed to build HTTP client: {}", e)))?;
78
79 let strategy_config = StrategyConfig::default();
81 let strategy_manager = Arc::new(StrategyManager::new()?);
82
83 Ok(Self {
84 client,
85 strategy_manager,
86 config,
87 strategy_config,
88 metrics: Arc::new(RwLock::new(FetchMetrics::default())),
89 cache: Arc::new(RwLock::new(HashMap::new())),
90 })
91 }
92
93 pub fn with_full_config(
95 client_config: ClientConfig,
96 strategy_config: StrategyConfig,
97 ) -> FetchResult<Self> {
98 let mut builder = Client::builder()
99 .timeout(Duration::from_secs(client_config.timeout_seconds))
100 .cookie_store(true)
101 .redirect(Policy::limited(5));
102
103 let user_agent = client_config.user_agent.as_deref()
105 .unwrap_or("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36");
106 builder = builder.user_agent(user_agent);
107
108 let mut headers = reqwest::header::HeaderMap::new();
110 for (key, value) in &client_config.custom_headers {
111 if let (Ok(name), Ok(val)) = (
112 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
113 reqwest::header::HeaderValue::from_str(value),
114 ) {
115 headers.insert(name, val);
116 }
117 }
118 builder = builder.default_headers(headers);
119
120 let client = builder.build()
122 .map_err(|e| FetchError::Network(format!("Failed to build HTTP client: {}", e)))?;
123
124 let strategy_manager = Arc::new(StrategyManager::with_config(&strategy_config)?);
126
127 Ok(Self {
128 client,
129 strategy_manager,
130 config: client_config,
131 strategy_config,
132 metrics: Arc::new(RwLock::new(FetchMetrics::default())),
133 cache: Arc::new(RwLock::new(HashMap::new())),
134 })
135 }
136
137 pub async fn get_channel(&self, channel_name: &str) -> FetchResult<ChannelInfo> {
140 let start_time = Instant::now();
141 let context = RequestContext::new(channel_name, "auto");
142
143 info!("Fetching channel: {}", channel_name);
144
145 if let Some(cached) = self.check_cache(channel_name).await {
147 debug!("Channel {} found in cache", channel_name);
148 self.record_metrics(start_time, true).await;
149 return Ok(cached);
150 }
151
152 let result = self.fetch_with_retries(channel_name, &context).await;
154
155 let success = result.is_ok();
157 self.record_metrics(start_time, success).await;
158
159 if let Ok(ref channel) = result {
161 self.cache_channel(channel_name, channel).await;
162 }
163
164 result
165 }
166
167 async fn fetch_with_retries(&self, channel_name: &str, _context: &RequestContext) -> FetchResult<ChannelInfo> {
169 let mut last_error = None;
170
171 for attempt in 1..=self.config.max_retries {
172 debug!("Attempt {} for channel {}", attempt, channel_name);
173
174 match self.strategy_manager.fetch_channel(&self.client, channel_name, &self.strategy_config).await {
175 Ok(channel) => {
176 info!("Successfully fetched channel {} on attempt {}", channel_name, attempt);
177 return Ok(channel);
178 }
179 Err(e) => {
180 warn!("Attempt {} failed for channel {}: {}", attempt, channel_name, e);
181 last_error = Some(e.clone());
182
183 match &e {
185 FetchError::ChannelNotFound(_) => break,
186 FetchError::AuthenticationRequired => break,
187 _ => {
188 if attempt < self.config.max_retries {
190 let delay = self.config.retry_delay_ms * attempt as u64;
191 debug!("Waiting {}ms before retry", delay);
192 tokio::time::sleep(Duration::from_millis(delay)).await;
193 }
194 }
195 }
196 }
197 }
198 }
199
200 Err(last_error.unwrap_or_else(|| FetchError::ChannelNotFound(format!("All retries failed for channel: {}", channel_name))))
201 }
202
203 pub async fn channel_exists(&self, channel_name: &str) -> FetchResult<bool> {
205 match self.get_channel(channel_name).await {
206 Ok(_) => Ok(true),
207 Err(FetchError::ChannelNotFound(_)) => Ok(false),
208 Err(e) => Err(e),
209 }
210 }
211
212 pub async fn get_channel_id(&self, channel_name: &str) -> FetchResult<u64> {
214 let channel = self.get_channel(channel_name).await?;
215 Ok(channel.id)
216 }
217
218 pub async fn get_chatroom_id(&self, channel_name: &str) -> FetchResult<u64> {
220 let channel = self.get_channel(channel_name).await?;
221 channel.chatroom
222 .map(|chatroom| chatroom.id)
223 .ok_or_else(|| FetchError::InvalidResponse)
224 }
225
226 pub async fn get_channels(&self, channel_names: &[&str]) -> Vec<FetchResult<ChannelInfo>> {
228 let futures: Vec<_> = channel_names.iter()
229 .map(|&name| self.get_channel(name))
230 .collect();
231
232 futures_util::future::join_all(futures).await
233 }
234
235 pub async fn get_metrics(&self) -> FetchMetrics {
237 self.metrics.read().await.clone()
238 }
239
240 pub async fn reset_metrics(&self) {
242 *self.metrics.write().await = FetchMetrics::default();
243 }
244
245 pub async fn clear_cache(&self) {
247 self.cache.write().await.clear();
248 }
249
250 pub fn strategy_names(&self) -> Vec<&'static str> {
252 self.strategy_manager.strategy_names()
253 }
254
255 pub async fn test_connection(&self) -> FetchResult<()> {
257 let test_channel = "kick"; match self.get_channel(test_channel).await {
259 Ok(_) => {
260 info!("Connection test successful");
261 Ok(())
262 }
263 Err(e) => {
264 error!("Connection test failed: {}", e);
265 Err(e)
266 }
267 }
268 }
269
270 async fn check_cache(&self, channel_name: &str) -> Option<ChannelInfo> {
272 let cache = self.cache.read().await;
273 cache.get(channel_name)
274 .filter(|cached| !cached.is_expired())
275 .map(|cached| cached.channel.clone())
276 }
277
278 async fn cache_channel(&self, channel_name: &str, channel: &ChannelInfo) {
280 let ttl = Duration::from_secs(300); let cached = CachedChannel::new(channel.clone(), ttl);
282
283 let mut cache = self.cache.write().await;
284 cache.insert(channel_name.to_string(), cached);
285
286 cache.retain(|_, cached| !cached.is_expired());
288 }
289
290 async fn record_metrics(&self, start_time: Instant, success: bool) {
292 let response_time = start_time.elapsed().as_millis() as u64;
293 let mut metrics = self.metrics.write().await;
294
295 if success {
296 metrics.record_success(response_time);
297 } else {
298 metrics.record_failure();
299 }
300 }
301}
302
303impl Default for KickApiClient {
304 fn default() -> Self {
305 Self::new().expect("Failed to create default KickApiClient")
306 }
307}
308
309pub struct KickApiClientBuilder {
311 client_config: ClientConfig,
312 strategy_config: StrategyConfig,
313}
314
315impl KickApiClientBuilder {
316 pub fn new() -> Self {
318 Self {
319 client_config: ClientConfig::default(),
320 strategy_config: StrategyConfig::default(),
321 }
322 }
323
324 pub fn user_agent<S: Into<String>>(mut self, user_agent: S) -> Self {
326 self.client_config.user_agent = Some(user_agent.into());
327 self
328 }
329
330 pub fn timeout(mut self, seconds: u64) -> Self {
332 self.client_config.timeout_seconds = seconds;
333 self
334 }
335
336 pub fn max_retries(mut self, retries: u32) -> Self {
338 self.client_config.max_retries = retries;
339 self
340 }
341
342 pub fn retry_delay(mut self, delay_ms: u64) -> Self {
344 self.client_config.retry_delay_ms = delay_ms;
345 self
346 }
347
348 pub fn enable_logging(mut self, enable: bool) -> Self {
350 self.client_config.enable_logging = enable;
351 self
352 }
353
354 pub fn header<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
356 self.client_config.custom_headers.push((key.into(), value.into()));
357 self
358 }
359
360 pub fn random_delay(mut self, min_ms: u64, max_ms: u64) -> Self {
362 self.strategy_config.random_delay_range = (min_ms, max_ms);
363 self
364 }
365
366
367
368 pub fn build(self) -> FetchResult<KickApiClient> {
370 KickApiClient::with_full_config(self.client_config, self.strategy_config)
371 }
372}
373
374impl Default for KickApiClientBuilder {
375 fn default() -> Self {
376 Self::new()
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 #[tokio::test]
385 async fn test_client_creation() {
386 let client = KickApiClient::new();
387 assert!(client.is_ok());
388 }
389
390 #[tokio::test]
391 async fn test_client_builder() {
392 let client = KickApiClientBuilder::new()
393 .timeout(60)
394 .max_retries(5)
395 .build();
396
397 assert!(client.is_ok());
398 }
399
400 #[tokio::test]
401 async fn test_metrics() {
402 let client = KickApiClient::new().unwrap();
403
404 let metrics = client.get_metrics().await;
406 assert_eq!(metrics.total_requests, 0);
407 assert_eq!(metrics.success_rate(), 0.0);
408
409 client.reset_metrics().await;
411 let metrics = client.get_metrics().await;
412 assert_eq!(metrics.total_requests, 0);
413 }
414
415 #[tokio::test]
416 async fn test_cache_operations() {
417 let client = KickApiClient::new().unwrap();
418
419 client.clear_cache().await;
421
422 let strategies = client.strategy_names();
424 assert!(!strategies.is_empty());
425 }
426
427 #[tokio::test]
428 async fn test_cached_channel() {
429 let channel = ChannelInfo {
430 id: 12345,
431 slug: "test".to_string(),
432 title: Some("Test".to_string()),
433 followers_count: None,
434 subscribers_count: None,
435 is_live: false,
436 viewers_count: None,
437 category: None,
438 tags: None,
439 language: None,
440 user: None,
441 chatroom: None,
442 };
443
444 let cached = CachedChannel::new(channel.clone(), Duration::from_secs(60));
445 assert!(!cached.is_expired());
446
447 let cached_expired = CachedChannel::new(channel, Duration::from_millis(1));
448 tokio::time::sleep(Duration::from_millis(10)).await;
449 assert!(cached_expired.is_expired());
450 }
451}