Skip to main content

whatsapp_rust/
unified_session.rs

1//! Unified session telemetry manager.
2//!
3//! Sends `<ib><unified_session id="..."/></ib>` stanzas to match WhatsApp Web behavior.
4//! Features: server time sync, duplicate prevention, sequence counter.
5
6use async_lock::Mutex;
7use log::debug;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
10use wacore::ib::{IbStanza, UnifiedSession};
11use wacore::protocol::ProtocolNode;
12use wacore_binary::node::Node;
13
14/// Manager for unified session telemetry.
15pub struct UnifiedSessionManager {
16    server_time_offset_ms: Arc<AtomicI64>,
17    last_sent_id: Arc<Mutex<Option<String>>>,
18    sequence: Arc<AtomicU64>,
19}
20
21impl Default for UnifiedSessionManager {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl UnifiedSessionManager {
28    pub fn new() -> Self {
29        Self {
30            server_time_offset_ms: Arc::new(AtomicI64::new(0)),
31            last_sent_id: Arc::new(Mutex::new(None)),
32            sequence: Arc::new(AtomicU64::new(0)),
33        }
34    }
35
36    pub fn server_time_offset_ms(&self) -> i64 {
37        self.server_time_offset_ms.load(Ordering::Relaxed)
38    }
39
40    pub fn sequence(&self) -> u64 {
41        self.sequence.load(Ordering::Relaxed)
42    }
43
44    /// Update server time offset from node's `t` attribute (Unix timestamp in seconds).
45    pub fn update_server_time_offset(&self, node: &Node) {
46        if let Some(t_val) = node.attrs.get("t").map(|v| v.as_str())
47            && let Ok(server_time) = t_val.parse::<i64>()
48            && server_time > 0
49        {
50            let local_time = wacore::time::now_secs();
51            let offset_ms = (server_time - local_time) * 1000;
52            self.server_time_offset_ms
53                .store(offset_ms, Ordering::Relaxed);
54            debug!(target: "UnifiedSession", "Server time offset: {}ms", offset_ms);
55        }
56    }
57
58    /// Update server time offset using RTT-adjusted midpoint calculation.
59    ///
60    /// WA Web: `Math.round((startTime + rtt/2) / 1000 - serverTime)`
61    ///
62    /// This gives a more accurate clock skew estimate by assuming the server
63    /// timestamp corresponds to the midpoint of the round trip.
64    pub fn update_server_time_offset_with_rtt(&self, node: &Node, start_time_ms: i64, rtt_ms: i64) {
65        if let Some(t_val) = node.attrs.get("t").map(|v| v.as_str())
66            && let Ok(server_time) = t_val.parse::<i64>()
67            && server_time > 0
68        {
69            let midpoint_s = (start_time_ms + rtt_ms / 2) / 1000;
70            let offset_ms = (server_time - midpoint_s) * 1000;
71            self.server_time_offset_ms
72                .store(offset_ms, Ordering::Relaxed);
73            debug!(target: "UnifiedSession", "Server time offset: {}ms (RTT: {}ms)", offset_ms, rtt_ms);
74        }
75    }
76
77    pub fn calculate_session_id(&self) -> String {
78        let offset = self.server_time_offset_ms.load(Ordering::Relaxed);
79        UnifiedSession::calculate_id(offset)
80    }
81
82    /// Prepare to send unified session. Returns None if duplicate (already sent this ID).
83    pub async fn prepare_send(&self) -> Option<(Node, u64)> {
84        let session_id = self.calculate_session_id();
85
86        {
87            let mut last_id = self.last_sent_id.lock().await;
88            if let Some(ref prev_id) = *last_id
89                && prev_id == &session_id
90            {
91                debug!(target: "UnifiedSession", "Skipping duplicate id={}", session_id);
92                return None;
93            }
94
95            // Reset sequence when session ID changes (matches WhatsApp Web behavior)
96            if last_id.as_ref() != Some(&session_id) {
97                self.sequence.store(0, Ordering::Relaxed);
98            }
99            *last_id = Some(session_id.clone());
100        }
101
102        // Pre-increment to return 1 on first call (matches WhatsApp Web's ++$2)
103        let sequence = self.sequence.fetch_add(1, Ordering::Relaxed) + 1;
104        let stanza = IbStanza::unified_session(UnifiedSession::new(&session_id));
105        let node = stanza.into_node();
106
107        debug!(target: "UnifiedSession", "Sending id={}, seq={}", session_id, sequence);
108
109        Some((node, sequence))
110    }
111
112    /// Clear last sent ID to allow retry on failure.
113    pub async fn clear_last_sent(&self) {
114        *self.last_sent_id.lock().await = None;
115    }
116
117    /// Reset state on disconnect (keeps sequence counter).
118    pub async fn reset(&self) {
119        self.server_time_offset_ms.store(0, Ordering::Relaxed);
120        *self.last_sent_id.lock().await = None;
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use wacore_binary::builder::NodeBuilder;
128
129    #[test]
130    fn test_manager_default() {
131        let manager = UnifiedSessionManager::new();
132        assert_eq!(manager.server_time_offset_ms(), 0);
133        assert_eq!(manager.sequence(), 0);
134    }
135
136    #[test]
137    fn test_update_server_time_offset() {
138        let manager = UnifiedSessionManager::new();
139
140        let server_time = wacore::time::now_secs() + 10;
141        let node = NodeBuilder::new("success")
142            .attr("t", server_time.to_string())
143            .build();
144
145        manager.update_server_time_offset(&node);
146
147        let offset = manager.server_time_offset_ms();
148        assert!(
149            (offset - 10000).abs() < 1000,
150            "Offset should be ~10000ms, got {}",
151            offset
152        );
153    }
154
155    #[test]
156    fn test_update_server_time_offset_invalid() {
157        let manager = UnifiedSessionManager::new();
158
159        let node = NodeBuilder::new("success").build();
160        manager.update_server_time_offset(&node);
161        assert_eq!(manager.server_time_offset_ms(), 0);
162
163        let node = NodeBuilder::new("success")
164            .attr("t", "not_a_number")
165            .build();
166        manager.update_server_time_offset(&node);
167        assert_eq!(manager.server_time_offset_ms(), 0);
168
169        let node = NodeBuilder::new("success").attr("t", "0").build();
170        manager.update_server_time_offset(&node);
171        assert_eq!(manager.server_time_offset_ms(), 0);
172    }
173
174    #[test]
175    fn test_calculate_session_id() {
176        let manager = UnifiedSessionManager::new();
177        let id = manager.calculate_session_id();
178
179        let id_num: i64 = id.parse().expect("Should be a valid number");
180        const WEEK_MS: i64 = 7 * 24 * 60 * 60 * 1000;
181        assert!((0..WEEK_MS).contains(&id_num));
182    }
183
184    #[tokio::test]
185    async fn test_prepare_send() {
186        let manager = UnifiedSessionManager::new();
187
188        // Use a loop to handle potential millisecond boundary crossing during the test.
189        // Duplicate prevention only applies if the session ID (which is time-dependent) remains the same.
190        loop {
191            manager.reset().await; // Start clean for each attempt
192            let result = manager.prepare_send().await;
193            assert!(result.is_some());
194            let (node, seq) = result.unwrap();
195            assert_eq!(node.tag, "ib");
196            assert_eq!(seq, 1);
197
198            let result2 = manager.prepare_send().await;
199            if result2.is_none() {
200                // Success: duplicate was prevented within the same millisecond/session
201                assert_eq!(manager.sequence(), 1);
202                break;
203            }
204            // If result2 is Some, it means the millisecond changed and it's a new session.
205            // We'll loop and try again to catch it within the same millisecond.
206            tokio::task::yield_now().await;
207        }
208    }
209
210    #[tokio::test]
211    async fn test_clear_last_sent() {
212        let manager = UnifiedSessionManager::new();
213
214        let (_, seq1) = manager.prepare_send().await.unwrap();
215        assert_eq!(seq1, 1);
216        assert_eq!(manager.sequence(), 1);
217
218        manager.clear_last_sent().await;
219
220        // After clear, it's treated as a new session -> sequence resets
221        let result = manager.prepare_send().await;
222        assert!(result.is_some());
223        let (_, seq2) = result.unwrap();
224        assert_eq!(seq2, 1, "Sequence resets when session ID changes");
225        assert_eq!(manager.sequence(), 1);
226    }
227
228    #[tokio::test]
229    async fn test_reset() {
230        let manager = UnifiedSessionManager::new();
231
232        let node = NodeBuilder::new("success")
233            .attr("t", (wacore::time::now_secs() + 10).to_string())
234            .build();
235        manager.update_server_time_offset(&node);
236        let (_, seq1) = manager.prepare_send().await.unwrap();
237        assert_eq!(seq1, 1);
238
239        manager.reset().await;
240
241        assert_eq!(manager.server_time_offset_ms(), 0);
242        // Sequence persists until next prepare_send detects new session ID
243        assert_eq!(manager.sequence(), 1);
244
245        // After reset, next send will reset sequence since session ID changed
246        let (_, seq2) = manager.prepare_send().await.unwrap();
247        assert_eq!(seq2, 1, "Sequence resets on new session");
248    }
249}