1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
//! Sender Key and message cache methods for Client.
//!
//! This module contains methods for managing sender keys (SKDM) for group messaging
//! and caching recent messages for retry handling.
//!
//! Key features:
//! - Mark participants for fresh SKDM on retry
//! - Consume forget marks when sending group messages
//! - Cache recent messages for retry handling
use anyhow::Result;
use wacore_binary::jid::Jid;
use waproto::whatsapp as wa;
use super::Client;
impl Client {
/// Mark participants for fresh SKDM on next group send.
/// Filters out our own devices (we don't need to send SKDM to ourselves).
/// Matches WhatsApp Web's WAWebApiParticipantStore.markForgetSenderKey behavior.
/// Called from handle_retry_receipt for group/status messages.
pub(crate) async fn mark_forget_sender_key(
&self,
group_jid: &str,
participants: &[String],
) -> Result<()> {
use anyhow::anyhow;
// Get our own user ID to filter out (WhatsApp Web: isMeDevice check)
let device_store = self.persistence_manager.get_device_arc().await;
let device_guard = device_store.read().await;
let own_lid_user = device_guard.lid.as_ref().map(|j| j.user.clone());
let own_pn_user = device_guard.pn.as_ref().map(|j| j.user.clone());
drop(device_guard);
// Pre-compute prefix strings outside the filter loop to avoid repeated allocations
// Include exact match string in tuple to avoid repeated Option lookups
let lid_prefixes = own_lid_user
.as_ref()
.map(|lid| (format!("{lid}:"), format!("{lid}@"), lid.as_str()));
let pn_prefixes = own_pn_user
.as_ref()
.map(|pn| (format!("{pn}:"), format!("{pn}@"), pn.as_str()));
// Filter out own devices (WhatsApp Web: !isMeDevice(e))
let filtered: Vec<String> = participants
.iter()
.filter(|p| {
// Parse participant JID and check if it's our own
let is_own_lid = lid_prefixes.as_ref().is_some_and(|(colon, at, exact)| {
p.starts_with(colon) || p.starts_with(at) || p.as_str() == *exact
});
let is_own_pn = pn_prefixes.as_ref().is_some_and(|(colon, at, exact)| {
p.starts_with(colon) || p.starts_with(at) || p.as_str() == *exact
});
!is_own_lid && !is_own_pn
})
.cloned()
.collect();
if filtered.is_empty() {
return Ok(());
}
let backend = self.persistence_manager.backend();
for participant in &filtered {
backend
.mark_forget_sender_key(group_jid, participant)
.await
.map_err(|e| anyhow!("{e}"))?;
}
Ok(())
}
/// Get participants marked for fresh SKDM and consume the marks.
/// Matches WhatsApp Web's getGroupSenderKeyList pattern.
pub(crate) async fn consume_forget_marks(&self, group_jid: &str) -> Result<Vec<String>> {
use anyhow::anyhow;
let backend = self.persistence_manager.backend();
backend
.consume_forget_marks(group_jid)
.await
.map_err(|e| anyhow!("{e}"))
}
/// Take a sent message for retry handling. Checks L1 cache first (if enabled),
/// then falls back to DB. Matches WA Web's getMessageTable().get() pattern.
pub(crate) async fn take_recent_message(&self, to: Jid, id: String) -> Option<wa::Message> {
use prost::Message;
let key = self.make_stanza_key(to.clone(), id.clone()).await;
let chat_str = key.chat.to_string();
let has_l1_cache = self.cache_config.recent_messages.capacity > 0;
// L1 cache check (if capacity > 0)
if has_l1_cache && let Some(bytes) = self.recent_messages.remove(&key).await {
if let Ok(msg) = wa::Message::decode(bytes.as_slice()) {
// Cache hit — consume the DB row in the background to avoid orphans.
// Note: if the background DB write from add_recent_message hasn't completed
// yet, this delete may run first and the write creates an orphan. This is
// harmless — periodic cleanup (sent_message_ttl_secs) purges it. The race
// window is negligible since retry receipts arrive seconds after send.
let backend = self.persistence_manager.backend();
let cs = chat_str.clone();
let mid = key.id.clone();
self.runtime
.spawn(Box::pin(async move {
let _ = backend.take_sent_message(&cs, &mid).await;
}))
.detach();
return Some(msg);
}
// Cache decode failed — fall through to DB
log::warn!(
"Failed to decode cached message for {}:{}, trying DB",
to,
id
);
}
// DB path (primary when cache capacity = 0, fallback when cache misses)
match self
.persistence_manager
.backend()
.take_sent_message(&chat_str, &key.id)
.await
{
Ok(Some(bytes)) => match wa::Message::decode(bytes.as_slice()) {
Ok(msg) => Some(msg),
Err(e) => {
log::warn!("Failed to decode DB message for {}:{}: {}", to, id, e);
None
}
},
Ok(None) => None,
Err(e) => {
log::warn!(
"Failed to read sent message from DB for {}:{}: {}",
to,
id,
e
);
None
}
}
}
/// Store a sent message for retry handling. Always writes to DB; when L1 cache
/// is enabled (capacity > 0) also stores in-memory for fast retrieval.
/// In DB-only mode (capacity = 0), the DB write is awaited to guarantee persistence.
/// With L1 cache, the DB write is backgrounded since the cache serves reads immediately.
pub(crate) async fn add_recent_message(&self, to: Jid, id: String, msg: &wa::Message) {
use prost::Message;
let key = self.make_stanza_key(to, id).await;
let bytes = msg.encode_to_vec();
let has_l1_cache = self.cache_config.recent_messages.capacity > 0;
if has_l1_cache {
// L1 cache serves reads immediately; DB write can be backgrounded
self.recent_messages
.insert(key.clone(), bytes.clone())
.await;
let backend = self.persistence_manager.backend();
let chat_str = key.chat.to_string();
let msg_id = key.id.clone();
self.runtime
.spawn(Box::pin(async move {
if let Err(e) = backend.store_sent_message(&chat_str, &msg_id, &bytes).await {
log::warn!("Failed to store sent message to DB: {e}");
}
}))
.detach();
} else {
// DB-only mode: await to guarantee the row exists before returning
let chat_str = key.chat.to_string();
if let Err(e) = self
.persistence_manager
.backend()
.store_sent_message(&chat_str, &key.id, &bytes)
.await
{
log::warn!("Failed to store sent message to DB: {e}");
}
}
}
}