mangofetch_core/platforms/p2p/
mod.rs1pub mod words;
2
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use tokio::fs::File;
9use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
10use tokio::net::TcpStream;
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13
14use crate::models::media::{DownloadOptions, DownloadResult, MediaInfo, MediaType, VideoQuality};
15use crate::platforms::traits::PlatformDownloader;
16
17const CHUNK_SIZE: usize = 64 * 1024;
18
19fn relay_addr() -> String {
20 std::env::var("MANGOFETCH_RELAY").unwrap_or_else(|_| "relay.tonho.wtf:9009".to_string())
21}
22
23async fn connect_relay() -> anyhow::Result<TcpStream> {
24 let addr = relay_addr();
25 let stream = tokio::time::timeout(
26 std::time::Duration::from_secs(10),
27 TcpStream::connect(&addr),
28 )
29 .await
30 .map_err(|_| anyhow!("Connection to relay timed out (10s)"))?
31 .map_err(|e| anyhow!("Failed to connect to relay {}: {}", addr, e))?;
32 Ok(stream)
33}
34
35async fn read_line(
36 reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
37) -> anyhow::Result<String> {
38 let mut line = String::new();
39 let n = reader.read_line(&mut line).await?;
40 if n == 0 {
41 anyhow::bail!("Relay closed connection unexpectedly");
42 }
43 Ok(line.trim_end().to_string())
44}
45
46fn check_relay_error(line: &str) -> anyhow::Result<()> {
47 if let Some(err) = line.strip_prefix("ERROR ") {
48 anyhow::bail!("Relay error: {}", err);
49 }
50 Ok(())
51}
52
53pub struct P2pDownloader;
54
55impl Default for P2pDownloader {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl P2pDownloader {
62 pub fn new() -> Self {
63 Self
64 }
65}
66
67#[async_trait]
68impl PlatformDownloader for P2pDownloader {
69 fn name(&self) -> &str {
70 "p2p"
71 }
72
73 fn can_handle(&self, url: &str) -> bool {
74 if let Some(code) = url.strip_prefix("p2p:") {
75 return words::is_valid_code(code);
76 }
77 false
78 }
79
80 async fn get_media_info(&self, url: &str) -> anyhow::Result<MediaInfo> {
81 let code = url
82 .strip_prefix("p2p:")
83 .ok_or_else(|| anyhow!("Invalid P2P URL: {}", url))?;
84
85 if !words::is_valid_code(code) {
86 anyhow::bail!("Invalid share code: {}", code);
87 }
88
89 let title = format!("P2P Transfer ({})", &code[..code.len().min(30)]);
90
91 Ok(MediaInfo {
92 title,
93 author: "P2P Transfer".to_string(),
94 platform: "p2p".to_string(),
95 duration_seconds: None,
96 thumbnail_url: None,
97 available_qualities: vec![VideoQuality {
98 label: "Original".to_string(),
99 width: 0,
100 height: 0,
101 url: url.to_string(),
102 format: "p2p".to_string(),
103 }],
104 media_type: MediaType::Video,
105 file_size_bytes: None,
106 })
107 }
108
109 async fn download(
110 &self,
111 info: &MediaInfo,
112 opts: &DownloadOptions,
113 progress: mpsc::Sender<f64>,
114 ) -> anyhow::Result<DownloadResult> {
115 let url = match info.available_qualities.first() {
116 Some(q) => &q.url,
117 None => anyhow::bail!("No URL found in MediaInfo"),
118 };
119
120 let code = url
121 .strip_prefix("p2p:")
122 .ok_or_else(|| anyhow!("Invalid P2P URL"))?;
123
124 let _ = progress.send(-2.0).await;
125
126 tracing::info!("[p2p] connecting to relay for code: {}", code);
127
128 let stream = connect_relay().await?;
129 let (read_half, mut write_half) = tokio::io::split(stream);
130 let mut reader = BufReader::new(read_half);
131
132 write_half
133 .write_all(format!("RECV {}\n", code).as_bytes())
134 .await?;
135 write_half.flush().await?;
136
137 let response = read_line(&mut reader).await?;
138 check_relay_error(&response)?;
139 if response != "READY" {
140 anyhow::bail!("Unexpected relay response: {}", response);
141 }
142
143 tracing::info!("[p2p] connected to sender via relay");
144
145 let file_name = read_line(&mut reader).await?;
146 let file_size_str = read_line(&mut reader).await?;
147 let file_size: u64 = file_size_str
148 .parse()
149 .map_err(|_| anyhow!("Invalid file size from sender: {}", file_size_str))?;
150
151 tracing::info!("[p2p] receiving: {} ({} bytes)", file_name, file_size);
152
153 write_half.write_all(b"OK\n").await?;
154 write_half.flush().await?;
155
156 let _ = progress.send(0.0).await;
157
158 let sanitized = sanitize_filename::sanitize(&file_name);
159 let output_path = opts.output_dir.join(&sanitized);
160 if let Some(parent) = output_path.parent() {
161 tokio::fs::create_dir_all(parent).await?;
162 }
163
164 let mut file = File::create(&output_path).await?;
165 let mut received: u64 = 0;
166 let mut buf = vec![0u8; CHUNK_SIZE];
167
168 while received < file_size {
169 if opts.cancel_token.is_cancelled() {
170 let _ = tokio::fs::remove_file(&output_path).await;
171 anyhow::bail!("Download cancelled");
172 }
173
174 let to_read = ((file_size - received) as usize).min(CHUNK_SIZE);
175 let n = reader.read(&mut buf[..to_read]).await?;
176 if n == 0 {
177 break;
178 }
179
180 file.write_all(&buf[..n]).await?;
181 received += n as u64;
182
183 if file_size > 0 {
184 let pct = (received as f64 / file_size as f64) * 100.0;
185 let _ = progress.send(pct).await;
186 }
187 }
188
189 file.flush().await?;
190 drop(file);
191
192 let _ = progress.send(100.0).await;
193
194 tracing::info!("[p2p] download complete: {}", output_path.display());
195
196 Ok(DownloadResult {
197 file_path: output_path,
198 file_size_bytes: received,
199 duration_seconds: 0.0,
200 torrent_id: None,
201 })
202 }
203}
204
205pub struct P2pSendSession {
206 pub code: String,
207 pub file_path: PathBuf,
208 pub file_name: String,
209 pub file_size: u64,
210 pub cancel_token: CancellationToken,
211 pub progress: Arc<tokio::sync::Mutex<f64>>,
212 pub status: Arc<tokio::sync::Mutex<String>>,
213 pub sent_bytes: Arc<tokio::sync::Mutex<u64>>,
214 pub paused: Arc<std::sync::atomic::AtomicBool>,
215}
216
217pub async fn start_send(
218 file_path: PathBuf,
219 cancel_token: CancellationToken,
220) -> anyhow::Result<P2pSendSession> {
221 let metadata = tokio::fs::metadata(&file_path)
222 .await
223 .map_err(|e| anyhow!("File not found: {}", e))?;
224
225 if !metadata.is_file() {
226 anyhow::bail!("Path is not a file: {}", file_path.display());
227 }
228
229 let file_size = metadata.len();
230 let file_name = file_path
231 .file_name()
232 .map(|n| n.to_string_lossy().to_string())
233 .unwrap_or_else(|| "file".to_string());
234
235 let code = words::generate_code();
236
237 tracing::info!("[p2p] share code generated: {}", code);
238
239 Ok(P2pSendSession {
240 code,
241 file_path,
242 file_name,
243 file_size,
244 cancel_token,
245 progress: Arc::new(tokio::sync::Mutex::new(0.0)),
246 status: Arc::new(tokio::sync::Mutex::new("waiting".to_string())),
247 sent_bytes: Arc::new(tokio::sync::Mutex::new(0)),
248 paused: Arc::new(std::sync::atomic::AtomicBool::new(false)),
249 })
250}
251
252async fn wait_for_receiver(
253 reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
254 cancel: &CancellationToken,
255) -> anyhow::Result<()> {
256 let ready = tokio::select! {
257 line = read_line(reader) => line?,
258 _ = cancel.cancelled() => {
259 anyhow::bail!("Send cancelled while waiting for receiver");
260 }
261 };
262
263 check_relay_error(&ready)?;
264 if ready != "READY" {
265 anyhow::bail!("Unexpected relay response: {}", ready);
266 }
267
268 Ok(())
269}
270
271async fn transfer_data(
272 session: &P2pSendSession,
273 cancel: &CancellationToken,
274 write_half: &mut tokio::io::WriteHalf<TcpStream>,
275) -> anyhow::Result<()> {
276 let mut file = File::open(&session.file_path).await?;
277 let mut buf = vec![0u8; CHUNK_SIZE];
278 let mut sent: u64 = 0;
279
280 loop {
281 if cancel.is_cancelled() {
282 anyhow::bail!("Send cancelled during transfer");
283 }
284
285 while session.paused.load(std::sync::atomic::Ordering::Relaxed) {
286 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
287 if cancel.is_cancelled() {
288 anyhow::bail!("Send cancelled while paused");
289 }
290 }
291
292 let n = file.read(&mut buf).await?;
293 if n == 0 {
294 break;
295 }
296
297 write_half.write_all(&buf[..n]).await?;
298 sent += n as u64;
299
300 *session.sent_bytes.lock().await = sent;
301 if session.file_size > 0 {
302 *session.progress.lock().await = (sent as f64 / session.file_size as f64) * 100.0;
303 }
304 }
305
306 write_half.flush().await?;
307
308 *session.progress.lock().await = 100.0;
309 *session.status.lock().await = "complete".to_string();
310
311 tracing::info!("[p2p] transfer complete: {} bytes sent", sent);
312 Ok(())
313}
314
315pub async fn run_sender(session: &P2pSendSession) -> anyhow::Result<()> {
316 let cancel = session.cancel_token.clone();
317
318 *session.status.lock().await = "connecting".to_string();
319
320 tracing::info!("[p2p] connecting to relay...");
321
322 let stream = connect_relay().await?;
323 let (read_half, mut write_half) = tokio::io::split(stream);
324 let mut reader = BufReader::new(read_half);
325
326 write_half
327 .write_all(format!("SEND {}\n", session.code).as_bytes())
328 .await?;
329 write_half.flush().await?;
330
331 let response = read_line(&mut reader).await?;
332 check_relay_error(&response)?;
333 if response != "WAIT" {
334 anyhow::bail!("Unexpected relay response: {}", response);
335 }
336
337 *session.status.lock().await = "waiting_for_receiver".to_string();
338 tracing::info!("[p2p] waiting for receiver... code: {}", session.code);
339
340 wait_for_receiver(&mut reader, &cancel).await?;
341
342 *session.status.lock().await = "connected".to_string();
343 tracing::info!("[p2p] receiver connected");
344
345 let header = format!("{}\n{}\n", session.file_name, session.file_size);
346 write_half.write_all(header.as_bytes()).await?;
347 write_half.flush().await?;
348
349 let ok_response = tokio::select! {
350 line = read_line(&mut reader) => line?,
351 _ = cancel.cancelled() => {
352 anyhow::bail!("Send cancelled while waiting for OK");
353 }
354 };
355
356 if ok_response != "OK" {
357 anyhow::bail!("Receiver rejected transfer: {}", ok_response);
358 }
359
360 *session.status.lock().await = "transferring".to_string();
361 tracing::info!(
362 "[p2p] transferring: {} ({} bytes)",
363 session.file_name,
364 session.file_size
365 );
366
367 transfer_data(session, &cancel, &mut write_half).await?;
368
369 Ok(())
370}