1use std::sync::Arc;
34
35use ferogram_tl_types as tl;
36use ferogram_tl_types::{Cursor, Deserializable};
37use tokio::io::AsyncRead;
38use tokio::io::AsyncReadExt;
39use tokio::sync::Mutex;
40
41use crate::{Client, InvocationError};
42
43pub struct AlbumItem {
47 pub media: tl::enums::InputMedia,
48 pub caption: String,
49 pub entities: Vec<tl::enums::MessageEntity>,
50 pub reply_to: Option<i32>,
51}
52
53impl AlbumItem {
54 pub fn new(media: tl::enums::InputMedia) -> Self {
55 Self {
56 media,
57 caption: String::new(),
58 entities: Vec::new(),
59 reply_to: None,
60 }
61 }
62 pub fn caption(mut self, text: impl Into<String>) -> Self {
63 self.caption = text.into();
64 self
65 }
66 pub fn reply_to(mut self, msg_id: Option<i32>) -> Self {
67 self.reply_to = msg_id;
68 self
69 }
70}
71
72impl From<(tl::enums::InputMedia, String)> for AlbumItem {
73 fn from((media, caption): (tl::enums::InputMedia, String)) -> Self {
74 Self::new(media).caption(caption)
75 }
76}
77
78pub const UPLOAD_CHUNK_SIZE: i32 = 512 * 1024;
80pub const DOWNLOAD_CHUNK_SIZE: i32 = 512 * 1024;
81const BIG_FILE_THRESHOLD: usize = 10 * 1024 * 1024;
83
84fn resolve_mime(name: &str, mime_type: &str) -> String {
87 if !mime_type.is_empty() && mime_type != "application/octet-stream" {
88 return mime_type.to_string();
89 }
90 mime_guess::from_path(name)
91 .first_or_octet_stream()
92 .to_string()
93}
94const WORKER_COUNT: usize = 4;
96
97#[derive(Debug, Clone)]
99pub struct UploadedFile {
100 pub(crate) inner: tl::enums::InputFile,
101 pub(crate) mime_type: String,
102 pub(crate) name: String,
103}
104
105impl UploadedFile {
106 pub fn mime_type(&self) -> &str {
107 &self.mime_type
108 }
109 pub fn name(&self) -> &str {
110 &self.name
111 }
112
113 pub fn as_document_media(&self) -> tl::enums::InputMedia {
115 tl::enums::InputMedia::UploadedDocument(tl::types::InputMediaUploadedDocument {
116 nosound_video: false,
117 force_file: false,
118 spoiler: false,
119 file: self.inner.clone(),
120 thumb: None,
121 mime_type: self.mime_type.clone(),
122 attributes: vec![tl::enums::DocumentAttribute::Filename(
123 tl::types::DocumentAttributeFilename {
124 file_name: self.name.clone(),
125 },
126 )],
127 stickers: None,
128 ttl_seconds: None,
129 video_cover: None,
130 video_timestamp: None,
131 })
132 }
133
134 pub fn as_photo_media(&self) -> tl::enums::InputMedia {
136 tl::enums::InputMedia::UploadedPhoto(tl::types::InputMediaUploadedPhoto {
137 spoiler: false,
138 live_photo: false,
139 file: self.inner.clone(),
140 stickers: None,
141 ttl_seconds: None,
142 video: None,
143 })
144 }
145}
146
147pub trait Downloadable {
153 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation>;
155
156 fn size(&self) -> Option<usize> {
158 None
159 }
160}
161
162#[derive(Debug, Clone)]
166pub struct Photo {
167 pub raw: tl::types::Photo,
168}
169
170impl Photo {
171 pub fn from_raw(raw: tl::types::Photo) -> Self {
172 Self { raw }
173 }
174
175 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
177 if let tl::enums::MessageMedia::Photo(mp) = media
178 && let Some(tl::enums::Photo::Photo(p)) = &mp.photo
179 {
180 return Some(Self { raw: p.clone() });
181 }
182 None
183 }
184
185 pub fn id(&self) -> i64 {
186 self.raw.id
187 }
188 pub fn access_hash(&self) -> i64 {
189 self.raw.access_hash
190 }
191 pub fn date(&self) -> i32 {
192 self.raw.date
193 }
194 pub fn has_stickers(&self) -> bool {
195 self.raw.has_stickers
196 }
197
198 pub fn largest_thumb_type(&self) -> &str {
200 self.raw
201 .sizes
202 .iter()
203 .filter_map(|s| match s {
204 tl::enums::PhotoSize::PhotoSize(ps) => Some(ps.r#type.as_str()),
205 _ => None,
206 })
207 .next_back()
208 .unwrap_or("s")
209 }
210}
211
212impl Downloadable for Photo {
213 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
214 Some(tl::enums::InputFileLocation::InputPhotoFileLocation(
215 tl::types::InputPhotoFileLocation {
216 id: self.raw.id,
217 access_hash: self.raw.access_hash,
218 file_reference: self.raw.file_reference.clone(),
219 thumb_size: self.largest_thumb_type().to_string(),
220 },
221 ))
222 }
223}
224
225#[derive(Debug, Clone)]
227pub struct Document {
228 pub raw: tl::types::Document,
229}
230
231impl Document {
232 pub fn from_raw(raw: tl::types::Document) -> Self {
233 Self { raw }
234 }
235
236 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
238 if let tl::enums::MessageMedia::Document(md) = media
239 && let Some(tl::enums::Document::Document(d)) = &md.document
240 {
241 return Some(Self { raw: d.clone() });
242 }
243 None
244 }
245
246 pub fn id(&self) -> i64 {
247 self.raw.id
248 }
249 pub fn access_hash(&self) -> i64 {
250 self.raw.access_hash
251 }
252 pub fn date(&self) -> i32 {
253 self.raw.date
254 }
255 pub fn mime_type(&self) -> &str {
256 &self.raw.mime_type
257 }
258 pub fn size(&self) -> i64 {
259 self.raw.size
260 }
261
262 pub fn file_name(&self) -> Option<&str> {
264 self.raw.attributes.iter().find_map(|a| match a {
265 tl::enums::DocumentAttribute::Filename(f) => Some(f.file_name.as_str()),
266 _ => None,
267 })
268 }
269
270 pub fn is_animated(&self) -> bool {
272 self.raw
273 .attributes
274 .iter()
275 .any(|a| matches!(a, tl::enums::DocumentAttribute::Animated))
276 }
277}
278
279impl Downloadable for Document {
280 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
281 Some(tl::enums::InputFileLocation::InputDocumentFileLocation(
282 tl::types::InputDocumentFileLocation {
283 id: self.raw.id,
284 access_hash: self.raw.access_hash,
285 file_reference: self.raw.file_reference.clone(),
286 thumb_size: String::new(),
287 },
288 ))
289 }
290
291 fn size(&self) -> Option<usize> {
292 Some(self.raw.size as usize)
293 }
294}
295
296#[derive(Debug, Clone)]
298pub struct Sticker {
299 pub inner: Document,
300}
301
302impl Sticker {
303 pub fn from_document(doc: Document) -> Option<Self> {
305 let has_sticker_attr = doc
306 .raw
307 .attributes
308 .iter()
309 .any(|a| matches!(a, tl::enums::DocumentAttribute::Sticker(_)));
310 if has_sticker_attr {
311 Some(Self { inner: doc })
312 } else {
313 None
314 }
315 }
316
317 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
319 Document::from_media(media).and_then(Self::from_document)
320 }
321
322 pub fn emoji(&self) -> Option<&str> {
324 self.inner.raw.attributes.iter().find_map(|a| match a {
325 tl::enums::DocumentAttribute::Sticker(s) => Some(s.alt.as_str()),
326 _ => None,
327 })
328 }
329
330 pub fn is_video(&self) -> bool {
332 self.inner
333 .raw
334 .attributes
335 .iter()
336 .any(|a| matches!(a, tl::enums::DocumentAttribute::Video(_)))
337 }
338
339 pub fn id(&self) -> i64 {
340 self.inner.id()
341 }
342 pub fn mime_type(&self) -> &str {
343 self.inner.mime_type()
344 }
345}
346
347impl Downloadable for Sticker {
348 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
349 self.inner.to_input_location()
350 }
351 fn size(&self) -> Option<usize> {
352 Some(self.inner.raw.size as usize)
353 }
354}
355
356pub struct DownloadIter {
360 client: Client,
361 request: Option<tl::functions::upload::GetFile>,
362 done: bool,
363}
364
365impl DownloadIter {
366 pub fn chunk_size(mut self, size: i32) -> Self {
368 if let Some(r) = &mut self.request {
369 r.limit = size;
370 }
371 self
372 }
373
374 pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
376 if self.done {
377 return Ok(None);
378 }
379 let req = match &self.request {
380 Some(r) => r.clone(),
381 None => return Ok(None),
382 };
383 let body = self.client.rpc_call_raw_pub(&req).await?;
384 let mut cur = Cursor::from_slice(&body);
385 match tl::enums::upload::File::deserialize(&mut cur)? {
386 tl::enums::upload::File::File(f) => {
387 if (f.bytes.len() as i32) < req.limit {
388 self.done = true;
389 if f.bytes.is_empty() {
390 return Ok(None);
391 }
392 }
393 if let Some(r) = &mut self.request {
394 r.offset += req.limit as i64;
395 }
396 Ok(Some(f.bytes))
397 }
398 tl::enums::upload::File::CdnRedirect(_) => {
399 self.done = true;
400 Err(InvocationError::Deserialize(
401 "CDN redirect not supported".into(),
402 ))
403 }
404 }
405 }
406}
407
408impl Client {
409 pub async fn upload_file(
414 &self,
415 data: &[u8],
416 name: &str,
417 mime_type: &str,
418 ) -> Result<UploadedFile, InvocationError> {
419 let resolved_mime = resolve_mime(name, mime_type);
421
422 let file_id = crate::random_i64_pub();
423 let total = data.len();
424 let big = total >= BIG_FILE_THRESHOLD;
425 let part_size = UPLOAD_CHUNK_SIZE as usize;
426 let total_parts = total.div_ceil(part_size) as i32;
427
428 for (part_num, chunk) in data.chunks(part_size).enumerate() {
429 if big {
430 self.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
431 file_id,
432 file_part: part_num as i32,
433 file_total_parts: total_parts,
434 bytes: chunk.to_vec(),
435 })
436 .await?;
437 } else {
438 self.rpc_call_raw_pub(&tl::functions::upload::SaveFilePart {
439 file_id,
440 file_part: part_num as i32,
441 bytes: chunk.to_vec(),
442 })
443 .await?;
444 }
445 }
446
447 let inner = make_input_file(big, file_id, total_parts, name, data);
448 tracing::info!(
449 "[ferogram] uploaded '{}' ({} bytes, {} parts, mime={})",
450 name,
451 total,
452 total_parts,
453 resolved_mime
454 );
455 Ok(UploadedFile {
456 inner,
457 mime_type: resolved_mime,
458 name: name.to_string(),
459 })
460 }
461
462 pub async fn upload_file_concurrent(
467 &self,
468 data: Arc<Vec<u8>>,
469 name: &str,
470 mime_type: &str,
471 ) -> Result<UploadedFile, InvocationError> {
472 let total = data.len();
473 let part_size = UPLOAD_CHUNK_SIZE as usize;
474 let total_parts = total.div_ceil(part_size) as i32;
475
476 if total < BIG_FILE_THRESHOLD {
477 return self.upload_file(&data, name, mime_type).await;
479 }
480
481 let file_id = crate::random_i64_pub();
482 let next_part = Arc::new(Mutex::new(0i32));
483 let mut tasks = tokio::task::JoinSet::new();
484
485 for _ in 0..WORKER_COUNT {
486 let client = self.clone();
487 let data = Arc::clone(&data);
488 let next_part = Arc::clone(&next_part);
489
490 tasks.spawn(async move {
491 loop {
492 let part_num = {
493 let mut guard = next_part.lock().await;
494 if *guard >= total_parts {
495 break;
496 }
497 let n = *guard;
498 *guard += 1;
499 n
500 };
501 let start = part_num as usize * part_size;
502 let end = (start + part_size).min(data.len());
503 let bytes = data[start..end].to_vec();
504
505 client
506 .rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
507 file_id,
508 file_part: part_num,
509 file_total_parts: total_parts,
510 bytes,
511 })
512 .await?;
513 }
514 Ok::<(), InvocationError>(())
515 });
516 }
517
518 while let Some(res) = tasks.join_next().await {
519 res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
520 }
521
522 let inner = tl::enums::InputFile::Big(tl::types::InputFileBig {
523 id: file_id,
524 parts: total_parts,
525 name: name.to_string(),
526 });
527 tracing::info!(
528 "[ferogram] concurrent-uploaded '{}' ({} bytes, {} parts, {} workers)",
529 name,
530 total,
531 total_parts,
532 WORKER_COUNT
533 );
534 Ok(UploadedFile {
535 inner,
536 mime_type: resolve_mime(name, mime_type),
537 name: name.to_string(),
538 })
539 }
540
541 pub async fn upload_stream<R: AsyncRead + Unpin>(
543 &self,
544 reader: &mut R,
545 name: &str,
546 mime_type: &str,
547 ) -> Result<UploadedFile, InvocationError> {
548 let mut data = Vec::new();
549 reader.read_to_end(&mut data).await?;
550 if data.len() >= BIG_FILE_THRESHOLD {
551 self.upload_file_concurrent(Arc::new(data), name, mime_type)
552 .await
553 } else {
554 self.upload_file(&data, name, mime_type).await
555 }
556 }
557
558 pub async fn send_file(
562 &self,
563 peer: tl::enums::Peer,
564 media: tl::enums::InputMedia,
565 caption: &str,
566 ) -> Result<(), InvocationError> {
567 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
568 let req = tl::functions::messages::SendMedia {
569 silent: false,
570 background: false,
571 clear_draft: false,
572 noforwards: false,
573 update_stickersets_order: false,
574 invert_media: false,
575 allow_paid_floodskip: false,
576 peer: input_peer,
577 reply_to: None,
578 media,
579 message: caption.to_string(),
580 random_id: crate::random_i64_pub(),
581 reply_markup: None,
582 entities: None,
583 schedule_date: None,
584 schedule_repeat_period: None,
585 send_as: None,
586 quick_reply_shortcut: None,
587 effect: None,
588 allow_paid_stars: None,
589 suggested_post: None,
590 };
591 self.rpc_call_raw_pub(&req).await?;
592 Ok(())
593 }
594
595 pub async fn send_album(
614 &self,
615 peer: tl::enums::Peer,
616 items: Vec<AlbumItem>,
617 ) -> Result<(), InvocationError> {
618 let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
619
620 let reply_to = items.iter().find_map(|i| i.reply_to).map(|id| {
622 tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
623 reply_to_msg_id: id,
624 top_msg_id: None,
625 reply_to_peer_id: None,
626 quote_text: None,
627 quote_entities: None,
628 quote_offset: None,
629 monoforum_peer_id: None,
630 poll_option: None,
631 todo_item_id: None,
632 })
633 });
634
635 let multi: Vec<tl::enums::InputSingleMedia> = items
636 .into_iter()
637 .map(|item| {
638 tl::enums::InputSingleMedia::InputSingleMedia(tl::types::InputSingleMedia {
639 media: item.media,
640 random_id: crate::random_i64_pub(),
641 message: item.caption,
642 entities: if item.entities.is_empty() {
643 None
644 } else {
645 Some(item.entities)
646 },
647 })
648 })
649 .collect();
650
651 let req = tl::functions::messages::SendMultiMedia {
652 silent: false,
653 background: false,
654 clear_draft: false,
655 noforwards: false,
656 update_stickersets_order: false,
657 invert_media: false,
658 allow_paid_floodskip: false,
659 peer: input_peer,
660 reply_to,
661 multi_media: multi,
662 schedule_date: None,
663 send_as: None,
664 quick_reply_shortcut: None,
665 effect: None,
666 allow_paid_stars: None,
667 };
668 self.rpc_call_raw_pub(&req).await?;
669 Ok(())
670 }
671
672 pub fn iter_download(&self, location: tl::enums::InputFileLocation) -> DownloadIter {
676 DownloadIter {
677 client: self.clone(),
678 done: false,
679 request: Some(tl::functions::upload::GetFile {
680 precise: false,
681 cdn_supported: false,
682 location,
683 offset: 0,
684 limit: DOWNLOAD_CHUNK_SIZE,
685 }),
686 }
687 }
688
689 pub async fn download_media(
691 &self,
692 location: tl::enums::InputFileLocation,
693 ) -> Result<Vec<u8>, InvocationError> {
694 let mut bytes = Vec::new();
695 let mut iter = self.iter_download(location);
696 while let Some(chunk) = iter.next().await? {
697 bytes.extend_from_slice(&chunk);
698 }
699 Ok(bytes)
700 }
701
702 pub async fn download_media_concurrent(
709 &self,
710 location: tl::enums::InputFileLocation,
711 size: usize,
712 ) -> Result<Vec<u8>, InvocationError> {
713 let chunk = DOWNLOAD_CHUNK_SIZE as usize;
714 let n_parts = size.div_ceil(chunk);
715 let next_part = Arc::new(Mutex::new(0usize));
716
717 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, Vec<u8>)>();
719 let mut tasks = tokio::task::JoinSet::new();
720
721 for _ in 0..WORKER_COUNT {
722 let client = self.clone();
723 let location = location.clone();
724 let next_part = Arc::clone(&next_part);
725 let tx = tx.clone();
726
727 tasks.spawn(async move {
728 loop {
729 let part = {
730 let mut g = next_part.lock().await;
731 if *g >= n_parts {
732 break;
733 }
734 let p = *g;
735 *g += 1;
736 p
737 };
738 let offset = (part * chunk) as i64;
739 let req = tl::functions::upload::GetFile {
740 precise: true,
741 cdn_supported: false,
742 location: location.clone(),
743 offset,
744 limit: DOWNLOAD_CHUNK_SIZE,
745 };
746 let raw = client.rpc_call_raw_pub(&req).await?;
747 let mut cur = Cursor::from_slice(&raw);
748 if let tl::enums::upload::File::File(f) =
749 tl::enums::upload::File::deserialize(&mut cur)?
750 {
751 let _ = tx.send((part, f.bytes));
752 }
753 }
754 Ok::<(), InvocationError>(())
755 });
756 }
757 drop(tx);
758
759 let mut parts: Vec<Option<Vec<u8>>> = (0..n_parts).map(|_| None).collect();
761 while let Some((idx, data)) = rx.recv().await {
762 if idx < parts.len() {
763 parts[idx] = Some(data);
764 }
765 }
766
767 while let Some(res) = tasks.join_next().await {
769 res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
770 }
771
772 let mut out = Vec::with_capacity(size);
774 for part in parts.into_iter().flatten() {
775 out.extend_from_slice(&part);
776 }
777 out.truncate(size);
778 Ok(out)
779 }
780
781 pub async fn download<D: Downloadable>(&self, item: &D) -> Result<Vec<u8>, InvocationError> {
784 let loc = item
785 .to_input_location()
786 .ok_or_else(|| InvocationError::Deserialize("item has no download location".into()))?;
787 match item.size() {
788 Some(sz) if sz >= BIG_FILE_THRESHOLD => self.download_media_concurrent(loc, sz).await,
789 _ => self.download_media(loc).await,
790 }
791 }
792}
793
794impl crate::update::IncomingMessage {
797 pub fn download_location(&self) -> Option<tl::enums::InputFileLocation> {
799 let media = match &self.raw {
800 tl::enums::Message::Message(m) => m.media.as_ref()?,
801 _ => return None,
802 };
803 if let Some(doc) = Document::from_media(media) {
804 return doc.to_input_location();
805 }
806 if let Some(photo) = Photo::from_media(media) {
807 return photo.to_input_location();
808 }
809 None
810 }
811}
812
813pub fn download_location_from_media(
818 media: Option<&tl::enums::MessageMedia>,
819) -> Option<tl::enums::InputFileLocation> {
820 let m = media?;
821 if let Some(doc) = Document::from_media(m) {
822 return doc.to_input_location();
823 }
824 if let Some(photo) = Photo::from_media(m) {
825 return photo.to_input_location();
826 }
827 None
828}
829
830fn make_input_file(
833 big: bool,
834 file_id: i64,
835 total_parts: i32,
836 name: &str,
837 data: &[u8],
838) -> tl::enums::InputFile {
839 if big {
840 tl::enums::InputFile::Big(tl::types::InputFileBig {
841 id: file_id,
842 parts: total_parts,
843 name: name.to_string(),
844 })
845 } else {
846 let _ = data; tl::enums::InputFile::InputFile(tl::types::InputFile {
848 id: file_id,
849 parts: total_parts,
850 name: name.to_string(),
851 md5_checksum: String::new(),
852 })
853 }
854}