1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
use crate::types::events::{Event, LazyConversation};
use bytes::Bytes;
use std::sync::Arc;
use wacore::history_sync::process_history_sync;
use waproto::whatsapp::message::HistorySyncNotification;
use crate::client::Client;
impl Client {
pub(crate) async fn handle_history_sync(
self: &Arc<Self>,
message_id: String,
notification: HistorySyncNotification,
) {
if self.is_shutting_down() {
log::debug!(
"Dropping history sync {} during shutdown (Type: {:?})",
message_id,
notification.sync_type()
);
return;
}
if self.skip_history_sync_enabled() {
log::debug!(
"Skipping history sync for message {} (Type: {:?})",
message_id,
notification.sync_type()
);
// Send receipt so the phone considers this chunk delivered and stops
// retrying. This intentionally diverges from WhatsApp Web's AB prop
// drop path (which sends no receipt) because bots will never process
// history, and without the receipt the phone would keep re-uploading
// blobs that will never be consumed.
self.send_protocol_receipt(
message_id,
crate::types::presence::ReceiptType::HistorySync,
)
.await;
return;
}
// Enqueue a MajorSyncTask for the dedicated sync worker to consume.
self.begin_history_sync_task();
let task = crate::sync_task::MajorSyncTask::HistorySync {
message_id,
notification: Box::new(notification),
};
if let Err(e) = self.major_sync_task_sender.send(task).await {
self.finish_history_sync_task();
if self.is_shutting_down() {
log::debug!("Dropping history sync task during shutdown: {e}");
} else {
log::error!("Failed to enqueue history sync task: {e}");
}
}
}
/// Process history sync with streaming and lazy parsing.
///
/// Memory efficient: raw bytes are wrapped in LazyConversation and only
/// parsed if the event handler actually accesses the conversation data.
pub(crate) async fn process_history_sync_task(
self: &Arc<Self>,
message_id: String,
mut notification: HistorySyncNotification,
) {
if self.is_shutting_down() {
log::debug!("Aborting history sync {} before processing", message_id);
return;
}
log::info!(
"Processing history sync for message {} (Size: {}, Type: {:?})",
message_id,
notification.file_length(),
notification.sync_type()
);
self.send_protocol_receipt(
message_id.clone(),
crate::types::presence::ReceiptType::HistorySync,
)
.await;
if self.is_shutting_down() {
log::debug!(
"Aborting history sync {} after receipt during shutdown",
message_id
);
return;
}
// file_length is the decrypted (but still zlib-compressed) blob size, not
// the final decompressed size. We still pass it as a hint — the decompressor
// uses it with a 4x multiplier, which is a better estimate than guessing
// from the encrypted size (which includes MAC/padding overhead).
let compressed_size_hint = notification.file_length.filter(|&s| s > 0);
// Use take() to avoid cloning large payloads - moves ownership instead
let compressed_data = if let Some(inline_payload) =
notification.initial_hist_bootstrap_inline_payload.take()
{
log::info!(
"Found inline history sync payload ({} bytes). Using directly.",
inline_payload.len()
);
inline_payload
} else {
log::info!("Downloading external history sync blob...");
if self.is_shutting_down() {
log::debug!("Aborting history sync {} before blob download", message_id);
return;
}
// Stream-decrypt: reads encrypted chunks (8KB) from the network and
// decrypts on the fly into a Vec, avoiding holding the full encrypted
// blob in memory alongside the decrypted one.
match self
.download_to_writer(¬ification, std::io::Cursor::new(Vec::new()))
.await
{
Ok(cursor) => {
log::info!("Successfully downloaded history sync blob.");
cursor.into_inner()
}
Err(e) => {
log::error!("Failed to download history sync blob: {:?}", e);
return;
}
}
};
// Get own user for pushname extraction (moved into blocking task, no clone needed)
let own_user = {
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
device_snapshot.pn.as_ref().map(|j| j.to_non_ad().user)
};
// Check if anyone is listening for events
let has_listeners = self.core.event_bus.has_handlers();
let parse_result = if has_listeners {
// Use a bounded channel to stream raw conversation bytes as Bytes (zero-copy)
let (tx, rx) = async_channel::bounded::<Bytes>(4);
// Run streaming parsing in blocking thread
// own_user is moved directly, no clone needed
let (result_tx, result_rx) = futures::channel::oneshot::channel();
// Spawn the blocking work concurrently — it runs while we
// process channel items below.
let blocking_fut = self.runtime.spawn_blocking(Box::new(move || {
let own_user_ref = own_user.as_deref();
// Streaming: decompresses and extracts raw bytes incrementally
// No parsing happens here - just raw byte extraction
// Uses Bytes for zero-copy reference counting
let result = process_history_sync(
compressed_data,
own_user_ref,
Some(|raw_bytes: Bytes| {
// Send Bytes through channel (zero-copy clone)
#[cfg(not(target_arch = "wasm32"))]
let _ = tx.send_blocking(raw_bytes);
#[cfg(target_arch = "wasm32")]
let _ = tx.try_send(raw_bytes);
}),
compressed_size_hint,
);
// tx dropped here, closing channel
let _ = result_tx.send(result);
}));
// Drive the blocking future to completion in the background
self.runtime
.spawn(Box::pin(async move {
blocking_fut.await;
}))
.detach();
// Receive and dispatch lazy conversations as they come in
let mut conv_count = 0usize;
while let Ok(raw_bytes) = rx.recv().await {
if self.is_shutting_down() {
log::debug!(
"Stopping history sync {} event dispatch during shutdown",
message_id
);
break;
}
conv_count += 1;
if conv_count.is_multiple_of(25) {
log::info!("History sync progress: {conv_count} conversations processed...");
}
// Wrap Bytes in LazyConversation using from_bytes (true zero-copy)
// Parsing only happens if the event handler calls .conversation() or .get()
let lazy_conv = LazyConversation::from_bytes(raw_bytes);
self.core.event_bus.dispatch(&Event::JoinedGroup(lazy_conv));
}
// Drop receiver before awaiting the blocking task. If we broke out
// of the loop during shutdown, the sender may be blocked on
// tx.send_blocking() — dropping rx causes it to return Err and
// unblock, preventing a deadlock.
drop(rx);
// Wait for parsing result
result_rx.await.ok()
} else {
// No listeners - skip conversation processing entirely
log::debug!("No event handlers registered, skipping conversation processing");
// own_user is moved directly, no clone needed
Some(
wacore::runtime::blocking(&*self.runtime, move || {
let own_user_ref = own_user.as_deref();
// Pass None for callback - conversations are skipped at protobuf level
process_history_sync::<fn(Bytes)>(
compressed_data,
own_user_ref,
None,
compressed_size_hint,
)
})
.await,
)
};
if self.is_shutting_down() {
log::debug!(
"Aborting history sync {} after parse during shutdown",
message_id
);
return;
}
match parse_result {
Some(Ok(sync_result)) => {
log::info!(
"Successfully processed HistorySync (message {message_id}); {} conversations",
sync_result.conversations_processed
);
// Update own push name if found
if let Some(new_name) = sync_result.own_pushname {
log::info!("Updating own push name from history sync to '{new_name}'");
self.update_push_name_and_notify(new_name).await;
}
}
Some(Err(e)) => {
log::error!("Failed to process HistorySync data: {:?}", e);
}
None => {
log::error!("History sync blocking task was cancelled");
}
}
}
}