Skip to main content

ferogram_mtsender/
sender_task.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// Licensed under either the MIT License or the Apache License 2.0.
4
5//! The sender task: a single `tokio::spawn`-ed loop that owns [`MtpSender`]
6//! and is the only entity that touches the TCP socket.
7//!
8//! External callers interact via two channels:
9//!
10//! - [`RpcEnqueue`]: send a pre-serialised TL body + oneshot::Sender to the task.
11//!   The task enqueues it into `MtpSender`, and the oneshot is fulfilled when the
12//!   server responds.  This replaces the old `do_rpc_call` + `Mutex<ConnectionWriter>`
13//!   + `pending` HashMap pattern.
14//!
15//! - [`ReconnectRequest`]: send a new `(TcpStream, EncryptedSession, FrameKind,
16//!   Option<perm_key>)` to the task after a reconnect completes.  The task calls
17//!   `MtpSender::set_stream` and resumes the loop.
18//!
19//! The task forwards raw update bodies (everything `MtpSender::step()` returns
20//! that is not an rpc_result) via [`FrameEvent`] to the client's dispatch path.
21
22use ferogram_connect::FrameKind;
23use ferogram_mtproto::EncryptedSession;
24use tokio::net::TcpStream;
25use tokio::sync::{mpsc, oneshot};
26
27use crate::errors::InvocationError;
28use crate::mtp_sender::MtpSender;
29
30/// A single RPC request sent from any caller to the sender task.
31pub struct RpcEnqueue {
32    /// Pre-serialised TL body (output of `EncryptedSession::pack_body_with_msg_id`
33    /// or any raw TL bytes; the sender task will re-encrypt via MtpSender).
34    pub body: Vec<u8>,
35    /// Fulfilled with the raw rpc_result body (or an error) when the server responds.
36    pub tx: oneshot::Sender<Result<Vec<u8>, InvocationError>>,
37}
38
39/// Reconnect request: replace the TCP stream inside the sender task.
40pub struct ReconnectRequest {
41    pub stream: TcpStream,
42    pub enc: EncryptedSession,
43    pub frame_kind: FrameKind,
44    pub perm_auth_key: Option<[u8; 256]>,
45}
46
47/// Events the sender task sends back to the client.
48pub enum FrameEvent {
49    /// A raw update body (Updates, UpdateShort, etc.) to dispatch.
50    Update(Vec<u8>),
51    /// The connection failed; the client must reconnect and send a ReconnectRequest.
52    Error(InvocationError),
53    /// Session info after initial connect or reconnect (for session saving).
54    Connected {
55        auth_key: Box<[u8; 256]>,
56        first_salt: i64,
57        time_offset: i32,
58        session_id: i64,
59    },
60}
61
62/// Sender-side handles given to the client after spawning the sender task.
63pub struct SenderHandle {
64    /// Enqueue RPC requests here.
65    pub rpc_tx: mpsc::Sender<RpcEnqueue>,
66    /// Send a new stream here after reconnect.
67    pub reconnect_tx: mpsc::Sender<ReconnectRequest>,
68}
69
70/// Spawn the sender task.  Returns a [`SenderHandle`] for the client and an
71/// `mpsc::Receiver<FrameEvent>` for receiving update bodies and errors.
72pub fn spawn_sender_task(
73    stream: TcpStream,
74    enc: EncryptedSession,
75    frame_kind: FrameKind,
76    perm_auth_key: Option<[u8; 256]>,
77) -> (SenderHandle, mpsc::Receiver<FrameEvent>) {
78    let (rpc_tx, rpc_rx) = mpsc::channel::<RpcEnqueue>(512);
79    let (reconnect_tx, reconnect_rx) = mpsc::channel::<ReconnectRequest>(4);
80    let (frame_tx, frame_rx) = mpsc::channel::<FrameEvent>(256);
81
82    let sender = MtpSender::new(stream, enc, frame_kind, perm_auth_key);
83
84    tokio::spawn(sender_loop(sender, rpc_rx, reconnect_rx, frame_tx));
85
86    (
87        SenderHandle {
88            rpc_tx,
89            reconnect_tx,
90        },
91        frame_rx,
92    )
93}
94
95async fn sender_loop(
96    mut sender: MtpSender,
97    mut rpc_rx: mpsc::Receiver<RpcEnqueue>,
98    mut reconnect_rx: mpsc::Receiver<ReconnectRequest>,
99    frame_tx: mpsc::Sender<FrameEvent>,
100) {
101    // Notify the client that we are connected and ready.
102    let _ = frame_tx
103        .send(FrameEvent::Connected {
104            auth_key: Box::new(sender.auth_key_bytes()),
105            first_salt: sender.first_salt(),
106            time_offset: sender.time_offset(),
107            session_id: sender.session_id(),
108        })
109        .await;
110
111    loop {
112        // Drain all pending RPC enqueues before stepping (non-blocking).
113        loop {
114            match rpc_rx.try_recv() {
115                Ok(enqueue) => sender.enqueue(enqueue.body, enqueue.tx),
116                Err(mpsc::error::TryRecvError::Empty) => break,
117                Err(mpsc::error::TryRecvError::Disconnected) => {
118                    // Client dropped all handles: shut down cleanly.
119                    return;
120                }
121            }
122        }
123
124        tokio::select! {
125            biased;
126
127            // New RPC enqueue arrived while we were waiting in step().
128            Some(enqueue) = rpc_rx.recv() => {
129                sender.enqueue(enqueue.body, enqueue.tx);
130                // Loop back immediately so step() can send it.
131                continue;
132            }
133
134            // Reconnect request: swap the stream.
135            Some(req) = reconnect_rx.recv() => {
136                tracing::info!("[ferogram::sender] reconnect: new stream received, swapping");
137                sender.set_stream(req.stream, req.enc, req.frame_kind, req.perm_auth_key);
138                let _ = frame_tx
139                    .send(FrameEvent::Connected {
140                        auth_key: Box::new(sender.auth_key_bytes()),
141                        first_salt: sender.first_salt(),
142                        time_offset: sender.time_offset(),
143                        session_id: sender.session_id(),
144                    })
145                    .await;
146                continue;
147            }
148
149            // Drive one network event.
150            result = sender.step() => {
151                match result {
152                    Ok(updates) => {
153                        for body in updates {
154                            if frame_tx.send(FrameEvent::Update(body)).await.is_err() {
155                                // Client gone.
156                                return;
157                            }
158                        }
159                    }
160                    Err(e) => {
161                        tracing::warn!("[ferogram::sender] connection error, failing pending requests and waiting for reconnect: {e}");
162                        // Fail all pending requests immediately.
163                        sender.fail_all(&e);
164                        // Notify the client; it will reconnect and send ReconnectRequest.
165                        if frame_tx.send(FrameEvent::Error(e)).await.is_err() {
166                            return;
167                        }
168                        // Wait for a reconnect before driving step() again.
169                        match reconnect_rx.recv().await {
170                            Some(req) => {
171                                tracing::info!("[ferogram::sender] reconnect received, resuming send loop");
172                                sender.set_stream(
173                                    req.stream,
174                                    req.enc,
175                                    req.frame_kind,
176                                    req.perm_auth_key,
177                                );
178                                let _ = frame_tx
179                                    .send(FrameEvent::Connected {
180                                        auth_key: Box::new(sender.auth_key_bytes()),
181                                        first_salt: sender.first_salt(),
182                                        time_offset: sender.time_offset(),
183                                        session_id: sender.session_id(),
184                                    })
185                                    .await;
186                            }
187                            None => {
188                                // Client dropped reconnect handle: shut down.
189                                return;
190                            }
191                        }
192                    }
193                }
194            }
195        }
196    }
197}