Skip to main content

layer_client/
media.rs

1//! Media upload, download, and typed media wrappers (G-41 / G-42 / G-43 / G-44).
2//!
3//! ## Upload
4//! - [`Client::upload_file`]   — sequential (small files, < 10 MB)
5//! - [`Client::upload_file_concurrent`] — **G-41** parallel worker pool (big files)
6//! - [`Client::upload_stream`] — reads AsyncRead → calls upload_file
7//!
8//! ## Download
9//! - [`Client::iter_download`]          — chunk-by-chunk streaming
10//! - [`Client::download_media`]         — collect all bytes
11//! - [`Client::download_media_concurrent`] — **G-42** parallel multi-worker download
12//!
13//! ## Typed wrappers (G-43)
14//! [`Photo`], [`Document`], [`Sticker`] — ergonomic accessors over raw TL types.
15//!
16//! ## Downloadable trait (G-44)
17//! [`Downloadable`] — implemented by Photo, Document, Sticker so you can pass
18//! any of them to `iter_download` / `download_media`.
19
20use std::sync::Arc;
21
22use layer_tl_types as tl;
23use layer_tl_types::{Cursor, Deserializable};
24use tokio::io::AsyncRead;
25use tokio::io::AsyncReadExt;
26use tokio::sync::Mutex;
27
28use crate::{Client, InvocationError};
29
30// ─── Constants ────────────────────────────────────────────────────────────────
31
32/// Chunk size used for uploads and downloads (512 KB).
33pub const UPLOAD_CHUNK_SIZE: i32   = 512 * 1024;
34pub const DOWNLOAD_CHUNK_SIZE: i32 = 512 * 1024;
35/// Files larger than this use `SaveBigFilePart` and the parallel upload path.
36const BIG_FILE_THRESHOLD: usize = 10 * 1024 * 1024;
37/// Number of parallel workers for concurrent transfer.
38const WORKER_COUNT: usize = 4;
39
40// ─── UploadedFile ─────────────────────────────────────────────────────────────
41
42/// A successfully uploaded file handle, ready to be sent as media.
43#[derive(Debug, Clone)]
44pub struct UploadedFile {
45    pub(crate) inner:     tl::enums::InputFile,
46    pub(crate) mime_type: String,
47    pub(crate) name:      String,
48}
49
50impl UploadedFile {
51    pub fn mime_type(&self) -> &str { &self.mime_type }
52    pub fn name(&self)      -> &str { &self.name }
53
54    /// Wrap as `InputMedia` for sending as a document.
55    pub fn as_document_media(&self) -> tl::enums::InputMedia {
56        tl::enums::InputMedia::UploadedDocument(tl::types::InputMediaUploadedDocument {
57            nosound_video:    false,
58            force_file:       false,
59            spoiler:          false,
60            file:             self.inner.clone(),
61            thumb:            None,
62            mime_type:        self.mime_type.clone(),
63            attributes:       vec![tl::enums::DocumentAttribute::Filename(
64                tl::types::DocumentAttributeFilename { file_name: self.name.clone() },
65            )],
66            stickers:         None,
67            ttl_seconds:      None,
68            video_cover:      None,
69            video_timestamp:  None,
70        })
71    }
72
73    /// Wrap as `InputMedia` for sending as a photo.
74    pub fn as_photo_media(&self) -> tl::enums::InputMedia {
75        tl::enums::InputMedia::UploadedPhoto(tl::types::InputMediaUploadedPhoto {
76            spoiler:     false,
77            live_photo:  false,
78            file:        self.inner.clone(),
79            stickers:    None,
80            ttl_seconds: None,
81            video:       None,
82        })
83    }
84}
85
86// ─── Downloadable trait (G-44) ────────────────────────────────────────────────
87
88/// Something that can be downloaded via [`Client::iter_download`].
89///
90/// Implemented by [`Photo`], [`Document`], and [`Sticker`].
91pub trait Downloadable {
92    /// Return the `InputFileLocation` needed for `upload.getFile`.
93    fn to_input_location(&self) -> Option<tl::enums::InputFileLocation>;
94
95    /// File size in bytes, if known (used to choose the concurrent path).
96    fn size(&self) -> Option<usize> { None }
97}
98
99// ─── Typed media wrappers (G-43) ──────────────────────────────────────────────
100
101/// Ergonomic wrapper over a Telegram photo.
102#[derive(Debug, Clone)]
103pub struct Photo {
104    pub raw: tl::types::Photo,
105}
106
107impl Photo {
108    pub fn from_raw(raw: tl::types::Photo) -> Self { Self { raw } }
109
110    /// Try to extract from a `MessageMedia` variant.
111    pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
112        if let tl::enums::MessageMedia::Photo(mp) = media
113            && let Some(tl::enums::Photo::Photo(p)) = &mp.photo {
114            return Some(Self { raw: p.clone() });
115        }
116        None
117    }
118
119    pub fn id(&self)          -> i64  { self.raw.id }
120    pub fn access_hash(&self) -> i64  { self.raw.access_hash }
121    pub fn date(&self)        -> i32  { self.raw.date }
122    pub fn has_stickers(&self) -> bool { self.raw.has_stickers }
123
124    /// The largest available thumb type letter (e.g. `"s"`, `"m"`, `"x"`).
125    pub fn largest_thumb_type(&self) -> &str {
126        self.raw.sizes.iter()
127            .filter_map(|s| match s {
128                tl::enums::PhotoSize::PhotoSize(ps) => Some(ps.r#type.as_str()),
129                _ => None,
130            })
131            .next_back()
132            .unwrap_or("s")
133    }
134}
135
136impl Downloadable for Photo {
137    fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
138        Some(tl::enums::InputFileLocation::InputPhotoFileLocation(
139            tl::types::InputPhotoFileLocation {
140                id:             self.raw.id,
141                access_hash:    self.raw.access_hash,
142                file_reference: self.raw.file_reference.clone(),
143                thumb_size:     self.largest_thumb_type().to_string(),
144            },
145        ))
146    }
147}
148
149/// Ergonomic wrapper over a Telegram document (file, video, audio, …).
150#[derive(Debug, Clone)]
151pub struct Document {
152    pub raw: tl::types::Document,
153}
154
155impl Document {
156    pub fn from_raw(raw: tl::types::Document) -> Self { Self { raw } }
157
158    /// Try to extract from a `MessageMedia` variant.
159    pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
160        if let tl::enums::MessageMedia::Document(md) = media
161            && let Some(tl::enums::Document::Document(d)) = &md.document {
162            return Some(Self { raw: d.clone() });
163        }
164        None
165    }
166
167    pub fn id(&self)          -> i64    { self.raw.id }
168    pub fn access_hash(&self) -> i64    { self.raw.access_hash }
169    pub fn date(&self)        -> i32    { self.raw.date }
170    pub fn mime_type(&self)   -> &str   { &self.raw.mime_type }
171    pub fn size(&self)        -> i64    { self.raw.size }
172
173    /// File name from document attributes, if present.
174    pub fn file_name(&self) -> Option<&str> {
175        self.raw.attributes.iter().find_map(|a| match a {
176            tl::enums::DocumentAttribute::Filename(f) => Some(f.file_name.as_str()),
177            _ => None,
178        })
179    }
180
181    /// `true` if the document has animated sticker attributes.
182    pub fn is_animated(&self) -> bool {
183        self.raw.attributes.iter().any(|a| matches!(a, tl::enums::DocumentAttribute::Animated))
184    }
185}
186
187impl Downloadable for Document {
188    fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
189        Some(tl::enums::InputFileLocation::InputDocumentFileLocation(
190            tl::types::InputDocumentFileLocation {
191                id:             self.raw.id,
192                access_hash:    self.raw.access_hash,
193                file_reference: self.raw.file_reference.clone(),
194                thumb_size:     String::new(),
195            },
196        ))
197    }
198
199    fn size(&self) -> Option<usize> {
200        Some(self.raw.size as usize)
201    }
202}
203
204/// Ergonomic wrapper over a Telegram sticker (a document with sticker attributes).
205#[derive(Debug, Clone)]
206pub struct Sticker {
207    pub inner: Document,
208}
209
210impl Sticker {
211    /// Wrap a document that carries `DocumentAttributeSticker`.
212    pub fn from_document(doc: Document) -> Option<Self> {
213        let has_sticker_attr = doc.raw.attributes.iter()
214            .any(|a| matches!(a, tl::enums::DocumentAttribute::Sticker(_)));
215        if has_sticker_attr { Some(Self { inner: doc }) } else { None }
216    }
217
218    /// Try to extract directly from `MessageMedia`.
219    pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
220        Document::from_media(media).and_then(Self::from_document)
221    }
222
223    /// The emoji associated with the sticker.
224    pub fn emoji(&self) -> Option<&str> {
225        self.inner.raw.attributes.iter().find_map(|a| match a {
226            tl::enums::DocumentAttribute::Sticker(s) => Some(s.alt.as_str()),
227            _ => None,
228        })
229    }
230
231    /// `true` if this is a video sticker.
232    pub fn is_video(&self) -> bool {
233        self.inner.raw.attributes.iter()
234            .any(|a| matches!(a, tl::enums::DocumentAttribute::Video(_)))
235    }
236
237    pub fn id(&self)          -> i64  { self.inner.id() }
238    pub fn mime_type(&self)   -> &str { self.inner.mime_type() }
239}
240
241impl Downloadable for Sticker {
242    fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
243        self.inner.to_input_location()
244    }
245    fn size(&self) -> Option<usize> { Some(self.inner.raw.size as usize) }
246}
247
248// ─── DownloadIter ─────────────────────────────────────────────────────────────
249
250/// Sequential chunk-by-chunk download iterator.
251pub struct DownloadIter {
252    client:  Client,
253    request: Option<tl::functions::upload::GetFile>,
254    done:    bool,
255}
256
257impl DownloadIter {
258    /// Set a custom chunk size (must be multiple of 4096, max 524288).
259    pub fn chunk_size(mut self, size: i32) -> Self {
260        if let Some(r) = &mut self.request { r.limit = size; }
261        self
262    }
263
264    /// Fetch the next chunk. Returns `None` when the download is complete.
265    pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
266        if self.done { return Ok(None); }
267        let req = match &self.request {
268            Some(r) => r.clone(),
269            None    => return Ok(None),
270        };
271        let body = self.client.rpc_call_raw_pub(&req).await?;
272        let mut cur = Cursor::from_slice(&body);
273        match tl::enums::upload::File::deserialize(&mut cur)? {
274            tl::enums::upload::File::File(f) => {
275                if (f.bytes.len() as i32) < req.limit {
276                    self.done = true;
277                    if f.bytes.is_empty() { return Ok(None); }
278                }
279                if let Some(r) = &mut self.request { r.offset += req.limit as i64; }
280                Ok(Some(f.bytes))
281            }
282            tl::enums::upload::File::CdnRedirect(_) => {
283                self.done = true;
284                Err(InvocationError::Deserialize("CDN redirect not supported".into()))
285            }
286        }
287    }
288}
289
290// ─── Client methods ───────────────────────────────────────────────────────────
291
292impl Client {
293    // ── Upload ───────────────────────────────────────────────────────────────
294
295    /// Upload bytes sequentially. For big files (≥ 10 MB) prefer
296    /// [`upload_file_concurrent`] which uses parallel workers.
297    pub async fn upload_file(
298        &self,
299        data:      &[u8],
300        name:      &str,
301        mime_type: &str,
302    ) -> Result<UploadedFile, InvocationError> {
303        let file_id     = crate::random_i64_pub();
304        let total       = data.len();
305        let big         = total >= BIG_FILE_THRESHOLD;
306        let part_size   = UPLOAD_CHUNK_SIZE as usize;
307        let total_parts = total.div_ceil(part_size) as i32;
308
309        for (part_num, chunk) in data.chunks(part_size).enumerate() {
310            if big {
311                self.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
312                    file_id,
313                    file_part:        part_num as i32,
314                    file_total_parts: total_parts,
315                    bytes:            chunk.to_vec(),
316                }).await?;
317            } else {
318                self.rpc_call_raw_pub(&tl::functions::upload::SaveFilePart {
319                    file_id,
320                    file_part: part_num as i32,
321                    bytes:     chunk.to_vec(),
322                }).await?;
323            }
324        }
325
326        let inner = make_input_file(big, file_id, total_parts, name, data);
327        log::info!("[layer] uploaded '{}' ({} bytes, {} parts)", name, total, total_parts);
328        Ok(UploadedFile { inner, mime_type: mime_type.to_string(), name: name.to_string() })
329    }
330
331    /// **G-41** — Upload bytes using `WORKER_COUNT` (4) parallel workers.
332    ///
333    /// Only beneficial for big files (≥ 10 MB).  Falls through to sequential
334    /// for small files automatically.
335    pub async fn upload_file_concurrent(
336        &self,
337        data:      Arc<Vec<u8>>,
338        name:      &str,
339        mime_type: &str,
340    ) -> Result<UploadedFile, InvocationError> {
341        let total       = data.len();
342        let part_size   = UPLOAD_CHUNK_SIZE as usize;
343        let total_parts = total.div_ceil(part_size) as i32;
344
345        if total < BIG_FILE_THRESHOLD {
346            // Not big enough to benefit — fall back to sequential.
347            return self.upload_file(&data, name, mime_type).await;
348        }
349
350        let file_id    = crate::random_i64_pub();
351        let next_part  = Arc::new(Mutex::new(0i32));
352        let mut tasks  = tokio::task::JoinSet::new();
353
354        for _ in 0..WORKER_COUNT {
355            let client     = self.clone();
356            let data       = Arc::clone(&data);
357            let next_part  = Arc::clone(&next_part);
358
359            tasks.spawn(async move {
360                loop {
361                    let part_num = {
362                        let mut guard = next_part.lock().await;
363                        if *guard >= total_parts { break; }
364                        let n = *guard;
365                        *guard += 1;
366                        n
367                    };
368                    let start = part_num as usize * part_size;
369                    let end   = (start + part_size).min(data.len());
370                    let bytes = data[start..end].to_vec();
371
372                    client.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
373                        file_id,
374                        file_part:        part_num,
375                        file_total_parts: total_parts,
376                        bytes,
377                    }).await?;
378                }
379                Ok::<(), InvocationError>(())
380            });
381        }
382
383        while let Some(res) = tasks.join_next().await {
384            res.map_err(|e| InvocationError::Io(
385                std::io::Error::other(e.to_string())
386            ))??;
387        }
388
389        let inner = tl::enums::InputFile::Big(tl::types::InputFileBig {
390            id:    file_id,
391            parts: total_parts,
392            name:  name.to_string(),
393        });
394        log::info!("[layer] concurrent-uploaded '{}' ({} bytes, {} parts, {} workers)",
395            name, total, total_parts, WORKER_COUNT);
396        Ok(UploadedFile { inner, mime_type: mime_type.to_string(), name: name.to_string() })
397    }
398
399    /// Upload from an `AsyncRead`. Reads fully into memory then uploads.
400    pub async fn upload_stream<R: AsyncRead + Unpin>(
401        &self,
402        reader:    &mut R,
403        name:      &str,
404        mime_type: &str,
405    ) -> Result<UploadedFile, InvocationError> {
406        let mut data = Vec::new();
407        reader.read_to_end(&mut data).await?;
408        if data.len() >= BIG_FILE_THRESHOLD {
409            self.upload_file_concurrent(Arc::new(data), name, mime_type).await
410        } else {
411            self.upload_file(&data, name, mime_type).await
412        }
413    }
414
415    // ── Send ─────────────────────────────────────────────────────────────────
416
417    /// Send a file as a document or photo to a chat.
418    pub async fn send_file(
419        &self,
420        peer:    tl::enums::Peer,
421        media:   tl::enums::InputMedia,
422        caption: &str,
423    ) -> Result<(), InvocationError> {
424        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
425        let req = tl::functions::messages::SendMedia {
426            silent:                   false,
427            background:               false,
428            clear_draft:              false,
429            noforwards:               false,
430            update_stickersets_order: false,
431            invert_media:             false,
432            allow_paid_floodskip:     false,
433            peer:                     input_peer,
434            reply_to:                 None,
435            media,
436            message:                  caption.to_string(),
437            random_id:                crate::random_i64_pub(),
438            reply_markup:             None,
439            entities:                 None,
440            schedule_date:            None,
441            schedule_repeat_period:   None,
442            send_as:                  None,
443            quick_reply_shortcut:     None,
444            effect:                   None,
445            allow_paid_stars:         None,
446            suggested_post:           None,
447        };
448        self.rpc_call_raw_pub(&req).await?;
449        Ok(())
450    }
451
452    /// Send multiple files as an album.
453    pub async fn send_album(
454        &self,
455        peer:  tl::enums::Peer,
456        items: Vec<(tl::enums::InputMedia, String)>,
457    ) -> Result<(), InvocationError> {
458        let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
459        let multi: Vec<tl::enums::InputSingleMedia> = items.into_iter().map(|(media, caption)| {
460            tl::enums::InputSingleMedia::InputSingleMedia(tl::types::InputSingleMedia {
461                media,
462                random_id: crate::random_i64_pub(),
463                message:   caption,
464                entities:  None,
465            })
466        }).collect();
467        let req = tl::functions::messages::SendMultiMedia {
468            silent:                   false,
469            background:               false,
470            clear_draft:              false,
471            noforwards:               false,
472            update_stickersets_order: false,
473            invert_media:             false,
474            allow_paid_floodskip:     false,
475            peer:                     input_peer,
476            reply_to:                 None,
477            multi_media:              multi,
478            schedule_date:            None,
479            send_as:                  None,
480            quick_reply_shortcut:     None,
481            effect:                   None,
482            allow_paid_stars:         None,
483        };
484        self.rpc_call_raw_pub(&req).await?;
485        Ok(())
486    }
487
488    // ── Download ─────────────────────────────────────────────────────────────
489
490    /// Create a sequential chunk download iterator.
491    pub fn iter_download(&self, location: tl::enums::InputFileLocation) -> DownloadIter {
492        DownloadIter {
493            client:  self.clone(),
494            done:    false,
495            request: Some(tl::functions::upload::GetFile {
496                precise:       false,
497                cdn_supported: false,
498                location,
499                offset:        0,
500                limit:         DOWNLOAD_CHUNK_SIZE,
501            }),
502        }
503    }
504
505    /// Download all bytes of a media attachment at once (sequential).
506    pub async fn download_media(
507        &self,
508        location: tl::enums::InputFileLocation,
509    ) -> Result<Vec<u8>, InvocationError> {
510        let mut bytes = Vec::new();
511        let mut iter  = self.iter_download(location);
512        while let Some(chunk) = iter.next().await? {
513            bytes.extend_from_slice(&chunk);
514        }
515        Ok(bytes)
516    }
517
518    /// **G-42** — Download a file using `WORKER_COUNT` (4) parallel workers.
519    ///
520    /// `size` must be the exact byte size of the file (obtained from the
521    /// [`Downloadable::size`] accessor, or from the document's `size` field).
522    ///
523    /// Returns the full file bytes in order.
524    pub async fn download_media_concurrent(
525        &self,
526        location: tl::enums::InputFileLocation,
527        size:     usize,
528    ) -> Result<Vec<u8>, InvocationError> {
529        let chunk     = DOWNLOAD_CHUNK_SIZE as usize;
530        let n_parts   = size.div_ceil(chunk);
531        let next_part = Arc::new(Mutex::new(0usize));
532
533        // Channel: each worker sends (part_index, bytes)
534        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, Vec<u8>)>();
535        let mut tasks = tokio::task::JoinSet::new();
536
537        for _ in 0..WORKER_COUNT {
538            let client    = self.clone();
539            let location  = location.clone();
540            let next_part = Arc::clone(&next_part);
541            let tx        = tx.clone();
542
543            tasks.spawn(async move {
544                loop {
545                    let part = {
546                        let mut g = next_part.lock().await;
547                        if *g >= n_parts { break; }
548                        let p = *g; *g += 1; p
549                    };
550                    let offset = (part * chunk) as i64;
551                    let req = tl::functions::upload::GetFile {
552                        precise:       true,
553                        cdn_supported: false,
554                        location:      location.clone(),
555                        offset,
556                        limit: DOWNLOAD_CHUNK_SIZE,
557                    };
558                    let raw = client.rpc_call_raw_pub(&req).await?;
559                    let mut cur = Cursor::from_slice(&raw);
560                    if let tl::enums::upload::File::File(f) =
561                        tl::enums::upload::File::deserialize(&mut cur)?
562                    {
563                        let _ = tx.send((part, f.bytes));
564                    }
565                }
566                Ok::<(), InvocationError>(())
567            });
568        }
569        drop(tx);
570
571        // Collect all parts
572        let mut parts: Vec<Option<Vec<u8>>> = (0..n_parts).map(|_| None).collect();
573        while let Some((idx, data)) = rx.recv().await {
574            if idx < parts.len() { parts[idx] = Some(data); }
575        }
576
577        // Join workers
578        while let Some(res) = tasks.join_next().await {
579            res.map_err(|e| InvocationError::Io(
580                std::io::Error::other(e.to_string())
581            ))??;
582        }
583
584        // Assemble in order
585        let mut out = Vec::with_capacity(size);
586        for part in parts.into_iter().flatten() { out.extend_from_slice(&part); }
587        out.truncate(size);
588        Ok(out)
589    }
590
591    /// Download any [`Downloadable`] item, automatically choosing concurrent
592    /// mode for files ≥ 10 MB (G-42 / G-44 integration).
593    pub async fn download<D: Downloadable>(
594        &self,
595        item: &D,
596    ) -> Result<Vec<u8>, InvocationError> {
597        let loc = item.to_input_location()
598            .ok_or_else(|| InvocationError::Deserialize("item has no download location".into()))?;
599        match item.size() {
600            Some(sz) if sz >= BIG_FILE_THRESHOLD =>
601                self.download_media_concurrent(loc, sz).await,
602            _ =>
603                self.download_media(loc).await,
604        }
605    }
606}
607
608// ─── InputFileLocation from IncomingMessage ───────────────────────────────────
609
610impl crate::update::IncomingMessage {
611    /// Get the download location for the media in this message, if any.
612    pub fn download_location(&self) -> Option<tl::enums::InputFileLocation> {
613        let media = match &self.raw {
614            tl::enums::Message::Message(m) => m.media.as_ref()?,
615            _ => return None,
616        };
617        if let Some(doc) = Document::from_media(media) {
618            return doc.to_input_location();
619        }
620        if let Some(photo) = Photo::from_media(media) {
621            return photo.to_input_location();
622        }
623        None
624    }
625}
626
627// ─── Helpers ─────────────────────────────────────────────────────────────────
628
629fn make_input_file(
630    big:         bool,
631    file_id:     i64,
632    total_parts: i32,
633    name:        &str,
634    data:        &[u8],
635) -> tl::enums::InputFile {
636    if big {
637        tl::enums::InputFile::Big(tl::types::InputFileBig {
638            id:    file_id,
639            parts: total_parts,
640            name:  name.to_string(),
641        })
642    } else {
643        let _ = data; // MD5 omitted — Telegram accepts empty checksum
644        tl::enums::InputFile::InputFile(tl::types::InputFile {
645            id:           file_id,
646            parts:        total_parts,
647            name:         name.to_string(),
648            md5_checksum: String::new(),
649        })
650    }
651}