use std::sync::Arc;
use layer_tl_types as tl;
use layer_tl_types::{Cursor, Deserializable};
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
use crate::{Client, InvocationError};
pub struct AlbumItem {
pub media: tl::enums::InputMedia,
pub caption: String,
pub entities: Vec<tl::enums::MessageEntity>,
pub reply_to: Option<i32>,
}
impl AlbumItem {
pub fn new(media: tl::enums::InputMedia) -> Self {
Self {
media,
caption: String::new(),
entities: Vec::new(),
reply_to: None,
}
}
pub fn caption(mut self, text: impl Into<String>) -> Self {
self.caption = text.into();
self
}
pub fn reply_to(mut self, msg_id: Option<i32>) -> Self {
self.reply_to = msg_id;
self
}
}
impl From<(tl::enums::InputMedia, String)> for AlbumItem {
fn from((media, caption): (tl::enums::InputMedia, String)) -> Self {
Self::new(media).caption(caption)
}
}
pub const UPLOAD_CHUNK_SIZE: i32 = 512 * 1024;
pub const DOWNLOAD_CHUNK_SIZE: i32 = 512 * 1024;
const BIG_FILE_THRESHOLD: usize = 10 * 1024 * 1024;
fn resolve_mime(name: &str, mime_type: &str) -> String {
if !mime_type.is_empty() && mime_type != "application/octet-stream" {
return mime_type.to_string();
}
mime_guess::from_path(name)
.first_or_octet_stream()
.to_string()
}
const WORKER_COUNT: usize = 4;
#[derive(Debug, Clone)]
pub struct UploadedFile {
pub(crate) inner: tl::enums::InputFile,
pub(crate) mime_type: String,
pub(crate) name: String,
}
impl UploadedFile {
pub fn mime_type(&self) -> &str {
&self.mime_type
}
pub fn name(&self) -> &str {
&self.name
}
pub fn as_document_media(&self) -> tl::enums::InputMedia {
tl::enums::InputMedia::UploadedDocument(tl::types::InputMediaUploadedDocument {
nosound_video: false,
force_file: false,
spoiler: false,
file: self.inner.clone(),
thumb: None,
mime_type: self.mime_type.clone(),
attributes: vec![tl::enums::DocumentAttribute::Filename(
tl::types::DocumentAttributeFilename {
file_name: self.name.clone(),
},
)],
stickers: None,
ttl_seconds: None,
video_cover: None,
video_timestamp: None,
})
}
pub fn as_photo_media(&self) -> tl::enums::InputMedia {
tl::enums::InputMedia::UploadedPhoto(tl::types::InputMediaUploadedPhoto {
spoiler: false,
live_photo: false,
file: self.inner.clone(),
stickers: None,
ttl_seconds: None,
video: None,
})
}
}
pub trait Downloadable {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation>;
fn size(&self) -> Option<usize> {
None
}
}
#[derive(Debug, Clone)]
pub struct Photo {
pub raw: tl::types::Photo,
}
impl Photo {
pub fn from_raw(raw: tl::types::Photo) -> Self {
Self { raw }
}
pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
if let tl::enums::MessageMedia::Photo(mp) = media
&& let Some(tl::enums::Photo::Photo(p)) = &mp.photo
{
return Some(Self { raw: p.clone() });
}
None
}
pub fn id(&self) -> i64 {
self.raw.id
}
pub fn access_hash(&self) -> i64 {
self.raw.access_hash
}
pub fn date(&self) -> i32 {
self.raw.date
}
pub fn has_stickers(&self) -> bool {
self.raw.has_stickers
}
pub fn largest_thumb_type(&self) -> &str {
self.raw
.sizes
.iter()
.filter_map(|s| match s {
tl::enums::PhotoSize::PhotoSize(ps) => Some(ps.r#type.as_str()),
_ => None,
})
.next_back()
.unwrap_or("s")
}
}
impl Downloadable for Photo {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
Some(tl::enums::InputFileLocation::InputPhotoFileLocation(
tl::types::InputPhotoFileLocation {
id: self.raw.id,
access_hash: self.raw.access_hash,
file_reference: self.raw.file_reference.clone(),
thumb_size: self.largest_thumb_type().to_string(),
},
))
}
}
#[derive(Debug, Clone)]
pub struct Document {
pub raw: tl::types::Document,
}
impl Document {
pub fn from_raw(raw: tl::types::Document) -> Self {
Self { raw }
}
pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
if let tl::enums::MessageMedia::Document(md) = media
&& let Some(tl::enums::Document::Document(d)) = &md.document
{
return Some(Self { raw: d.clone() });
}
None
}
pub fn id(&self) -> i64 {
self.raw.id
}
pub fn access_hash(&self) -> i64 {
self.raw.access_hash
}
pub fn date(&self) -> i32 {
self.raw.date
}
pub fn mime_type(&self) -> &str {
&self.raw.mime_type
}
pub fn size(&self) -> i64 {
self.raw.size
}
pub fn file_name(&self) -> Option<&str> {
self.raw.attributes.iter().find_map(|a| match a {
tl::enums::DocumentAttribute::Filename(f) => Some(f.file_name.as_str()),
_ => None,
})
}
pub fn is_animated(&self) -> bool {
self.raw
.attributes
.iter()
.any(|a| matches!(a, tl::enums::DocumentAttribute::Animated))
}
}
impl Downloadable for Document {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
Some(tl::enums::InputFileLocation::InputDocumentFileLocation(
tl::types::InputDocumentFileLocation {
id: self.raw.id,
access_hash: self.raw.access_hash,
file_reference: self.raw.file_reference.clone(),
thumb_size: String::new(),
},
))
}
fn size(&self) -> Option<usize> {
Some(self.raw.size as usize)
}
}
#[derive(Debug, Clone)]
pub struct Sticker {
pub inner: Document,
}
impl Sticker {
pub fn from_document(doc: Document) -> Option<Self> {
let has_sticker_attr = doc
.raw
.attributes
.iter()
.any(|a| matches!(a, tl::enums::DocumentAttribute::Sticker(_)));
if has_sticker_attr {
Some(Self { inner: doc })
} else {
None
}
}
pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
Document::from_media(media).and_then(Self::from_document)
}
pub fn emoji(&self) -> Option<&str> {
self.inner.raw.attributes.iter().find_map(|a| match a {
tl::enums::DocumentAttribute::Sticker(s) => Some(s.alt.as_str()),
_ => None,
})
}
pub fn is_video(&self) -> bool {
self.inner
.raw
.attributes
.iter()
.any(|a| matches!(a, tl::enums::DocumentAttribute::Video(_)))
}
pub fn id(&self) -> i64 {
self.inner.id()
}
pub fn mime_type(&self) -> &str {
self.inner.mime_type()
}
}
impl Downloadable for Sticker {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
self.inner.to_input_location()
}
fn size(&self) -> Option<usize> {
Some(self.inner.raw.size as usize)
}
}
pub struct DownloadIter {
client: Client,
request: Option<tl::functions::upload::GetFile>,
done: bool,
}
impl DownloadIter {
pub fn chunk_size(mut self, size: i32) -> Self {
if let Some(r) = &mut self.request {
r.limit = size;
}
self
}
pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
if self.done {
return Ok(None);
}
let req = match &self.request {
Some(r) => r.clone(),
None => return Ok(None),
};
let body = self.client.rpc_call_raw_pub(&req).await?;
let mut cur = Cursor::from_slice(&body);
match tl::enums::upload::File::deserialize(&mut cur)? {
tl::enums::upload::File::File(f) => {
if (f.bytes.len() as i32) < req.limit {
self.done = true;
if f.bytes.is_empty() {
return Ok(None);
}
}
if let Some(r) = &mut self.request {
r.offset += req.limit as i64;
}
Ok(Some(f.bytes))
}
tl::enums::upload::File::CdnRedirect(_) => {
self.done = true;
Err(InvocationError::Deserialize(
"CDN redirect not supported".into(),
))
}
}
}
}
impl Client {
pub async fn upload_file(
&self,
data: &[u8],
name: &str,
mime_type: &str,
) -> Result<UploadedFile, InvocationError> {
let resolved_mime = resolve_mime(name, mime_type);
let file_id = crate::random_i64_pub();
let total = data.len();
let big = total >= BIG_FILE_THRESHOLD;
let part_size = UPLOAD_CHUNK_SIZE as usize;
let total_parts = total.div_ceil(part_size) as i32;
for (part_num, chunk) in data.chunks(part_size).enumerate() {
if big {
self.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
file_id,
file_part: part_num as i32,
file_total_parts: total_parts,
bytes: chunk.to_vec(),
})
.await?;
} else {
self.rpc_call_raw_pub(&tl::functions::upload::SaveFilePart {
file_id,
file_part: part_num as i32,
bytes: chunk.to_vec(),
})
.await?;
}
}
let inner = make_input_file(big, file_id, total_parts, name, data);
tracing::info!(
"[layer] uploaded '{}' ({} bytes, {} parts, mime={})",
name,
total,
total_parts,
resolved_mime
);
Ok(UploadedFile {
inner,
mime_type: resolved_mime,
name: name.to_string(),
})
}
pub async fn upload_file_concurrent(
&self,
data: Arc<Vec<u8>>,
name: &str,
mime_type: &str,
) -> Result<UploadedFile, InvocationError> {
let total = data.len();
let part_size = UPLOAD_CHUNK_SIZE as usize;
let total_parts = total.div_ceil(part_size) as i32;
if total < BIG_FILE_THRESHOLD {
return self.upload_file(&data, name, mime_type).await;
}
let file_id = crate::random_i64_pub();
let next_part = Arc::new(Mutex::new(0i32));
let mut tasks = tokio::task::JoinSet::new();
for _ in 0..WORKER_COUNT {
let client = self.clone();
let data = Arc::clone(&data);
let next_part = Arc::clone(&next_part);
tasks.spawn(async move {
loop {
let part_num = {
let mut guard = next_part.lock().await;
if *guard >= total_parts {
break;
}
let n = *guard;
*guard += 1;
n
};
let start = part_num as usize * part_size;
let end = (start + part_size).min(data.len());
let bytes = data[start..end].to_vec();
client
.rpc_call_raw_pub(&tl::functions::upload::SaveBigFilePart {
file_id,
file_part: part_num,
file_total_parts: total_parts,
bytes,
})
.await?;
}
Ok::<(), InvocationError>(())
});
}
while let Some(res) = tasks.join_next().await {
res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
}
let inner = tl::enums::InputFile::Big(tl::types::InputFileBig {
id: file_id,
parts: total_parts,
name: name.to_string(),
});
tracing::info!(
"[layer] concurrent-uploaded '{}' ({} bytes, {} parts, {} workers)",
name,
total,
total_parts,
WORKER_COUNT
);
Ok(UploadedFile {
inner,
mime_type: resolve_mime(name, mime_type),
name: name.to_string(),
})
}
pub async fn upload_stream<R: AsyncRead + Unpin>(
&self,
reader: &mut R,
name: &str,
mime_type: &str,
) -> Result<UploadedFile, InvocationError> {
let mut data = Vec::new();
reader.read_to_end(&mut data).await?;
if data.len() >= BIG_FILE_THRESHOLD {
self.upload_file_concurrent(Arc::new(data), name, mime_type)
.await
} else {
self.upload_file(&data, name, mime_type).await
}
}
pub async fn send_file(
&self,
peer: tl::enums::Peer,
media: tl::enums::InputMedia,
caption: &str,
) -> Result<(), InvocationError> {
let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
let req = tl::functions::messages::SendMedia {
silent: false,
background: false,
clear_draft: false,
noforwards: false,
update_stickersets_order: false,
invert_media: false,
allow_paid_floodskip: false,
peer: input_peer,
reply_to: None,
media,
message: caption.to_string(),
random_id: crate::random_i64_pub(),
reply_markup: None,
entities: None,
schedule_date: None,
schedule_repeat_period: None,
send_as: None,
quick_reply_shortcut: None,
effect: None,
allow_paid_stars: None,
suggested_post: None,
};
self.rpc_call_raw_pub(&req).await?;
Ok(())
}
pub async fn send_album(
&self,
peer: tl::enums::Peer,
items: Vec<AlbumItem>,
) -> Result<(), InvocationError> {
let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer);
let reply_to = items.iter().find_map(|i| i.reply_to).map(|id| {
tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
reply_to_msg_id: id,
top_msg_id: None,
reply_to_peer_id: None,
quote_text: None,
quote_entities: None,
quote_offset: None,
monoforum_peer_id: None,
poll_option: None,
todo_item_id: None,
})
});
let multi: Vec<tl::enums::InputSingleMedia> = items
.into_iter()
.map(|item| {
tl::enums::InputSingleMedia::InputSingleMedia(tl::types::InputSingleMedia {
media: item.media,
random_id: crate::random_i64_pub(),
message: item.caption,
entities: if item.entities.is_empty() {
None
} else {
Some(item.entities)
},
})
})
.collect();
let req = tl::functions::messages::SendMultiMedia {
silent: false,
background: false,
clear_draft: false,
noforwards: false,
update_stickersets_order: false,
invert_media: false,
allow_paid_floodskip: false,
peer: input_peer,
reply_to,
multi_media: multi,
schedule_date: None,
send_as: None,
quick_reply_shortcut: None,
effect: None,
allow_paid_stars: None,
};
self.rpc_call_raw_pub(&req).await?;
Ok(())
}
pub fn iter_download(&self, location: tl::enums::InputFileLocation) -> DownloadIter {
DownloadIter {
client: self.clone(),
done: false,
request: Some(tl::functions::upload::GetFile {
precise: false,
cdn_supported: false,
location,
offset: 0,
limit: DOWNLOAD_CHUNK_SIZE,
}),
}
}
pub async fn download_media(
&self,
location: tl::enums::InputFileLocation,
) -> Result<Vec<u8>, InvocationError> {
let mut bytes = Vec::new();
let mut iter = self.iter_download(location);
while let Some(chunk) = iter.next().await? {
bytes.extend_from_slice(&chunk);
}
Ok(bytes)
}
pub async fn download_media_concurrent(
&self,
location: tl::enums::InputFileLocation,
size: usize,
) -> Result<Vec<u8>, InvocationError> {
let chunk = DOWNLOAD_CHUNK_SIZE as usize;
let n_parts = size.div_ceil(chunk);
let next_part = Arc::new(Mutex::new(0usize));
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(usize, Vec<u8>)>();
let mut tasks = tokio::task::JoinSet::new();
for _ in 0..WORKER_COUNT {
let client = self.clone();
let location = location.clone();
let next_part = Arc::clone(&next_part);
let tx = tx.clone();
tasks.spawn(async move {
loop {
let part = {
let mut g = next_part.lock().await;
if *g >= n_parts {
break;
}
let p = *g;
*g += 1;
p
};
let offset = (part * chunk) as i64;
let req = tl::functions::upload::GetFile {
precise: true,
cdn_supported: false,
location: location.clone(),
offset,
limit: DOWNLOAD_CHUNK_SIZE,
};
let raw = client.rpc_call_raw_pub(&req).await?;
let mut cur = Cursor::from_slice(&raw);
if let tl::enums::upload::File::File(f) =
tl::enums::upload::File::deserialize(&mut cur)?
{
let _ = tx.send((part, f.bytes));
}
}
Ok::<(), InvocationError>(())
});
}
drop(tx);
let mut parts: Vec<Option<Vec<u8>>> = (0..n_parts).map(|_| None).collect();
while let Some((idx, data)) = rx.recv().await {
if idx < parts.len() {
parts[idx] = Some(data);
}
}
while let Some(res) = tasks.join_next().await {
res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))??;
}
let mut out = Vec::with_capacity(size);
for part in parts.into_iter().flatten() {
out.extend_from_slice(&part);
}
out.truncate(size);
Ok(out)
}
pub async fn download<D: Downloadable>(&self, item: &D) -> Result<Vec<u8>, InvocationError> {
let loc = item
.to_input_location()
.ok_or_else(|| InvocationError::Deserialize("item has no download location".into()))?;
match item.size() {
Some(sz) if sz >= BIG_FILE_THRESHOLD => self.download_media_concurrent(loc, sz).await,
_ => self.download_media(loc).await,
}
}
}
impl crate::update::IncomingMessage {
pub fn download_location(&self) -> Option<tl::enums::InputFileLocation> {
let media = match &self.raw {
tl::enums::Message::Message(m) => m.media.as_ref()?,
_ => return None,
};
if let Some(doc) = Document::from_media(media) {
return doc.to_input_location();
}
if let Some(photo) = Photo::from_media(media) {
return photo.to_input_location();
}
None
}
}
pub fn download_location_from_media(
media: Option<&tl::enums::MessageMedia>,
) -> Option<tl::enums::InputFileLocation> {
let m = media?;
if let Some(doc) = Document::from_media(m) {
return doc.to_input_location();
}
if let Some(photo) = Photo::from_media(m) {
return photo.to_input_location();
}
None
}
fn make_input_file(
big: bool,
file_id: i64,
total_parts: i32,
name: &str,
data: &[u8],
) -> tl::enums::InputFile {
if big {
tl::enums::InputFile::Big(tl::types::InputFileBig {
id: file_id,
parts: total_parts,
name: name.to_string(),
})
} else {
let _ = data; tl::enums::InputFile::InputFile(tl::types::InputFile {
id: file_id,
parts: total_parts,
name: name.to_string(),
md5_checksum: String::new(),
})
}
}