Skip to main content

mangofetch_core/platforms/p2p/
mod.rs

1pub mod words;
2
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use tokio::fs::File;
9use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
10use tokio::net::TcpStream;
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13
14use crate::models::media::{DownloadOptions, DownloadResult, MediaInfo, MediaType, VideoQuality};
15use crate::platforms::traits::PlatformDownloader;
16
17const CHUNK_SIZE: usize = 64 * 1024;
18
19fn relay_addr() -> String {
20    std::env::var("MANGOFETCH_RELAY").unwrap_or_else(|_| "relay.tonho.wtf:9009".to_string())
21}
22
23async fn connect_relay() -> anyhow::Result<TcpStream> {
24    let addr = relay_addr();
25    let stream = tokio::time::timeout(
26        std::time::Duration::from_secs(10),
27        TcpStream::connect(&addr),
28    )
29    .await
30    .map_err(|_| anyhow!("Connection to relay timed out (10s)"))?
31    .map_err(|e| anyhow!("Failed to connect to relay {}: {}", addr, e))?;
32    Ok(stream)
33}
34
35async fn read_line(
36    reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
37) -> anyhow::Result<String> {
38    let mut line = String::new();
39    let n = reader.read_line(&mut line).await?;
40    if n == 0 {
41        anyhow::bail!("Relay closed connection unexpectedly");
42    }
43    Ok(line.trim_end().to_string())
44}
45
46fn check_relay_error(line: &str) -> anyhow::Result<()> {
47    if let Some(err) = line.strip_prefix("ERROR ") {
48        anyhow::bail!("Relay error: {}", err);
49    }
50    Ok(())
51}
52
53pub struct P2pDownloader;
54
55impl Default for P2pDownloader {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl P2pDownloader {
62    pub fn new() -> Self {
63        Self
64    }
65}
66
67#[async_trait]
68impl PlatformDownloader for P2pDownloader {
69    fn name(&self) -> &str {
70        "p2p"
71    }
72
73    fn can_handle(&self, url: &str) -> bool {
74        if let Some(code) = url.strip_prefix("p2p:") {
75            return words::is_valid_code(code);
76        }
77        false
78    }
79
80    async fn get_media_info(&self, url: &str) -> anyhow::Result<MediaInfo> {
81        let code = url
82            .strip_prefix("p2p:")
83            .ok_or_else(|| anyhow!("Invalid P2P URL: {}", url))?;
84
85        if !words::is_valid_code(code) {
86            anyhow::bail!("Invalid share code: {}", code);
87        }
88
89        let title = format!("P2P Transfer ({})", &code[..code.len().min(30)]);
90
91        Ok(MediaInfo {
92            title,
93            author: "P2P Transfer".to_string(),
94            platform: "p2p".to_string(),
95            duration_seconds: None,
96            thumbnail_url: None,
97            available_qualities: vec![VideoQuality {
98                label: "Original".to_string(),
99                width: 0,
100                height: 0,
101                url: url.to_string(),
102                format: "p2p".to_string(),
103            }],
104            media_type: MediaType::Video,
105            file_size_bytes: None,
106        })
107    }
108
109    async fn download(
110        &self,
111        info: &MediaInfo,
112        opts: &DownloadOptions,
113        progress: mpsc::Sender<f64>,
114    ) -> anyhow::Result<DownloadResult> {
115        let url = match info.available_qualities.first() {
116            Some(q) => &q.url,
117            None => anyhow::bail!("No URL found in MediaInfo"),
118        };
119
120        let code = url
121            .strip_prefix("p2p:")
122            .ok_or_else(|| anyhow!("Invalid P2P URL"))?;
123
124        let _ = progress.send(-2.0).await;
125
126        tracing::info!("[p2p] connecting to relay for code: {}", code);
127
128        let stream = connect_relay().await?;
129        let (read_half, mut write_half) = tokio::io::split(stream);
130        let mut reader = BufReader::new(read_half);
131
132        write_half
133            .write_all(format!("RECV {}\n", code).as_bytes())
134            .await?;
135        write_half.flush().await?;
136
137        let response = read_line(&mut reader).await?;
138        check_relay_error(&response)?;
139        if response != "READY" {
140            anyhow::bail!("Unexpected relay response: {}", response);
141        }
142
143        tracing::info!("[p2p] connected to sender via relay");
144
145        let file_name = read_line(&mut reader).await?;
146        let file_size_str = read_line(&mut reader).await?;
147        let file_size: u64 = file_size_str
148            .parse()
149            .map_err(|_| anyhow!("Invalid file size from sender: {}", file_size_str))?;
150
151        tracing::info!("[p2p] receiving: {} ({} bytes)", file_name, file_size);
152
153        write_half.write_all(b"OK\n").await?;
154        write_half.flush().await?;
155
156        let _ = progress.send(0.0).await;
157
158        let sanitized = sanitize_filename::sanitize(&file_name);
159        let output_path = opts.output_dir.join(&sanitized);
160        if let Some(parent) = output_path.parent() {
161            tokio::fs::create_dir_all(parent).await?;
162        }
163
164        let mut file = File::create(&output_path).await?;
165        let mut received: u64 = 0;
166        let mut buf = vec![0u8; CHUNK_SIZE];
167
168        while received < file_size {
169            if opts.cancel_token.is_cancelled() {
170                let _ = tokio::fs::remove_file(&output_path).await;
171                anyhow::bail!("Download cancelled");
172            }
173
174            let to_read = ((file_size - received) as usize).min(CHUNK_SIZE);
175            let n = reader.read(&mut buf[..to_read]).await?;
176            if n == 0 {
177                break;
178            }
179
180            file.write_all(&buf[..n]).await?;
181            received += n as u64;
182
183            if file_size > 0 {
184                let pct = (received as f64 / file_size as f64) * 100.0;
185                let _ = progress.send(pct).await;
186            }
187        }
188
189        file.flush().await?;
190        drop(file);
191
192        let _ = progress.send(100.0).await;
193
194        tracing::info!("[p2p] download complete: {}", output_path.display());
195
196        Ok(DownloadResult {
197            file_path: output_path,
198            file_size_bytes: received,
199            duration_seconds: 0.0,
200            torrent_id: None,
201        })
202    }
203}
204
205pub struct P2pSendSession {
206    pub code: String,
207    pub file_path: PathBuf,
208    pub file_name: String,
209    pub file_size: u64,
210    pub cancel_token: CancellationToken,
211    pub progress: Arc<tokio::sync::Mutex<f64>>,
212    pub status: Arc<tokio::sync::Mutex<String>>,
213    pub sent_bytes: Arc<tokio::sync::Mutex<u64>>,
214    pub paused: Arc<std::sync::atomic::AtomicBool>,
215}
216
217pub async fn start_send(
218    file_path: PathBuf,
219    cancel_token: CancellationToken,
220) -> anyhow::Result<P2pSendSession> {
221    let metadata = tokio::fs::metadata(&file_path)
222        .await
223        .map_err(|e| anyhow!("File not found: {}", e))?;
224
225    if !metadata.is_file() {
226        anyhow::bail!("Path is not a file: {}", file_path.display());
227    }
228
229    let file_size = metadata.len();
230    let file_name = file_path
231        .file_name()
232        .map(|n| n.to_string_lossy().to_string())
233        .unwrap_or_else(|| "file".to_string());
234
235    let code = words::generate_code();
236
237    tracing::info!("[p2p] share code generated: {}", code);
238
239    Ok(P2pSendSession {
240        code,
241        file_path,
242        file_name,
243        file_size,
244        cancel_token,
245        progress: Arc::new(tokio::sync::Mutex::new(0.0)),
246        status: Arc::new(tokio::sync::Mutex::new("waiting".to_string())),
247        sent_bytes: Arc::new(tokio::sync::Mutex::new(0)),
248        paused: Arc::new(std::sync::atomic::AtomicBool::new(false)),
249    })
250}
251
252async fn wait_for_receiver(
253    reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
254    cancel: &CancellationToken,
255) -> anyhow::Result<()> {
256    let ready = tokio::select! {
257        line = read_line(reader) => line?,
258        _ = cancel.cancelled() => {
259            anyhow::bail!("Send cancelled while waiting for receiver");
260        }
261    };
262
263    check_relay_error(&ready)?;
264    if ready != "READY" {
265        anyhow::bail!("Unexpected relay response: {}", ready);
266    }
267
268    Ok(())
269}
270
271async fn transfer_data(
272    session: &P2pSendSession,
273    cancel: &CancellationToken,
274    write_half: &mut tokio::io::WriteHalf<TcpStream>,
275) -> anyhow::Result<()> {
276    let mut file = File::open(&session.file_path).await?;
277    let mut buf = vec![0u8; CHUNK_SIZE];
278    let mut sent: u64 = 0;
279
280    loop {
281        if cancel.is_cancelled() {
282            anyhow::bail!("Send cancelled during transfer");
283        }
284
285        while session.paused.load(std::sync::atomic::Ordering::Relaxed) {
286            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
287            if cancel.is_cancelled() {
288                anyhow::bail!("Send cancelled while paused");
289            }
290        }
291
292        let n = file.read(&mut buf).await?;
293        if n == 0 {
294            break;
295        }
296
297        write_half.write_all(&buf[..n]).await?;
298        sent += n as u64;
299
300        *session.sent_bytes.lock().await = sent;
301        if session.file_size > 0 {
302            *session.progress.lock().await = (sent as f64 / session.file_size as f64) * 100.0;
303        }
304    }
305
306    write_half.flush().await?;
307
308    *session.progress.lock().await = 100.0;
309    *session.status.lock().await = "complete".to_string();
310
311    tracing::info!("[p2p] transfer complete: {} bytes sent", sent);
312    Ok(())
313}
314
315pub async fn run_sender(session: &P2pSendSession) -> anyhow::Result<()> {
316    let cancel = session.cancel_token.clone();
317
318    *session.status.lock().await = "connecting".to_string();
319
320    tracing::info!("[p2p] connecting to relay...");
321
322    let stream = connect_relay().await?;
323    let (read_half, mut write_half) = tokio::io::split(stream);
324    let mut reader = BufReader::new(read_half);
325
326    write_half
327        .write_all(format!("SEND {}\n", session.code).as_bytes())
328        .await?;
329    write_half.flush().await?;
330
331    let response = read_line(&mut reader).await?;
332    check_relay_error(&response)?;
333    if response != "WAIT" {
334        anyhow::bail!("Unexpected relay response: {}", response);
335    }
336
337    *session.status.lock().await = "waiting_for_receiver".to_string();
338    tracing::info!("[p2p] waiting for receiver... code: {}", session.code);
339
340    wait_for_receiver(&mut reader, &cancel).await?;
341
342    *session.status.lock().await = "connected".to_string();
343    tracing::info!("[p2p] receiver connected");
344
345    let header = format!("{}\n{}\n", session.file_name, session.file_size);
346    write_half.write_all(header.as_bytes()).await?;
347    write_half.flush().await?;
348
349    let ok_response = tokio::select! {
350        line = read_line(&mut reader) => line?,
351        _ = cancel.cancelled() => {
352            anyhow::bail!("Send cancelled while waiting for OK");
353        }
354    };
355
356    if ok_response != "OK" {
357        anyhow::bail!("Receiver rejected transfer: {}", ok_response);
358    }
359
360    *session.status.lock().await = "transferring".to_string();
361    tracing::info!(
362        "[p2p] transferring: {} ({} bytes)",
363        session.file_name,
364        session.file_size
365    );
366
367    transfer_data(session, &cancel, &mut write_half).await?;
368
369    Ok(())
370}