Skip to main content

mangofetch_core/platforms/p2p/
mod.rs

1pub mod words;
2
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use tokio::fs::File;
9use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
10use tokio::net::TcpStream;
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13
14use crate::models::media::{DownloadOptions, DownloadResult, MediaInfo, MediaType, VideoQuality};
15use crate::platforms::traits::PlatformDownloader;
16
17const CHUNK_SIZE: usize = 64 * 1024;
18
19fn relay_addr() -> String {
20    std::env::var("MANGOFETCH_RELAY").unwrap_or_else(|_| "relay.tonho.wtf:9009".to_string())
21}
22
23async fn connect_relay() -> anyhow::Result<TcpStream> {
24    let addr = relay_addr();
25    let stream = tokio::time::timeout(
26        std::time::Duration::from_secs(10),
27        TcpStream::connect(&addr),
28    )
29    .await
30    .map_err(|_| anyhow!("Connection to relay timed out (10s)"))?
31    .map_err(|e| anyhow!("Failed to connect to relay {}: {}", addr, e))?;
32    Ok(stream)
33}
34
35async fn read_line(
36    reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
37) -> anyhow::Result<String> {
38    let mut line = String::new();
39    let n = reader.read_line(&mut line).await?;
40    if n == 0 {
41        anyhow::bail!("Relay closed connection unexpectedly");
42    }
43    Ok(line.trim_end().to_string())
44}
45
46fn check_relay_error(line: &str) -> anyhow::Result<()> {
47    if let Some(err) = line.strip_prefix("ERROR ") {
48        anyhow::bail!("Relay error: {}", err);
49    }
50    Ok(())
51}
52
53pub struct P2pDownloader;
54
55impl Default for P2pDownloader {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl P2pDownloader {
62    pub fn new() -> Self {
63        Self
64    }
65}
66
67#[async_trait]
68impl PlatformDownloader for P2pDownloader {
69    fn name(&self) -> &str {
70        "p2p"
71    }
72
73    fn can_handle(&self, url: &str) -> bool {
74        if let Some(code) = url.strip_prefix("p2p:") {
75            return words::is_valid_code(code);
76        }
77        false
78    }
79
80    async fn get_media_info(&self, url: &str) -> anyhow::Result<MediaInfo> {
81        let code = url
82            .strip_prefix("p2p:")
83            .ok_or_else(|| anyhow!("Invalid P2P URL: {}", url))?;
84
85        if !words::is_valid_code(code) {
86            anyhow::bail!("Invalid share code: {}", code);
87        }
88
89        let title = format!("P2P Transfer ({})", &code[..code.len().min(30)]);
90
91        Ok(MediaInfo {
92            title,
93            author: "P2P Transfer".to_string(),
94            platform: "p2p".to_string(),
95            duration_seconds: None,
96            thumbnail_url: None,
97            available_qualities: vec![VideoQuality {
98                label: "Original".to_string(),
99                width: 0,
100                height: 0,
101                url: url.to_string(),
102                format: "p2p".to_string(),
103
104                filesize_bytes: None,
105            }],
106            media_type: MediaType::Video,
107            file_size_bytes: None,
108        })
109    }
110
111    async fn download(
112        &self,
113        info: &MediaInfo,
114        opts: &DownloadOptions,
115        progress: mpsc::Sender<f64>,
116    ) -> anyhow::Result<DownloadResult> {
117        let url = match info.available_qualities.first() {
118            Some(q) => &q.url,
119            None => anyhow::bail!("No URL found in MediaInfo"),
120        };
121
122        let code = url
123            .strip_prefix("p2p:")
124            .ok_or_else(|| anyhow!("Invalid P2P URL"))?;
125
126        let _ = progress.send(-2.0).await;
127
128        tracing::info!("[p2p] connecting to relay for code: {}", code);
129
130        let stream = connect_relay().await?;
131        let (read_half, mut write_half) = tokio::io::split(stream);
132        let mut reader = BufReader::new(read_half);
133
134        write_half
135            .write_all(format!("RECV {}\n", code).as_bytes())
136            .await?;
137        write_half.flush().await?;
138
139        let response = read_line(&mut reader).await?;
140        check_relay_error(&response)?;
141        if response != "READY" {
142            anyhow::bail!("Unexpected relay response: {}", response);
143        }
144
145        tracing::info!("[p2p] connected to sender via relay");
146
147        let file_name = read_line(&mut reader).await?;
148        let file_size_str = read_line(&mut reader).await?;
149        let file_size: u64 = file_size_str
150            .parse()
151            .map_err(|_| anyhow!("Invalid file size from sender: {}", file_size_str))?;
152
153        tracing::info!("[p2p] receiving: {} ({} bytes)", file_name, file_size);
154
155        write_half.write_all(b"OK\n").await?;
156        write_half.flush().await?;
157
158        let _ = progress.send(0.0).await;
159
160        let sanitized = sanitize_filename::sanitize(&file_name);
161        let output_path = opts.output_dir.join(&sanitized);
162        if let Some(parent) = output_path.parent() {
163            tokio::fs::create_dir_all(parent).await?;
164        }
165
166        let mut file = File::create(&output_path).await?;
167        let mut received: u64 = 0;
168        let mut buf = vec![0u8; CHUNK_SIZE];
169
170        while received < file_size {
171            if opts.cancel_token.is_cancelled() {
172                let _ = tokio::fs::remove_file(&output_path).await;
173                anyhow::bail!("Download cancelled");
174            }
175
176            let to_read = ((file_size - received) as usize).min(CHUNK_SIZE);
177            let n = reader.read(&mut buf[..to_read]).await?;
178            if n == 0 {
179                break;
180            }
181
182            file.write_all(&buf[..n]).await?;
183            received += n as u64;
184
185            if file_size > 0 {
186                let pct = (received as f64 / file_size as f64) * 100.0;
187                let _ = progress.send(pct).await;
188            }
189        }
190
191        file.flush().await?;
192        drop(file);
193
194        let _ = progress.send(100.0).await;
195
196        tracing::info!("[p2p] download complete: {}", output_path.display());
197
198        Ok(DownloadResult {
199            file_path: output_path,
200            file_size_bytes: received,
201            duration_seconds: 0.0,
202            torrent_id: None,
203        })
204    }
205}
206
207pub struct P2pSendSession {
208    pub code: String,
209    pub file_path: PathBuf,
210    pub file_name: String,
211    pub file_size: u64,
212    pub cancel_token: CancellationToken,
213    pub progress: Arc<tokio::sync::Mutex<f64>>,
214    pub status: Arc<tokio::sync::Mutex<String>>,
215    pub sent_bytes: Arc<tokio::sync::Mutex<u64>>,
216    pub paused: Arc<std::sync::atomic::AtomicBool>,
217}
218
219pub async fn start_send(
220    file_path: PathBuf,
221    cancel_token: CancellationToken,
222) -> anyhow::Result<P2pSendSession> {
223    let metadata = tokio::fs::metadata(&file_path)
224        .await
225        .map_err(|e| anyhow!("File not found: {}", e))?;
226
227    if !metadata.is_file() {
228        anyhow::bail!("Path is not a file: {}", file_path.display());
229    }
230
231    let file_size = metadata.len();
232    let file_name = file_path
233        .file_name()
234        .map(|n| n.to_string_lossy().to_string())
235        .unwrap_or_else(|| "file".to_string());
236
237    let code = words::generate_code();
238
239    tracing::info!("[p2p] share code generated: {}", code);
240
241    Ok(P2pSendSession {
242        code,
243        file_path,
244        file_name,
245        file_size,
246        cancel_token,
247        progress: Arc::new(tokio::sync::Mutex::new(0.0)),
248        status: Arc::new(tokio::sync::Mutex::new("waiting".to_string())),
249        sent_bytes: Arc::new(tokio::sync::Mutex::new(0)),
250        paused: Arc::new(std::sync::atomic::AtomicBool::new(false)),
251    })
252}
253
254async fn wait_for_receiver(
255    reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
256    cancel: &CancellationToken,
257) -> anyhow::Result<()> {
258    let ready = tokio::select! {
259        line = read_line(reader) => line?,
260        _ = cancel.cancelled() => {
261            anyhow::bail!("Send cancelled while waiting for receiver");
262        }
263    };
264
265    check_relay_error(&ready)?;
266    if ready != "READY" {
267        anyhow::bail!("Unexpected relay response: {}", ready);
268    }
269
270    Ok(())
271}
272
273async fn transfer_data(
274    session: &P2pSendSession,
275    cancel: &CancellationToken,
276    write_half: &mut tokio::io::WriteHalf<TcpStream>,
277) -> anyhow::Result<()> {
278    let mut file = File::open(&session.file_path).await?;
279    let mut buf = vec![0u8; CHUNK_SIZE];
280    let mut sent: u64 = 0;
281
282    loop {
283        if cancel.is_cancelled() {
284            anyhow::bail!("Send cancelled during transfer");
285        }
286
287        while session.paused.load(std::sync::atomic::Ordering::Relaxed) {
288            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
289            if cancel.is_cancelled() {
290                anyhow::bail!("Send cancelled while paused");
291            }
292        }
293
294        let n = file.read(&mut buf).await?;
295        if n == 0 {
296            break;
297        }
298
299        write_half.write_all(&buf[..n]).await?;
300        sent += n as u64;
301
302        *session.sent_bytes.lock().await = sent;
303        if session.file_size > 0 {
304            *session.progress.lock().await = (sent as f64 / session.file_size as f64) * 100.0;
305        }
306    }
307
308    write_half.flush().await?;
309
310    *session.progress.lock().await = 100.0;
311    *session.status.lock().await = "complete".to_string();
312
313    tracing::info!("[p2p] transfer complete: {} bytes sent", sent);
314    Ok(())
315}
316
317pub async fn run_sender(session: &P2pSendSession) -> anyhow::Result<()> {
318    let cancel = session.cancel_token.clone();
319
320    *session.status.lock().await = "connecting".to_string();
321
322    tracing::info!("[p2p] connecting to relay...");
323
324    let stream = connect_relay().await?;
325    let (read_half, mut write_half) = tokio::io::split(stream);
326    let mut reader = BufReader::new(read_half);
327
328    write_half
329        .write_all(format!("SEND {}\n", session.code).as_bytes())
330        .await?;
331    write_half.flush().await?;
332
333    let response = read_line(&mut reader).await?;
334    check_relay_error(&response)?;
335    if response != "WAIT" {
336        anyhow::bail!("Unexpected relay response: {}", response);
337    }
338
339    *session.status.lock().await = "waiting_for_receiver".to_string();
340    tracing::info!("[p2p] waiting for receiver... code: {}", session.code);
341
342    wait_for_receiver(&mut reader, &cancel).await?;
343
344    *session.status.lock().await = "connected".to_string();
345    tracing::info!("[p2p] receiver connected");
346
347    let header = format!("{}\n{}\n", session.file_name, session.file_size);
348    write_half.write_all(header.as_bytes()).await?;
349    write_half.flush().await?;
350
351    let ok_response = tokio::select! {
352        line = read_line(&mut reader) => line?,
353        _ = cancel.cancelled() => {
354            anyhow::bail!("Send cancelled while waiting for OK");
355        }
356    };
357
358    if ok_response != "OK" {
359        anyhow::bail!("Receiver rejected transfer: {}", ok_response);
360    }
361
362    *session.status.lock().await = "transferring".to_string();
363    tracing::info!(
364        "[p2p] transferring: {} ({} bytes)",
365        session.file_name,
366        session.file_size
367    );
368
369    transfer_data(session, &cancel, &mut write_half).await?;
370
371    Ok(())
372}