1use std::sync::Arc;
21
22use layer_tl_types as tl;
23use layer_tl_types::{Cursor, Deserializable};
24use tokio::io::AsyncRead;
25use tokio::io::AsyncReadExt;
26use tokio::sync::Mutex;
27
28use crate::{Client, InvocationError};
29
30pub const UPLOAD_CHUNK_SIZE: i32 = 512 * 1024;
34pub const DOWNLOAD_CHUNK_SIZE: i32 = 512 * 1024;
35const BIG_FILE_THRESHOLD: usize = 10 * 1024 * 1024;
37const WORKER_COUNT: usize = 4;
39
40#[derive(Debug, Clone)]
44pub struct UploadedFile {
45 pub(crate) inner: tl::enums::InputFile,
46 pub(crate) mime_type: String,
47 pub(crate) name: String,
48}
49
50impl UploadedFile {
51 pub fn mime_type(&self) -> &str { &self.mime_type }
52 pub fn name(&self) -> &str { &self.name }
53
54 pub fn as_document_media(&self) -> tl::enums::InputMedia {
56 tl::enums::InputMedia::UploadedDocument(tl::types::InputMediaUploadedDocument {
57 nosound_video: false,
58 force_file: false,
59 spoiler: false,
60 file: self.inner.clone(),
61 thumb: None,
62 mime_type: self.mime_type.clone(),
63 attributes: vec![tl::enums::DocumentAttribute::Filename(
64 tl::types::DocumentAttributeFilename { file_name: self.name.clone() },
65 )],
66 stickers: None,
67 ttl_seconds: None,
68 video_cover: None,
69 video_timestamp: None,
70 })
71 }
72
73 pub fn as_photo_media(&self) -> tl::enums::InputMedia {
75 tl::enums::InputMedia::UploadedPhoto(tl::types::InputMediaUploadedPhoto {
76 spoiler: false,
77 live_photo: false,
78 file: self.inner.clone(),
79 stickers: None,
80 ttl_seconds: None,
81 video: None,
82 })
83 }
84}
85
86pub trait Downloadable {
92 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation>;
94
95 fn size(&self) -> Option<usize> { None }
97}
98
99#[derive(Debug, Clone)]
103pub struct Photo {
104 pub raw: tl::types::Photo,
105}
106
107impl Photo {
108 pub fn from_raw(raw: tl::types::Photo) -> Self { Self { raw } }
109
110 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
112 if let tl::enums::MessageMedia::Photo(mp) = media
113 && let Some(tl::enums::Photo::Photo(p)) = &mp.photo {
114 return Some(Self { raw: p.clone() });
115 }
116 None
117 }
118
119 pub fn id(&self) -> i64 { self.raw.id }
120 pub fn access_hash(&self) -> i64 { self.raw.access_hash }
121 pub fn date(&self) -> i32 { self.raw.date }
122 pub fn has_stickers(&self) -> bool { self.raw.has_stickers }
123
124 pub fn largest_thumb_type(&self) -> &str {
126 self.raw.sizes.iter()
127 .filter_map(|s| match s {
128 tl::enums::PhotoSize::PhotoSize(ps) => Some(ps.r#type.as_str()),
129 _ => None,
130 })
131 .next_back()
132 .unwrap_or("s")
133 }
134}
135
136impl Downloadable for Photo {
137 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
138 Some(tl::enums::InputFileLocation::InputPhotoFileLocation(
139 tl::types::InputPhotoFileLocation {
140 id: self.raw.id,
141 access_hash: self.raw.access_hash,
142 file_reference: self.raw.file_reference.clone(),
143 thumb_size: self.largest_thumb_type().to_string(),
144 },
145 ))
146 }
147}
148
149#[derive(Debug, Clone)]
151pub struct Document {
152 pub raw: tl::types::Document,
153}
154
155impl Document {
156 pub fn from_raw(raw: tl::types::Document) -> Self { Self { raw } }
157
158 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
160 if let tl::enums::MessageMedia::Document(md) = media
161 && let Some(tl::enums::Document::Document(d)) = &md.document {
162 return Some(Self { raw: d.clone() });
163 }
164 None
165 }
166
167 pub fn id(&self) -> i64 { self.raw.id }
168 pub fn access_hash(&self) -> i64 { self.raw.access_hash }
169 pub fn date(&self) -> i32 { self.raw.date }
170 pub fn mime_type(&self) -> &str { &self.raw.mime_type }
171 pub fn size(&self) -> i64 { self.raw.size }
172
173 pub fn file_name(&self) -> Option<&str> {
175 self.raw.attributes.iter().find_map(|a| match a {
176 tl::enums::DocumentAttribute::Filename(f) => Some(f.file_name.as_str()),
177 _ => None,
178 })
179 }
180
181 pub fn is_animated(&self) -> bool {
183 self.raw.attributes.iter().any(|a| matches!(a, tl::enums::DocumentAttribute::Animated))
184 }
185}
186
187impl Downloadable for Document {
188 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
189 Some(tl::enums::InputFileLocation::InputDocumentFileLocation(
190 tl::types::InputDocumentFileLocation {
191 id: self.raw.id,
192 access_hash: self.raw.access_hash,
193 file_reference: self.raw.file_reference.clone(),
194 thumb_size: String::new(),
195 },
196 ))
197 }
198
199 fn size(&self) -> Option<usize> {
200 Some(self.raw.size as usize)
201 }
202}
203
204#[derive(Debug, Clone)]
206pub struct Sticker {
207 pub inner: Document,
208}
209
210impl Sticker {
211 pub fn from_document(doc: Document) -> Option<Self> {
213 let has_sticker_attr = doc.raw.attributes.iter()
214 .any(|a| matches!(a, tl::enums::DocumentAttribute::Sticker(_)));
215 if has_sticker_attr { Some(Self { inner: doc }) } else { None }
216 }
217
218 pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
220 Document::from_media(media).and_then(Self::from_document)
221 }
222
223 pub fn emoji(&self) -> Option<&str> {
225 self.inner.raw.attributes.iter().find_map(|a| match a {
226 tl::enums::DocumentAttribute::Sticker(s) => Some(s.alt.as_str()),
227 _ => None,
228 })
229 }
230
231 pub fn is_video(&self) -> bool {
233 self.inner.raw.attributes.iter()
234 .any(|a| matches!(a, tl::enums::DocumentAttribute::Video(_)))
235 }
236
237 pub fn id(&self) -> i64 { self.inner.id() }
238 pub fn mime_type(&self) -> &str { self.inner.mime_type() }
239}
240
241impl Downloadable for Sticker {
242 fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
243 self.inner.to_input_location()
244 }
245 fn size(&self) -> Option<usize> { Some(self.inner.raw.size as usize) }
246}
247
248pub struct DownloadIter {
252 client: Client,
253 request: Option<tl::functions::upload::GetFile>,
254 done: bool,
255}
256
257impl DownloadIter {
258 pub fn chunk_size(mut self, size: i32) -> Self {
260 if let Some(r) = &mut self.request { r.limit = size; }
261 self
262 }
263
264 pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
266 if self.done { return Ok(None); }
267 let req = match &self.request {
268 Some(r) => r.clone(),
269 None => return Ok(None),
270 };
271 let body = self.client.rpc_call_raw_pub(&req).await?;
272 let mut cur = Cursor::from_slice(&body);
273 match tl::enums::upload::File::deserialize(&mut cur)? {
274 tl::enums::upload::File::File(f) => {
275 if (f.bytes.len() as i32) < req.limit {
276 self.done = true;
277 if f.bytes.is_empty() { return Ok(None); }
278 }
279 if let Some(r) = &mut self.request { r.offset += req.limit as i64; }
280 Ok(Some(f.bytes))
281 }
282 tl::enums::upload::File::CdnRedirect(_) => {
283 self.done = true;
284 Err(InvocationError::Deserialize("CDN redirect not supported".into()))
285 }
286 }
287 }
288}
289
290impl Client {
293 pub async fn upload_file(
298 &self,
299 data: &[u8],
300 name: &str,
301 mime_type: &str,
302 ) -> Result<UploadedFile, InvocationError> {
303 let file_id = crate::random_i64_pub();
304 let total = data.len();
305 let big = total >= BIG_FILE_THRESHOLD;
306 let part_size = UPLOAD_CHUNK_SIZE as usize;
307 let total_parts = total.div_ceil(part_size) as i32;
308
309 for (part_num, chunk) in data.chunks(part_size).enumerate() {
310 if big {
311 self.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
312 file_id,
313 file_part: part_num as i32,
314 file_total_parts: total_parts,
315 bytes: chunk.to_vec(),
316 }).await?;
317 } else {
318 self.rpc_call_raw_pub(&tl::functions::upload::SaveFilePart {
319 file_id,
320 file_part: part_num as i32,
321 bytes: chunk.to_vec(),
322 }).await?;
323 }
324 }
325
326 let inner = make_input_file(big, file_id, total_parts, name, data);
327 log::info!("[layer] uploaded '{}' ({} bytes, {} parts)", name, total, total_parts);
328 Ok(UploadedFile { inner, mime_type: mime_type.to_string(), name: name.to_string() })
329 }
330
331 pub async fn upload_file_concurrent(
336 &self,
337 data: Arc<Vec<u8>>,
338 name: &str,
339 mime_type: &str,
340 ) -> Result<UploadedFile, InvocationError> {
341 let total = data.len();
342 let part_size = UPLOAD_CHUNK_SIZE as usize;
343 let total_parts = total.div_ceil(part_size) as i32;
344
345 if total < BIG_FILE_THRESHOLD {
346 return self.upload_file(&data, name, mime_type).await;
348 }
349
350 let file_id = crate::random_i64_pub();
351 let next_part = Arc::new(Mutex::new(0i32));
352 let mut tasks = tokio::task::JoinSet::new();
353
354 for _ in 0..WORKER_COUNT {
355 let client = self.clone();
356 let data = Arc::clone(&data);
357 let next_part = Arc::clone(&next_part);
358
359 tasks.spawn(async move {
360 loop {
361 let part_num = {
362 let mut guard = next_part.lock().await;
363 if *guard >= total_parts { break; }
364 let n = *guard;
365 *guard += 1;
366 n
367 };
368 let start = part_num as usize * part_size;
369 let end = (start + part_size).min(data.len());
370 let bytes = data[start..end].to_vec();
371
372 client.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
373 file_id,
374 file_part: part_num,
375 file_total_parts: total_parts,
376 bytes,
377 }).await?;
378 }
379 Ok::<(), InvocationError>(())
380 });
381 }
382
383 while let Some(res) = tasks.join_next().await {
384 res.map_err(|e| InvocationError::Io(
385 std::io::Error::other(e.to_string())
386 ))??;
387 }
388
389 let inner = tl::enums::InputFile::Big(tl::types::InputFileBig {
390 id: file_id,
391 parts: total_parts,
392 name: name.to_string(),
393 });
394 log::info!("[layer] concurrent-uploaded '{}' ({} bytes, {} parts, {} workers)",
395 name, total, total_parts, WORKER_COUNT);
396 Ok(UploadedFile { inner, mime_type: mime_type.to_string(), name: name.to_string() })
397 }
398
399 pub async fn upload_stream<R: AsyncRead + Unpin>(
401 &self,
402 reader: &mut R,
403 name: &str,
404 mime_type: &str,
405 ) -> Result<UploadedFile, InvocationError> {
406 let mut data = Vec::new();
407 reader.read_to_end(&mut data).await?;
408 if data.len() >= BIG_FILE_THRESHOLD {
409 self.upload_file_concurrent(Arc::new(data), name, mime_type).await
410 } else {
411 self.upload_file(&data, name, mime_type).await
412 }
413 }
414
415 pub async fn send_file(
419 &self,
420 peer: tl::enums::Peer,
421 media: tl::enums::InputMedia,
422 caption: &str,
423 ) -> Result<(), InvocationError> {
424 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
425 let req = tl::functions::messages::SendMedia {
426 silent: false,
427 background: false,
428 clear_draft: false,
429 noforwards: false,
430 update_stickersets_order: false,
431 invert_media: false,
432 allow_paid_floodskip: false,
433 peer: input_peer,
434 reply_to: None,
435 media,
436 message: caption.to_string(),
437 random_id: crate::random_i64_pub(),
438 reply_markup: None,
439 entities: None,
440 schedule_date: None,
441 schedule_repeat_period: None,
442 send_as: None,
443 quick_reply_shortcut: None,
444 effect: None,
445 allow_paid_stars: None,
446 suggested_post: None,
447 };
448 self.rpc_call_raw_pub(&req).await?;
449 Ok(())
450 }
451
452 pub async fn send_album(
454 &self,
455 peer: tl::enums::Peer,
456 items: Vec<(tl::enums::InputMedia, String)>,
457 ) -> Result<(), InvocationError> {
458 let input_peer = self.inner.peer_cache.lock().await.peer_to_input(&peer);
459 let multi: Vec<tl::enums::InputSingleMedia> = items.into_iter().map(|(media, caption)| {
460 tl::enums::InputSingleMedia::InputSingleMedia(tl::types::InputSingleMedia {
461 media,
462 random_id: crate::random_i64_pub(),
463 message: caption,
464 entities: None,
465 })
466 }).collect();
467 let req = tl::functions::messages::SendMultiMedia {
468 silent: false,
469 background: false,
470 clear_draft: false,
471 noforwards: false,
472 update_stickersets_order: false,
473 invert_media: false,
474 allow_paid_floodskip: false,
475 peer: input_peer,
476 reply_to: None,
477 multi_media: multi,
478 schedule_date: None,
479 send_as: None,
480 quick_reply_shortcut: None,
481 effect: None,
482 allow_paid_stars: None,
483 };
484 self.rpc_call_raw_pub(&req).await?;
485 Ok(())
486 }
487
488 pub fn iter_download(&self, location: tl::enums::InputFileLocation) -> DownloadIter {
492 DownloadIter {
493 client: self.clone(),
494 done: false,
495 request: Some(tl::functions::upload::GetFile {
496 precise: false,
497 cdn_supported: false,
498 location,
499 offset: 0,
500 limit: DOWNLOAD_CHUNK_SIZE,
501 }),
502 }
503 }
504
505 pub async fn download_media(
507 &self,
508 location: tl::enums::InputFileLocation,
509 ) -> Result<Vec<u8>, InvocationError> {
510 let mut bytes = Vec::new();
511 let mut iter = self.iter_download(location);
512 while let Some(chunk) = iter.next().await? {
513 bytes.extend_from_slice(&chunk);
514 }
515 Ok(bytes)
516 }
517
518 pub async fn download_media_concurrent(
525 &self,
526 location: tl::enums::InputFileLocation,
527 size: usize,
528 ) -> Result<Vec<u8>, InvocationError> {
529 let chunk = DOWNLOAD_CHUNK_SIZE as usize;
530 let n_parts = size.div_ceil(chunk);
531 let next_part = Arc::new(Mutex::new(0usize));
532
533 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, Vec<u8>)>();
535 let mut tasks = tokio::task::JoinSet::new();
536
537 for _ in 0..WORKER_COUNT {
538 let client = self.clone();
539 let location = location.clone();
540 let next_part = Arc::clone(&next_part);
541 let tx = tx.clone();
542
543 tasks.spawn(async move {
544 loop {
545 let part = {
546 let mut g = next_part.lock().await;
547 if *g >= n_parts { break; }
548 let p = *g; *g += 1; p
549 };
550 let offset = (part * chunk) as i64;
551 let req = tl::functions::upload::GetFile {
552 precise: true,
553 cdn_supported: false,
554 location: location.clone(),
555 offset,
556 limit: DOWNLOAD_CHUNK_SIZE,
557 };
558 let raw = client.rpc_call_raw_pub(&req).await?;
559 let mut cur = Cursor::from_slice(&raw);
560 if let tl::enums::upload::File::File(f) =
561 tl::enums::upload::File::deserialize(&mut cur)?
562 {
563 let _ = tx.send((part, f.bytes));
564 }
565 }
566 Ok::<(), InvocationError>(())
567 });
568 }
569 drop(tx);
570
571 let mut parts: Vec<Option<Vec<u8>>> = (0..n_parts).map(|_| None).collect();
573 while let Some((idx, data)) = rx.recv().await {
574 if idx < parts.len() { parts[idx] = Some(data); }
575 }
576
577 while let Some(res) = tasks.join_next().await {
579 res.map_err(|e| InvocationError::Io(
580 std::io::Error::other(e.to_string())
581 ))??;
582 }
583
584 let mut out = Vec::with_capacity(size);
586 for part in parts.into_iter().flatten() { out.extend_from_slice(&part); }
587 out.truncate(size);
588 Ok(out)
589 }
590
591 pub async fn download<D: Downloadable>(
594 &self,
595 item: &D,
596 ) -> Result<Vec<u8>, InvocationError> {
597 let loc = item.to_input_location()
598 .ok_or_else(|| InvocationError::Deserialize("item has no download location".into()))?;
599 match item.size() {
600 Some(sz) if sz >= BIG_FILE_THRESHOLD =>
601 self.download_media_concurrent(loc, sz).await,
602 _ =>
603 self.download_media(loc).await,
604 }
605 }
606}
607
608impl crate::update::IncomingMessage {
611 pub fn download_location(&self) -> Option<tl::enums::InputFileLocation> {
613 let media = match &self.raw {
614 tl::enums::Message::Message(m) => m.media.as_ref()?,
615 _ => return None,
616 };
617 if let Some(doc) = Document::from_media(media) {
618 return doc.to_input_location();
619 }
620 if let Some(photo) = Photo::from_media(media) {
621 return photo.to_input_location();
622 }
623 None
624 }
625}
626
627fn make_input_file(
630 big: bool,
631 file_id: i64,
632 total_parts: i32,
633 name: &str,
634 data: &[u8],
635) -> tl::enums::InputFile {
636 if big {
637 tl::enums::InputFile::Big(tl::types::InputFileBig {
638 id: file_id,
639 parts: total_parts,
640 name: name.to_string(),
641 })
642 } else {
643 let _ = data; tl::enums::InputFile::InputFile(tl::types::InputFile {
645 id: file_id,
646 parts: total_parts,
647 name: name.to_string(),
648 md5_checksum: String::new(),
649 })
650 }
651}