1use std::sync::Arc;
39
40use layer_tl_types as tl;
41use layer_tl_types::{Cursor, Deserializable};
42use tokio::io::AsyncRead;
43use tokio::io::AsyncReadExt;
44use tokio::sync::Mutex;
45
46use crate::{Client, InvocationError};
47
48pub struct AlbumItem {
52 pub media: tl::enums::InputMedia,
53 pub caption: String,
54 pub entities: Vec<tl::enums::MessageEntity>,
55 pub reply_to: Option<i32>,
56}
57
58impl AlbumItem {
59 pub fn new(media: tl::enums::InputMedia) -> Self {
60 Self {
61 media,
62 caption: String::new(),
63 entities: Vec::new(),
64 reply_to: None,
65 }
66 }
67 pub fn caption(mut self, text: impl Into<String>) -> Self {
68 self.caption = text.into();
69 self
70 }
71 pub fn reply_to(mut self, msg_id: Option<i32>) -> Self {
72 self.reply_to = msg_id;
73 self
74 }
75}
76
77impl From<(tl::enums::InputMedia, String)> for AlbumItem {
78 fn from((media, caption): (tl::enums::InputMedia, String)) -> Self {
79 Self::new(media).caption(caption)
80 }
81}
82
83pub const UPLOAD_CHUNK_SIZE: i32 = 512 * 1024;
85pub const DOWNLOAD_CHUNK_SIZE: i32 = 512 * 1024;
86const BIG_FILE_THRESHOLD: usize = 10 * 1024 * 1024;
88
89fn resolve_mime(name: &str, mime_type: &str) -> String {
92 if !mime_type.is_empty() && mime_type != "application/octet-stream" {
93 return mime_type.to_string();
94 }
95 mime_guess::from_path(name)
96 .first_or_octet_stream()
97 .to_string()
98}
99const WORKER_COUNT: usize = 4;
101
102#[derive(Debug, Clone)]
104pub struct UploadedFile {
105 pub(crate) inner: tl::enums::InputFile,
106 pub(crate) mime_type: String,
107 pub(crate) name: String,
108}
109
110impl UploadedFile {
111 pub fn mime_type(&self) -> &str {
112 &self.mime_type
113 }
114 pub fn name(&self) -> &str {
115 &self.name
116 }
117
118 pub fn as_document_media(&self) -> tl::enums::InputMedia {
120 tl::enums::InputMedia::UploadedDocument(tl::types::InputMediaUploadedDocument {
121 nosound_video: false,
122 force_file: false,
123 spoiler: false,
124 file: self.inner.clone(),
125 thumb: None,
126 mime_type: self.mime_type.clone(),
127 attributes: vec![tl::enums::DocumentAttribute::Filename(
128 tl::types::DocumentAttributeFilename {
129 file_name: self.name.clone(),
130 },
131 )],
132 stickers: None,
133 ttl_seconds: None,
134 video_cover: None,
135 video_timestamp: None,
136 })
137 }
138
139 pub fn as_photo_media(&self) -> tl::enums::InputMedia {
141 tl::enums::InputMedia::UploadedPhoto(tl::types::InputMediaUploadedPhoto {
142 spoiler: false,
143 live_photo: false,
144 file: self.inner.clone(),
145 stickers: None,
146 ttl_seconds: None,
147 video: None,
148 })
149 }
150}
151
152pub trait Downloadable {
158 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation>;
160
161 fn size(&self) -> Option<usize> {
163 None
164 }
165}
166
167#[derive(Debug, Clone)]
171pub struct Photo {
172 pub raw: tl::types::Photo,
173}
174
175impl Photo {
176 pub fn from_raw(raw: tl::types::Photo) -> Self {
177 Self { raw }
178 }
179
180 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
182 if let tl::enums::MessageMedia::Photo(mp) = media
183 && let Some(tl::enums::Photo::Photo(p)) = &mp.photo
184 {
185 return Some(Self { raw: p.clone() });
186 }
187 None
188 }
189
190 pub fn id(&self) -> i64 {
191 self.raw.id
192 }
193 pub fn access_hash(&self) -> i64 {
194 self.raw.access_hash
195 }
196 pub fn date(&self) -> i32 {
197 self.raw.date
198 }
199 pub fn has_stickers(&self) -> bool {
200 self.raw.has_stickers
201 }
202
203 pub fn largest_thumb_type(&self) -> &str {
205 self.raw
206 .sizes
207 .iter()
208 .filter_map(|s| match s {
209 tl::enums::PhotoSize::PhotoSize(ps) => Some(ps.r#type.as_str()),
210 _ => None,
211 })
212 .next_back()
213 .unwrap_or("s")
214 }
215}
216
217impl Downloadable for Photo {
218 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
219 Some(tl::enums::InputFileLocation::InputPhotoFileLocation(
220 tl::types::InputPhotoFileLocation {
221 id: self.raw.id,
222 access_hash: self.raw.access_hash,
223 file_reference: self.raw.file_reference.clone(),
224 thumb_size: self.largest_thumb_type().to_string(),
225 },
226 ))
227 }
228}
229
230#[derive(Debug, Clone)]
232pub struct Document {
233 pub raw: tl::types::Document,
234}
235
236impl Document {
237 pub fn from_raw(raw: tl::types::Document) -> Self {
238 Self { raw }
239 }
240
241 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
243 if let tl::enums::MessageMedia::Document(md) = media
244 && let Some(tl::enums::Document::Document(d)) = &md.document
245 {
246 return Some(Self { raw: d.clone() });
247 }
248 None
249 }
250
251 pub fn id(&self) -> i64 {
252 self.raw.id
253 }
254 pub fn access_hash(&self) -> i64 {
255 self.raw.access_hash
256 }
257 pub fn date(&self) -> i32 {
258 self.raw.date
259 }
260 pub fn mime_type(&self) -> &str {
261 &self.raw.mime_type
262 }
263 pub fn size(&self) -> i64 {
264 self.raw.size
265 }
266
267 pub fn file_name(&self) -> Option<&str> {
269 self.raw.attributes.iter().find_map(|a| match a {
270 tl::enums::DocumentAttribute::Filename(f) => Some(f.file_name.as_str()),
271 _ => None,
272 })
273 }
274
275 pub fn is_animated(&self) -> bool {
277 self.raw
278 .attributes
279 .iter()
280 .any(|a| matches!(a, tl::enums::DocumentAttribute::Animated))
281 }
282}
283
284impl Downloadable for Document {
285 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
286 Some(tl::enums::InputFileLocation::InputDocumentFileLocation(
287 tl::types::InputDocumentFileLocation {
288 id: self.raw.id,
289 access_hash: self.raw.access_hash,
290 file_reference: self.raw.file_reference.clone(),
291 thumb_size: String::new(),
292 },
293 ))
294 }
295
296 fn size(&self) -> Option<usize> {
297 Some(self.raw.size as usize)
298 }
299}
300
301#[derive(Debug, Clone)]
303pub struct Sticker {
304 pub inner: Document,
305}
306
307impl Sticker {
308 pub fn from_document(doc: Document) -> Option<Self> {
310 let has_sticker_attr = doc
311 .raw
312 .attributes
313 .iter()
314 .any(|a| matches!(a, tl::enums::DocumentAttribute::Sticker(_)));
315 if has_sticker_attr {
316 Some(Self { inner: doc })
317 } else {
318 None
319 }
320 }
321
322 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
324 Document::from_media(media).and_then(Self::from_document)
325 }
326
327 pub fn emoji(&self) -> Option<&str> {
329 self.inner.raw.attributes.iter().find_map(|a| match a {
330 tl::enums::DocumentAttribute::Sticker(s) => Some(s.alt.as_str()),
331 _ => None,
332 })
333 }
334
335 pub fn is_video(&self) -> bool {
337 self.inner
338 .raw
339 .attributes
340 .iter()
341 .any(|a| matches!(a, tl::enums::DocumentAttribute::Video(_)))
342 }
343
344 pub fn id(&self) -> i64 {
345 self.inner.id()
346 }
347 pub fn mime_type(&self) -> &str {
348 self.inner.mime_type()
349 }
350}
351
352impl Downloadable for Sticker {
353 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
354 self.inner.to_input_location()
355 }
356 fn size(&self) -> Option<usize> {
357 Some(self.inner.raw.size as usize)
358 }
359}
360
361pub struct DownloadIter {
365 client: Client,
366 request: Option<tl::functions::upload::GetFile>,
367 done: bool,
368}
369
370impl DownloadIter {
371 pub fn chunk_size(mut self, size: i32) -> Self {
373 if let Some(r) = &mut self.request {
374 r.limit = size;
375 }
376 self
377 }
378
379 pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
381 if self.done {
382 return Ok(None);
383 }
384 let req = match &self.request {
385 Some(r) => r.clone(),
386 None => return Ok(None),
387 };
388 let body = self.client.rpc_call_raw_pub(&req).await?;
389 let mut cur = Cursor::from_slice(&body);
390 match tl::enums::upload::File::deserialize(&mut cur)? {
391 tl::enums::upload::File::File(f) => {
392 if (f.bytes.len() as i32) < req.limit {
393 self.done = true;
394 if f.bytes.is_empty() {
395 return Ok(None);
396 }
397 }
398 if let Some(r) = &mut self.request {
399 r.offset += req.limit as i64;
400 }
401 Ok(Some(f.bytes))
402 }
403 tl::enums::upload::File::CdnRedirect(_) => {
404 self.done = true;
405 Err(InvocationError::Deserialize(
406 "CDN redirect not supported".into(),
407 ))
408 }
409 }
410 }
411}
412
413impl Client {
416 pub async fn upload_file(
421 &self,
422 data: &[u8],
423 name: &str,
424 mime_type: &str,
425 ) -> Result<UploadedFile, InvocationError> {
426 let resolved_mime = resolve_mime(name, mime_type);
428
429 let file_id = crate::random_i64_pub();
430 let total = data.len();
431 let big = total >= BIG_FILE_THRESHOLD;
432 let part_size = UPLOAD_CHUNK_SIZE as usize;
433 let total_parts = total.div_ceil(part_size) as i32;
434
435 for (part_num, chunk) in data.chunks(part_size).enumerate() {
436 if big {
437 self.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
438 file_id,
439 file_part: part_num as i32,
440 file_total_parts: total_parts,
441 bytes: chunk.to_vec(),
442 })
443 .await?;
444 } else {
445 self.rpc_call_raw_pub(&tl::functions::upload::SaveFilePart {
446 file_id,
447 file_part: part_num as i32,
448 bytes: chunk.to_vec(),
449 })
450 .await?;
451 }
452 }
453
454 let inner = make_input_file(big, file_id, total_parts, name, data);
455 tracing::info!(
456 "[layer] uploaded '{}' ({} bytes, {} parts, mime={})",
457 name,
458 total,
459 total_parts,
460 resolved_mime
461 );
462 Ok(UploadedFile {
463 inner,
464 mime_type: resolved_mime,
465 name: name.to_string(),
466 })
467 }
468
469 pub async fn upload_file_concurrent(
474 &self,
475 data: Arc<Vec<u8>>,
476 name: &str,
477 mime_type: &str,
478 ) -> Result<UploadedFile, InvocationError> {
479 let total = data.len();
480 let part_size = UPLOAD_CHUNK_SIZE as usize;
481 let total_parts = total.div_ceil(part_size) as i32;
482
483 if total < BIG_FILE_THRESHOLD {
484 return self.upload_file(&data, name, mime_type).await;
486 }
487
488 let file_id = crate::random_i64_pub();
489 let next_part = Arc::new(Mutex::new(0i32));
490 let mut tasks = tokio::task::JoinSet::new();
491
492 for _ in 0..WORKER_COUNT {
493 let client = self.clone();
494 let data = Arc::clone(&data);
495 let next_part = Arc::clone(&next_part);
496
497 tasks.spawn(async move {
498 loop {
499 let part_num = {
500 let mut guard = next_part.lock().await;
501 if *guard >= total_parts {
502 break;
503 }
504 let n = *guard;
505 *guard += 1;
506 n
507 };
508 let start = part_num as usize * part_size;
509 let end = (start + part_size).min(data.len());
510 let bytes = data[start..end].to_vec();
511
512 client
513 .rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
514 file_id,
515 file_part: part_num,
516 file_total_parts: total_parts,
517 bytes,
518 })
519 .await?;
520 }
521 Ok::<(), InvocationError>(())
522 });
523 }
524
525 while let Some(res) = tasks.join_next().await {
526 res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
527 }
528
529 let inner = tl::enums::InputFile::Big(tl::types::InputFileBig {
530 id: file_id,
531 parts: total_parts,
532 name: name.to_string(),
533 });
534 tracing::info!(
535 "[layer] concurrent-uploaded '{}' ({} bytes, {} parts, {} workers)",
536 name,
537 total,
538 total_parts,
539 WORKER_COUNT
540 );
541 Ok(UploadedFile {
542 inner,
543 mime_type: resolve_mime(name, mime_type),
544 name: name.to_string(),
545 })
546 }
547
548 pub async fn upload_stream<R: AsyncRead + Unpin>(
550 &self,
551 reader: &mut R,
552 name: &str,
553 mime_type: &str,
554 ) -> Result<UploadedFile, InvocationError> {
555 let mut data = Vec::new();
556 reader.read_to_end(&mut data).await?;
557 if data.len() >= BIG_FILE_THRESHOLD {
558 self.upload_file_concurrent(Arc::new(data), name, mime_type)
559 .await
560 } else {
561 self.upload_file(&data, name, mime_type).await
562 }
563 }
564
565 pub async fn send_file(
569 &self,
570 peer: tl::enums::Peer,
571 media: tl::enums::InputMedia,
572 caption: &str,
573 ) -> Result<(), InvocationError> {
574 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
575 let req = tl::functions::messages::SendMedia {
576 silent: false,
577 background: false,
578 clear_draft: false,
579 noforwards: false,
580 update_stickersets_order: false,
581 invert_media: false,
582 allow_paid_floodskip: false,
583 peer: input_peer,
584 reply_to: None,
585 media,
586 message: caption.to_string(),
587 random_id: crate::random_i64_pub(),
588 reply_markup: None,
589 entities: None,
590 schedule_date: None,
591 schedule_repeat_period: None,
592 send_as: None,
593 quick_reply_shortcut: None,
594 effect: None,
595 allow_paid_stars: None,
596 suggested_post: None,
597 };
598 self.rpc_call_raw_pub(&req).await?;
599 Ok(())
600 }
601
602 pub async fn send_album(
621 &self,
622 peer: tl::enums::Peer,
623 items: Vec<AlbumItem>,
624 ) -> Result<(), InvocationError> {
625 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
626
627 let reply_to = items.iter().find_map(|i| i.reply_to).map(|id| {
629 tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
630 reply_to_msg_id: id,
631 top_msg_id: None,
632 reply_to_peer_id: None,
633 quote_text: None,
634 quote_entities: None,
635 quote_offset: None,
636 monoforum_peer_id: None,
637 poll_option: None,
638 todo_item_id: None,
639 })
640 });
641
642 let multi: Vec<tl::enums::InputSingleMedia> = items
643 .into_iter()
644 .map(|item| {
645 tl::enums::InputSingleMedia::InputSingleMedia(tl::types::InputSingleMedia {
646 media: item.media,
647 random_id: crate::random_i64_pub(),
648 message: item.caption,
649 entities: if item.entities.is_empty() {
650 None
651 } else {
652 Some(item.entities)
653 },
654 })
655 })
656 .collect();
657
658 let req = tl::functions::messages::SendMultiMedia {
659 silent: false,
660 background: false,
661 clear_draft: false,
662 noforwards: false,
663 update_stickersets_order: false,
664 invert_media: false,
665 allow_paid_floodskip: false,
666 peer: input_peer,
667 reply_to,
668 multi_media: multi,
669 schedule_date: None,
670 send_as: None,
671 quick_reply_shortcut: None,
672 effect: None,
673 allow_paid_stars: None,
674 };
675 self.rpc_call_raw_pub(&req).await?;
676 Ok(())
677 }
678
679 pub fn iter_download(&self, location: tl::enums::InputFileLocation) -> DownloadIter {
683 DownloadIter {
684 client: self.clone(),
685 done: false,
686 request: Some(tl::functions::upload::GetFile {
687 precise: false,
688 cdn_supported: false,
689 location,
690 offset: 0,
691 limit: DOWNLOAD_CHUNK_SIZE,
692 }),
693 }
694 }
695
696 pub async fn download_media(
698 &self,
699 location: tl::enums::InputFileLocation,
700 ) -> Result<Vec<u8>, InvocationError> {
701 let mut bytes = Vec::new();
702 let mut iter = self.iter_download(location);
703 while let Some(chunk) = iter.next().await? {
704 bytes.extend_from_slice(&chunk);
705 }
706 Ok(bytes)
707 }
708
709 pub async fn download_media_concurrent(
716 &self,
717 location: tl::enums::InputFileLocation,
718 size: usize,
719 ) -> Result<Vec<u8>, InvocationError> {
720 let chunk = DOWNLOAD_CHUNK_SIZE as usize;
721 let n_parts = size.div_ceil(chunk);
722 let next_part = Arc::new(Mutex::new(0usize));
723
724 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, Vec<u8>)>();
726 let mut tasks = tokio::task::JoinSet::new();
727
728 for _ in 0..WORKER_COUNT {
729 let client = self.clone();
730 let location = location.clone();
731 let next_part = Arc::clone(&next_part);
732 let tx = tx.clone();
733
734 tasks.spawn(async move {
735 loop {
736 let part = {
737 let mut g = next_part.lock().await;
738 if *g >= n_parts {
739 break;
740 }
741 let p = *g;
742 *g += 1;
743 p
744 };
745 let offset = (part * chunk) as i64;
746 let req = tl::functions::upload::GetFile {
747 precise: true,
748 cdn_supported: false,
749 location: location.clone(),
750 offset,
751 limit: DOWNLOAD_CHUNK_SIZE,
752 };
753 let raw = client.rpc_call_raw_pub(&req).await?;
754 let mut cur = Cursor::from_slice(&raw);
755 if let tl::enums::upload::File::File(f) =
756 tl::enums::upload::File::deserialize(&mut cur)?
757 {
758 let _ = tx.send((part, f.bytes));
759 }
760 }
761 Ok::<(), InvocationError>(())
762 });
763 }
764 drop(tx);
765
766 let mut parts: Vec<Option<Vec<u8>>> = (0..n_parts).map(|_| None).collect();
768 while let Some((idx, data)) = rx.recv().await {
769 if idx < parts.len() {
770 parts[idx] = Some(data);
771 }
772 }
773
774 while let Some(res) = tasks.join_next().await {
776 res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
777 }
778
779 let mut out = Vec::with_capacity(size);
781 for part in parts.into_iter().flatten() {
782 out.extend_from_slice(&part);
783 }
784 out.truncate(size);
785 Ok(out)
786 }
787
788 pub async fn download<D: Downloadable>(&self, item: &D) -> Result<Vec<u8>, InvocationError> {
791 let loc = item
792 .to_input_location()
793 .ok_or_else(|| InvocationError::Deserialize("item has no download location".into()))?;
794 match item.size() {
795 Some(sz) if sz >= BIG_FILE_THRESHOLD => self.download_media_concurrent(loc, sz).await,
796 _ => self.download_media(loc).await,
797 }
798 }
799}
800
801impl crate::update::IncomingMessage {
804 pub fn download_location(&self) -> Option<tl::enums::InputFileLocation> {
806 let media = match &self.raw {
807 tl::enums::Message::Message(m) => m.media.as_ref()?,
808 _ => return None,
809 };
810 if let Some(doc) = Document::from_media(media) {
811 return doc.to_input_location();
812 }
813 if let Some(photo) = Photo::from_media(media) {
814 return photo.to_input_location();
815 }
816 None
817 }
818}
819
820pub fn download_location_from_media(
825 media: Option<&tl::enums::MessageMedia>,
826) -> Option<tl::enums::InputFileLocation> {
827 let m = media?;
828 if let Some(doc) = Document::from_media(m) {
829 return doc.to_input_location();
830 }
831 if let Some(photo) = Photo::from_media(m) {
832 return photo.to_input_location();
833 }
834 None
835}
836
837fn make_input_file(
840 big: bool,
841 file_id: i64,
842 total_parts: i32,
843 name: &str,
844 data: &[u8],
845) -> tl::enums::InputFile {
846 if big {
847 tl::enums::InputFile::Big(tl::types::InputFileBig {
848 id: file_id,
849 parts: total_parts,
850 name: name.to_string(),
851 })
852 } else {
853 let _ = data; tl::enums::InputFile::InputFile(tl::types::InputFile {
855 id: file_id,
856 parts: total_parts,
857 name: name.to_string(),
858 md5_checksum: String::new(),
859 })
860 }
861}