use crate::DcEntry;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use ferogram_tl_types as tl;
use ferogram_tl_types::{Cursor, Deserializable};
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
use crate::{Client, InvocationError};
pub struct AlbumItem {
pub media: tl::enums::InputMedia,
pub caption: String,
pub entities: Vec<tl::enums::MessageEntity>,
pub reply_to: Option<i32>,
}
impl AlbumItem {
pub fn new(media: tl::enums::InputMedia) -> Self {
Self {
media,
caption: String::new(),
entities: Vec::new(),
reply_to: None,
}
}
pub fn caption(mut self, text: impl Into<String>) -> Self {
self.caption = text.into();
self
}
pub fn caption_html(mut self, html: impl Into<String>) -> Self {
let (text, ents) = crate::parsers::parse_html(html.into().as_str());
self.caption = text;
self.entities = ents;
self
}
pub fn caption_markdown(mut self, md: impl Into<String>) -> Self {
let (text, ents) = crate::parsers::parse_markdown(md.into().as_str());
self.caption = text;
self.entities = ents;
self
}
pub fn reply_to(mut self, msg_id: Option<i32>) -> Self {
self.reply_to = msg_id;
self
}
}
impl From<(tl::enums::InputMedia, String)> for AlbumItem {
fn from((media, caption): (tl::enums::InputMedia, String)) -> Self {
Self::new(media).caption(caption)
}
}
pub const DOWNLOAD_CHUNK_SIZE: i32 = 256 * 1024;
pub fn download_chunk_size(file_size: usize) -> i32 {
if file_size < 50 * 1024 * 1024 {
256 * 1024 } else if file_size < 500 * 1024 * 1024 {
512 * 1024 } else {
1024 * 1024 }
}
pub const MAX_WORKERS_PER_FILE: usize = 4;
pub const MAX_GLOBAL_SENDERS: usize = 12;
pub const BIG_FILE_THRESHOLD: usize = 10 * 1024 * 1024;
#[allow(dead_code)]
const UPLOAD_MAX_PARTS: i32 = 4000;
#[allow(dead_code)]
const UPLOAD_MAX_PER_SESSION: usize = 1024 * 1024;
#[allow(dead_code)]
const UPLOAD_PART_SIZES: &[usize] = &[32 * 1024, 64 * 1024, 128 * 1024, 256 * 1024, 512 * 1024];
pub fn upload_part_size(file_size: usize) -> (usize, i32) {
const MAX_PARTS: usize = 4000;
let mut ps: usize = if file_size < 512 * 1024 {
32 * 1024
} else {
512 * 1024
};
if file_size.div_ceil(ps) > MAX_PARTS {
ps = file_size.div_ceil(MAX_PARTS);
ps = ps.div_ceil(512); }
(ps, file_size.div_ceil(ps) as i32)
}
#[allow(dead_code)]
pub(crate) fn count_workers(n_parts: usize) -> usize {
match n_parts {
0..=5 => 1,
6..=20 => 2,
21..=80 => 3,
_ => MAX_WORKERS_PER_FILE, }
}
pub fn download_worker_count(file_size: usize) -> usize {
if file_size < 10 * 1024 * 1024 {
1
} else if file_size < 50 * 1024 * 1024 {
2
} else if file_size < 300 * 1024 * 1024 {
3
} else {
MAX_WORKERS_PER_FILE }
}
pub fn upload_worker_count(file_size: usize) -> usize {
if file_size < 10 * 1024 * 1024 {
1
} else if file_size < 100 * 1024 * 1024 {
2
} else if file_size < 500 * 1024 * 1024 {
3
} else {
MAX_WORKERS_PER_FILE }
}
#[deprecated(note = "use upload_part_size(file_size).0")]
pub const UPLOAD_CHUNK_SIZE: i32 = 128 * 1024;
fn resolve_mime(name: &str, mime_type: &str) -> String {
if !mime_type.is_empty() && mime_type != "application/octet-stream" {
return mime_type.to_string();
}
mime_guess::from_path(name)
.first_or_octet_stream()
.to_string()
}
#[derive(Debug, Clone)]
pub struct UploadedFile {
pub(crate) inner: tl::enums::InputFile,
pub(crate) mime_type: String,
pub(crate) name: String,
}
impl UploadedFile {
pub fn mime_type(&self) -> &str {
&self.mime_type
}
pub fn name(&self) -> &str {
&self.name
}
pub fn as_document_media(&self) -> tl::enums::InputMedia {
tl::enums::InputMedia::UploadedDocument(tl::types::InputMediaUploadedDocument {
nosound_video: false,
force_file: false,
spoiler: false,
file: self.inner.clone(),
thumb: None,
mime_type: self.mime_type.clone(),
attributes: vec![tl::enums::DocumentAttribute::Filename(
tl::types::DocumentAttributeFilename {
file_name: self.name.clone(),
},
)],
stickers: None,
ttl_seconds: None,
video_cover: None,
video_timestamp: None,
})
}
pub fn as_photo_media(&self) -> tl::enums::InputMedia {
tl::enums::InputMedia::UploadedPhoto(tl::types::InputMediaUploadedPhoto {
spoiler: false,
live_photo: false,
file: self.inner.clone(),
stickers: None,
ttl_seconds: None,
video: None,
})
}
}
pub trait Downloadable {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation>;
fn dc_id(&self) -> i32;
fn size(&self) -> Option<usize> {
None
}
}
#[derive(Debug, Clone)]
pub struct Photo {
pub raw: tl::types::Photo,
}
impl Photo {
pub fn from_raw(raw: tl::types::Photo) -> Self {
Self { raw }
}
pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
if let tl::enums::MessageMedia::Photo(mp) = media
&& let Some(tl::enums::Photo::Photo(p)) = &mp.photo
{
return Some(Self { raw: p.clone() });
}
None
}
pub fn id(&self) -> i64 {
self.raw.id
}
pub fn access_hash(&self) -> i64 {
self.raw.access_hash
}
pub fn date(&self) -> i32 {
self.raw.date
}
pub fn has_stickers(&self) -> bool {
self.raw.has_stickers
}
pub fn largest_thumb_type(&self) -> &str {
self.raw
.sizes
.iter()
.filter_map(|s| match s {
tl::enums::PhotoSize::PhotoSize(ps) => Some(ps.r#type.as_str()),
_ => None,
})
.next_back()
.unwrap_or("s")
}
}
impl Downloadable for Photo {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
Some(tl::enums::InputFileLocation::InputPhotoFileLocation(
tl::types::InputPhotoFileLocation {
id: self.raw.id,
access_hash: self.raw.access_hash,
file_reference: self.raw.file_reference.clone(),
thumb_size: self.largest_thumb_type().to_string(),
},
))
}
fn dc_id(&self) -> i32 {
self.raw.dc_id
}
}
#[derive(Debug, Clone)]
pub struct Document {
pub raw: tl::types::Document,
}
impl Document {
pub fn from_raw(raw: tl::types::Document) -> Self {
Self { raw }
}
pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
if let tl::enums::MessageMedia::Document(md) = media
&& let Some(tl::enums::Document::Document(d)) = &md.document
{
return Some(Self { raw: d.clone() });
}
None
}
pub fn id(&self) -> i64 {
self.raw.id
}
pub fn access_hash(&self) -> i64 {
self.raw.access_hash
}
pub fn date(&self) -> i32 {
self.raw.date
}
pub fn mime_type(&self) -> &str {
&self.raw.mime_type
}
pub fn size(&self) -> i64 {
self.raw.size
}
pub fn file_name(&self) -> Option<&str> {
self.raw.attributes.iter().find_map(|a| match a {
tl::enums::DocumentAttribute::Filename(f) => Some(f.file_name.as_str()),
_ => None,
})
}
pub fn is_animated(&self) -> bool {
self.raw
.attributes
.iter()
.any(|a| matches!(a, tl::enums::DocumentAttribute::Animated))
}
}
impl Downloadable for Document {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
Some(tl::enums::InputFileLocation::InputDocumentFileLocation(
tl::types::InputDocumentFileLocation {
id: self.raw.id,
access_hash: self.raw.access_hash,
file_reference: self.raw.file_reference.clone(),
thumb_size: String::new(),
},
))
}
fn dc_id(&self) -> i32 {
self.raw.dc_id
}
fn size(&self) -> Option<usize> {
Some(self.raw.size as usize)
}
}
#[derive(Debug, Clone)]
pub struct Sticker {
pub inner: Document,
}
impl Sticker {
pub fn from_document(doc: Document) -> Option<Self> {
let has_sticker_attr = doc
.raw
.attributes
.iter()
.any(|a| matches!(a, tl::enums::DocumentAttribute::Sticker(_)));
if has_sticker_attr {
Some(Self { inner: doc })
} else {
None
}
}
pub fn from_media(media: &tl::enums::MessageMedia) -> Option<Self> {
Document::from_media(media).and_then(Self::from_document)
}
pub fn emoji(&self) -> Option<&str> {
self.inner.raw.attributes.iter().find_map(|a| match a {
tl::enums::DocumentAttribute::Sticker(s) => Some(s.alt.as_str()),
_ => None,
})
}
pub fn is_video(&self) -> bool {
self.inner
.raw
.attributes
.iter()
.any(|a| matches!(a, tl::enums::DocumentAttribute::Video(_)))
}
pub fn id(&self) -> i64 {
self.inner.id()
}
pub fn mime_type(&self) -> &str {
self.inner.mime_type()
}
}
impl Downloadable for Sticker {
fn to_input_location(&self) -> Option<tl::enums::InputFileLocation> {
self.inner.to_input_location()
}
fn dc_id(&self) -> i32 {
self.inner.dc_id()
}
fn size(&self) -> Option<usize> {
Some(self.inner.raw.size as usize)
}
}
pub struct DownloadIter {
client: Client,
request: Option<tl::functions::upload::GetFile>,
done: bool,
dc_id: i32,
}
impl DownloadIter {
pub fn chunk_size(mut self, size: i32) -> Self {
if let Some(r) = &mut self.request {
r.limit = size;
}
self
}
pub async fn next(&mut self) -> Result<Option<Vec<u8>>, InvocationError> {
if self.done {
return Ok(None);
}
let req = match &self.request {
Some(r) => r.clone(),
None => return Ok(None),
};
let body = self.client.rpc_transfer_on_dc_pub(self.dc_id, &req).await?;
let mut cur = Cursor::from_slice(&body);
match tl::enums::upload::File::deserialize(&mut cur)? {
tl::enums::upload::File::File(f) => {
if (f.bytes.len() as i32) < req.limit {
self.done = true;
if f.bytes.is_empty() {
return Ok(None);
}
}
if let Some(r) = &mut self.request {
r.offset += req.limit as i64;
}
Ok(Some(f.bytes))
}
tl::enums::upload::File::CdnRedirect(_) => {
self.done = true;
Err(InvocationError::Deserialize(
"upload.fileCdnRedirect received (cdn_supported=false was ignored by server)"
.into(),
))
}
}
}
}
impl Client {
pub async fn upload_file(
&self,
data: &[u8],
name: &str,
mime_type: &str,
) -> Result<UploadedFile, InvocationError> {
if data.is_empty() {
return Err(InvocationError::Deserialize(
"cannot upload empty file".into(),
));
}
let resolved_mime = resolve_mime(name, mime_type);
let total = data.len();
let big = total > BIG_FILE_THRESHOLD;
let (part_size, total_parts) = upload_part_size(total);
let file_id = crate::random_i64_pub();
for (part_num, chunk) in data.chunks(part_size).enumerate() {
if big {
self.rpc_transfer_on_dc_pub(
0,
&tl::functions::upload::SaveBigFilePart {
file_id,
file_part: part_num as i32,
file_total_parts: total_parts,
bytes: chunk.to_vec(),
},
)
.await?;
} else {
self.rpc_transfer_on_dc_pub(
0,
&tl::functions::upload::SaveFilePart {
file_id,
file_part: part_num as i32,
bytes: chunk.to_vec(),
},
)
.await?;
}
}
let inner = make_input_file(big, file_id, total_parts, name, data);
tracing::info!(
"[ferogram] uploaded '{}' ({} bytes, part={}B × {} parts, mime={})",
name,
total,
part_size,
total_parts,
resolved_mime
);
Ok(UploadedFile {
inner,
mime_type: resolved_mime,
name: name.to_string(),
})
}
pub async fn upload_file_concurrent(
&self,
data: Arc<Vec<u8>>,
name: &str,
mime_type: &str,
) -> Result<UploadedFile, InvocationError> {
if data.is_empty() {
return Err(InvocationError::Deserialize(
"cannot upload empty file".into(),
));
}
let total = data.len();
let (part_size, total_parts) = upload_part_size(total);
let big = total > BIG_FILE_THRESHOLD;
let n_workers = upload_worker_count(total).min(MAX_WORKERS_PER_FILE);
let _global_guard = self
.inner
.worker_semaphore
.acquire_many(n_workers as u32)
.await
.expect("worker semaphore unexpectedly closed");
let file_id_atomic =
std::sync::Arc::new(std::sync::atomic::AtomicI64::new(crate::random_i64_pub()));
let upload_dc = Arc::new(AtomicI32::new(0i32));
let mut open_set: tokio::task::JoinSet<
Result<crate::dc_pool::DcConnection, InvocationError>,
> = tokio::task::JoinSet::new();
for _ in 0..n_workers {
let client = self.clone();
open_set.spawn(async move { client.open_worker_conn(0).await });
}
let mut conns: Vec<crate::dc_pool::DcConnection> = Vec::with_capacity(n_workers);
while let Some(res) = open_set.join_next().await {
match res {
Ok(Ok(c)) => conns.push(c),
Ok(Err(e)) => tracing::warn!("[ferogram] upload: worker conn failed: {e}"),
Err(e) => tracing::warn!("[ferogram] upload: worker conn join error: {e}"),
}
}
if conns.is_empty() {
tracing::warn!("[ferogram] upload: no worker conns, falling back to sequential");
return self.upload_file(&data, name, mime_type).await;
}
let actual_workers = conns.len();
let next_part = Arc::new(Mutex::new(0i32));
let mut tasks: tokio::task::JoinSet<Result<(), InvocationError>> =
tokio::task::JoinSet::new();
for mut conn in conns {
let data = Arc::clone(&data);
let next_part = Arc::clone(&next_part);
let client = self.clone();
let upload_dc = Arc::clone(&upload_dc);
let file_id_atomic = std::sync::Arc::clone(&file_id_atomic);
tasks.spawn(async move {
const MAX_WORKER_RECONNECTS: u8 = 5;
let mut total_reconnects = 0u8;
let mut worker_dc = 0i32;
loop {
let (part_num, file_id, current_dc) = {
let mut g = next_part.lock().await;
let fid = file_id_atomic.load(std::sync::atomic::Ordering::Relaxed);
let dc = upload_dc.load(Ordering::Relaxed);
if *g >= total_parts {
break;
}
let n = *g;
*g += 1;
(n, fid, dc)
};
if current_dc != worker_dc {
worker_dc = current_dc;
conn = match client.open_worker_conn(worker_dc).await {
Ok(c) => c,
Err(e) => return Err(e),
};
}
let start = part_num as usize * part_size;
let end = (start + part_size).min(data.len());
let bytes = data[start..end].to_vec();
loop {
let result = if big {
conn.rpc_call(&tl::functions::upload::SaveBigFilePart {
file_id,
file_part: part_num,
file_total_parts: total_parts,
bytes: bytes.clone(),
})
.await
} else {
conn.rpc_call(&tl::functions::upload::SaveFilePart {
file_id,
file_part: part_num,
bytes: bytes.clone(),
})
.await
};
let err = match result {
Ok(_) => break,
Err(e) => e,
};
if let InvocationError::Rpc(ref rpc) = err {
if rpc.code == 420 {
let secs = rpc.value.unwrap_or(1) as u64;
tracing::info!(
"[ferogram] upload: FLOOD_WAIT_{secs}; sleeping before retry"
);
tokio::time::sleep(std::time::Duration::from_secs(secs)).await;
continue;
}
if rpc.code == 303 {
let new_dc = rpc.value.unwrap_or(1) as i32;
tracing::info!(
"[ferogram] upload: FILE_MIGRATE_{new_dc}; \
switching worker DC{worker_dc}→DC{new_dc}"
);
{
let mut g = next_part.lock().await;
file_id_atomic.store(
crate::random_i64_pub(),
std::sync::atomic::Ordering::SeqCst,
);
upload_dc.store(new_dc, Ordering::SeqCst);
*g = 0;
}
worker_dc = new_dc;
match client.open_worker_conn(new_dc).await {
Ok(c) => {
conn = c;
continue;
}
Err(e) => return Err(e),
}
}
if rpc.name == "AUTH_KEY_UNREGISTERED" {
tracing::warn!(
"[ferogram] upload: AUTH_KEY_UNREGISTERED DC{worker_dc}; \
reopening worker [{}/{MAX_WORKER_RECONNECTS}]",
total_reconnects + 1
);
total_reconnects += 1;
if total_reconnects >= MAX_WORKER_RECONNECTS {
return Err(err);
}
let backoff_ms = 300u64 * (1u64 << (total_reconnects - 1));
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms))
.await;
match client.open_worker_conn(worker_dc).await {
Ok(c) => {
conn = c;
continue;
}
Err(e) => return Err(e),
}
}
if rpc.code != -503 {
return Err(err);
}
}
total_reconnects += 1;
if total_reconnects >= MAX_WORKER_RECONNECTS {
return Err(err);
}
let backoff_ms = 300u64 * (1u64 << (total_reconnects - 1));
tracing::warn!(
"[ferogram] upload: worker error ({err}), reconnecting \
[{total_reconnects}/{MAX_WORKER_RECONNECTS}] (backoff {backoff_ms}ms)"
);
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
conn = match client.open_worker_conn(worker_dc).await {
Ok(c) => c,
Err(e) => return Err(e),
};
}
}
Ok(())
});
}
while let Some(res) = tasks.join_next().await {
if let Err(e) =
res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))?
{
tasks.abort_all();
return Err(e);
}
}
let file_id = file_id_atomic.load(std::sync::atomic::Ordering::Relaxed);
let inner = make_input_file(big, file_id, total_parts, name, &data);
tracing::info!(
"[ferogram] uploaded '{}' ({} bytes, part={}B x {} parts, {} workers)",
name,
total,
part_size,
total_parts,
actual_workers
);
Ok(UploadedFile {
inner,
mime_type: resolve_mime(name, mime_type),
name: name.to_string(),
})
}
pub async fn upload_stream<R: AsyncRead + Unpin>(
&self,
reader: &mut R,
name: &str,
mime_type: &str,
) -> Result<UploadedFile, InvocationError> {
let mut data = Vec::new();
reader.read_to_end(&mut data).await?;
if data.len() > BIG_FILE_THRESHOLD {
self.upload_file_concurrent(Arc::new(data), name, mime_type)
.await
} else {
self.upload_file(&data, name, mime_type).await
}
}
pub async fn send_file(
&self,
peer: impl Into<crate::PeerRef>,
media: tl::enums::InputMedia,
msg: &crate::InputMessage,
) -> Result<crate::update::IncomingMessage, InvocationError> {
let peer = peer.into().resolve(self).await?;
let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer)?;
let req = tl::functions::messages::SendMedia {
silent: msg.silent,
background: msg.background,
clear_draft: msg.clear_draft,
noforwards: false,
update_stickersets_order: false,
invert_media: msg.invert_media,
allow_paid_floodskip: false,
peer: input_peer,
reply_to: msg.reply_header(),
media,
message: msg.text.clone(),
random_id: crate::random_i64_pub(),
reply_markup: msg.reply_markup.clone(),
entities: msg.entities.clone(),
schedule_date: msg.schedule_date,
schedule_repeat_period: None,
send_as: None,
quick_reply_shortcut: None,
effect: None,
allow_paid_stars: None,
suggested_post: None,
};
let body: Vec<u8> = self.rpc_call_raw(&req).await?;
Ok(self.parse_send_response(&body, msg, &peer).await)
}
pub async fn send_album(
&self,
peer: impl Into<crate::PeerRef>,
items: Vec<AlbumItem>,
) -> Result<Vec<crate::update::IncomingMessage>, InvocationError> {
let peer = peer.into().resolve(self).await?;
let input_peer = self.inner.peer_cache.read().await.peer_to_input(&peer)?;
let reply_to = items.iter().find_map(|i| i.reply_to).map(|id| {
tl::enums::InputReplyTo::Message(tl::types::InputReplyToMessage {
reply_to_msg_id: id,
top_msg_id: None,
reply_to_peer_id: None,
quote_text: None,
quote_entities: None,
quote_offset: None,
monoforum_peer_id: None,
poll_option: None,
todo_item_id: None,
})
});
let multi: Vec<tl::enums::InputSingleMedia> = items
.into_iter()
.map(|item| {
tl::enums::InputSingleMedia::InputSingleMedia(tl::types::InputSingleMedia {
media: item.media,
random_id: crate::random_i64_pub(),
message: item.caption,
entities: if item.entities.is_empty() {
None
} else {
Some(item.entities)
},
})
})
.collect();
let req = tl::functions::messages::SendMultiMedia {
silent: false,
background: false,
clear_draft: false,
noforwards: false,
update_stickersets_order: false,
invert_media: false,
allow_paid_floodskip: false,
peer: input_peer,
reply_to,
multi_media: multi,
schedule_date: None,
send_as: None,
quick_reply_shortcut: None,
effect: None,
allow_paid_stars: None,
};
let body: Vec<u8> = self.rpc_call_raw(&req).await?;
let mut out = Vec::new();
if body.len() >= 4 {
let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
if cid == 0x74ae4240 || cid == 0x725b04c3 {
let updates_opt = match tl::enums::Updates::from_bytes_exact(&body) {
Ok(updates) => Some(updates),
Err(e) => {
tracing::warn!("[ferogram] updates parse error: {e}");
None
}
};
let (raw_updates, users, chats) = match updates_opt {
Some(tl::enums::Updates::Updates(u)) => (u.updates, u.users, u.chats),
Some(tl::enums::Updates::Combined(u)) => (u.updates, u.users, u.chats),
_ => (vec![], vec![], vec![]),
};
self.cache_users_and_chats(&users, &chats).await;
for upd in raw_updates {
match upd {
tl::enums::Update::NewMessage(u) => {
out.push(
crate::update::IncomingMessage::from_raw(u.message)
.with_client(self.clone()),
);
}
tl::enums::Update::NewChannelMessage(u) => {
out.push(
crate::update::IncomingMessage::from_raw(u.message)
.with_client(self.clone()),
);
}
_ => {}
}
}
}
}
Ok(out)
}
pub fn iter_download(&self, location: tl::enums::InputFileLocation) -> DownloadIter {
self.iter_download_on_dc(location, 0)
}
pub fn iter_download_on_dc(
&self,
location: tl::enums::InputFileLocation,
dc_id: i32,
) -> DownloadIter {
DownloadIter {
client: self.clone(),
done: false,
dc_id,
request: Some(tl::functions::upload::GetFile {
precise: false,
cdn_supported: false,
location,
offset: 0,
limit: 512 * 1024,
}),
}
}
pub async fn download_media(
&self,
location: tl::enums::InputFileLocation,
) -> Result<Vec<u8>, InvocationError> {
self.download_media_on_dc(location, 0).await
}
pub async fn download_media_on_dc(
&self,
location: tl::enums::InputFileLocation,
dc_id: i32,
) -> Result<Vec<u8>, InvocationError> {
let chunk = 512 * 1024i32;
let mut worker_dc = if dc_id == 0 {
{
let _g: tokio::sync::MutexGuard<'_, i32> = self.inner.home_dc_id.lock().await;
*_g
}
} else {
dc_id
};
let mut conn = self.open_worker_conn(worker_dc).await?;
let mut offset = 0i64;
let mut bytes = Vec::new();
let mut reopen_attempts = 0u8;
const MAX_REOPEN: u8 = 3;
loop {
let req = tl::functions::upload::GetFile {
precise: true,
cdn_supported: false,
location: location.clone(),
offset,
limit: chunk,
};
match conn.rpc_call(&req).await {
Ok(raw) => {
let mut cur = Cursor::from_slice(&raw);
match tl::enums::upload::File::deserialize(&mut cur)? {
tl::enums::upload::File::File(f) => {
reopen_attempts = 0; let done = (f.bytes.len() as i32) < chunk;
bytes.extend_from_slice(&f.bytes);
if done {
break;
}
offset += chunk as i64;
}
tl::enums::upload::File::CdnRedirect(_) => break,
}
}
Err(InvocationError::Rpc(ref rpc))
if rpc.name == "FILE_MIGRATE" || rpc.name == "FILE_MIGRATE_X" =>
{
let new_dc = rpc.value.unwrap_or(0) as i32;
if new_dc == 0 || new_dc == worker_dc {
return Err(InvocationError::Rpc(rpc.clone()));
}
tracing::debug!(
"[ferogram] seq download: FILE_MIGRATE_{new_dc}; reopening worker on DC{new_dc}"
);
worker_dc = new_dc;
conn = self.open_worker_conn(worker_dc).await?;
}
Err(InvocationError::Rpc(ref rpc)) if rpc.name == "AUTH_KEY_UNREGISTERED" => {
reopen_attempts += 1;
if reopen_attempts > MAX_REOPEN {
return Err(InvocationError::Rpc(rpc.clone()));
}
tracing::debug!(
"[ferogram] seq download: AUTH_KEY_UNREGISTERED DC{worker_dc}; \
reopening worker [{reopen_attempts}/{MAX_REOPEN}]"
);
{
let mut opts: tokio::sync::MutexGuard<
'_,
std::collections::HashMap<i32, DcEntry>,
> = self.inner.dc_options.lock().await;
if let Some(e) = opts.get_mut(&worker_dc) {
e.auth_key = None;
}
}
conn = self.open_worker_conn(worker_dc).await?;
}
Err(e) => return Err(e),
}
}
Ok(bytes)
}
pub async fn download_media_concurrent(
&self,
location: tl::enums::InputFileLocation,
size: usize,
) -> Result<Vec<u8>, InvocationError> {
self.download_media_concurrent_on_dc(location, size, 0)
.await
}
pub async fn download_media_concurrent_on_dc(
&self,
location: tl::enums::InputFileLocation,
size: usize,
dc_id: i32,
) -> Result<Vec<u8>, InvocationError> {
let chunk = download_chunk_size(size) as usize; let n_parts = size.div_ceil(chunk);
let n_workers = download_worker_count(size).min(MAX_WORKERS_PER_FILE);
let _global_guard = self
.inner
.worker_semaphore
.acquire_many(n_workers as u32)
.await
.expect("worker semaphore unexpectedly closed");
let home = {
let _g: tokio::sync::MutexGuard<'_, i32> = self.inner.home_dc_id.lock().await;
*_g
};
let effective_dc = if dc_id == 0 { home } else { dc_id };
if n_workers == 1 && effective_dc == home {
return self.download_media_on_dc(location, dc_id).await;
}
let mut open_set: tokio::task::JoinSet<
Result<crate::dc_pool::DcConnection, InvocationError>,
> = tokio::task::JoinSet::new();
for _ in 0..n_workers {
let client = self.clone();
open_set.spawn(async move { client.open_worker_conn(dc_id).await });
}
let mut conns: Vec<crate::dc_pool::DcConnection> = Vec::with_capacity(n_workers);
while let Some(res) = open_set.join_next().await {
match res {
Ok(Ok(c)) => conns.push(c),
Ok(Err(e)) => tracing::warn!("[ferogram] download: worker conn failed: {e}"),
Err(e) => tracing::warn!("[ferogram] download: worker conn join error: {e}"),
}
}
if conns.is_empty() {
tracing::warn!("[ferogram] download: no worker conns, falling back to sequential");
return self.download_media_on_dc(location, dc_id).await;
}
let next_part = Arc::new(Mutex::new(0usize));
let (tx, mut rx) = tokio::sync::mpsc::channel::<(usize, Vec<u8>)>(conns.len() * 2);
let mut tasks: tokio::task::JoinSet<Result<(), InvocationError>> =
tokio::task::JoinSet::new();
let abort = Arc::new(AtomicBool::new(false));
for mut conn in conns {
let location = location.clone();
let next_part = Arc::clone(&next_part);
let tx = tx.clone();
let client = self.clone();
let abort = Arc::clone(&abort);
let init_dc = effective_dc;
tasks.spawn(async move {
const MAX_WORKER_RECONNECTS: u8 = 5;
let mut total_reconnects = 0u8;
let mut worker_dc = init_dc;
loop {
if abort.load(Ordering::Relaxed) {
break;
}
let part = {
let mut g = next_part.lock().await;
if *g >= n_parts {
break;
}
let p = *g;
*g += 1;
p
};
let req = tl::functions::upload::GetFile {
precise: true,
cdn_supported: false,
location: location.clone(),
offset: (part * chunk) as i64, limit: chunk as i32, };
let raw = loop {
let err = match conn.rpc_call(&req).await {
Ok(r) => break r,
Err(e) => e,
};
if let InvocationError::Rpc(ref rpc) = err {
if rpc.code == 420 {
let secs = rpc.value.unwrap_or(1) as u64;
tracing::info!(
"[ferogram] download: FLOOD_WAIT_{secs}; sleeping before retry"
);
if abort.load(Ordering::Relaxed) {
abort.store(true, Ordering::Relaxed);
return Err(err);
}
tokio::time::sleep(std::time::Duration::from_secs(secs)).await;
continue; }
if rpc.code == 303 {
let new_dc = rpc.value.unwrap_or(1) as i32;
tracing::info!(
"[ferogram] download: FILE_MIGRATE_{new_dc}; \
switching worker DC{worker_dc}→DC{new_dc}"
);
worker_dc = new_dc;
match client.open_worker_conn(new_dc).await {
Ok(c) => {
conn = c;
continue;
}
Err(e) => {
abort.store(true, Ordering::Relaxed);
return Err(e);
}
}
}
if rpc.name == "AUTH_KEY_UNREGISTERED" {
tracing::warn!(
"[ferogram] download: AUTH_KEY_UNREGISTERED DC{worker_dc}; \
reopening worker [{}/{MAX_WORKER_RECONNECTS}]",
total_reconnects + 1
);
total_reconnects += 1;
if total_reconnects >= MAX_WORKER_RECONNECTS {
abort.store(true, Ordering::Relaxed);
return Err(err);
}
let backoff_ms = 300u64 * (1u64 << (total_reconnects - 1));
if abort.load(Ordering::Relaxed) {
abort.store(true, Ordering::Relaxed);
return Err(err);
}
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms))
.await;
match client.open_worker_conn(worker_dc).await {
Ok(c) => {
conn = c;
continue;
}
Err(e) => {
abort.store(true, Ordering::Relaxed);
return Err(e);
}
}
}
if rpc.code != -503 {
abort.store(true, Ordering::Relaxed);
return Err(err); }
}
total_reconnects += 1;
if total_reconnects >= MAX_WORKER_RECONNECTS {
abort.store(true, Ordering::Relaxed);
return Err(err);
}
let backoff_ms = 300u64 * (1u64 << (total_reconnects - 1));
tracing::warn!(
"[ferogram] download: worker error ({err}), reconnecting \
[{total_reconnects}/{MAX_WORKER_RECONNECTS}] (backoff {backoff_ms}ms)"
);
if abort.load(Ordering::Relaxed) {
abort.store(true, Ordering::Relaxed);
return Err(err);
}
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
match client.open_worker_conn(worker_dc).await {
Ok(c) => {
conn = c;
}
Err(e) => {
abort.store(true, Ordering::Relaxed);
return Err(e);
}
}
};
let mut cur = Cursor::from_slice(&raw);
match tl::enums::upload::File::deserialize(&mut cur)? {
tl::enums::upload::File::File(f) => {
let expected = if part == n_parts - 1 {
size - part * chunk
} else {
chunk
};
if f.bytes.len() != expected {
abort.store(true, Ordering::Relaxed);
return Err(InvocationError::Deserialize(format!(
"download part {part}: expected {expected} B, got {} B",
f.bytes.len()
)));
}
if tx.send((part, f.bytes)).await.is_err() {
break;
}
}
tl::enums::upload::File::CdnRedirect(_redir) => {
abort.store(true, Ordering::Relaxed);
return Err(InvocationError::Deserialize(
"upload.fileCdnRedirect: CDN redirect received in concurrent \
download; retry via sequential path"
.into(),
));
}
}
}
Ok(())
});
}
drop(tx);
let mut parts: Vec<Option<Vec<u8>>> = (0..n_parts).map(|_| None).collect();
while let Some((idx, data)) = rx.recv().await {
if idx < parts.len() {
parts[idx] = Some(data);
}
if abort.load(Ordering::Relaxed) {
break;
}
}
while let Ok((idx, data)) = rx.try_recv() {
if idx < parts.len() {
parts[idx] = Some(data);
}
}
while let Some(res) = tasks.join_next().await {
if let Err(e) =
res.map_err(|e| InvocationError::Io(std::io::Error::other(e.to_string())))?
{
tasks.abort_all(); return Err(e);
}
}
let mut out = Vec::with_capacity(size);
for part in parts.into_iter().flatten() {
out.extend_from_slice(&part);
}
out.truncate(size);
Ok(out)
}
pub async fn download<D: Downloadable>(&self, item: &D) -> Result<Vec<u8>, InvocationError> {
let loc = item
.to_input_location()
.ok_or_else(|| InvocationError::Deserialize("item has no download location".into()))?;
let dc = item.dc_id();
match item.size() {
Some(sz) => self.download_media_concurrent_on_dc(loc, sz, dc).await,
None => self.download_media_on_dc(loc, dc).await,
}
}
}
impl crate::update::IncomingMessage {
pub fn download_location(&self) -> Option<tl::enums::InputFileLocation> {
let media = match &self.raw {
tl::enums::Message::Message(m) => m.media.as_ref()?,
_ => return None,
};
if let Some(doc) = Document::from_media(media) {
return doc.to_input_location();
}
if let Some(photo) = Photo::from_media(media) {
return photo.to_input_location();
}
None
}
pub fn download_location_with_dc(&self) -> Option<(tl::enums::InputFileLocation, i32)> {
let media = match &self.raw {
tl::enums::Message::Message(m) => m.media.as_ref()?,
_ => return None,
};
if let Some(doc) = Document::from_media(media) {
return Some((doc.to_input_location()?, doc.dc_id()));
}
if let Some(photo) = Photo::from_media(media) {
return Some((photo.to_input_location()?, photo.dc_id()));
}
None
}
}
pub fn download_location_from_media(
media: Option<&tl::enums::MessageMedia>,
) -> Option<(tl::enums::InputFileLocation, i32)> {
let m = media?;
if let Some(doc) = Document::from_media(m) {
return Some((doc.to_input_location()?, doc.dc_id()));
}
if let Some(photo) = Photo::from_media(m) {
return Some((photo.to_input_location()?, photo.dc_id()));
}
None
}
fn make_input_file(
big: bool,
file_id: i64,
total_parts: i32,
name: &str,
data: &[u8],
) -> tl::enums::InputFile {
if big {
tl::enums::InputFile::Big(tl::types::InputFileBig {
id: file_id,
parts: total_parts,
name: name.to_string(),
})
} else {
let md5_checksum = format!("{:x}", md5::compute(data));
tl::enums::InputFile::InputFile(tl::types::InputFile {
id: file_id,
parts: total_parts,
name: name.to_string(),
md5_checksum,
})
}
}