Skip to main content

mangofetch_core/platforms/p2p/
mod.rs

1pub mod words;
2
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use async_trait::async_trait;
8use tokio::fs::File;
9use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
10use tokio::net::TcpStream;
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13
14use crate::models::media::{DownloadOptions, DownloadResult, MediaInfo, MediaType, VideoQuality};
15use crate::platforms::traits::PlatformDownloader;
16
17const CHUNK_SIZE: usize = 64 * 1024;
18
19fn relay_addr() -> String {
20    std::env::var("MANGOFETCH_RELAY").unwrap_or_else(|_| "relay.tonho.wtf:9009".to_string())
21}
22
23async fn connect_relay() -> anyhow::Result<TcpStream> {
24    let addr = relay_addr();
25    let stream = tokio::time::timeout(
26        std::time::Duration::from_secs(10),
27        TcpStream::connect(&addr),
28    )
29    .await
30    .map_err(|_| anyhow!("Connection to relay timed out (10s)"))?
31    .map_err(|e| anyhow!("Failed to connect to relay {}: {}", addr, e))?;
32    Ok(stream)
33}
34
35async fn read_line(
36    reader: &mut BufReader<tokio::io::ReadHalf<TcpStream>>,
37) -> anyhow::Result<String> {
38    let mut line = String::new();
39    let n = reader.read_line(&mut line).await?;
40    if n == 0 {
41        anyhow::bail!("Relay closed connection unexpectedly");
42    }
43    Ok(line.trim_end().to_string())
44}
45
46fn check_relay_error(line: &str) -> anyhow::Result<()> {
47    if let Some(err) = line.strip_prefix("ERROR ") {
48        anyhow::bail!("Relay error: {}", err);
49    }
50    Ok(())
51}
52
53pub struct P2pDownloader;
54
55impl Default for P2pDownloader {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl P2pDownloader {
62    pub fn new() -> Self {
63        Self
64    }
65}
66
67#[async_trait]
68impl PlatformDownloader for P2pDownloader {
69    fn name(&self) -> &str {
70        "p2p"
71    }
72
73    fn can_handle(&self, url: &str) -> bool {
74        if let Some(code) = url.strip_prefix("p2p:") {
75            return words::is_valid_code(code);
76        }
77        false
78    }
79
80    async fn get_media_info(&self, url: &str) -> anyhow::Result<MediaInfo> {
81        let code = url
82            .strip_prefix("p2p:")
83            .ok_or_else(|| anyhow!("Invalid P2P URL: {}", url))?;
84
85        if !words::is_valid_code(code) {
86            anyhow::bail!("Invalid share code: {}", code);
87        }
88
89        let title = format!("P2P Transfer ({})", &code[..code.len().min(30)]);
90
91        Ok(MediaInfo {
92            title,
93            author: "P2P Transfer".to_string(),
94            platform: "p2p".to_string(),
95            duration_seconds: None,
96            thumbnail_url: None,
97            available_qualities: vec![VideoQuality {
98                label: "Original".to_string(),
99                width: 0,
100                height: 0,
101                url: url.to_string(),
102                format: "p2p".to_string(),
103            }],
104            media_type: MediaType::Video,
105            file_size_bytes: None,
106        })
107    }
108
109    async fn download(
110        &self,
111        info: &MediaInfo,
112        opts: &DownloadOptions,
113        progress: mpsc::Sender<f64>,
114    ) -> anyhow::Result<DownloadResult> {
115        let url = match info.available_qualities.first() {
116            Some(q) => &q.url,
117            None => anyhow::bail!("No URL found in MediaInfo"),
118        };
119
120        let code = url
121            .strip_prefix("p2p:")
122            .ok_or_else(|| anyhow!("Invalid P2P URL"))?;
123
124        let _ = progress.send(-2.0).await;
125
126        tracing::info!("[p2p] connecting to relay for code: {}", code);
127
128        let stream = connect_relay().await?;
129        let (read_half, mut write_half) = tokio::io::split(stream);
130        let mut reader = BufReader::new(read_half);
131
132        write_half
133            .write_all(format!("RECV {}\n", code).as_bytes())
134            .await?;
135        write_half.flush().await?;
136
137        let response = read_line(&mut reader).await?;
138        check_relay_error(&response)?;
139        if response != "READY" {
140            anyhow::bail!("Unexpected relay response: {}", response);
141        }
142
143        tracing::info!("[p2p] connected to sender via relay");
144
145        let file_name = read_line(&mut reader).await?;
146        let file_size_str = read_line(&mut reader).await?;
147        let file_size: u64 = file_size_str
148            .parse()
149            .map_err(|_| anyhow!("Invalid file size from sender: {}", file_size_str))?;
150
151        tracing::info!("[p2p] receiving: {} ({} bytes)", file_name, file_size);
152
153        write_half.write_all(b"OK\n").await?;
154        write_half.flush().await?;
155
156        let _ = progress.send(0.0).await;
157
158        let sanitized = sanitize_filename::sanitize(&file_name);
159        let output_path = opts.output_dir.join(&sanitized);
160        if let Some(parent) = output_path.parent() {
161            tokio::fs::create_dir_all(parent).await?;
162        }
163
164        let mut file = File::create(&output_path).await?;
165        let mut received: u64 = 0;
166        let mut buf = vec![0u8; CHUNK_SIZE];
167
168        while received < file_size {
169            if opts.cancel_token.is_cancelled() {
170                let _ = tokio::fs::remove_file(&output_path).await;
171                anyhow::bail!("Download cancelled");
172            }
173
174            let to_read = ((file_size - received) as usize).min(CHUNK_SIZE);
175            let n = reader.read(&mut buf[..to_read]).await?;
176            if n == 0 {
177                break;
178            }
179
180            file.write_all(&buf[..n]).await?;
181            received += n as u64;
182
183            if file_size > 0 {
184                let pct = (received as f64 / file_size as f64) * 100.0;
185                let _ = progress.send(pct).await;
186            }
187        }
188
189        file.flush().await?;
190        drop(file);
191
192        let _ = progress.send(100.0).await;
193
194        tracing::info!("[p2p] download complete: {}", output_path.display());
195
196        Ok(DownloadResult {
197            file_path: output_path,
198            file_size_bytes: received,
199            duration_seconds: 0.0,
200            torrent_id: None,
201        })
202    }
203}
204
205pub struct P2pSendSession {
206    pub code: String,
207    pub file_path: PathBuf,
208    pub file_name: String,
209    pub file_size: u64,
210    pub cancel_token: CancellationToken,
211    pub progress: Arc<tokio::sync::Mutex<f64>>,
212    pub status: Arc<tokio::sync::Mutex<String>>,
213    pub sent_bytes: Arc<tokio::sync::Mutex<u64>>,
214    pub paused: Arc<std::sync::atomic::AtomicBool>,
215}
216
217pub async fn start_send(
218    file_path: PathBuf,
219    cancel_token: CancellationToken,
220) -> anyhow::Result<P2pSendSession> {
221    let metadata = tokio::fs::metadata(&file_path)
222        .await
223        .map_err(|e| anyhow!("File not found: {}", e))?;
224
225    if !metadata.is_file() {
226        anyhow::bail!("Path is not a file: {}", file_path.display());
227    }
228
229    let file_size = metadata.len();
230    let file_name = file_path
231        .file_name()
232        .map(|n| n.to_string_lossy().to_string())
233        .unwrap_or_else(|| "file".to_string());
234
235    let code = words::generate_code();
236
237    tracing::info!("[p2p] share code generated: {}", code);
238
239    Ok(P2pSendSession {
240        code,
241        file_path,
242        file_name,
243        file_size,
244        cancel_token,
245        progress: Arc::new(tokio::sync::Mutex::new(0.0)),
246        status: Arc::new(tokio::sync::Mutex::new("waiting".to_string())),
247        sent_bytes: Arc::new(tokio::sync::Mutex::new(0)),
248        paused: Arc::new(std::sync::atomic::AtomicBool::new(false)),
249    })
250}
251
252pub async fn run_sender(session: &P2pSendSession) -> anyhow::Result<()> {
253    let cancel = session.cancel_token.clone();
254
255    *session.status.lock().await = "connecting".to_string();
256
257    tracing::info!("[p2p] connecting to relay...");
258
259    let stream = connect_relay().await?;
260    let (read_half, mut write_half) = tokio::io::split(stream);
261    let mut reader = BufReader::new(read_half);
262
263    write_half
264        .write_all(format!("SEND {}\n", session.code).as_bytes())
265        .await?;
266    write_half.flush().await?;
267
268    let response = read_line(&mut reader).await?;
269    check_relay_error(&response)?;
270    if response != "WAIT" {
271        anyhow::bail!("Unexpected relay response: {}", response);
272    }
273
274    *session.status.lock().await = "waiting_for_receiver".to_string();
275    tracing::info!("[p2p] waiting for receiver... code: {}", session.code);
276
277    let ready = tokio::select! {
278        line = read_line(&mut reader) => line?,
279        _ = cancel.cancelled() => {
280            anyhow::bail!("Send cancelled while waiting for receiver");
281        }
282    };
283
284    check_relay_error(&ready)?;
285    if ready != "READY" {
286        anyhow::bail!("Unexpected relay response: {}", ready);
287    }
288
289    *session.status.lock().await = "connected".to_string();
290    tracing::info!("[p2p] receiver connected");
291
292    let header = format!("{}\n{}\n", session.file_name, session.file_size);
293    write_half.write_all(header.as_bytes()).await?;
294    write_half.flush().await?;
295
296    let ok_response = tokio::select! {
297        line = read_line(&mut reader) => line?,
298        _ = cancel.cancelled() => {
299            anyhow::bail!("Send cancelled while waiting for OK");
300        }
301    };
302
303    if ok_response != "OK" {
304        anyhow::bail!("Receiver rejected transfer: {}", ok_response);
305    }
306
307    *session.status.lock().await = "transferring".to_string();
308    tracing::info!(
309        "[p2p] transferring: {} ({} bytes)",
310        session.file_name,
311        session.file_size
312    );
313
314    let mut file = File::open(&session.file_path).await?;
315    let mut buf = vec![0u8; CHUNK_SIZE];
316    let mut sent: u64 = 0;
317
318    loop {
319        if cancel.is_cancelled() {
320            anyhow::bail!("Send cancelled during transfer");
321        }
322
323        while session.paused.load(std::sync::atomic::Ordering::Relaxed) {
324            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
325            if cancel.is_cancelled() {
326                anyhow::bail!("Send cancelled while paused");
327            }
328        }
329
330        let n = file.read(&mut buf).await?;
331        if n == 0 {
332            break;
333        }
334
335        write_half.write_all(&buf[..n]).await?;
336        sent += n as u64;
337
338        *session.sent_bytes.lock().await = sent;
339        if session.file_size > 0 {
340            *session.progress.lock().await = (sent as f64 / session.file_size as f64) * 100.0;
341        }
342    }
343
344    write_half.flush().await?;
345    drop(write_half);
346
347    *session.progress.lock().await = 100.0;
348    *session.status.lock().await = "complete".to_string();
349
350    tracing::info!("[p2p] transfer complete: {} bytes sent", sent);
351
352    Ok(())
353}