whatsapp_rust/
unified_session.rs1use async_lock::Mutex;
7use log::debug;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
10use wacore::ib::{IbStanza, UnifiedSession};
11use wacore::protocol::ProtocolNode;
12use wacore_binary::node::Node;
13
14pub struct UnifiedSessionManager {
16 server_time_offset_ms: Arc<AtomicI64>,
17 last_sent_id: Arc<Mutex<Option<String>>>,
18 sequence: Arc<AtomicU64>,
19}
20
21impl Default for UnifiedSessionManager {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl UnifiedSessionManager {
28 pub fn new() -> Self {
29 Self {
30 server_time_offset_ms: Arc::new(AtomicI64::new(0)),
31 last_sent_id: Arc::new(Mutex::new(None)),
32 sequence: Arc::new(AtomicU64::new(0)),
33 }
34 }
35
36 pub fn server_time_offset_ms(&self) -> i64 {
37 self.server_time_offset_ms.load(Ordering::Relaxed)
38 }
39
40 pub fn sequence(&self) -> u64 {
41 self.sequence.load(Ordering::Relaxed)
42 }
43
44 pub fn update_server_time_offset(&self, node: &Node) {
46 if let Some(t_val) = node.attrs.get("t").map(|v| v.as_str())
47 && let Ok(server_time) = t_val.parse::<i64>()
48 && server_time > 0
49 {
50 let local_time = wacore::time::now_secs();
51 let offset_ms = (server_time - local_time) * 1000;
52 self.server_time_offset_ms
53 .store(offset_ms, Ordering::Relaxed);
54 debug!(target: "UnifiedSession", "Server time offset: {}ms", offset_ms);
55 }
56 }
57
58 pub fn update_server_time_offset_with_rtt(&self, node: &Node, start_time_ms: i64, rtt_ms: i64) {
65 if let Some(t_val) = node.attrs.get("t").map(|v| v.as_str())
66 && let Ok(server_time) = t_val.parse::<i64>()
67 && server_time > 0
68 {
69 let midpoint_s = (start_time_ms + rtt_ms / 2) / 1000;
70 let offset_ms = (server_time - midpoint_s) * 1000;
71 self.server_time_offset_ms
72 .store(offset_ms, Ordering::Relaxed);
73 debug!(target: "UnifiedSession", "Server time offset: {}ms (RTT: {}ms)", offset_ms, rtt_ms);
74 }
75 }
76
77 pub fn calculate_session_id(&self) -> String {
78 let offset = self.server_time_offset_ms.load(Ordering::Relaxed);
79 UnifiedSession::calculate_id(offset)
80 }
81
82 pub async fn prepare_send(&self) -> Option<(Node, u64)> {
84 let session_id = self.calculate_session_id();
85
86 {
87 let mut last_id = self.last_sent_id.lock().await;
88 if let Some(ref prev_id) = *last_id
89 && prev_id == &session_id
90 {
91 debug!(target: "UnifiedSession", "Skipping duplicate id={}", session_id);
92 return None;
93 }
94
95 if last_id.as_ref() != Some(&session_id) {
97 self.sequence.store(0, Ordering::Relaxed);
98 }
99 *last_id = Some(session_id.clone());
100 }
101
102 let sequence = self.sequence.fetch_add(1, Ordering::Relaxed) + 1;
104 let stanza = IbStanza::unified_session(UnifiedSession::new(&session_id));
105 let node = stanza.into_node();
106
107 debug!(target: "UnifiedSession", "Sending id={}, seq={}", session_id, sequence);
108
109 Some((node, sequence))
110 }
111
112 pub async fn clear_last_sent(&self) {
114 *self.last_sent_id.lock().await = None;
115 }
116
117 pub async fn reset(&self) {
119 self.server_time_offset_ms.store(0, Ordering::Relaxed);
120 *self.last_sent_id.lock().await = None;
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use wacore_binary::builder::NodeBuilder;
128
129 #[test]
130 fn test_manager_default() {
131 let manager = UnifiedSessionManager::new();
132 assert_eq!(manager.server_time_offset_ms(), 0);
133 assert_eq!(manager.sequence(), 0);
134 }
135
136 #[test]
137 fn test_update_server_time_offset() {
138 let manager = UnifiedSessionManager::new();
139
140 let server_time = wacore::time::now_secs() + 10;
141 let node = NodeBuilder::new("success")
142 .attr("t", server_time.to_string())
143 .build();
144
145 manager.update_server_time_offset(&node);
146
147 let offset = manager.server_time_offset_ms();
148 assert!(
149 (offset - 10000).abs() < 1000,
150 "Offset should be ~10000ms, got {}",
151 offset
152 );
153 }
154
155 #[test]
156 fn test_update_server_time_offset_invalid() {
157 let manager = UnifiedSessionManager::new();
158
159 let node = NodeBuilder::new("success").build();
160 manager.update_server_time_offset(&node);
161 assert_eq!(manager.server_time_offset_ms(), 0);
162
163 let node = NodeBuilder::new("success")
164 .attr("t", "not_a_number")
165 .build();
166 manager.update_server_time_offset(&node);
167 assert_eq!(manager.server_time_offset_ms(), 0);
168
169 let node = NodeBuilder::new("success").attr("t", "0").build();
170 manager.update_server_time_offset(&node);
171 assert_eq!(manager.server_time_offset_ms(), 0);
172 }
173
174 #[test]
175 fn test_calculate_session_id() {
176 let manager = UnifiedSessionManager::new();
177 let id = manager.calculate_session_id();
178
179 let id_num: i64 = id.parse().expect("Should be a valid number");
180 const WEEK_MS: i64 = 7 * 24 * 60 * 60 * 1000;
181 assert!((0..WEEK_MS).contains(&id_num));
182 }
183
184 #[tokio::test]
185 async fn test_prepare_send() {
186 let manager = UnifiedSessionManager::new();
187
188 loop {
191 manager.reset().await; let result = manager.prepare_send().await;
193 assert!(result.is_some());
194 let (node, seq) = result.unwrap();
195 assert_eq!(node.tag, "ib");
196 assert_eq!(seq, 1);
197
198 let result2 = manager.prepare_send().await;
199 if result2.is_none() {
200 assert_eq!(manager.sequence(), 1);
202 break;
203 }
204 tokio::task::yield_now().await;
207 }
208 }
209
210 #[tokio::test]
211 async fn test_clear_last_sent() {
212 let manager = UnifiedSessionManager::new();
213
214 let (_, seq1) = manager.prepare_send().await.unwrap();
215 assert_eq!(seq1, 1);
216 assert_eq!(manager.sequence(), 1);
217
218 manager.clear_last_sent().await;
219
220 let result = manager.prepare_send().await;
222 assert!(result.is_some());
223 let (_, seq2) = result.unwrap();
224 assert_eq!(seq2, 1, "Sequence resets when session ID changes");
225 assert_eq!(manager.sequence(), 1);
226 }
227
228 #[tokio::test]
229 async fn test_reset() {
230 let manager = UnifiedSessionManager::new();
231
232 let node = NodeBuilder::new("success")
233 .attr("t", (wacore::time::now_secs() + 10).to_string())
234 .build();
235 manager.update_server_time_offset(&node);
236 let (_, seq1) = manager.prepare_send().await.unwrap();
237 assert_eq!(seq1, 1);
238
239 manager.reset().await;
240
241 assert_eq!(manager.server_time_offset_ms(), 0);
242 assert_eq!(manager.sequence(), 1);
244
245 let (_, seq2) = manager.prepare_send().await.unwrap();
247 assert_eq!(seq2, 1, "Sequence resets on new session");
248 }
249}