1use std::collections::HashMap;
38use std::path::PathBuf;
39use std::rc::Rc;
40
41use anyhow::Result;
42use tokio::sync::{mpsc, oneshot};
43use url::Url;
44use uuid::Uuid;
45
46use crate::engine::{
47 DownloadEngine, DownloadStatus, DownloadTask, EngineCommand, EngineEvent, HttpMode,
48 ScheduleMode,
49};
50use crate::storage::StorageConfig;
51
52#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
58pub struct CookieEntry {
59 pub name: String,
60 pub value: String,
61 pub domain: String,
62 pub path: String,
63 pub secure: bool,
64 pub expires: Option<String>,
65}
66
67impl CookieEntry {
68 pub fn new(
69 name: impl Into<String>,
70 value: impl Into<String>,
71 domain: impl Into<String>,
72 ) -> Self {
73 Self {
74 name: name.into(),
75 value: value.into(),
76 domain: domain.into(),
77 path: "/".into(),
78 secure: false,
79 expires: None,
80 }
81 }
82
83 pub fn from_set_cookie(header: &str, request_url: &Url) -> Option<Self> {
86 let mut name = String::new();
87 let mut value = String::new();
88 let mut domain = request_url.host_str()?.to_string();
89 let mut path = "/".to_string();
90 let mut secure = false;
91 let mut expires = None;
92
93 let mut parts = header.split(';');
94 if let Some(first) = parts.next() {
95 let eq_pos = first.find('=')?;
96 name = first[..eq_pos].trim().to_string();
97 value = first[eq_pos + 1..].trim().to_string();
98 }
99
100 for part in parts {
101 let part = part.trim();
102 if let Some(eq_pos) = part.find('=') {
103 let key = part[..eq_pos].trim().to_ascii_lowercase();
104 let val = part[eq_pos + 1..].trim().to_string();
105 match key.as_str() {
106 "domain" => domain = val.trim_start_matches('.').to_string(),
107 "path" => path = val,
108 "expires" => expires = Some(val),
109 _ => {}
110 }
111 } else if part.eq_ignore_ascii_case("secure") {
112 secure = true;
113 }
114 }
115
116 Some(Self {
117 name,
118 value,
119 domain,
120 path,
121 secure,
122 expires,
123 })
124 }
125
126 pub fn to_request_value(&self) -> String {
128 format!("{}={}", self.name, self.value)
129 }
130}
131
132#[derive(Debug, Clone, Default)]
134pub struct CookieJar {
135 cookies: Vec<CookieEntry>,
136}
137
138impl CookieJar {
139 pub fn new() -> Self {
140 Self {
141 cookies: Vec::new(),
142 }
143 }
144
145 pub fn insert(&mut self, cookie: CookieEntry) {
147 self.cookies.retain(|c| {
148 !(c.name == cookie.name && c.domain == cookie.domain && c.path == cookie.path)
149 });
150 self.cookies.push(cookie);
151 }
152
153 pub fn match_url(&self, url: &Url) -> Vec<&CookieEntry> {
155 let host = url.host_str().unwrap_or("");
156 let path = url.path();
157 self.cookies
158 .iter()
159 .filter(|c| {
160 let domain_match = host == c.domain || host.ends_with(&format!(".{}", c.domain));
161 let path_match = path.starts_with(&c.path);
162 let secure_ok = !c.secure || url.scheme() == "https";
163 domain_match && path_match && secure_ok
164 })
165 .collect()
166 }
167
168 pub fn header_value_for_url(&self, url: &Url) -> Option<String> {
170 let matched = self.match_url(url);
171 if matched.is_empty() {
172 return None;
173 }
174 Some(
175 matched
176 .iter()
177 .map(|c| c.to_request_value())
178 .collect::<Vec<_>>()
179 .join("; "),
180 )
181 }
182
183 pub fn import_lines(&mut self, lines: &str, default_domain: &str) {
185 for line in lines.lines() {
186 let line = line.trim();
187 if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
188 continue;
189 }
190 let parts: Vec<&str> = line.split('\t').collect();
191 if parts.len() >= 7 {
192 let domain = parts[0].trim_start_matches('.');
193 let path = parts[2];
194 let secure = parts[3] == "TRUE";
195 let name = parts[5];
196 let value = parts[6];
197 self.insert(CookieEntry {
198 name: name.to_string(),
199 value: value.to_string(),
200 domain: domain.to_string(),
201 path: path.to_string(),
202 secure,
203 expires: None,
204 });
205 } else if let Some(eq_pos) = line.find('=') {
206 let name = line[..eq_pos].trim();
207 let value = line[eq_pos + 1..].trim();
208 if !name.is_empty() {
209 self.insert(CookieEntry::new(name, value, default_domain));
210 }
211 }
212 }
213 }
214
215 pub fn export_netscape(&self) -> String {
217 let mut out = String::new();
218 out.push_str("# Netscape HTTP Cookie File\n");
219 out.push_str("# https://curl.se/rfc/cookie_spec.html\n");
220 out.push_str("# This file was generated by tur-rs\n");
221 for c in &self.cookies {
222 let secure = if c.secure { "TRUE" } else { "FALSE" };
223 let expires = c.expires.as_deref().unwrap_or("0");
224 out.push_str(&format!(
225 "{}\tTRUE\t{}\t{}\t{}\t{}\t{}\n",
226 c.domain, c.path, secure, expires, c.name, c.value
227 ));
228 }
229 out
230 }
231
232 pub fn len(&self) -> usize {
233 self.cookies.len()
234 }
235 pub fn is_empty(&self) -> bool {
236 self.cookies.is_empty()
237 }
238}
239
240#[derive(Debug, Clone, Default)]
246pub struct RequestContext {
247 pub headers: HashMap<String, String>,
249 pub auth: Option<String>,
251 pub referer: Option<String>,
253 pub user_agent: Option<String>,
255 pub cookies: Option<Vec<CookieEntry>>,
257}
258
259impl RequestContext {
260 pub fn new() -> Self {
261 Self::default()
262 }
263
264 pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
266 self.headers.insert(name.into(), value.into());
267 self
268 }
269
270 pub fn auth(mut self, value: impl Into<String>) -> Self {
272 self.auth = Some(value.into());
273 self
274 }
275
276 pub fn referer(mut self, url: impl Into<String>) -> Self {
278 self.referer = Some(url.into());
279 self
280 }
281
282 pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
284 self.user_agent = Some(ua.into());
285 self
286 }
287
288 pub fn cookies(mut self, cookies: Vec<CookieEntry>) -> Self {
290 self.cookies = Some(cookies);
291 self
292 }
293}
294
295#[derive(Debug, Clone, Default)]
303pub struct SessionContext {
304 pub cookies: Vec<CookieEntry>,
305 pub headers: HashMap<String, String>,
306 pub auth: Option<String>,
307 pub referer: Option<String>,
308 pub user_agent: Option<String>,
309}
310
311impl SessionContext {
312 pub fn new() -> Self {
313 Self::default()
314 }
315
316 pub fn to_request_context(&self) -> RequestContext {
318 RequestContext {
319 headers: self.headers.clone(),
320 auth: self.auth.clone(),
321 referer: self.referer.clone(),
322 user_agent: self.user_agent.clone(),
323 cookies: if self.cookies.is_empty() {
324 None
325 } else {
326 Some(self.cookies.clone())
327 },
328 }
329 }
330
331 pub fn cookie(mut self, entry: CookieEntry) -> Self {
333 self.cookies.push(entry);
334 self
335 }
336
337 pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
339 self.headers.insert(name.into(), value.into());
340 self
341 }
342
343 pub fn auth(mut self, value: impl Into<String>) -> Self {
345 self.auth = Some(value.into());
346 self
347 }
348
349 pub fn referer(mut self, url: impl Into<String>) -> Self {
351 self.referer = Some(url.into());
352 self
353 }
354
355 pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
357 self.user_agent = Some(ua.into());
358 self
359 }
360}
361
362#[derive(Debug, Clone)]
368pub struct ServiceConfig {
369 pub connections_per_download: usize,
371 pub max_concurrent_tasks: usize,
373 pub max_total_connections: usize,
375 pub global_bandwidth_limit_bps: u64,
377 pub enable_origin_memory: bool,
379 pub storage_config: StorageConfig,
381}
382
383impl Default for ServiceConfig {
384 fn default() -> Self {
385 Self {
386 connections_per_download: 8,
387 max_concurrent_tasks: 3,
388 max_total_connections: 32,
389 global_bandwidth_limit_bps: 0,
390 enable_origin_memory: true,
391 storage_config: StorageConfig::default(),
392 }
393 }
394}
395
396#[derive(Debug, Clone)]
410pub struct DownloadRequest {
411 pub url: String,
412 pub dir: PathBuf,
413 pub filename: Option<String>,
414 pub connections: Option<usize>,
415 pub min_connections: Option<usize>,
416 pub max_connections: Option<usize>,
417 pub borrow_limit_mb: Option<u64>,
418 pub per_download_bandwidth_limit_bps: Option<u64>,
419 pub http_mode: Option<HttpMode>,
420 pub schedule_mode: Option<ScheduleMode>,
421 pub dry_run: bool,
422 pub dry_run_size_mb: Option<u64>,
423 pub request_context: Option<RequestContext>,
425}
426
427impl DownloadRequest {
428 pub fn new(url: impl Into<String>) -> Self {
430 Self {
431 url: url.into(),
432 dir: PathBuf::from("."),
433 filename: None,
434 connections: None,
435 min_connections: None,
436 max_connections: None,
437 borrow_limit_mb: None,
438 per_download_bandwidth_limit_bps: None,
439 http_mode: None,
440 schedule_mode: None,
441 dry_run: false,
442 dry_run_size_mb: None,
443 request_context: None,
444 }
445 }
446
447 pub fn dir(mut self, dir: impl Into<PathBuf>) -> Self {
449 self.dir = dir.into();
450 self
451 }
452
453 pub fn connections(mut self, n: usize) -> Self {
455 self.connections = Some(n);
456 self
457 }
458
459 pub fn filename(mut self, name: impl Into<String>) -> Self {
461 self.filename = Some(name.into());
462 self
463 }
464
465 pub fn min_connections(mut self, n: usize) -> Self {
467 self.min_connections = Some(n);
468 self
469 }
470
471 pub fn max_connections(mut self, n: usize) -> Self {
473 self.max_connections = Some(n);
474 self
475 }
476
477 pub fn borrow_limit_mb(mut self, mb: u64) -> Self {
479 self.borrow_limit_mb = Some(mb);
480 self
481 }
482
483 pub fn per_download_bandwidth_limit_bps(mut self, bps: u64) -> Self {
485 self.per_download_bandwidth_limit_bps = Some(bps);
486 self
487 }
488
489 pub fn http_mode(mut self, mode: HttpMode) -> Self {
491 self.http_mode = Some(mode);
492 self
493 }
494
495 pub fn schedule_mode(mut self, mode: ScheduleMode) -> Self {
497 self.schedule_mode = Some(mode);
498 self
499 }
500
501 pub fn dry_run(mut self, dry: bool) -> Self {
503 self.dry_run = dry;
504 self
505 }
506
507 pub fn dry_run_size_mb(mut self, mb: u64) -> Self {
509 self.dry_run_size_mb = Some(mb);
510 self
511 }
512
513 pub fn context(mut self, ctx: RequestContext) -> Self {
516 self.request_context = Some(ctx);
517 self
518 }
519
520 pub fn bearer_token(mut self, token: impl Into<String>) -> Self {
522 self.request_context
523 .get_or_insert_with(RequestContext::new)
524 .auth = Some(format!("Bearer {}", token.into()));
525 self
526 }
527
528 pub fn referer(mut self, url: impl Into<String>) -> Self {
530 self.request_context
531 .get_or_insert_with(RequestContext::new)
532 .referer = Some(url.into());
533 self
534 }
535
536 pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
538 self.request_context
539 .get_or_insert_with(RequestContext::new)
540 .headers
541 .insert(name.into(), value.into());
542 self
543 }
544}
545
546#[derive(Debug, Clone)]
551pub enum DownloadUpdate {
552 Progress {
554 downloaded_bytes: u64,
556 speed_bps: f64,
558 },
559 TotalSize(u64),
561 Workers(Vec<crate::engine::WorkerSnapshot>),
563 Protocol(crate::engine::ProtocolInfo),
565 StatusChanged(DownloadStatus),
567}
568
569pub struct DownloadHandle {
574 pub id: Uuid,
576 engine_tx: mpsc::Sender<EngineCommand>,
577 event_rx: mpsc::UnboundedReceiver<DownloadUpdate>,
578}
579
580impl std::fmt::Debug for DownloadHandle {
581 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582 f.debug_struct("DownloadHandle")
583 .field("id", &self.id)
584 .finish()
585 }
586}
587
588impl DownloadHandle {
589 pub async fn recv(&mut self) -> Option<DownloadUpdate> {
593 self.event_rx.recv().await
594 }
595
596 pub fn try_recv(&mut self) -> Result<DownloadUpdate, mpsc::error::TryRecvError> {
598 self.event_rx.try_recv()
599 }
600
601 pub async fn pause(&self) {
603 let _ = self.engine_tx.send(EngineCommand::Stop(self.id)).await;
604 }
605
606 pub async fn resume(&self) {
608 let _ = self.engine_tx.send(EngineCommand::Resume(self.id)).await;
609 }
610
611 pub async fn cancel(&self) {
613 let _ = self.engine_tx.send(EngineCommand::Cancel(self.id)).await;
614 }
615}
616
617pub struct TurService {
626 engine: Rc<DownloadEngine>,
627 engine_tx: mpsc::Sender<EngineCommand>,
628 shutdown_tx: Option<oneshot::Sender<()>>,
629 handles: Rc<std::cell::RefCell<HashMap<Uuid, mpsc::UnboundedSender<DownloadUpdate>>>>,
630 cookie_jar: std::cell::RefCell<CookieJar>,
631}
632
633impl std::fmt::Debug for TurService {
634 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
635 f.debug_struct("TurService").finish()
636 }
637}
638
639impl TurService {
640 pub async fn new(config: ServiceConfig) -> Result<Self> {
647 let (engine_tx, engine_rx) = mpsc::channel::<EngineCommand>(100);
648 let (event_tx, event_rx) = mpsc::channel::<EngineEvent>(100);
649 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
650
651 let handles: Rc<std::cell::RefCell<HashMap<Uuid, mpsc::UnboundedSender<DownloadUpdate>>>> =
652 Rc::new(std::cell::RefCell::new(HashMap::new()));
653
654 let engine = DownloadEngine::new(
655 config.connections_per_download,
656 config.max_concurrent_tasks,
657 config.max_total_connections,
658 config.global_bandwidth_limit_bps,
659 config.enable_origin_memory,
660 config.storage_config,
661 );
662
663 let engine_tx_clone = engine_tx.clone();
664 let handles_clone = handles.clone();
665
666 tokio::task::spawn_local({
668 let engine_tx = engine_tx.clone();
669 async move {
670 Self::route_events(event_rx, handles_clone, engine_tx, shutdown_rx).await;
671 }
672 });
673
674 tokio::task::spawn_local({
676 let engine = engine.clone();
677 let event_tx = event_tx.clone();
678 async move {
679 if let Err(e) = engine.run(engine_rx, engine_tx_clone, event_tx).await {
680 eprintln!("Engine error: {}", e);
681 }
682 }
683 });
684
685 Ok(Self {
686 engine,
687 engine_tx,
688 shutdown_tx: Some(shutdown_tx),
689 handles,
690 cookie_jar: std::cell::RefCell::new(CookieJar::new()),
691 })
692 }
693
694 pub fn cookie_jar(&self) -> std::cell::Ref<'_, CookieJar> {
696 self.cookie_jar.borrow()
697 }
698
699 pub fn cookie_jar_mut(&self) -> std::cell::RefMut<'_, CookieJar> {
701 self.cookie_jar.borrow_mut()
702 }
703
704 pub async fn add_download(&self, request: DownloadRequest) -> Result<DownloadHandle> {
713 let (event_tx, event_rx) = mpsc::unbounded_channel::<DownloadUpdate>();
714
715 let filename = request.filename.clone().unwrap_or_else(|| {
716 request
717 .url
718 .split('/')
719 .last()
720 .unwrap_or("unknown")
721 .to_string()
722 });
723
724 if let Some(ref ctx) = request.request_context {
726 if let Some(ref cookies) = ctx.cookies {
727 let mut jar = self.cookie_jar.borrow_mut();
728 for c in cookies {
729 jar.insert(c.clone());
730 }
731 }
732 }
733
734 let mut task = DownloadTask {
735 id: Uuid::new_v4(),
736 url: request.url,
737 filename,
738 dir: request.dir,
739 total_size: 0,
740 downloaded_size: 0,
741 status: DownloadStatus::Queued,
742 speed: 0.0,
743 connections: request
744 .connections
745 .unwrap_or(self.engine.connections_per_download),
746 dry_run: request.dry_run,
747 dry_run_size_mb: request.dry_run_size_mb,
748 borrow_limit_mb: request.borrow_limit_mb.unwrap_or(2),
749 min_connections: request.min_connections.unwrap_or(1),
750 max_connections: request.max_connections.unwrap_or(16),
751 per_download_bandwidth_limit_bps: request.per_download_bandwidth_limit_bps.unwrap_or(0),
752 schedule_mode: request.schedule_mode.unwrap_or(ScheduleMode::Equal),
753 http_mode: request.http_mode.unwrap_or(HttpMode::Auto),
754 log_root: None,
755 request_context: request.request_context,
756 };
757
758 if let Ok(url) = url::Url::parse(&task.url) {
761 let jar = self.cookie_jar.borrow();
762 let jar_cookies: Vec<CookieEntry> = jar.match_url(&url).into_iter().cloned().collect();
763 if !jar_cookies.is_empty() {
764 let ctx = task.request_context.get_or_insert_with(RequestContext::new);
765 let mut existing = ctx.cookies.take().unwrap_or_default();
766 for c in jar_cookies {
768 if !existing
769 .iter()
770 .any(|ec| ec.name == c.name && ec.domain == c.domain && ec.path == c.path)
771 {
772 existing.push(c);
773 }
774 }
775 ctx.cookies = Some(existing);
776 }
777 }
778
779 let id = task.id;
780 self.handles.borrow_mut().insert(id, event_tx);
781 self.engine_tx
782 .send(EngineCommand::Add(task))
783 .await
784 .map_err(|_| anyhow::anyhow!("engine channel closed"))?;
785
786 Ok(DownloadHandle {
787 id,
788 engine_tx: self.engine_tx.clone(),
789 event_rx,
790 })
791 }
792
793 pub async fn import_cookie_file(&self, path: &PathBuf) -> Result<()> {
798 let contents = tokio::fs::read_to_string(path).await?;
799 self.cookie_jar.borrow_mut().import_lines(&contents, "");
800 Ok(())
801 }
802
803 pub async fn shutdown(mut self) {
810 if let Some(tx) = self.shutdown_tx.take() {
811 let _ = tx.send(());
812 }
813 drop(self.engine_tx);
815 }
816
817 pub fn effective_connection_budget(&self) -> usize {
819 self.engine.effective_connection_budget.get()
820 }
821
822 pub fn configured_connection_budget(&self) -> usize {
824 self.engine.configured_connection_budget.get()
825 }
826
827 async fn route_events(
829 mut event_rx: mpsc::Receiver<EngineEvent>,
830 handles: Rc<std::cell::RefCell<HashMap<Uuid, mpsc::UnboundedSender<DownloadUpdate>>>>,
831 engine_tx: mpsc::Sender<EngineCommand>,
832 mut shutdown_rx: oneshot::Receiver<()>,
833 ) {
834 loop {
835 tokio::select! {
836 _ = &mut shutdown_rx => {
837 let ids: Vec<Uuid> = handles.borrow().keys().copied().collect();
838 for id in ids {
839 let _ = engine_tx.send(EngineCommand::Stop(id)).await;
840 }
841 break;
842 }
843 event_opt = event_rx.recv() => {
844 let Some(event) = event_opt else { break };
845 let update = match event {
846 EngineEvent::Progress(id, downloaded, speed) => {
847 Some((id, DownloadUpdate::Progress {
848 downloaded_bytes: downloaded,
849 speed_bps: speed,
850 }))
851 }
852 EngineEvent::TotalSize(id, size) => {
853 Some((id, DownloadUpdate::TotalSize(size)))
854 }
855 EngineEvent::Workers(id, workers) => {
856 Some((id, DownloadUpdate::Workers(workers)))
857 }
858 EngineEvent::Protocol(id, protocol) => {
859 Some((id, DownloadUpdate::Protocol(protocol)))
860 }
861 EngineEvent::StatusChanged(id, DownloadStatus::Completed) => {
862 let _ = handles.borrow_mut().remove(&id);
863 Some((id, DownloadUpdate::StatusChanged(DownloadStatus::Completed)))
864 }
865 EngineEvent::StatusChanged(id, status) => {
866 let is_terminal = matches!(status, DownloadStatus::Error(_));
867 if is_terminal {
868 let _ = handles.borrow_mut().remove(&id);
869 }
870 Some((id, DownloadUpdate::StatusChanged(status)))
871 }
872 };
873
874 if let Some((id, update)) = update {
875 if let Some(tx) = handles.borrow().get(&id) {
876 let _ = tx.send(update);
877 }
878 }
879 }
880 }
881 }
882 }
883}