mangofetch_core/platforms/p2p/
mod.rs1pub mod words;
2
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use tokio::fs::File;
9use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
10use tokio::net::TcpStream;
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13
14use crate::models::media::{DownloadOptions, DownloadResult, MediaInfo, MediaType, VideoQuality};
15use crate::platforms::traits::PlatformDownloader;
16
17const CHUNK_SIZE: usize = 64 * 1024;
18
19fn relay_addr() -> String {
20 std::env::var("MANGOFETCH_RELAY").unwrap_or_else(|_| "relay.tonho.wtf:9009".to_string())
21}
22
23async fn connect_relay() -> anyhow::Result<TcpStream> {
24 let addr = relay_addr();
25 let stream = tokio::time::timeout(
26 std::time::Duration::from_secs(10),
27 TcpStream::connect(&addr),
28 )
29 .await
30 .map_err(|_| anyhow!("Connection to relay timed out (10s)"))?
31 .map_err(|e| anyhow!("Failed to connect to relay {}: {}", addr, e))?;
32 Ok(stream)
33}
34
35async fn read_line(
36 reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
37) -> anyhow::Result<String> {
38 let mut line = String::new();
39 let n = reader.read_line(&mut line).await?;
40 if n == 0 {
41 anyhow::bail!("Relay closed connection unexpectedly");
42 }
43 Ok(line.trim_end().to_string())
44}
45
46fn check_relay_error(line: &str) -> anyhow::Result<()> {
47 if let Some(err) = line.strip_prefix("ERROR ") {
48 anyhow::bail!("Relay error: {}", err);
49 }
50 Ok(())
51}
52
53pub struct P2pDownloader;
54
55impl Default for P2pDownloader {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl P2pDownloader {
62 pub fn new() -> Self {
63 Self
64 }
65}
66
67#[async_trait]
68impl PlatformDownloader for P2pDownloader {
69 fn name(&self) -> &str {
70 "p2p"
71 }
72
73 fn can_handle(&self, url: &str) -> bool {
74 if let Some(code) = url.strip_prefix("p2p:") {
75 return words::is_valid_code(code);
76 }
77 false
78 }
79
80 async fn get_media_info(&self, url: &str) -> anyhow::Result<MediaInfo> {
81 let code = url
82 .strip_prefix("p2p:")
83 .ok_or_else(|| anyhow!("Invalid P2P URL: {}", url))?;
84
85 if !words::is_valid_code(code) {
86 anyhow::bail!("Invalid share code: {}", code);
87 }
88
89 let title = format!("P2P Transfer ({})", &code[..code.len().min(30)]);
90
91 Ok(MediaInfo {
92 title,
93 author: "P2P Transfer".to_string(),
94 platform: "p2p".to_string(),
95 duration_seconds: None,
96 thumbnail_url: None,
97 available_qualities: vec![VideoQuality {
98 label: "Original".to_string(),
99 width: 0,
100 height: 0,
101 url: url.to_string(),
102 format: "p2p".to_string(),
103
104 filesize_bytes: None,
105 }],
106 media_type: MediaType::Video,
107 file_size_bytes: None,
108 })
109 }
110
111 async fn download(
112 &self,
113 info: &MediaInfo,
114 opts: &DownloadOptions,
115 progress: mpsc::Sender<f64>,
116 ) -> anyhow::Result<DownloadResult> {
117 let url = match info.available_qualities.first() {
118 Some(q) => &q.url,
119 None => anyhow::bail!("No URL found in MediaInfo"),
120 };
121
122 let code = url
123 .strip_prefix("p2p:")
124 .ok_or_else(|| anyhow!("Invalid P2P URL"))?;
125
126 let _ = progress.send(-2.0).await;
127
128 tracing::info!("[p2p] connecting to relay for code: {}", code);
129
130 let stream = connect_relay().await?;
131 let (read_half, mut write_half) = tokio::io::split(stream);
132 let mut reader = BufReader::new(read_half);
133
134 write_half
135 .write_all(format!("RECV {}\n", code).as_bytes())
136 .await?;
137 write_half.flush().await?;
138
139 let response = read_line(&mut reader).await?;
140 check_relay_error(&response)?;
141 if response != "READY" {
142 anyhow::bail!("Unexpected relay response: {}", response);
143 }
144
145 tracing::info!("[p2p] connected to sender via relay");
146
147 let file_name = read_line(&mut reader).await?;
148 let file_size_str = read_line(&mut reader).await?;
149 let file_size: u64 = file_size_str
150 .parse()
151 .map_err(|_| anyhow!("Invalid file size from sender: {}", file_size_str))?;
152
153 tracing::info!("[p2p] receiving: {} ({} bytes)", file_name, file_size);
154
155 write_half.write_all(b"OK\n").await?;
156 write_half.flush().await?;
157
158 let _ = progress.send(0.0).await;
159
160 let sanitized = sanitize_filename::sanitize(&file_name);
161 let output_path = opts.output_dir.join(&sanitized);
162 if let Some(parent) = output_path.parent() {
163 tokio::fs::create_dir_all(parent).await?;
164 }
165
166 let mut file = File::create(&output_path).await?;
167 let mut received: u64 = 0;
168 let mut buf = vec![0u8; CHUNK_SIZE];
169
170 while received < file_size {
171 if opts.cancel_token.is_cancelled() {
172 let _ = tokio::fs::remove_file(&output_path).await;
173 anyhow::bail!("Download cancelled");
174 }
175
176 let to_read = ((file_size - received) as usize).min(CHUNK_SIZE);
177 let n = reader.read(&mut buf[..to_read]).await?;
178 if n == 0 {
179 break;
180 }
181
182 file.write_all(&buf[..n]).await?;
183 received += n as u64;
184
185 if file_size > 0 {
186 let pct = (received as f64 / file_size as f64) * 100.0;
187 let _ = progress.send(pct).await;
188 }
189 }
190
191 file.flush().await?;
192 drop(file);
193
194 let _ = progress.send(100.0).await;
195
196 tracing::info!("[p2p] download complete: {}", output_path.display());
197
198 Ok(DownloadResult {
199 file_path: output_path,
200 file_size_bytes: received,
201 duration_seconds: 0.0,
202 torrent_id: None,
203 })
204 }
205}
206
207pub struct P2pSendSession {
208 pub code: String,
209 pub file_path: PathBuf,
210 pub file_name: String,
211 pub file_size: u64,
212 pub cancel_token: CancellationToken,
213 pub progress: Arc<tokio::sync::Mutex<f64>>,
214 pub status: Arc<tokio::sync::Mutex<String>>,
215 pub sent_bytes: Arc<tokio::sync::Mutex<u64>>,
216 pub paused: Arc<std::sync::atomic::AtomicBool>,
217}
218
219pub async fn start_send(
220 file_path: PathBuf,
221 cancel_token: CancellationToken,
222) -> anyhow::Result<P2pSendSession> {
223 let metadata = tokio::fs::metadata(&file_path)
224 .await
225 .map_err(|e| anyhow!("File not found: {}", e))?;
226
227 if !metadata.is_file() {
228 anyhow::bail!("Path is not a file: {}", file_path.display());
229 }
230
231 let file_size = metadata.len();
232 let file_name = file_path
233 .file_name()
234 .map(|n| n.to_string_lossy().to_string())
235 .unwrap_or_else(|| "file".to_string());
236
237 let code = words::generate_code();
238
239 tracing::info!("[p2p] share code generated: {}", code);
240
241 Ok(P2pSendSession {
242 code,
243 file_path,
244 file_name,
245 file_size,
246 cancel_token,
247 progress: Arc::new(tokio::sync::Mutex::new(0.0)),
248 status: Arc::new(tokio::sync::Mutex::new("waiting".to_string())),
249 sent_bytes: Arc::new(tokio::sync::Mutex::new(0)),
250 paused: Arc::new(std::sync::atomic::AtomicBool::new(false)),
251 })
252}
253
254async fn wait_for_receiver(
255 reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
256 cancel: &CancellationToken,
257) -> anyhow::Result<()> {
258 let ready = tokio::select! {
259 line = read_line(reader) => line?,
260 _ = cancel.cancelled() => {
261 anyhow::bail!("Send cancelled while waiting for receiver");
262 }
263 };
264
265 check_relay_error(&ready)?;
266 if ready != "READY" {
267 anyhow::bail!("Unexpected relay response: {}", ready);
268 }
269
270 Ok(())
271}
272
273async fn transfer_data(
274 session: &P2pSendSession,
275 cancel: &CancellationToken,
276 write_half: &mut tokio::io::WriteHalf<TcpStream>,
277) -> anyhow::Result<()> {
278 let mut file = File::open(&session.file_path).await?;
279 let mut buf = vec![0u8; CHUNK_SIZE];
280 let mut sent: u64 = 0;
281
282 loop {
283 if cancel.is_cancelled() {
284 anyhow::bail!("Send cancelled during transfer");
285 }
286
287 while session.paused.load(std::sync::atomic::Ordering::Relaxed) {
288 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
289 if cancel.is_cancelled() {
290 anyhow::bail!("Send cancelled while paused");
291 }
292 }
293
294 let n = file.read(&mut buf).await?;
295 if n == 0 {
296 break;
297 }
298
299 write_half.write_all(&buf[..n]).await?;
300 sent += n as u64;
301
302 *session.sent_bytes.lock().await = sent;
303 if session.file_size > 0 {
304 *session.progress.lock().await = (sent as f64 / session.file_size as f64) * 100.0;
305 }
306 }
307
308 write_half.flush().await?;
309
310 *session.progress.lock().await = 100.0;
311 *session.status.lock().await = "complete".to_string();
312
313 tracing::info!("[p2p] transfer complete: {} bytes sent", sent);
314 Ok(())
315}
316
317pub async fn run_sender(session: &P2pSendSession) -> anyhow::Result<()> {
318 let cancel = session.cancel_token.clone();
319
320 *session.status.lock().await = "connecting".to_string();
321
322 tracing::info!("[p2p] connecting to relay...");
323
324 let stream = connect_relay().await?;
325 let (read_half, mut write_half) = tokio::io::split(stream);
326 let mut reader = BufReader::new(read_half);
327
328 write_half
329 .write_all(format!("SEND {}\n", session.code).as_bytes())
330 .await?;
331 write_half.flush().await?;
332
333 let response = read_line(&mut reader).await?;
334 check_relay_error(&response)?;
335 if response != "WAIT" {
336 anyhow::bail!("Unexpected relay response: {}", response);
337 }
338
339 *session.status.lock().await = "waiting_for_receiver".to_string();
340 tracing::info!("[p2p] waiting for receiver... code: {}", session.code);
341
342 wait_for_receiver(&mut reader, &cancel).await?;
343
344 *session.status.lock().await = "connected".to_string();
345 tracing::info!("[p2p] receiver connected");
346
347 let header = format!("{}\n{}\n", session.file_name, session.file_size);
348 write_half.write_all(header.as_bytes()).await?;
349 write_half.flush().await?;
350
351 let ok_response = tokio::select! {
352 line = read_line(&mut reader) => line?,
353 _ = cancel.cancelled() => {
354 anyhow::bail!("Send cancelled while waiting for OK");
355 }
356 };
357
358 if ok_response != "OK" {
359 anyhow::bail!("Receiver rejected transfer: {}", ok_response);
360 }
361
362 *session.status.lock().await = "transferring".to_string();
363 tracing::info!(
364 "[p2p] transferring: {} ({} bytes)",
365 session.file_name,
366 session.file_size
367 );
368
369 transfer_data(session, &cancel, &mut write_half).await?;
370
371 Ok(())
372}