grammers_client/client/
files.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use crate::types::{photo_sizes::PhotoSize, Downloadable, Media, Uploaded};
10use crate::utils::generate_random_id;
11use crate::Client;
12use futures_util::stream::{FuturesUnordered, StreamExt as _};
13use grammers_mtsender::InvocationError;
14use grammers_tl_types as tl;
15use std::{io::SeekFrom, path::Path, sync::Arc};
16use tokio::sync::mpsc::unbounded_channel;
17use tokio::{
18    fs,
19    io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
20    sync::Mutex as AsyncMutex,
21};
22
23pub const MIN_CHUNK_SIZE: i32 = 4 * 1024;
24pub const MAX_CHUNK_SIZE: i32 = 512 * 1024;
25const FILE_MIGRATE_ERROR: i32 = 303;
26const BIG_FILE_SIZE: usize = 10 * 1024 * 1024;
27const WORKER_COUNT: usize = 4;
28
29pub struct DownloadIter {
30    client: Client,
31    done: bool,
32    request: tl::functions::upload::GetFile,
33    photo_size_data: Option<Vec<u8>>,
34}
35
36impl DownloadIter {
37    fn new(client: &Client, downloadable: &Downloadable) -> Self {
38        match downloadable {
39            Downloadable::PhotoSize(photo_size)
40                if !matches!(photo_size, PhotoSize::Size(_) | PhotoSize::Progressive(_)) =>
41            {
42                Self::new_from_photo_size(client, photo_size.data())
43            }
44            _ => {
45                Self::new_from_file_location(client, downloadable.to_raw_input_location().unwrap())
46            }
47        }
48    }
49
50    fn new_from_file_location(client: &Client, location: tl::enums::InputFileLocation) -> Self {
51        // TODO let users tweak all the options from the request
52        // TODO cdn support
53        Self {
54            client: client.clone(),
55            done: false,
56            request: tl::functions::upload::GetFile {
57                precise: false,
58                cdn_supported: false,
59                location,
60                offset: 0,
61                limit: MAX_CHUNK_SIZE,
62            },
63            photo_size_data: None,
64        }
65    }
66
67    fn new_from_photo_size(client: &Client, data: Vec<u8>) -> Self {
68        Self {
69            client: client.clone(),
70            done: false,
71            // request is not needed, so fake one
72            request: tl::functions::upload::GetFile {
73                precise: false,
74                cdn_supported: false,
75                location: tl::enums::InputFileLocation::InputPhotoFileLocation(
76                    tl::types::InputPhotoFileLocation {
77                        id: 0,
78                        access_hash: 0,
79                        file_reference: vec![],
80                        thumb_size: "".to_string(),
81                    },
82                ),
83                offset: 0,
84                limit: MAX_CHUNK_SIZE,
85            },
86            photo_size_data: Some(data),
87        }
88    }
89
90    /// Changes the chunk size, in bytes, used to make requests. Useful if you only need to get a
91    /// small part of a file. By default, `MAX_CHUNK_SIZE` is used.
92    ///
93    /// # Panics
94    ///
95    /// Panics if `size` is not divisible by `MIN_CHUNK_SIZE`, or if `size` is not in contained in
96    /// the range `MIN_CHUNK_SIZE..=MAX_CHUNK_SIZE`.
97    pub fn chunk_size(mut self, size: i32) -> Self {
98        assert!((MIN_CHUNK_SIZE..=MAX_CHUNK_SIZE).contains(&size) && size % MIN_CHUNK_SIZE == 0);
99        self.request.limit = size;
100        self
101    }
102
103    /// Skips `n` chunks to start downloading a different offset from the file. If you want to
104    /// skip less data, modify the `chunk_size` before calling this method, and then reset it to
105    /// any value you want.
106    pub fn skip_chunks(mut self, n: i32) -> Self {
107        self.request.offset += (self.request.limit * n) as i64;
108        self
109    }
110
111    /// Fetch and return the next chunk.
112    pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
113        if self.done {
114            return Ok(None);
115        }
116
117        if let Some(data) = &self.photo_size_data {
118            self.done = true;
119            return Ok(Some(data.clone()));
120        }
121
122        use tl::enums::upload::File;
123
124        // TODO handle maybe FILEREF_UPGRADE_NEEDED
125        let mut dc: Option<u32> = None;
126        loop {
127            let result = match dc.take() {
128                None => self.client.invoke(&self.request).await,
129                Some(dc) => self.client.invoke_in_dc(&self.request, dc as i32).await,
130            };
131
132            break match result {
133                Ok(File::File(f)) => {
134                    if f.bytes.len() < self.request.limit as usize {
135                        self.done = true;
136                        if f.bytes.is_empty() {
137                            return Ok(None);
138                        }
139                    }
140
141                    self.request.offset += self.request.limit as i64;
142                    Ok(Some(f.bytes))
143                }
144                Ok(File::CdnRedirect(_)) => {
145                    panic!("API returned File::CdnRedirect even though cdn_supported = false");
146                }
147                Err(InvocationError::Rpc(err)) if err.code == FILE_MIGRATE_ERROR => {
148                    dc = err.value;
149                    continue;
150                }
151                Err(e) => Err(e),
152            };
153        }
154    }
155}
156
157/// Method implementations related to uploading or downloading files.
158impl Client {
159    /// Returns a new iterator over the contents of a media document that will be downloaded.
160    ///
161    /// # Examples
162    ///
163    /// ```
164    /// # async fn f(downloadable: grammers_client::types::Downloadable, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
165    /// let mut file_bytes = Vec::new();
166    /// let mut download = client.iter_download(&downloadable);
167    ///
168    /// while let Some(chunk) = download.next().await? {
169    ///     file_bytes.extend(chunk);
170    /// }
171    ///
172    /// // The file is now downloaded in-memory, inside `file_bytes`!
173    /// # Ok(())
174    /// # }
175    /// ```
176    pub fn iter_download(&self, downloadable: &Downloadable) -> DownloadIter {
177        DownloadIter::new(self, downloadable)
178    }
179
180    /// Downloads a media file into the specified path.
181    ///
182    /// If the file already exists, it will be overwritten.
183    ///
184    /// This is a small wrapper around [`Client::iter_download`] for the common case of
185    /// wanting to save the file locally.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// # async fn f(downloadable: grammers_client::types::Downloadable, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
191    /// client.download_media(&downloadable, "/home/username/photos/holidays.jpg").await?;
192    /// # Ok(())
193    /// # }
194    /// ```
195    pub async fn download_media<P: AsRef<Path>>(
196        &self,
197        downloadable: &Downloadable,
198        path: P,
199    ) -> Result<(), io::Error> {
200        // Concurrent downloader
201        if let Downloadable::Media(media) = downloadable {
202            if let Media::Document(document) = media {
203                if document.size() as usize > BIG_FILE_SIZE {
204                    return self
205                        .download_media_concurrent(media, path, WORKER_COUNT)
206                        .await;
207                }
208            }
209        }
210
211        if downloadable.to_raw_input_location().is_none() {
212            let data = match downloadable {
213                Downloadable::PhotoSize(photo_size)
214                    if !matches!(photo_size, PhotoSize::Size(_) | PhotoSize::Progressive(_)) =>
215                {
216                    photo_size.data()
217                }
218                _ => {
219                    return Err(io::Error::new(
220                        io::ErrorKind::Other,
221                        "media not downloadable",
222                    ));
223                }
224            };
225
226            if !data.is_empty() {
227                let mut file = fs::File::create(&path).await.unwrap();
228                file.write_all(&data).await.unwrap();
229            }
230
231            return Ok(());
232        }
233
234        let mut download = self.iter_download(downloadable);
235        Client::load(path, &mut download).await
236    }
237
238    async fn load<P: AsRef<Path>>(path: P, download: &mut DownloadIter) -> Result<(), io::Error> {
239        let mut file = fs::File::create(path).await?;
240        while let Some(chunk) = download
241            .next()
242            .await
243            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
244        {
245            file.write_all(&chunk).await?;
246        }
247
248        Ok(())
249    }
250
251    /// Downloads a `Document` to specified path using multiple connections
252    async fn download_media_concurrent<P: AsRef<Path>>(
253        &self,
254        media: &Media,
255        path: P,
256        workers: usize,
257    ) -> Result<(), io::Error> {
258        let document = match media {
259            Media::Document(document) => document,
260            _ => panic!("Only Document type is supported!"),
261        };
262        let size = document.size();
263        let location = media.to_raw_input_location().unwrap();
264        // Allocate
265        let mut file = fs::File::create(path).await?;
266        file.set_len(size as u64).await?;
267        file.seek(SeekFrom::Start(0)).await?;
268
269        // Start workers
270        let (tx, mut rx) = unbounded_channel();
271        let part_index = Arc::new(tokio::sync::Mutex::new(0));
272        let mut tasks = vec![];
273        for _ in 0..workers {
274            let location = location.clone();
275            let tx = tx.clone();
276            let part_index = part_index.clone();
277            let client = self.clone();
278            let task = tokio::task::spawn(async move {
279                let mut retry_offset = None;
280                let mut dc = None;
281                loop {
282                    // Calculate file offset
283                    let offset: i64 = {
284                        if let Some(offset) = retry_offset {
285                            retry_offset = None;
286                            offset
287                        } else {
288                            let mut i = part_index.lock().await;
289                            *i += 1;
290                            (MAX_CHUNK_SIZE * (*i - 1)) as i64
291                        }
292                    };
293                    if offset > size {
294                        break;
295                    }
296                    // Fetch from telegram
297                    let request = &tl::functions::upload::GetFile {
298                        precise: true,
299                        cdn_supported: false,
300                        location: location.clone(),
301                        offset,
302                        limit: MAX_CHUNK_SIZE,
303                    };
304                    let res = match dc {
305                        None => client.invoke(request).await,
306                        Some(dc) => client.invoke_in_dc(request, dc as i32).await,
307                    };
308                    match res {
309                        Ok(tl::enums::upload::File::File(file)) => {
310                            tx.send((offset as u64, file.bytes)).unwrap();
311                        }
312                        Ok(tl::enums::upload::File::CdnRedirect(_)) => {
313                            panic!(
314                                "API returned File::CdnRedirect even though cdn_supported = false"
315                            );
316                        }
317                        Err(InvocationError::Rpc(err)) => {
318                            if err.code == FILE_MIGRATE_ERROR {
319                                dc = err.value;
320                                retry_offset = Some(offset);
321                                continue;
322                            }
323                            return Err(InvocationError::Rpc(err));
324                        }
325                        Err(e) => return Err(e),
326                    }
327                }
328                Ok::<(), InvocationError>(())
329            });
330            tasks.push(task);
331        }
332        drop(tx);
333
334        // File write loop
335        let mut pos = 0;
336        while let Some((offset, data)) = rx.recv().await {
337            if offset != pos {
338                file.seek(SeekFrom::Start(offset)).await?;
339            }
340            file.write_all(&data).await?;
341            pos = offset + data.len() as u64;
342        }
343
344        // Check if all tasks finished succesfully
345        for task in tasks {
346            task.await?
347                .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
348        }
349        Ok(())
350    }
351
352    /// Uploads an async stream to Telegram servers.
353    ///
354    /// The file is not sent to any chat, but can be used as media when sending messages for a
355    /// certain period of time (less than a day). You can use this uploaded file multiple times.
356    ///
357    /// Refer to [`InputMessage`] to learn more uses for `uploaded_file`.
358    ///
359    /// The stream size must be known beforehand. If this is not possible, you might need to
360    /// process the entire async stream to determine its size, and then use the size and the
361    /// downloaded buffer.
362    ///
363    /// The stream size may be less or equal to the actual length of the stream, but not more.
364    /// If it's less, you may continue to read from the stream after the method returns.
365    /// If it's more, the method will fail because it does not have enough data to read.
366    ///
367    /// Note that Telegram uses the file name in certain methods, for example, to make sure the
368    /// file is an image when trying to use send the file as photo media, so it is important that
369    /// the file name at least uses the right extension, even if the name is a dummy value.
370    /// If the input file name is empty, the non-empty dummy value "a" will be used instead.
371    /// Because it has no extension, you may not be able to use the file in certain methods.
372    ///
373    /// # Examples
374    ///
375    /// ```
376    /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client, some_vec: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
377    /// use grammers_client::InputMessage;
378    ///
379    /// // In-memory `Vec<u8>` buffers can be used as async streams
380    /// let size = some_vec.len();
381    /// let mut stream = std::io::Cursor::new(some_vec);
382    /// let uploaded_file = client.upload_stream(&mut stream, size, "sleep.jpg".to_string()).await?;
383    ///
384    /// client.send_message(&chat, InputMessage::text("Zzz...").photo(uploaded_file)).await?;
385    /// # Ok(())
386    /// # }
387    /// ```
388    ///
389    /// [`InputMessage`]: crate::types::InputMessage
390    pub async fn upload_stream<S: AsyncRead + Unpin>(
391        &self,
392        stream: &mut S,
393        size: usize,
394        name: String,
395    ) -> Result<Uploaded, io::Error> {
396        let file_id = generate_random_id();
397        let name = if name.is_empty() {
398            "a".to_string()
399        } else {
400            name
401        };
402
403        let big_file = size > BIG_FILE_SIZE;
404        let parts = PartStream::new(stream, size);
405        let total_parts = parts.total_parts();
406
407        if big_file {
408            let parts = Arc::new(parts);
409            let mut tasks = FuturesUnordered::new();
410            for _ in 0..WORKER_COUNT {
411                let handle = self.clone();
412                let parts = Arc::clone(&parts);
413                let task = async move {
414                    while let Some((part, bytes)) = parts.next_part().await? {
415                        let ok = handle
416                            .invoke(&tl::functions::upload::SaveBigFilePart {
417                                file_id,
418                                file_part: part,
419                                file_total_parts: total_parts,
420                                bytes,
421                            })
422                            .await
423                            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425                        if !ok {
426                            return Err(io::Error::new(
427                                io::ErrorKind::Other,
428                                "server failed to store uploaded data",
429                            ));
430                        }
431                    }
432                    Ok(())
433                };
434                tasks.push(task);
435            }
436
437            while let Some(res) = tasks.next().await {
438                res?;
439            }
440
441            Ok(Uploaded::from_raw(
442                tl::types::InputFileBig {
443                    id: file_id,
444                    parts: total_parts,
445                    name,
446                }
447                .into(),
448            ))
449        } else {
450            let mut md5 = md5::Context::new();
451            while let Some((part, bytes)) = parts.next_part().await? {
452                md5.consume(&bytes);
453                let ok = self
454                    .invoke(&tl::functions::upload::SaveFilePart {
455                        file_id,
456                        file_part: part,
457                        bytes,
458                    })
459                    .await
460                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
461
462                if !ok {
463                    return Err(io::Error::new(
464                        io::ErrorKind::Other,
465                        "server failed to store uploaded data",
466                    ));
467                }
468            }
469            Ok(Uploaded::from_raw(
470                tl::types::InputFile {
471                    id: file_id,
472                    parts: total_parts,
473                    name,
474                    md5_checksum: format!("{:x}", md5.compute()),
475                }
476                .into(),
477            ))
478        }
479    }
480
481    /// Uploads a local file to Telegram servers.
482    ///
483    /// The file is not sent to any chat, but can be used as media when sending messages for a
484    /// certain period of time (less than a day). You can use this uploaded file multiple times.
485    ///
486    /// Refer to [`InputMessage`] to learn more uses for `uploaded_file`.
487    ///
488    /// If you need more control over the uploaded data, such as performing only a partial upload
489    /// or with a different name, use [`Client::upload_stream`] instead.
490    ///
491    /// # Examples
492    ///
493    /// ```
494    /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
495    /// use grammers_client::InputMessage;
496    ///
497    /// let uploaded_file = client.upload_file("/home/username/photos/holidays.jpg").await?;
498    ///
499    /// client.send_message(&chat, InputMessage::text("Check this out!").photo(uploaded_file)).await?;
500    /// # Ok(())
501    /// # }
502    /// ```
503    ///
504    /// [`InputMessage`]: crate::InputMessage
505    pub async fn upload_file<P: AsRef<Path>>(&self, path: P) -> Result<Uploaded, io::Error> {
506        let path = path.as_ref();
507
508        let mut file = fs::File::open(path).await?;
509        let size = file.seek(SeekFrom::End(0)).await? as usize;
510        file.seek(SeekFrom::Start(0)).await?;
511
512        // File name will only be `None` for `..` path, and directories cannot be uploaded as
513        // files, so it's fine to unwrap.
514        let name = path.file_name().unwrap().to_string_lossy().to_string();
515
516        self.upload_stream(&mut file, size, name).await
517    }
518}
519
520struct PartStreamInner<'a, S: AsyncRead + Unpin> {
521    stream: &'a mut S,
522    current_part: i32,
523}
524
525struct PartStream<'a, S: AsyncRead + Unpin> {
526    inner: AsyncMutex<PartStreamInner<'a, S>>,
527    total_parts: i32,
528}
529
530impl<'a, S: AsyncRead + Unpin> PartStream<'a, S> {
531    fn new(stream: &'a mut S, size: usize) -> Self {
532        let total_parts = ((size + MAX_CHUNK_SIZE as usize - 1) / MAX_CHUNK_SIZE as usize) as i32;
533        Self {
534            inner: AsyncMutex::new(PartStreamInner {
535                stream,
536                current_part: 0,
537            }),
538            total_parts,
539        }
540    }
541
542    fn total_parts(&self) -> i32 {
543        self.total_parts
544    }
545
546    async fn next_part(&self) -> Result<Option<(i32, Vec<u8>)>, io::Error> {
547        let mut lock = self.inner.lock().await;
548        if lock.current_part >= self.total_parts {
549            return Ok(None);
550        }
551        let mut read = 0;
552        let mut buffer = vec![0; MAX_CHUNK_SIZE as usize];
553
554        while read != buffer.len() {
555            let n = lock.stream.read(&mut buffer[read..]).await?;
556            if n == 0 {
557                if lock.current_part == self.total_parts - 1 {
558                    break;
559                } else {
560                    return Err(io::Error::new(
561                        io::ErrorKind::UnexpectedEof,
562                        "reached EOF before reaching the last file part",
563                    ));
564                }
565            }
566            read += n;
567        }
568
569        let bytes = if read == buffer.len() {
570            buffer
571        } else {
572            buffer[..read].to_vec()
573        };
574
575        let res = Ok(Some((lock.current_part, bytes)));
576        lock.current_part += 1;
577        res
578    }
579}