1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
use crate::types::events::{Event, LazyConversation};
use bytes::Bytes;
use std::sync::Arc;
use wacore_ng::history_sync::process_history_sync;
use waproto_ng::whatsapp::message::HistorySyncNotification;
use crate::client::Client;
impl Client {
pub(crate) async fn handle_history_sync(
self: &Arc<Self>,
message_id: String,
notification: HistorySyncNotification,
) {
if self.is_shutting_down() {
log::debug!(
"Dropping history sync {} during shutdown (Type: {:?})",
message_id,
notification.sync_type()
);
return;
}
if self.skip_history_sync_enabled() {
log::debug!(
"Skipping history sync for message {} (Type: {:?})",
message_id,
notification.sync_type()
);
// Send receipt so the phone considers this chunk delivered and stops
// retrying. This intentionally diverges from WhatsApp Web's AB prop
// drop path (which sends no receipt) because bots will never process
// history, and without the receipt the phone would keep re-uploading
// blobs that will never be consumed.
self.send_protocol_receipt(
message_id,
crate::types::presence::ReceiptType::HistorySync,
)
.await;
return;
}
// Enqueue a MajorSyncTask for the dedicated sync worker to consume.
self.begin_history_sync_task();
let task = crate::sync_task::MajorSyncTask::HistorySync {
message_id,
notification: Box::new(notification),
};
if let Err(e) = self.major_sync_task_sender.send(task).await {
self.finish_history_sync_task();
if self.is_shutting_down() {
log::debug!("Dropping history sync task during shutdown: {e}");
} else {
log::error!("Failed to enqueue history sync task: {e}");
}
}
}
/// Process history sync with streaming and lazy parsing.
///
/// Memory efficient: raw bytes are wrapped in LazyConversation and only
/// parsed if the event handler actually accesses the conversation data.
pub(crate) async fn process_history_sync_task(
self: &Arc<Self>,
message_id: String,
mut notification: HistorySyncNotification,
) {
if self.is_shutting_down() {
log::debug!("Aborting history sync {} before processing", message_id);
return;
}
log::info!(
"Processing history sync for message {} (Size: {}, Type: {:?})",
message_id,
notification.file_length(),
notification.sync_type()
);
self.send_protocol_receipt(
message_id.clone(),
crate::types::presence::ReceiptType::HistorySync,
)
.await;
if self.is_shutting_down() {
log::debug!(
"Aborting history sync {} after receipt during shutdown",
message_id
);
return;
}
// Use take() to avoid cloning large payloads - moves ownership instead
let compressed_data = if let Some(inline_payload) =
notification.initial_hist_bootstrap_inline_payload.take()
{
log::info!(
"Found inline history sync payload ({} bytes). Using directly.",
inline_payload.len()
);
inline_payload
} else {
log::info!("Downloading external history sync blob...");
if self.is_shutting_down() {
log::debug!("Aborting history sync {} before blob download", message_id);
return;
}
match self.download(¬ification).await {
Ok(data) => {
log::info!("Successfully downloaded history sync blob.");
data
}
Err(e) => {
log::error!("Failed to download history sync blob: {:?}", e);
return;
}
}
};
// Get own user for pushname extraction (moved into blocking task, no clone needed)
let own_user = {
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
device_snapshot.pn.as_ref().map(|j| j.to_non_ad().user)
};
// Check if anyone is listening for events
let has_listeners = self.core.event_bus.has_handlers();
let parse_result = if has_listeners {
// Use a bounded channel to stream raw conversation bytes as Bytes (zero-copy)
let (tx, mut rx) = tokio::sync::mpsc::channel::<Bytes>(16);
// Run streaming parsing in blocking thread
// own_user is moved directly, no clone needed
let parse_handle = tokio::task::spawn_blocking(move || {
let own_user_ref = own_user.as_deref();
// Streaming: decompresses and extracts raw bytes incrementally
// No parsing happens here - just raw byte extraction
// Uses Bytes for zero-copy reference counting
process_history_sync(
&compressed_data,
own_user_ref,
Some(|raw_bytes: Bytes| {
// Send Bytes through channel (zero-copy clone)
let _ = tx.blocking_send(raw_bytes);
}),
)
// tx dropped here, closing channel
});
// Receive and dispatch lazy conversations as they come in
let mut conv_count = 0usize;
while let Some(raw_bytes) = rx.recv().await {
if self.is_shutting_down() {
log::debug!(
"Stopping history sync {} event dispatch during shutdown",
message_id
);
break;
}
conv_count += 1;
if conv_count.is_multiple_of(25) {
log::info!("History sync progress: {conv_count} conversations processed...");
}
// Wrap Bytes in LazyConversation using from_bytes (true zero-copy)
// Parsing only happens if the event handler calls .conversation() or .get()
let lazy_conv = LazyConversation::from_bytes(raw_bytes);
self.core.event_bus.dispatch(&Event::JoinedGroup(lazy_conv));
}
// Wait for parsing to complete
parse_handle.await
} else {
// No listeners - skip conversation processing entirely
log::debug!("No event handlers registered, skipping conversation processing");
// own_user is moved directly, no clone needed
tokio::task::spawn_blocking(move || {
let own_user_ref = own_user.as_deref();
// Pass None for callback - conversations are skipped at protobuf level
process_history_sync::<fn(Bytes)>(&compressed_data, own_user_ref, None)
})
.await
};
if self.is_shutting_down() {
log::debug!(
"Aborting history sync {} after parse during shutdown",
message_id
);
return;
}
match parse_result {
Ok(Ok(sync_result)) => {
log::info!(
"Successfully processed HistorySync (message {message_id}); {} conversations",
sync_result.conversations_processed
);
// Update own push name if found
if let Some(new_name) = sync_result.own_pushname {
log::info!("Updating own push name from history sync to '{new_name}'");
self.update_push_name_and_notify(new_name).await;
}
}
Ok(Err(e)) => {
log::error!("Failed to process HistorySync data: {:?}", e);
}
Err(e) => {
log::error!("History sync blocking task panicked: {:?}", e);
}
}
}
}