use ferogram_connect::FrameKind;
use ferogram_mtproto::EncryptedSession;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot};
use crate::errors::InvocationError;
use crate::mtp_sender::MtpSender;
pub struct RpcEnqueue {
pub body: Vec<u8>,
pub tx: oneshot::Sender<Result<Vec<u8>, InvocationError>>,
}
pub struct ReconnectRequest {
pub stream: TcpStream,
pub enc: EncryptedSession,
pub frame_kind: FrameKind,
pub perm_auth_key: Option<[u8; 256]>,
}
pub enum FrameEvent {
Update(Vec<u8>),
Error(InvocationError),
Connected {
auth_key: Box<[u8; 256]>,
first_salt: i64,
time_offset: i32,
session_id: i64,
},
}
pub struct SenderHandle {
pub rpc_tx: mpsc::Sender<RpcEnqueue>,
pub reconnect_tx: mpsc::Sender<ReconnectRequest>,
}
pub fn spawn_sender_task(
stream: TcpStream,
enc: EncryptedSession,
frame_kind: FrameKind,
perm_auth_key: Option<[u8; 256]>,
) -> (SenderHandle, mpsc::Receiver<FrameEvent>) {
let (rpc_tx, rpc_rx) = mpsc::channel::<RpcEnqueue>(512);
let (reconnect_tx, reconnect_rx) = mpsc::channel::<ReconnectRequest>(4);
let (frame_tx, frame_rx) = mpsc::channel::<FrameEvent>(256);
let sender = MtpSender::new(stream, enc, frame_kind, perm_auth_key);
tokio::spawn(sender_loop(sender, rpc_rx, reconnect_rx, frame_tx));
(
SenderHandle {
rpc_tx,
reconnect_tx,
},
frame_rx,
)
}
async fn sender_loop(
mut sender: MtpSender,
mut rpc_rx: mpsc::Receiver<RpcEnqueue>,
mut reconnect_rx: mpsc::Receiver<ReconnectRequest>,
frame_tx: mpsc::Sender<FrameEvent>,
) {
let _ = frame_tx
.send(FrameEvent::Connected {
auth_key: Box::new(sender.auth_key_bytes()),
first_salt: sender.first_salt(),
time_offset: sender.time_offset(),
session_id: sender.session_id(),
})
.await;
loop {
loop {
match rpc_rx.try_recv() {
Ok(enqueue) => sender.enqueue(enqueue.body, enqueue.tx),
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => {
return;
}
}
}
tokio::select! {
biased;
Some(enqueue) = rpc_rx.recv() => {
sender.enqueue(enqueue.body, enqueue.tx);
continue;
}
Some(req) = reconnect_rx.recv() => {
tracing::info!("[ferogram::sender] reconnect: new stream received, swapping");
sender.set_stream(req.stream, req.enc, req.frame_kind, req.perm_auth_key);
let _ = frame_tx
.send(FrameEvent::Connected {
auth_key: Box::new(sender.auth_key_bytes()),
first_salt: sender.first_salt(),
time_offset: sender.time_offset(),
session_id: sender.session_id(),
})
.await;
continue;
}
result = sender.step() => {
match result {
Ok(updates) => {
for body in updates {
if frame_tx.send(FrameEvent::Update(body)).await.is_err() {
return;
}
}
}
Err(e) => {
tracing::warn!("[ferogram::sender] connection error, failing pending requests and waiting for reconnect: {e}");
sender.fail_all(&e);
if frame_tx.send(FrameEvent::Error(e)).await.is_err() {
return;
}
match reconnect_rx.recv().await {
Some(req) => {
tracing::info!("[ferogram::sender] reconnect received, resuming send loop");
sender.set_stream(
req.stream,
req.enc,
req.frame_kind,
req.perm_auth_key,
);
let _ = frame_tx
.send(FrameEvent::Connected {
auth_key: Box::new(sender.auth_key_bytes()),
first_salt: sender.first_salt(),
time_offset: sender.time_offset(),
session_id: sender.session_id(),
})
.await;
}
None => {
return;
}
}
}
}
}
}
}
}