whatsapp_rust/
appstate_sync.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::{Result, anyhow};
5use async_trait::async_trait;
6use prost::Message;
7use tokio::sync::Mutex;
8use wacore::appstate::expand_app_state_keys;
9use wacore::appstate::hash::{HashState, generate_content_mac, generate_patch_mac};
10use wacore::appstate::keys::ExpandedAppStateKeys;
11use wacore::appstate::patch_decode::{PatchList, WAPatchName, parse_patch_list};
12use wacore::libsignal::crypto::aes_256_cbc_decrypt;
13use wacore::store::traits::Backend;
14use wacore_binary::node::Node;
15use waproto::whatsapp as wa;
16
17#[derive(Clone)]
18pub struct AppStateProcessor {
19    backend: Arc<dyn Backend>,
20    key_cache: Arc<Mutex<HashMap<String, ExpandedAppStateKeys>>>,
21}
22
23impl AppStateProcessor {
24    pub fn new(backend: Arc<dyn Backend>) -> Self {
25        Self {
26            backend,
27            key_cache: Arc::new(Mutex::new(HashMap::new())),
28        }
29    }
30
31    async fn get_app_state_key(&self, key_id: &[u8]) -> Result<ExpandedAppStateKeys> {
32        use base64::Engine;
33        use base64::engine::general_purpose::STANDARD_NO_PAD;
34        let id_b64 = STANDARD_NO_PAD.encode(key_id);
35        if let Some(cached) = self.key_cache.lock().await.get(&id_b64).cloned() {
36            return Ok(cached);
37        }
38        let key_opt = self.backend.get_app_state_sync_key(key_id).await?;
39        let key = key_opt.ok_or_else(|| anyhow!("app state key not found"))?;
40        let expanded: ExpandedAppStateKeys = expand_app_state_keys(&key.key_data);
41        self.key_cache.lock().await.insert(id_b64, expanded.clone());
42        Ok(expanded)
43    }
44
45    pub async fn decode_patch_list<FDownload>(
46        &self,
47        stanza_root: &Node,
48        download: FDownload,
49        validate_macs: bool,
50    ) -> Result<(Vec<Mutation>, HashState, PatchList)>
51    where
52        FDownload: Fn(&wa::ExternalBlobReference) -> Result<Vec<u8>> + Send + Sync,
53    {
54        let mut pl = parse_patch_list(stanza_root)?;
55        if pl.snapshot.is_none()
56            && let Some(ext) = &pl.snapshot_ref
57            && let Ok(data) = download(ext)
58            && let Ok(snapshot) = wa::SyncdSnapshot::decode(data.as_slice())
59        {
60            pl.snapshot = Some(snapshot);
61        }
62        self.process_patch_list(pl, validate_macs).await
63    }
64
65    pub async fn process_patch_list(
66        &self,
67        pl: PatchList,
68        validate_macs: bool,
69    ) -> Result<(Vec<Mutation>, HashState, PatchList)> {
70        let mut state = self.backend.get_app_state_version(pl.name.as_str()).await?;
71        let mut new_mutations: Vec<Mutation> = Vec::new();
72
73        if let Some(snapshot) = &pl.snapshot {
74            let version = snapshot
75                .version
76                .as_ref()
77                .and_then(|v| v.version)
78                .unwrap_or(0);
79            state.version = version;
80
81            let encrypted: Vec<wa::SyncdMutation> = snapshot
82                .records
83                .iter()
84                .map(|rec| wa::SyncdMutation {
85                    operation: Some(wa::syncd_mutation::SyncdOperation::Set as i32),
86                    record: Some(rec.clone()),
87                })
88                .collect();
89
90            let (_warn, res) = state.update_hash(&encrypted, |_index_mac, _i| Ok(None));
91            res?;
92
93            if validate_macs
94                && let (Some(mac_expected), Some(key_id)) = (
95                    snapshot.mac.as_ref(),
96                    snapshot.key_id.as_ref().and_then(|k| k.id.as_ref()),
97                )
98            {
99                let keys = self.get_app_state_key(key_id).await?;
100                let computed = state.generate_snapshot_mac(pl.name.as_str(), &keys.snapshot_mac);
101                if computed != *mac_expected {
102                    return Err(anyhow!("snapshot MAC mismatch"));
103                }
104            }
105
106            let mut added = Vec::new();
107            for rec in &snapshot.records {
108                let mut out = Vec::new();
109                self.decode_record(
110                    wa::syncd_mutation::SyncdOperation::Set,
111                    rec,
112                    &mut out,
113                    validate_macs,
114                )
115                .await?;
116                if let Some(m) = out.last() {
117                    added.push(wacore::store::traits::AppStateMutationMAC {
118                        index_mac: m.index_mac.clone(),
119                        value_mac: m.value_mac.clone(),
120                    });
121                }
122                new_mutations.extend(out);
123            }
124
125            self.backend
126                .set_app_state_version(pl.name.as_str(), state.clone())
127                .await?;
128            if !added.is_empty() {
129                self.backend
130                    .put_app_state_mutation_macs(pl.name.as_str(), state.version, &added)
131                    .await?;
132            }
133        }
134
135        for patch in &pl.patches {
136            state.version = patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
137
138            use std::collections::HashMap as StdHashMap;
139            let mut need_db_lookup: Vec<Vec<u8>> = Vec::new();
140            for m in &patch.mutations {
141                if let Some(rec) = &m.record
142                    && let Some(ind) = &rec.index
143                    && let Some(index_mac) = &ind.blob
144                    && !need_db_lookup.iter().any(|v| v == index_mac)
145                {
146                    need_db_lookup.push(index_mac.clone());
147                }
148            }
149
150            let mut db_prev: StdHashMap<Vec<u8>, Vec<u8>> = StdHashMap::new();
151            for index_mac in need_db_lookup {
152                if let Some(mac) = self
153                    .backend
154                    .get_app_state_mutation_mac(pl.name.as_str(), &index_mac)
155                    .await?
156                {
157                    db_prev.insert(index_mac, mac);
158                }
159            }
160
161            let (_warn, res) = state.update_hash(&patch.mutations, |index_mac, idx| {
162                for prev in patch.mutations[..idx].iter().rev() {
163                    if let Some(rec) = &prev.record
164                        && let Some(ind) = &rec.index
165                        && let Some(b) = &ind.blob
166                        && b == index_mac
167                        && let Some(val) = &rec.value
168                        && let Some(vb) = &val.blob
169                        && vb.len() >= 32
170                    {
171                        return Ok(Some(vb[vb.len() - 32..].to_vec()));
172                    }
173                }
174                if let Some(prev_mac) = db_prev.get(index_mac) {
175                    return Ok(Some(prev_mac.clone()));
176                }
177
178                Ok(None)
179            });
180            res?;
181
182            if validate_macs && let Some(key_id) = patch.key_id.as_ref().and_then(|k| k.id.as_ref())
183            {
184                let keys = self.get_app_state_key(key_id).await?;
185                if let Some(snap_mac) = patch.snapshot_mac.as_ref() {
186                    let computed_snap =
187                        state.generate_snapshot_mac(pl.name.as_str(), &keys.snapshot_mac);
188                    if computed_snap != *snap_mac {
189                        return Err(anyhow!("patch snapshot MAC mismatch"));
190                    }
191                }
192                if let Some(patch_mac) = patch.patch_mac.as_ref() {
193                    let version = patch.version.as_ref().and_then(|v| v.version).unwrap_or(0);
194                    let computed_patch =
195                        generate_patch_mac(patch, pl.name.as_str(), &keys.patch_mac, version);
196                    if computed_patch != *patch_mac {
197                        return Err(anyhow!("patch MAC mismatch"));
198                    }
199                }
200            }
201
202            let mut added = Vec::new();
203            let mut removed: Vec<Vec<u8>> = Vec::new();
204            for m in &patch.mutations {
205                if let Some(rec) = &m.record {
206                    let mut out = Vec::new();
207                    let op = wa::syncd_mutation::SyncdOperation::try_from(m.operation.unwrap_or(0))
208                        .unwrap_or(wa::syncd_mutation::SyncdOperation::Set);
209
210                    self.decode_record(op, rec, &mut out, validate_macs).await?;
211                    if let Some(mdec) = out.last() {
212                        match op {
213                            wa::syncd_mutation::SyncdOperation::Set => {
214                                added.push(wacore::store::traits::AppStateMutationMAC {
215                                    index_mac: mdec.index_mac.clone(),
216                                    value_mac: mdec.value_mac.clone(),
217                                })
218                            }
219                            wa::syncd_mutation::SyncdOperation::Remove => {
220                                removed.push(mdec.index_mac.clone())
221                            }
222                        }
223                    }
224                    new_mutations.extend(out);
225                }
226            }
227
228            self.backend
229                .set_app_state_version(pl.name.as_str(), state.clone())
230                .await?;
231            if !removed.is_empty() {
232                self.backend
233                    .delete_app_state_mutation_macs(pl.name.as_str(), &removed)
234                    .await?;
235            }
236            if !added.is_empty() {
237                self.backend
238                    .put_app_state_mutation_macs(pl.name.as_str(), state.version, &added)
239                    .await?;
240            }
241        }
242
243        if pl.patches.is_empty() && pl.snapshot.is_some() {
244            self.backend
245                .set_app_state_version(pl.name.as_str(), state.clone())
246                .await?;
247        }
248
249        Ok((new_mutations, state, pl))
250    }
251
252    pub async fn get_missing_key_ids(&self, pl: &PatchList) -> Result<Vec<Vec<u8>>> {
253        use std::collections::HashSet;
254        let mut seen = HashSet::new();
255        let mut missing = Vec::new();
256        let mut check = |key_id: Option<&Vec<u8>>| {
257            if let Some(k) = key_id
258                && seen.insert(k.clone())
259            {
260                missing.push(k.clone());
261            }
262        };
263        if let Some(snapshot) = &pl.snapshot {
264            check(snapshot.key_id.as_ref().and_then(|k| k.id.as_ref()));
265            for rec in &snapshot.records {
266                check(rec.key_id.as_ref().and_then(|k| k.id.as_ref()));
267            }
268        }
269        for patch in &pl.patches {
270            check(patch.key_id.as_ref().and_then(|k| k.id.as_ref()));
271        }
272        let mut out = Vec::new();
273        for id in missing {
274            if self.backend.get_app_state_sync_key(&id).await?.is_none() {
275                out.push(id);
276            }
277        }
278        Ok(out)
279    }
280
281    async fn decode_record(
282        &self,
283        operation: wa::syncd_mutation::SyncdOperation,
284        record: &wa::SyncdRecord,
285        out: &mut Vec<Mutation>,
286        validate_macs: bool,
287    ) -> Result<()> {
288        let key_id = record
289            .key_id
290            .as_ref()
291            .and_then(|k| k.id.as_ref())
292            .ok_or_else(|| anyhow!("missing key id"))?;
293        let keys = self.get_app_state_key(key_id).await?;
294        let value_blob = record
295            .value
296            .as_ref()
297            .and_then(|v| v.blob.as_ref())
298            .ok_or_else(|| anyhow!("missing value blob"))?;
299        if value_blob.len() < 16 + 32 {
300            return Err(anyhow!("value blob too short"));
301        }
302        let (iv, rest) = value_blob.split_at(16);
303        let (ciphertext, value_mac) = rest.split_at(rest.len() - 32);
304        if validate_macs {
305            let expected = generate_content_mac(
306                operation,
307                &value_blob[..value_blob.len() - 32],
308                key_id,
309                &keys.value_mac,
310            );
311            if expected != value_mac {
312                return Err(anyhow!("content MAC mismatch"));
313            }
314        }
315        let plaintext = aes_256_cbc_decrypt(ciphertext, &keys.value_encryption, iv)?;
316        let action = wa::SyncActionData::decode(plaintext.as_slice())?;
317        let mut index_list: Vec<String> = Vec::new();
318        if let Some(idx_bytes) = action.index.as_ref() {
319            if validate_macs {
320                let stored = record
321                    .index
322                    .as_ref()
323                    .and_then(|i| i.blob.as_ref())
324                    .ok_or_else(|| anyhow!("missing index mac"))?;
325                wacore::appstate::hash::validate_index_mac(idx_bytes, stored, &keys.index)?;
326            }
327            if let Ok(parsed) = serde_json::from_slice::<Vec<String>>(idx_bytes) {
328                index_list = parsed;
329            }
330        }
331        out.push(Mutation {
332            action_value: action.value.clone(),
333            index_mac: record
334                .index
335                .as_ref()
336                .and_then(|i| i.blob.clone())
337                .unwrap_or_default(),
338            value_mac: value_mac.to_vec(),
339            index: index_list,
340            operation,
341        });
342        Ok(())
343    }
344
345    pub async fn sync_collection<D, FDownload>(
346        &self,
347        driver: &D,
348        name: WAPatchName,
349        validate_macs: bool,
350        download: FDownload,
351    ) -> Result<Vec<Mutation>>
352    where
353        D: AppStateSyncDriver + Sync,
354        FDownload: Fn(&wa::ExternalBlobReference) -> Result<Vec<u8>> + Send + Sync,
355    {
356        let mut all = Vec::new();
357        loop {
358            let state = self.backend.get_app_state_version(name.as_str()).await?;
359            let node = driver.fetch_collection(name, state.version).await?;
360            let (mut muts, _new_state, list) = self
361                .decode_patch_list(&node, &download, validate_macs)
362                .await?;
363            all.append(&mut muts);
364            if !list.has_more_patches {
365                break;
366            }
367        }
368        Ok(all)
369    }
370}
371
372#[derive(Debug, Clone)]
373pub struct Mutation {
374    pub action_value: Option<wa::SyncActionValue>,
375    pub index_mac: Vec<u8>,
376    pub value_mac: Vec<u8>,
377    pub index: Vec<String>,
378    pub operation: wa::syncd_mutation::SyncdOperation,
379}
380
381#[async_trait]
382pub trait AppStateSyncDriver {
383    async fn fetch_collection(&self, name: WAPatchName, after_version: u64) -> Result<Node>;
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use crate::store::traits::{AppStateMutationMAC, AppStateStore};
390    use prost::Message;
391    use std::collections::HashMap;
392    use wacore::appstate::WAPATCH_INTEGRITY;
393    use wacore::appstate::hash::HashState;
394    use wacore::appstate::hash::generate_content_mac;
395    use wacore::appstate::keys::expand_app_state_keys;
396    use wacore::libsignal::crypto::aes_256_cbc_encrypt;
397    use wacore::store::error::Result as StoreResult;
398    use wacore::store::traits::AppStateKeyStore as _;
399    use wacore::store::traits::AppStateSyncKey;
400
401    type MockMacMap = Arc<Mutex<HashMap<(String, Vec<u8>), Vec<u8>>>>;
402
403    #[derive(Default, Clone)]
404    struct MockBackend {
405        versions: Arc<Mutex<HashMap<String, HashState>>>,
406        macs: MockMacMap,
407        keys: Arc<Mutex<HashMap<Vec<u8>, AppStateSyncKey>>>,
408    }
409
410    #[async_trait]
411    impl AppStateStore for MockBackend {
412        async fn get_app_state_version(&self, name: &str) -> StoreResult<HashState> {
413            Ok(self
414                .versions
415                .lock()
416                .await
417                .get(name)
418                .cloned()
419                .unwrap_or_default())
420        }
421        async fn set_app_state_version(&self, name: &str, state: HashState) -> StoreResult<()> {
422            self.versions.lock().await.insert(name.to_string(), state);
423            Ok(())
424        }
425        async fn get_app_state_mutation_mac(
426            &self,
427            name: &str,
428            index_mac: &[u8],
429        ) -> StoreResult<Option<Vec<u8>>> {
430            Ok(self
431                .macs
432                .lock()
433                .await
434                .get(&(name.to_string(), index_mac.to_vec()))
435                .cloned())
436        }
437        async fn put_app_state_mutation_macs(
438            &self,
439            name: &str,
440            _version: u64,
441            mutations: &[AppStateMutationMAC],
442        ) -> StoreResult<()> {
443            let mut macs = self.macs.lock().await;
444            for m in mutations {
445                macs.insert((name.to_string(), m.index_mac.clone()), m.value_mac.clone());
446            }
447            Ok(())
448        }
449        async fn delete_app_state_mutation_macs(
450            &self,
451            _name: &str,
452            _index_macs: &[Vec<u8>],
453        ) -> StoreResult<()> {
454            Ok(())
455        }
456    }
457
458    #[async_trait]
459    impl crate::store::traits::AppStateKeyStore for MockBackend {
460        async fn get_app_state_sync_key(
461            &self,
462            key_id: &[u8],
463        ) -> StoreResult<Option<AppStateSyncKey>> {
464            Ok(self.keys.lock().await.get(key_id).cloned())
465        }
466        async fn set_app_state_sync_key(
467            &self,
468            key_id: &[u8],
469            key: AppStateSyncKey,
470        ) -> StoreResult<()> {
471            self.keys.lock().await.insert(key_id.to_vec(), key);
472            Ok(())
473        }
474    }
475
476    #[async_trait]
477    impl crate::store::traits::IdentityStore for MockBackend {
478        async fn put_identity(&self, _: &str, _: [u8; 32]) -> StoreResult<()> {
479            Ok(())
480        }
481        async fn delete_identity(&self, _: &str) -> StoreResult<()> {
482            Ok(())
483        }
484        async fn is_trusted_identity(
485            &self,
486            _: &str,
487            _: &[u8; 32],
488            _: wacore::libsignal::protocol::Direction,
489        ) -> StoreResult<bool> {
490            Ok(true)
491        }
492        async fn load_identity(&self, _: &str) -> StoreResult<Option<Vec<u8>>> {
493            Ok(None)
494        }
495    }
496    #[async_trait]
497    impl crate::store::traits::SessionStore for MockBackend {
498        async fn get_session(&self, _: &str) -> StoreResult<Option<Vec<u8>>> {
499            Ok(None)
500        }
501        async fn put_session(&self, _: &str, _: &[u8]) -> StoreResult<()> {
502            Ok(())
503        }
504        async fn delete_session(&self, _: &str) -> StoreResult<()> {
505            Ok(())
506        }
507        async fn has_session(&self, _: &str) -> StoreResult<bool> {
508            Ok(false)
509        }
510    }
511    #[async_trait]
512    impl wacore::libsignal::store::PreKeyStore for MockBackend {
513        async fn load_prekey(
514            &self,
515            _: u32,
516        ) -> std::result::Result<
517            Option<wa::PreKeyRecordStructure>,
518            Box<dyn std::error::Error + Send + Sync>,
519        > {
520            Ok(None)
521        }
522        async fn store_prekey(
523            &self,
524            _: u32,
525            _: wa::PreKeyRecordStructure,
526            _: bool,
527        ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
528            Ok(())
529        }
530        async fn contains_prekey(
531            &self,
532            _: u32,
533        ) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>> {
534            Ok(false)
535        }
536        async fn remove_prekey(
537            &self,
538            _: u32,
539        ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
540            Ok(())
541        }
542    }
543    #[async_trait]
544    impl wacore::libsignal::store::SignedPreKeyStore for MockBackend {
545        async fn load_signed_prekey(
546            &self,
547            _: u32,
548        ) -> std::result::Result<
549            Option<wa::SignedPreKeyRecordStructure>,
550            Box<dyn std::error::Error + Send + Sync>,
551        > {
552            Ok(None)
553        }
554        async fn load_signed_prekeys(
555            &self,
556        ) -> std::result::Result<
557            Vec<wa::SignedPreKeyRecordStructure>,
558            Box<dyn std::error::Error + Send + Sync>,
559        > {
560            Ok(vec![])
561        }
562        async fn store_signed_prekey(
563            &self,
564            _: u32,
565            _: wa::SignedPreKeyRecordStructure,
566        ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
567            Ok(())
568        }
569        async fn contains_signed_prekey(
570            &self,
571            _: u32,
572        ) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>> {
573            Ok(false)
574        }
575        async fn remove_signed_prekey(
576            &self,
577            _: u32,
578        ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
579            Ok(())
580        }
581    }
582    #[async_trait]
583    impl crate::store::traits::SenderKeyStoreHelper for MockBackend {
584        async fn put_sender_key(&self, _: &str, _: &[u8]) -> StoreResult<()> {
585            Ok(())
586        }
587        async fn get_sender_key(&self, _: &str) -> StoreResult<Option<Vec<u8>>> {
588            Ok(None)
589        }
590        async fn delete_sender_key(&self, _: &str) -> StoreResult<()> {
591            Ok(())
592        }
593    }
594
595    #[async_trait]
596    impl crate::store::traits::DevicePersistence for MockBackend {
597        async fn save_device_data(&self, _device_data: &wacore::store::Device) -> StoreResult<()> {
598            Ok(())
599        }
600
601        async fn save_device_data_for_device(
602            &self,
603            _device_id: i32,
604            _device_data: &wacore::store::Device,
605        ) -> StoreResult<()> {
606            Ok(())
607        }
608
609        async fn load_device_data(&self) -> StoreResult<Option<wacore::store::Device>> {
610            Ok(Some(wacore::store::Device::new()))
611        }
612
613        async fn load_device_data_for_device(
614            &self,
615            _device_id: i32,
616        ) -> StoreResult<Option<wacore::store::Device>> {
617            Ok(Some(wacore::store::Device::new()))
618        }
619
620        async fn device_exists(&self, _device_id: i32) -> StoreResult<bool> {
621            Ok(true)
622        }
623
624        async fn create_new_device(&self) -> StoreResult<i32> {
625            Ok(1)
626        }
627    }
628
629    fn create_encrypted_mutation(
630        op: wa::syncd_mutation::SyncdOperation,
631        index_mac: &[u8],
632        plaintext: &[u8],
633        keys: &wacore::appstate::keys::ExpandedAppStateKeys,
634        key_id_bytes: &[u8],
635    ) -> wa::SyncdMutation {
636        let iv = vec![0u8; 16];
637        let ciphertext = aes_256_cbc_encrypt(plaintext, &keys.value_encryption, &iv).unwrap();
638        let mut value_with_iv = iv;
639        value_with_iv.extend_from_slice(&ciphertext);
640        let value_mac = generate_content_mac(op, &value_with_iv, key_id_bytes, &keys.value_mac);
641        let mut value_blob = value_with_iv;
642        value_blob.extend_from_slice(&value_mac);
643
644        wa::SyncdMutation {
645            operation: Some(op as i32),
646            record: Some(wa::SyncdRecord {
647                index: Some(wa::SyncdIndex {
648                    blob: Some(index_mac.to_vec()),
649                }),
650                value: Some(wa::SyncdValue {
651                    blob: Some(value_blob),
652                }),
653                key_id: Some(wa::KeyId {
654                    id: Some(key_id_bytes.to_vec()),
655                }),
656            }),
657        }
658    }
659
660    #[tokio::test]
661    async fn test_process_patch_list_handles_set_overwrite_correctly() {
662        let backend = Arc::new(MockBackend::default());
663        let processor = AppStateProcessor::new(backend.clone());
664        let collection_name = WAPatchName::Regular;
665        let index_mac = vec![1; 32];
666        let key_id_bytes = b"test_key_id".to_vec();
667        let master_key = [7u8; 32];
668        let keys = expand_app_state_keys(&master_key);
669
670        let sync_key = AppStateSyncKey {
671            key_data: master_key.to_vec(),
672            ..Default::default()
673        };
674        backend
675            .set_app_state_sync_key(&key_id_bytes, sync_key)
676            .await
677            .unwrap();
678
679        let original_plaintext = wa::SyncActionData {
680            value: Some(wa::SyncActionValue {
681                timestamp: Some(1000),
682                ..Default::default()
683            }),
684            ..Default::default()
685        }
686        .encode_to_vec();
687        let original_mutation = create_encrypted_mutation(
688            wa::syncd_mutation::SyncdOperation::Set,
689            &index_mac,
690            &original_plaintext,
691            &keys,
692            &key_id_bytes,
693        );
694
695        let mut initial_state = HashState {
696            version: 1,
697            ..Default::default()
698        };
699        let (warnings, res) =
700            initial_state.update_hash(std::slice::from_ref(&original_mutation), |_, _| Ok(None));
701        assert!(res.is_ok() && warnings.is_empty());
702        backend
703            .set_app_state_version(collection_name.as_str(), initial_state.clone())
704            .await
705            .unwrap();
706
707        let original_value_blob = original_mutation
708            .record
709            .unwrap()
710            .value
711            .unwrap()
712            .blob
713            .unwrap();
714        let original_value_mac = original_value_blob[original_value_blob.len() - 32..].to_vec();
715        backend
716            .put_app_state_mutation_macs(
717                collection_name.as_str(),
718                1,
719                &[AppStateMutationMAC {
720                    index_mac: index_mac.clone(),
721                    value_mac: original_value_mac.clone(),
722                }],
723            )
724            .await
725            .unwrap();
726
727        let new_plaintext = wa::SyncActionData {
728            value: Some(wa::SyncActionValue {
729                timestamp: Some(2000),
730                ..Default::default()
731            }),
732            ..Default::default()
733        }
734        .encode_to_vec();
735        let overwrite_mutation = create_encrypted_mutation(
736            wa::syncd_mutation::SyncdOperation::Set,
737            &index_mac,
738            &new_plaintext,
739            &keys,
740            &key_id_bytes,
741        );
742
743        let patch_list = PatchList {
744            name: collection_name,
745            has_more_patches: false,
746            patches: vec![wa::SyncdPatch {
747                mutations: vec![overwrite_mutation.clone()],
748                version: Some(wa::SyncdVersion { version: Some(2) }),
749                key_id: Some(wa::KeyId {
750                    id: Some(key_id_bytes),
751                }),
752                ..Default::default()
753            }],
754            snapshot: None,
755            snapshot_ref: None,
756        };
757
758        let result = processor.process_patch_list(patch_list, false).await;
759
760        assert!(
761            result.is_ok(),
762            "Processing the patch should succeed, but it failed: {:?}",
763            result.err()
764        );
765        let (_, final_state, _) = result.unwrap();
766
767        let mut expected_state = initial_state.clone();
768        let new_value_blob = overwrite_mutation
769            .record
770            .unwrap()
771            .value
772            .unwrap()
773            .blob
774            .unwrap();
775        let new_value_mac = new_value_blob[new_value_blob.len() - 32..].to_vec();
776
777        WAPATCH_INTEGRITY.subtract_then_add_in_place(
778            &mut expected_state.hash,
779            &[original_value_mac],
780            &[new_value_mac],
781        );
782
783        assert_eq!(
784            final_state.hash, expected_state.hash,
785            "The final LTHash is incorrect, meaning the overwrite was not handled properly."
786        );
787        assert_eq!(
788            final_state.version, 2,
789            "The version should be updated to that of the patch."
790        );
791    }
792}