grammers_client/client/files.rs
1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use crate::types::{photo_sizes::PhotoSize, Downloadable, Media, Uploaded};
10use crate::utils::generate_random_id;
11use crate::Client;
12use futures_util::stream::{FuturesUnordered, StreamExt as _};
13use grammers_mtsender::InvocationError;
14use grammers_tl_types as tl;
15use std::{io::SeekFrom, path::Path, sync::Arc};
16use tokio::sync::mpsc::unbounded_channel;
17use tokio::{
18 fs,
19 io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
20 sync::Mutex as AsyncMutex,
21};
22
23pub const MIN_CHUNK_SIZE: i32 = 4 * 1024;
24pub const MAX_CHUNK_SIZE: i32 = 512 * 1024;
25const FILE_MIGRATE_ERROR: i32 = 303;
26const BIG_FILE_SIZE: usize = 10 * 1024 * 1024;
27const WORKER_COUNT: usize = 4;
28
29pub struct DownloadIter {
30 client: Client,
31 done: bool,
32 request: tl::functions::upload::GetFile,
33 photo_size_data: Option<Vec<u8>>,
34}
35
36impl DownloadIter {
37 fn new(client: &Client, downloadable: &Downloadable) -> Self {
38 match downloadable {
39 Downloadable::PhotoSize(photo_size)
40 if !matches!(photo_size, PhotoSize::Size(_) | PhotoSize::Progressive(_)) =>
41 {
42 Self::new_from_photo_size(client, photo_size.data())
43 }
44 _ => {
45 Self::new_from_file_location(client, downloadable.to_raw_input_location().unwrap())
46 }
47 }
48 }
49
50 fn new_from_file_location(client: &Client, location: tl::enums::InputFileLocation) -> Self {
51 // TODO let users tweak all the options from the request
52 // TODO cdn support
53 Self {
54 client: client.clone(),
55 done: false,
56 request: tl::functions::upload::GetFile {
57 precise: false,
58 cdn_supported: false,
59 location,
60 offset: 0,
61 limit: MAX_CHUNK_SIZE,
62 },
63 photo_size_data: None,
64 }
65 }
66
67 fn new_from_photo_size(client: &Client, data: Vec<u8>) -> Self {
68 Self {
69 client: client.clone(),
70 done: false,
71 // request is not needed, so fake one
72 request: tl::functions::upload::GetFile {
73 precise: false,
74 cdn_supported: false,
75 location: tl::enums::InputFileLocation::InputPhotoFileLocation(
76 tl::types::InputPhotoFileLocation {
77 id: 0,
78 access_hash: 0,
79 file_reference: vec![],
80 thumb_size: "".to_string(),
81 },
82 ),
83 offset: 0,
84 limit: MAX_CHUNK_SIZE,
85 },
86 photo_size_data: Some(data),
87 }
88 }
89
90 /// Changes the chunk size, in bytes, used to make requests. Useful if you only need to get a
91 /// small part of a file. By default, `MAX_CHUNK_SIZE` is used.
92 ///
93 /// # Panics
94 ///
95 /// Panics if `size` is not divisible by `MIN_CHUNK_SIZE`, or if `size` is not in contained in
96 /// the range `MIN_CHUNK_SIZE..=MAX_CHUNK_SIZE`.
97 pub fn chunk_size(mut self, size: i32) -> Self {
98 assert!((MIN_CHUNK_SIZE..=MAX_CHUNK_SIZE).contains(&size) && size % MIN_CHUNK_SIZE == 0);
99 self.request.limit = size;
100 self
101 }
102
103 /// Skips `n` chunks to start downloading a different offset from the file. If you want to
104 /// skip less data, modify the `chunk_size` before calling this method, and then reset it to
105 /// any value you want.
106 pub fn skip_chunks(mut self, n: i32) -> Self {
107 self.request.offset += (self.request.limit * n) as i64;
108 self
109 }
110
111 /// Fetch and return the next chunk.
112 pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
113 if self.done {
114 return Ok(None);
115 }
116
117 if let Some(data) = &self.photo_size_data {
118 self.done = true;
119 return Ok(Some(data.clone()));
120 }
121
122 use tl::enums::upload::File;
123
124 // TODO handle maybe FILEREF_UPGRADE_NEEDED
125 let mut dc: Option<u32> = None;
126 loop {
127 let result = match dc.take() {
128 None => self.client.invoke(&self.request).await,
129 Some(dc) => self.client.invoke_in_dc(&self.request, dc as i32).await,
130 };
131
132 break match result {
133 Ok(File::File(f)) => {
134 if f.bytes.len() < self.request.limit as usize {
135 self.done = true;
136 if f.bytes.is_empty() {
137 return Ok(None);
138 }
139 }
140
141 self.request.offset += self.request.limit as i64;
142 Ok(Some(f.bytes))
143 }
144 Ok(File::CdnRedirect(_)) => {
145 panic!("API returned File::CdnRedirect even though cdn_supported = false");
146 }
147 Err(InvocationError::Rpc(err)) if err.code == FILE_MIGRATE_ERROR => {
148 dc = err.value;
149 continue;
150 }
151 Err(e) => Err(e),
152 };
153 }
154 }
155}
156
157/// Method implementations related to uploading or downloading files.
158impl Client {
159 /// Returns a new iterator over the contents of a media document that will be downloaded.
160 ///
161 /// # Examples
162 ///
163 /// ```
164 /// # async fn f(downloadable: grammers_client::types::Downloadable, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
165 /// let mut file_bytes = Vec::new();
166 /// let mut download = client.iter_download(&downloadable);
167 ///
168 /// while let Some(chunk) = download.next().await? {
169 /// file_bytes.extend(chunk);
170 /// }
171 ///
172 /// // The file is now downloaded in-memory, inside `file_bytes`!
173 /// # Ok(())
174 /// # }
175 /// ```
176 pub fn iter_download(&self, downloadable: &Downloadable) -> DownloadIter {
177 DownloadIter::new(self, downloadable)
178 }
179
180 /// Downloads a media file into the specified path.
181 ///
182 /// If the file already exists, it will be overwritten.
183 ///
184 /// This is a small wrapper around [`Client::iter_download`] for the common case of
185 /// wanting to save the file locally.
186 ///
187 /// # Examples
188 ///
189 /// ```
190 /// # async fn f(downloadable: grammers_client::types::Downloadable, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
191 /// client.download_media(&downloadable, "/home/username/photos/holidays.jpg").await?;
192 /// # Ok(())
193 /// # }
194 /// ```
195 pub async fn download_media<P: AsRef<Path>>(
196 &self,
197 downloadable: &Downloadable,
198 path: P,
199 ) -> Result<(), io::Error> {
200 // Concurrent downloader
201 if let Downloadable::Media(media) = downloadable {
202 if let Media::Document(document) = media {
203 if document.size() as usize > BIG_FILE_SIZE {
204 return self
205 .download_media_concurrent(media, path, WORKER_COUNT)
206 .await;
207 }
208 }
209 }
210
211 if downloadable.to_raw_input_location().is_none() {
212 let data = match downloadable {
213 Downloadable::PhotoSize(photo_size)
214 if !matches!(photo_size, PhotoSize::Size(_) | PhotoSize::Progressive(_)) =>
215 {
216 photo_size.data()
217 }
218 _ => {
219 return Err(io::Error::new(
220 io::ErrorKind::Other,
221 "media not downloadable",
222 ));
223 }
224 };
225
226 if !data.is_empty() {
227 let mut file = fs::File::create(&path).await.unwrap();
228 file.write_all(&data).await.unwrap();
229 }
230
231 return Ok(());
232 }
233
234 let mut download = self.iter_download(downloadable);
235 Client::load(path, &mut download).await
236 }
237
238 async fn load<P: AsRef<Path>>(path: P, download: &mut DownloadIter) -> Result<(), io::Error> {
239 let mut file = fs::File::create(path).await?;
240 while let Some(chunk) = download
241 .next()
242 .await
243 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
244 {
245 file.write_all(&chunk).await?;
246 }
247
248 Ok(())
249 }
250
251 /// Downloads a `Document` to specified path using multiple connections
252 async fn download_media_concurrent<P: AsRef<Path>>(
253 &self,
254 media: &Media,
255 path: P,
256 workers: usize,
257 ) -> Result<(), io::Error> {
258 let document = match media {
259 Media::Document(document) => document,
260 _ => panic!("Only Document type is supported!"),
261 };
262 let size = document.size();
263 let location = media.to_raw_input_location().unwrap();
264 // Allocate
265 let mut file = fs::File::create(path).await?;
266 file.set_len(size as u64).await?;
267 file.seek(SeekFrom::Start(0)).await?;
268
269 // Start workers
270 let (tx, mut rx) = unbounded_channel();
271 let part_index = Arc::new(tokio::sync::Mutex::new(0));
272 let mut tasks = vec![];
273 for _ in 0..workers {
274 let location = location.clone();
275 let tx = tx.clone();
276 let part_index = part_index.clone();
277 let client = self.clone();
278 let task = tokio::task::spawn(async move {
279 let mut retry_offset = None;
280 let mut dc = None;
281 loop {
282 // Calculate file offset
283 let offset: i64 = {
284 if let Some(offset) = retry_offset {
285 retry_offset = None;
286 offset
287 } else {
288 let mut i = part_index.lock().await;
289 *i += 1;
290 (MAX_CHUNK_SIZE * (*i - 1)) as i64
291 }
292 };
293 if offset > size {
294 break;
295 }
296 // Fetch from telegram
297 let request = &tl::functions::upload::GetFile {
298 precise: true,
299 cdn_supported: false,
300 location: location.clone(),
301 offset,
302 limit: MAX_CHUNK_SIZE,
303 };
304 let res = match dc {
305 None => client.invoke(request).await,
306 Some(dc) => client.invoke_in_dc(request, dc as i32).await,
307 };
308 match res {
309 Ok(tl::enums::upload::File::File(file)) => {
310 tx.send((offset as u64, file.bytes)).unwrap();
311 }
312 Ok(tl::enums::upload::File::CdnRedirect(_)) => {
313 panic!(
314 "API returned File::CdnRedirect even though cdn_supported = false"
315 );
316 }
317 Err(InvocationError::Rpc(err)) => {
318 if err.code == FILE_MIGRATE_ERROR {
319 dc = err.value;
320 retry_offset = Some(offset);
321 continue;
322 }
323 return Err(InvocationError::Rpc(err));
324 }
325 Err(e) => return Err(e),
326 }
327 }
328 Ok::<(), InvocationError>(())
329 });
330 tasks.push(task);
331 }
332 drop(tx);
333
334 // File write loop
335 let mut pos = 0;
336 while let Some((offset, data)) = rx.recv().await {
337 if offset != pos {
338 file.seek(SeekFrom::Start(offset)).await?;
339 }
340 file.write_all(&data).await?;
341 pos = offset + data.len() as u64;
342 }
343
344 // Check if all tasks finished succesfully
345 for task in tasks {
346 task.await?
347 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
348 }
349 Ok(())
350 }
351
352 /// Uploads an async stream to Telegram servers.
353 ///
354 /// The file is not sent to any chat, but can be used as media when sending messages for a
355 /// certain period of time (less than a day). You can use this uploaded file multiple times.
356 ///
357 /// Refer to [`InputMessage`] to learn more uses for `uploaded_file`.
358 ///
359 /// The stream size must be known beforehand. If this is not possible, you might need to
360 /// process the entire async stream to determine its size, and then use the size and the
361 /// downloaded buffer.
362 ///
363 /// The stream size may be less or equal to the actual length of the stream, but not more.
364 /// If it's less, you may continue to read from the stream after the method returns.
365 /// If it's more, the method will fail because it does not have enough data to read.
366 ///
367 /// Note that Telegram uses the file name in certain methods, for example, to make sure the
368 /// file is an image when trying to use send the file as photo media, so it is important that
369 /// the file name at least uses the right extension, even if the name is a dummy value.
370 /// If the input file name is empty, the non-empty dummy value "a" will be used instead.
371 /// Because it has no extension, you may not be able to use the file in certain methods.
372 ///
373 /// # Examples
374 ///
375 /// ```
376 /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client, some_vec: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
377 /// use grammers_client::InputMessage;
378 ///
379 /// // In-memory `Vec<u8>` buffers can be used as async streams
380 /// let size = some_vec.len();
381 /// let mut stream = std::io::Cursor::new(some_vec);
382 /// let uploaded_file = client.upload_stream(&mut stream, size, "sleep.jpg".to_string()).await?;
383 ///
384 /// client.send_message(&chat, InputMessage::text("Zzz...").photo(uploaded_file)).await?;
385 /// # Ok(())
386 /// # }
387 /// ```
388 ///
389 /// [`InputMessage`]: crate::types::InputMessage
390 pub async fn upload_stream<S: AsyncRead + Unpin>(
391 &self,
392 stream: &mut S,
393 size: usize,
394 name: String,
395 ) -> Result<Uploaded, io::Error> {
396 let file_id = generate_random_id();
397 let name = if name.is_empty() {
398 "a".to_string()
399 } else {
400 name
401 };
402
403 let big_file = size > BIG_FILE_SIZE;
404 let parts = PartStream::new(stream, size);
405 let total_parts = parts.total_parts();
406
407 if big_file {
408 let parts = Arc::new(parts);
409 let mut tasks = FuturesUnordered::new();
410 for _ in 0..WORKER_COUNT {
411 let handle = self.clone();
412 let parts = Arc::clone(&parts);
413 let task = async move {
414 while let Some((part, bytes)) = parts.next_part().await? {
415 let ok = handle
416 .invoke(&tl::functions::upload::SaveBigFilePart {
417 file_id,
418 file_part: part,
419 file_total_parts: total_parts,
420 bytes,
421 })
422 .await
423 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425 if !ok {
426 return Err(io::Error::new(
427 io::ErrorKind::Other,
428 "server failed to store uploaded data",
429 ));
430 }
431 }
432 Ok(())
433 };
434 tasks.push(task);
435 }
436
437 while let Some(res) = tasks.next().await {
438 res?;
439 }
440
441 Ok(Uploaded::from_raw(
442 tl::types::InputFileBig {
443 id: file_id,
444 parts: total_parts,
445 name,
446 }
447 .into(),
448 ))
449 } else {
450 let mut md5 = md5::Context::new();
451 while let Some((part, bytes)) = parts.next_part().await? {
452 md5.consume(&bytes);
453 let ok = self
454 .invoke(&tl::functions::upload::SaveFilePart {
455 file_id,
456 file_part: part,
457 bytes,
458 })
459 .await
460 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
461
462 if !ok {
463 return Err(io::Error::new(
464 io::ErrorKind::Other,
465 "server failed to store uploaded data",
466 ));
467 }
468 }
469 Ok(Uploaded::from_raw(
470 tl::types::InputFile {
471 id: file_id,
472 parts: total_parts,
473 name,
474 md5_checksum: format!("{:x}", md5.compute()),
475 }
476 .into(),
477 ))
478 }
479 }
480
481 /// Uploads a local file to Telegram servers.
482 ///
483 /// The file is not sent to any chat, but can be used as media when sending messages for a
484 /// certain period of time (less than a day). You can use this uploaded file multiple times.
485 ///
486 /// Refer to [`InputMessage`] to learn more uses for `uploaded_file`.
487 ///
488 /// If you need more control over the uploaded data, such as performing only a partial upload
489 /// or with a different name, use [`Client::upload_stream`] instead.
490 ///
491 /// # Examples
492 ///
493 /// ```
494 /// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
495 /// use grammers_client::InputMessage;
496 ///
497 /// let uploaded_file = client.upload_file("/home/username/photos/holidays.jpg").await?;
498 ///
499 /// client.send_message(&chat, InputMessage::text("Check this out!").photo(uploaded_file)).await?;
500 /// # Ok(())
501 /// # }
502 /// ```
503 ///
504 /// [`InputMessage`]: crate::InputMessage
505 pub async fn upload_file<P: AsRef<Path>>(&self, path: P) -> Result<Uploaded, io::Error> {
506 let path = path.as_ref();
507
508 let mut file = fs::File::open(path).await?;
509 let size = file.seek(SeekFrom::End(0)).await? as usize;
510 file.seek(SeekFrom::Start(0)).await?;
511
512 // File name will only be `None` for `..` path, and directories cannot be uploaded as
513 // files, so it's fine to unwrap.
514 let name = path.file_name().unwrap().to_string_lossy().to_string();
515
516 self.upload_stream(&mut file, size, name).await
517 }
518}
519
520struct PartStreamInner<'a, S: AsyncRead + Unpin> {
521 stream: &'a mut S,
522 current_part: i32,
523}
524
525struct PartStream<'a, S: AsyncRead + Unpin> {
526 inner: AsyncMutex<PartStreamInner<'a, S>>,
527 total_parts: i32,
528}
529
530impl<'a, S: AsyncRead + Unpin> PartStream<'a, S> {
531 fn new(stream: &'a mut S, size: usize) -> Self {
532 let total_parts = ((size + MAX_CHUNK_SIZE as usize - 1) / MAX_CHUNK_SIZE as usize) as i32;
533 Self {
534 inner: AsyncMutex::new(PartStreamInner {
535 stream,
536 current_part: 0,
537 }),
538 total_parts,
539 }
540 }
541
542 fn total_parts(&self) -> i32 {
543 self.total_parts
544 }
545
546 async fn next_part(&self) -> Result<Option<(i32, Vec<u8>)>, io::Error> {
547 let mut lock = self.inner.lock().await;
548 if lock.current_part >= self.total_parts {
549 return Ok(None);
550 }
551 let mut read = 0;
552 let mut buffer = vec![0; MAX_CHUNK_SIZE as usize];
553
554 while read != buffer.len() {
555 let n = lock.stream.read(&mut buffer[read..]).await?;
556 if n == 0 {
557 if lock.current_part == self.total_parts - 1 {
558 break;
559 } else {
560 return Err(io::Error::new(
561 io::ErrorKind::UnexpectedEof,
562 "reached EOF before reaching the last file part",
563 ));
564 }
565 }
566 read += n;
567 }
568
569 let bytes = if read == buffer.len() {
570 buffer
571 } else {
572 buffer[..read].to_vec()
573 };
574
575 let res = Ok(Some((lock.current_part, bytes)));
576 lock.current_part += 1;
577 res
578 }
579}