ferogram_mtsender/sender_task.rs
1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// Licensed under either the MIT License or the Apache License 2.0.
4
5//! The sender task: a single `tokio::spawn`-ed loop that owns [`MtpSender`]
6//! and is the only entity that touches the TCP socket.
7//!
8//! External callers interact via two channels:
9//!
10//! - [`RpcEnqueue`]: send a pre-serialised TL body + oneshot::Sender to the task.
11//! The task enqueues it into `MtpSender`, and the oneshot is fulfilled when the
12//! server responds. This replaces the old `do_rpc_call` + `Mutex<ConnectionWriter>`
13//! + `pending` HashMap pattern.
14//!
15//! - [`ReconnectRequest`]: send a new `(TcpStream, EncryptedSession, FrameKind,
16//! Option<perm_key>)` to the task after a reconnect completes. The task calls
17//! `MtpSender::set_stream` and resumes the loop.
18//!
19//! The task forwards raw update bodies (everything `MtpSender::step()` returns
20//! that is not an rpc_result) via [`FrameEvent`] to the client's dispatch path.
21
22use ferogram_connect::FrameKind;
23use ferogram_mtproto::EncryptedSession;
24use tokio::net::TcpStream;
25use tokio::sync::{mpsc, oneshot};
26
27use crate::errors::InvocationError;
28use crate::mtp_sender::MtpSender;
29
30/// A single RPC request sent from any caller to the sender task.
31pub struct RpcEnqueue {
32 /// Pre-serialised TL body (output of `EncryptedSession::pack_body_with_msg_id`
33 /// or any raw TL bytes; the sender task will re-encrypt via MtpSender).
34 pub body: Vec<u8>,
35 /// Fulfilled with the raw rpc_result body (or an error) when the server responds.
36 pub tx: oneshot::Sender<Result<Vec<u8>, InvocationError>>,
37}
38
39/// Reconnect request: replace the TCP stream inside the sender task.
40pub struct ReconnectRequest {
41 pub stream: TcpStream,
42 pub enc: EncryptedSession,
43 pub frame_kind: FrameKind,
44 pub perm_auth_key: Option<[u8; 256]>,
45}
46
47/// Events the sender task sends back to the client.
48pub enum FrameEvent {
49 /// A raw update body (Updates, UpdateShort, etc.) to dispatch.
50 Update(Vec<u8>),
51 /// The connection failed; the client must reconnect and send a ReconnectRequest.
52 Error(InvocationError),
53 /// Session info after initial connect or reconnect (for session saving).
54 Connected {
55 auth_key: Box<[u8; 256]>,
56 first_salt: i64,
57 time_offset: i32,
58 session_id: i64,
59 },
60}
61
62/// Sender-side handles given to the client after spawning the sender task.
63pub struct SenderHandle {
64 /// Enqueue RPC requests here.
65 pub rpc_tx: mpsc::Sender<RpcEnqueue>,
66 /// Send a new stream here after reconnect.
67 pub reconnect_tx: mpsc::Sender<ReconnectRequest>,
68}
69
70/// Spawn the sender task. Returns a [`SenderHandle`] for the client and an
71/// `mpsc::Receiver<FrameEvent>` for receiving update bodies and errors.
72pub fn spawn_sender_task(
73 stream: TcpStream,
74 enc: EncryptedSession,
75 frame_kind: FrameKind,
76 perm_auth_key: Option<[u8; 256]>,
77) -> (SenderHandle, mpsc::Receiver<FrameEvent>) {
78 let (rpc_tx, rpc_rx) = mpsc::channel::<RpcEnqueue>(512);
79 let (reconnect_tx, reconnect_rx) = mpsc::channel::<ReconnectRequest>(4);
80 let (frame_tx, frame_rx) = mpsc::channel::<FrameEvent>(256);
81
82 let sender = MtpSender::new(stream, enc, frame_kind, perm_auth_key);
83
84 tokio::spawn(sender_loop(sender, rpc_rx, reconnect_rx, frame_tx));
85
86 (
87 SenderHandle {
88 rpc_tx,
89 reconnect_tx,
90 },
91 frame_rx,
92 )
93}
94
95async fn sender_loop(
96 mut sender: MtpSender,
97 mut rpc_rx: mpsc::Receiver<RpcEnqueue>,
98 mut reconnect_rx: mpsc::Receiver<ReconnectRequest>,
99 frame_tx: mpsc::Sender<FrameEvent>,
100) {
101 // Notify the client that we are connected and ready.
102 let _ = frame_tx
103 .send(FrameEvent::Connected {
104 auth_key: Box::new(sender.auth_key_bytes()),
105 first_salt: sender.first_salt(),
106 time_offset: sender.time_offset(),
107 session_id: sender.session_id(),
108 })
109 .await;
110
111 loop {
112 // Drain all pending RPC enqueues before stepping (non-blocking).
113 loop {
114 match rpc_rx.try_recv() {
115 Ok(enqueue) => sender.enqueue(enqueue.body, enqueue.tx),
116 Err(mpsc::error::TryRecvError::Empty) => break,
117 Err(mpsc::error::TryRecvError::Disconnected) => {
118 // Client dropped all handles: shut down cleanly.
119 return;
120 }
121 }
122 }
123
124 tokio::select! {
125 biased;
126
127 // New RPC enqueue arrived while we were waiting in step().
128 Some(enqueue) = rpc_rx.recv() => {
129 sender.enqueue(enqueue.body, enqueue.tx);
130 // Loop back immediately so step() can send it.
131 continue;
132 }
133
134 // Reconnect request: swap the stream.
135 Some(req) = reconnect_rx.recv() => {
136 tracing::info!("[ferogram::sender] reconnect: new stream received, swapping");
137 sender.set_stream(req.stream, req.enc, req.frame_kind, req.perm_auth_key);
138 let _ = frame_tx
139 .send(FrameEvent::Connected {
140 auth_key: Box::new(sender.auth_key_bytes()),
141 first_salt: sender.first_salt(),
142 time_offset: sender.time_offset(),
143 session_id: sender.session_id(),
144 })
145 .await;
146 continue;
147 }
148
149 // Drive one network event.
150 result = sender.step() => {
151 match result {
152 Ok(updates) => {
153 for body in updates {
154 if frame_tx.send(FrameEvent::Update(body)).await.is_err() {
155 // Client gone.
156 return;
157 }
158 }
159 }
160 Err(e) => {
161 tracing::warn!("[ferogram::sender] connection error, failing pending requests and waiting for reconnect: {e}");
162 // Fail all pending requests immediately.
163 sender.fail_all(&e);
164 // Notify the client; it will reconnect and send ReconnectRequest.
165 if frame_tx.send(FrameEvent::Error(e)).await.is_err() {
166 return;
167 }
168 // Wait for a reconnect before driving step() again.
169 match reconnect_rx.recv().await {
170 Some(req) => {
171 tracing::info!("[ferogram::sender] reconnect received, resuming send loop");
172 sender.set_stream(
173 req.stream,
174 req.enc,
175 req.frame_kind,
176 req.perm_auth_key,
177 );
178 let _ = frame_tx
179 .send(FrameEvent::Connected {
180 auth_key: Box::new(sender.auth_key_bytes()),
181 first_salt: sender.first_salt(),
182 time_offset: sender.time_offset(),
183 session_id: sender.session_id(),
184 })
185 .await;
186 }
187 None => {
188 // Client dropped reconnect handle: shut down.
189 return;
190 }
191 }
192 }
193 }
194 }
195 }
196 }
197}