1use std::sync::Arc;
21
22use layer_tl_types as tl;
23use layer_tl_types::{Cursor, Deserializable};
24use tokio::io::AsyncRead;
25use tokio::io::AsyncReadExt;
26use tokio::sync::Mutex;
27
28use crate::{Client, InvocationError};
29
30pub struct AlbumItem {
34 pub media: tl::enums::InputMedia,
35 pub caption: String,
36 pub entities: Vec<tl::enums::MessageEntity>,
37 pub reply_to: Option<i32>,
38}
39
40impl AlbumItem {
41 pub fn new(media: tl::enums::InputMedia) -> Self {
42 Self {
43 media,
44 caption: String::new(),
45 entities: Vec::new(),
46 reply_to: None,
47 }
48 }
49 pub fn caption(mut self, text: impl Into<String>) -> Self {
50 self.caption = text.into();
51 self
52 }
53 pub fn reply_to(mut self, msg_id: Option<i32>) -> Self {
54 self.reply_to = msg_id;
55 self
56 }
57}
58
59impl From<(tl::enums::InputMedia, String)> for AlbumItem {
60 fn from((media, caption): (tl::enums::InputMedia, String)) -> Self {
61 Self::new(media).caption(caption)
62 }
63}
64
65pub const UPLOAD_CHUNK_SIZE: i32 = 512 * 1024;
67pub const DOWNLOAD_CHUNK_SIZE: i32 = 512 * 1024;
68const BIG_FILE_THRESHOLD: usize = 10 * 1024 * 1024;
70
71fn resolve_mime(name: &str, mime_type: &str) -> String {
74 if !mime_type.is_empty() && mime_type != "application/octet-stream" {
75 return mime_type.to_string();
76 }
77 mime_guess::from_path(name)
78 .first_or_octet_stream()
79 .to_string()
80}
81const WORKER_COUNT: usize = 4;
83
84#[derive(Debug, Clone)]
86pub struct UploadedFile {
87 pub(crate) inner: tl::enums::InputFile,
88 pub(crate) mime_type: String,
89 pub(crate) name: String,
90}
91
92impl UploadedFile {
93 pub fn mime_type(&self) -> &str {
94 &self.mime_type
95 }
96 pub fn name(&self) -> &str {
97 &self.name
98 }
99
100 pub fn as_document_media(&self) -> tl::enums::InputMedia {
102 tl::enums::InputMedia::UploadedDocument(tl::types::InputMediaUploadedDocument {
103 nosound_video: false,
104 force_file: false,
105 spoiler: false,
106 file: self.inner.clone(),
107 thumb: None,
108 mime_type: self.mime_type.clone(),
109 attributes: vec![tl::enums::DocumentAttribute::Filename(
110 tl::types::DocumentAttributeFilename {
111 file_name: self.name.clone(),
112 },
113 )],
114 stickers: None,
115 ttl_seconds: None,
116 video_cover: None,
117 video_timestamp: None,
118 })
119 }
120
121 pub fn as_photo_media(&self) -> tl::enums::InputMedia {
123 tl::enums::InputMedia::UploadedPhoto(tl::types::InputMediaUploadedPhoto {
124 spoiler: false,
125 live_photo: false,
126 file: self.inner.clone(),
127 stickers: None,
128 ttl_seconds: None,
129 video: None,
130 })
131 }
132}
133
134pub trait Downloadable {
140 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation>;
142
143 fn size(&self) -> Option<usize> {
145 None
146 }
147}
148
149#[derive(Debug, Clone)]
153pub struct Photo {
154 pub raw: tl::types::Photo,
155}
156
157impl Photo {
158 pub fn from_raw(raw: tl::types::Photo) -> Self {
159 Self { raw }
160 }
161
162 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
164 if let tl::enums::MessageMedia::Photo(mp) = media
165 && let Some(tl::enums::Photo::Photo(p)) = &mp.photo
166 {
167 return Some(Self { raw: p.clone() });
168 }
169 None
170 }
171
172 pub fn id(&self) -> i64 {
173 self.raw.id
174 }
175 pub fn access_hash(&self) -> i64 {
176 self.raw.access_hash
177 }
178 pub fn date(&self) -> i32 {
179 self.raw.date
180 }
181 pub fn has_stickers(&self) -> bool {
182 self.raw.has_stickers
183 }
184
185 pub fn largest_thumb_type(&self) -> &str {
187 self.raw
188 .sizes
189 .iter()
190 .filter_map(|s| match s {
191 tl::enums::PhotoSize::PhotoSize(ps) => Some(ps.r#type.as_str()),
192 _ => None,
193 })
194 .next_back()
195 .unwrap_or("s")
196 }
197}
198
199impl Downloadable for Photo {
200 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
201 Some(tl::enums::InputFileLocation::InputPhotoFileLocation(
202 tl::types::InputPhotoFileLocation {
203 id: self.raw.id,
204 access_hash: self.raw.access_hash,
205 file_reference: self.raw.file_reference.clone(),
206 thumb_size: self.largest_thumb_type().to_string(),
207 },
208 ))
209 }
210}
211
212#[derive(Debug, Clone)]
214pub struct Document {
215 pub raw: tl::types::Document,
216}
217
218impl Document {
219 pub fn from_raw(raw: tl::types::Document) -> Self {
220 Self { raw }
221 }
222
223 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
225 if let tl::enums::MessageMedia::Document(md) = media
226 && let Some(tl::enums::Document::Document(d)) = &md.document
227 {
228 return Some(Self { raw: d.clone() });
229 }
230 None
231 }
232
233 pub fn id(&self) -> i64 {
234 self.raw.id
235 }
236 pub fn access_hash(&self) -> i64 {
237 self.raw.access_hash
238 }
239 pub fn date(&self) -> i32 {
240 self.raw.date
241 }
242 pub fn mime_type(&self) -> &str {
243 &self.raw.mime_type
244 }
245 pub fn size(&self) -> i64 {
246 self.raw.size
247 }
248
249 pub fn file_name(&self) -> Option<&str> {
251 self.raw.attributes.iter().find_map(|a| match a {
252 tl::enums::DocumentAttribute::Filename(f) => Some(f.file_name.as_str()),
253 _ => None,
254 })
255 }
256
257 pub fn is_animated(&self) -> bool {
259 self.raw
260 .attributes
261 .iter()
262 .any(|a| matches!(a, tl::enums::DocumentAttribute::Animated))
263 }
264}
265
266impl Downloadable for Document {
267 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
268 Some(tl::enums::InputFileLocation::InputDocumentFileLocation(
269 tl::types::InputDocumentFileLocation {
270 id: self.raw.id,
271 access_hash: self.raw.access_hash,
272 file_reference: self.raw.file_reference.clone(),
273 thumb_size: String::new(),
274 },
275 ))
276 }
277
278 fn size(&self) -> Option<usize> {
279 Some(self.raw.size as usize)
280 }
281}
282
283#[derive(Debug, Clone)]
285pub struct Sticker {
286 pub inner: Document,
287}
288
289impl Sticker {
290 pub fn from_document(doc: Document) -> Option<Self> {
292 let has_sticker_attr = doc
293 .raw
294 .attributes
295 .iter()
296 .any(|a| matches!(a, tl::enums::DocumentAttribute::Sticker(_)));
297 if has_sticker_attr {
298 Some(Self { inner: doc })
299 } else {
300 None
301 }
302 }
303
304 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
306 Document::from_media(media).and_then(Self::from_document)
307 }
308
309 pub fn emoji(&self) -> Option<&str> {
311 self.inner.raw.attributes.iter().find_map(|a| match a {
312 tl::enums::DocumentAttribute::Sticker(s) => Some(s.alt.as_str()),
313 _ => None,
314 })
315 }
316
317 pub fn is_video(&self) -> bool {
319 self.inner
320 .raw
321 .attributes
322 .iter()
323 .any(|a| matches!(a, tl::enums::DocumentAttribute::Video(_)))
324 }
325
326 pub fn id(&self) -> i64 {
327 self.inner.id()
328 }
329 pub fn mime_type(&self) -> &str {
330 self.inner.mime_type()
331 }
332}
333
334impl Downloadable for Sticker {
335 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
336 self.inner.to_input_location()
337 }
338 fn size(&self) -> Option<usize> {
339 Some(self.inner.raw.size as usize)
340 }
341}
342
343pub struct DownloadIter {
347 client: Client,
348 request: Option<tl::functions::upload::GetFile>,
349 done: bool,
350}
351
352impl DownloadIter {
353 pub fn chunk_size(mut self, size: i32) -> Self {
355 if let Some(r) = &mut self.request {
356 r.limit = size;
357 }
358 self
359 }
360
361 pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
363 if self.done {
364 return Ok(None);
365 }
366 let req = match &self.request {
367 Some(r) => r.clone(),
368 None => return Ok(None),
369 };
370 let body = self.client.rpc_call_raw_pub(&req).await?;
371 let mut cur = Cursor::from_slice(&body);
372 match tl::enums::upload::File::deserialize(&mut cur)? {
373 tl::enums::upload::File::File(f) => {
374 if (f.bytes.len() as i32) < req.limit {
375 self.done = true;
376 if f.bytes.is_empty() {
377 return Ok(None);
378 }
379 }
380 if let Some(r) = &mut self.request {
381 r.offset += req.limit as i64;
382 }
383 Ok(Some(f.bytes))
384 }
385 tl::enums::upload::File::CdnRedirect(_) => {
386 self.done = true;
387 Err(InvocationError::Deserialize(
388 "CDN redirect not supported".into(),
389 ))
390 }
391 }
392 }
393}
394
395impl Client {
398 pub async fn upload_file(
403 &self,
404 data: &[u8],
405 name: &str,
406 mime_type: &str,
407 ) -> Result<UploadedFile, InvocationError> {
408 let resolved_mime = resolve_mime(name, mime_type);
410
411 let file_id = crate::random_i64_pub();
412 let total = data.len();
413 let big = total >= BIG_FILE_THRESHOLD;
414 let part_size = UPLOAD_CHUNK_SIZE as usize;
415 let total_parts = total.div_ceil(part_size) as i32;
416
417 for (part_num, chunk) in data.chunks(part_size).enumerate() {
418 if big {
419 self.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
420 file_id,
421 file_part: part_num as i32,
422 file_total_parts: total_parts,
423 bytes: chunk.to_vec(),
424 })
425 .await?;
426 } else {
427 self.rpc_call_raw_pub(&tl::functions::upload::SaveFilePart {
428 file_id,
429 file_part: part_num as i32,
430 bytes: chunk.to_vec(),
431 })
432 .await?;
433 }
434 }
435
436 let inner = make_input_file(big, file_id, total_parts, name, data);
437 tracing::info!(
438 "[layer] uploaded '{}' ({} bytes, {} parts, mime={})",
439 name,
440 total,
441 total_parts,
442 resolved_mime
443 );
444 Ok(UploadedFile {
445 inner,
446 mime_type: resolved_mime,
447 name: name.to_string(),
448 })
449 }
450
451 pub async fn upload_file_concurrent(
456 &self,
457 data: Arc<Vec<u8>>,
458 name: &str,
459 mime_type: &str,
460 ) -> Result<UploadedFile, InvocationError> {
461 let total = data.len();
462 let part_size = UPLOAD_CHUNK_SIZE as usize;
463 let total_parts = total.div_ceil(part_size) as i32;
464
465 if total < BIG_FILE_THRESHOLD {
466 return self.upload_file(&data, name, mime_type).await;
468 }
469
470 let file_id = crate::random_i64_pub();
471 let next_part = Arc::new(Mutex::new(0i32));
472 let mut tasks = tokio::task::JoinSet::new();
473
474 for _ in 0..WORKER_COUNT {
475 let client = self.clone();
476 let data = Arc::clone(&data);
477 let next_part = Arc::clone(&next_part);
478
479 tasks.spawn(async move {
480 loop {
481 let part_num = {
482 let mut guard = next_part.lock().await;
483 if *guard >= total_parts {
484 break;
485 }
486 let n = *guard;
487 *guard += 1;
488 n
489 };
490 let start = part_num as usize * part_size;
491 let end = (start + part_size).min(data.len());
492 let bytes = data[start..end].to_vec();
493
494 client
495 .rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
496 file_id,
497 file_part: part_num,
498 file_total_parts: total_parts,
499 bytes,
500 })
501 .await?;
502 }
503 Ok::<(), InvocationError>(())
504 });
505 }
506
507 while let Some(res) = tasks.join_next().await {
508 res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
509 }
510
511 let inner = tl::enums::InputFile::Big(tl::types::InputFileBig {
512 id: file_id,
513 parts: total_parts,
514 name: name.to_string(),
515 });
516 tracing::info!(
517 "[layer] concurrent-uploaded '{}' ({} bytes, {} parts, {} workers)",
518 name,
519 total,
520 total_parts,
521 WORKER_COUNT
522 );
523 Ok(UploadedFile {
524 inner,
525 mime_type: resolve_mime(name, mime_type),
526 name: name.to_string(),
527 })
528 }
529
530 pub async fn upload_stream<R: AsyncRead + Unpin>(
532 &self,
533 reader: &mut R,
534 name: &str,
535 mime_type: &str,
536 ) -> Result<UploadedFile, InvocationError> {
537 let mut data = Vec::new();
538 reader.read_to_end(&mut data).await?;
539 if data.len() >= BIG_FILE_THRESHOLD {
540 self.upload_file_concurrent(Arc::new(data), name, mime_type)
541 .await
542 } else {
543 self.upload_file(&data, name, mime_type).await
544 }
545 }
546
547 pub async fn send_file(
551 &self,
552 peer: tl::enums::Peer,
553 media: tl::enums::InputMedia,
554 caption: &str,
555 ) -> Result<(), InvocationError> {
556 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
557 let req = tl::functions::messages::SendMedia {
558 silent: false,
559 background: false,
560 clear_draft: false,
561 noforwards: false,
562 update_stickersets_order: false,
563 invert_media: false,
564 allow_paid_floodskip: false,
565 peer: input_peer,
566 reply_to: None,
567 media,
568 message: caption.to_string(),
569 random_id: crate::random_i64_pub(),
570 reply_markup: None,
571 entities: None,
572 schedule_date: None,
573 schedule_repeat_period: None,
574 send_as: None,
575 quick_reply_shortcut: None,
576 effect: None,
577 allow_paid_stars: None,
578 suggested_post: None,
579 };
580 self.rpc_call_raw_pub(&req).await?;
581 Ok(())
582 }
583
584 pub async fn send_album(
603 &self,
604 peer: tl::enums::Peer,
605 items: Vec<AlbumItem>,
606 ) -> Result<(), InvocationError> {
607 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
608
609 let reply_to = items.iter().find_map(|i| i.reply_to).map(|id| {
611 tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
612 reply_to_msg_id: id,
613 top_msg_id: None,
614 reply_to_peer_id: None,
615 quote_text: None,
616 quote_entities: None,
617 quote_offset: None,
618 monoforum_peer_id: None,
619 poll_option: None,
620 todo_item_id: None,
621 })
622 });
623
624 let multi: Vec<tl::enums::InputSingleMedia> = items
625 .into_iter()
626 .map(|item| {
627 tl::enums::InputSingleMedia::InputSingleMedia(tl::types::InputSingleMedia {
628 media: item.media,
629 random_id: crate::random_i64_pub(),
630 message: item.caption,
631 entities: if item.entities.is_empty() {
632 None
633 } else {
634 Some(item.entities)
635 },
636 })
637 })
638 .collect();
639
640 let req = tl::functions::messages::SendMultiMedia {
641 silent: false,
642 background: false,
643 clear_draft: false,
644 noforwards: false,
645 update_stickersets_order: false,
646 invert_media: false,
647 allow_paid_floodskip: false,
648 peer: input_peer,
649 reply_to,
650 multi_media: multi,
651 schedule_date: None,
652 send_as: None,
653 quick_reply_shortcut: None,
654 effect: None,
655 allow_paid_stars: None,
656 };
657 self.rpc_call_raw_pub(&req).await?;
658 Ok(())
659 }
660
661 pub fn iter_download(&self, location: tl::enums::InputFileLocation) -> DownloadIter {
665 DownloadIter {
666 client: self.clone(),
667 done: false,
668 request: Some(tl::functions::upload::GetFile {
669 precise: false,
670 cdn_supported: false,
671 location,
672 offset: 0,
673 limit: DOWNLOAD_CHUNK_SIZE,
674 }),
675 }
676 }
677
678 pub async fn download_media(
680 &self,
681 location: tl::enums::InputFileLocation,
682 ) -> Result<Vec<u8>, InvocationError> {
683 let mut bytes = Vec::new();
684 let mut iter = self.iter_download(location);
685 while let Some(chunk) = iter.next().await? {
686 bytes.extend_from_slice(&chunk);
687 }
688 Ok(bytes)
689 }
690
691 pub async fn download_media_concurrent(
698 &self,
699 location: tl::enums::InputFileLocation,
700 size: usize,
701 ) -> Result<Vec<u8>, InvocationError> {
702 let chunk = DOWNLOAD_CHUNK_SIZE as usize;
703 let n_parts = size.div_ceil(chunk);
704 let next_part = Arc::new(Mutex::new(0usize));
705
706 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, Vec<u8>)>();
708 let mut tasks = tokio::task::JoinSet::new();
709
710 for _ in 0..WORKER_COUNT {
711 let client = self.clone();
712 let location = location.clone();
713 let next_part = Arc::clone(&next_part);
714 let tx = tx.clone();
715
716 tasks.spawn(async move {
717 loop {
718 let part = {
719 let mut g = next_part.lock().await;
720 if *g >= n_parts {
721 break;
722 }
723 let p = *g;
724 *g += 1;
725 p
726 };
727 let offset = (part * chunk) as i64;
728 let req = tl::functions::upload::GetFile {
729 precise: true,
730 cdn_supported: false,
731 location: location.clone(),
732 offset,
733 limit: DOWNLOAD_CHUNK_SIZE,
734 };
735 let raw = client.rpc_call_raw_pub(&req).await?;
736 let mut cur = Cursor::from_slice(&raw);
737 if let tl::enums::upload::File::File(f) =
738 tl::enums::upload::File::deserialize(&mut cur)?
739 {
740 let _ = tx.send((part, f.bytes));
741 }
742 }
743 Ok::<(), InvocationError>(())
744 });
745 }
746 drop(tx);
747
748 let mut parts: Vec<Option<Vec<u8>>> = (0..n_parts).map(|_| None).collect();
750 while let Some((idx, data)) = rx.recv().await {
751 if idx < parts.len() {
752 parts[idx] = Some(data);
753 }
754 }
755
756 while let Some(res) = tasks.join_next().await {
758 res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
759 }
760
761 let mut out = Vec::with_capacity(size);
763 for part in parts.into_iter().flatten() {
764 out.extend_from_slice(&part);
765 }
766 out.truncate(size);
767 Ok(out)
768 }
769
770 pub async fn download<D: Downloadable>(&self, item: &D) -> Result<Vec<u8>, InvocationError> {
773 let loc = item
774 .to_input_location()
775 .ok_or_else(|| InvocationError::Deserialize("item has no download location".into()))?;
776 match item.size() {
777 Some(sz) if sz >= BIG_FILE_THRESHOLD => self.download_media_concurrent(loc, sz).await,
778 _ => self.download_media(loc).await,
779 }
780 }
781}
782
783impl crate::update::IncomingMessage {
786 pub fn download_location(&self) -> Option<tl::enums::InputFileLocation> {
788 let media = match &self.raw {
789 tl::enums::Message::Message(m) => m.media.as_ref()?,
790 _ => return None,
791 };
792 if let Some(doc) = Document::from_media(media) {
793 return doc.to_input_location();
794 }
795 if let Some(photo) = Photo::from_media(media) {
796 return photo.to_input_location();
797 }
798 None
799 }
800}
801
802pub fn download_location_from_media(
807 media: Option<&tl::enums::MessageMedia>,
808) -> Option<tl::enums::InputFileLocation> {
809 let m = media?;
810 if let Some(doc) = Document::from_media(m) {
811 return doc.to_input_location();
812 }
813 if let Some(photo) = Photo::from_media(m) {
814 return photo.to_input_location();
815 }
816 None
817}
818
819fn make_input_file(
822 big: bool,
823 file_id: i64,
824 total_parts: i32,
825 name: &str,
826 data: &[u8],
827) -> tl::enums::InputFile {
828 if big {
829 tl::enums::InputFile::Big(tl::types::InputFileBig {
830 id: file_id,
831 parts: total_parts,
832 name: name.to_string(),
833 })
834 } else {
835 let _ = data; tl::enums::InputFile::InputFile(tl::types::InputFile {
837 id: file_id,
838 parts: total_parts,
839 name: name.to_string(),
840 md5_checksum: String::new(),
841 })
842 }
843}