Skip to main content

darkpool_client/
scan_engine.rs

1//! Watches `DarkPool` events (`NewNote`, `NewPrivateMemo`, `NewPublicMemo`, `NullifierSpent`)
2//! and syncs local note state via decryption and Merkle tree updates.
3
4use ethers::prelude::*;
5use ethers::types::{Address, Filter, Log, H256, U256};
6use std::sync::{Arc, LazyLock};
7use thiserror::Error;
8use tiny_keccak::{Hasher, Keccak};
9use tracing::{debug, info, warn};
10
11use crate::crypto_helpers::{
12    aes128_decrypt, bjj_scalar_mul, derive_nullifier_path_a, derive_nullifier_path_b,
13    kdf_to_aes_key_iv, recipient_decrypt_3party, unpack_ciphertext_from_fields,
14    unpack_note_plaintext,
15};
16use crate::key_repository::KeyRepository;
17use crate::merkle_tree::LocalMerkleTree;
18use crate::utxo_store::{OwnedNote, UtxoStore};
19
20#[derive(Debug, Error)]
21pub enum ScanError {
22    #[error("Provider error: {0}")]
23    Provider(String),
24    #[error("Decryption failed: {0}")]
25    Decryption(String),
26    #[error("Invalid event data: {0}")]
27    InvalidEvent(String),
28}
29
30/// Discovered public memo, claimable via `public_claim()`
31#[derive(Debug, Clone)]
32pub struct PublicMemoInfo {
33    pub memo_id: U256,
34    pub owner_pk: (U256, U256),
35    pub asset: Address,
36    pub value: U256,
37    pub timelock: U256,
38    pub salt: U256,
39}
40
41#[derive(Debug, Default)]
42pub struct ScanResult {
43    pub new_notes: Vec<U256>,
44    pub spent_nullifiers: Vec<U256>,
45    pub blocks_processed: u64,
46    pub new_commitments: Vec<U256>,
47    pub new_public_memos: Vec<PublicMemoInfo>,
48}
49
50#[derive(Debug, Clone)]
51pub enum DarkPoolEvent {
52    NewNote {
53        commitment: U256,
54        ephemeral_pk: (U256, U256),
55        packed_ciphertext: [U256; 7],
56    },
57    NewPrivateMemo {
58        commitment: U256,
59        transfer_tag: U256,
60        ephemeral_pk: (U256, U256),
61        packed_ciphertext: [U256; 7],
62        /// `a * compliance_pk` -- Bob uses this with ivk for 3-party ECDH decryption
63        int_bob: (U256, U256),
64        /// `a * recipient_b` -- Carol uses this for compliance decryption
65        int_carol: (U256, U256),
66    },
67    NewPublicMemo {
68        memo_id: U256,
69        owner_x: U256,
70        owner_y: U256,
71        asset: Address,
72        value: U256,
73        timelock: U256,
74        salt: U256,
75    },
76    NullifierSpent {
77        nullifier_hash: U256,
78    },
79}
80
81pub struct ScanEngine<M: Middleware> {
82    provider: Arc<M>,
83    darkpool_address: Address,
84    keys: KeyRepository,
85    utxos: UtxoStore,
86    tree: LocalMerkleTree,
87    compliance_pk: (U256, U256),
88    last_scanned_block: u64,
89}
90
91/// Compute keccak256 of a Solidity event signature string
92fn keccak256_event_sig(sig: &str) -> H256 {
93    let mut hasher = Keccak::v256();
94    hasher.update(sig.as_bytes());
95    let mut output = [0u8; 32];
96    hasher.finalize(&mut output);
97    H256::from(output)
98}
99
100// Precomputed event signature hashes (see DarkPool.sol)
101static SIG_NEW_NOTE: LazyLock<H256> =
102    LazyLock::new(|| keccak256_event_sig("NewNote(uint256,bytes32,uint256,uint256,bytes32[7])"));
103static SIG_NULLIFIER_SPENT: LazyLock<H256> =
104    LazyLock::new(|| keccak256_event_sig("NullifierSpent(bytes32)"));
105static SIG_NEW_PRIVATE_MEMO: LazyLock<H256> = LazyLock::new(|| {
106    keccak256_event_sig("NewPrivateMemo(uint256,bytes32,uint256,uint256,uint256,bytes32[7],uint256,uint256,uint256,uint256)")
107});
108static SIG_NEW_PUBLIC_MEMO: LazyLock<H256> = LazyLock::new(|| {
109    keccak256_event_sig("NewPublicMemo(bytes32,uint256,uint256,address,uint256,uint256,uint256)")
110});
111
112impl<M: Middleware + 'static> ScanEngine<M> {
113    pub fn new(
114        provider: Arc<M>,
115        darkpool_address: Address,
116        keys: KeyRepository,
117        compliance_pk: (U256, U256),
118    ) -> Self {
119        Self {
120            provider,
121            darkpool_address,
122            keys,
123            utxos: UtxoStore::new(),
124            tree: LocalMerkleTree::new(),
125            compliance_pk,
126            last_scanned_block: 0,
127        }
128    }
129
130    pub fn with_state(
131        provider: Arc<M>,
132        darkpool_address: Address,
133        keys: KeyRepository,
134        utxos: UtxoStore,
135        tree: LocalMerkleTree,
136        compliance_pk: (U256, U256),
137        last_block: u64,
138    ) -> Self {
139        Self {
140            provider,
141            darkpool_address,
142            keys,
143            utxos,
144            tree,
145            compliance_pk,
146            last_scanned_block: last_block,
147        }
148    }
149
150    pub async fn scan_blocks(
151        &mut self,
152        from_block: u64,
153        to_block: u64,
154    ) -> Result<ScanResult, ScanError> {
155        let mut result = ScanResult::default();
156
157        info!(
158            "Scanning blocks {} to {} for DarkPool events at {:?}",
159            from_block, to_block, self.darkpool_address
160        );
161
162        let filter = Filter::new()
163            .address(self.darkpool_address)
164            .from_block(from_block)
165            .to_block(to_block);
166
167        let logs = self
168            .provider
169            .get_logs(&filter)
170            .await
171            .map_err(|e| ScanError::Provider(e.to_string()))?;
172
173        info!("Found {} logs from DarkPool", logs.len());
174
175        for log in logs {
176            let block_number = log.block_number.map_or(from_block, |b| b.as_u64());
177            self.process_log(&log, block_number, &mut result)?;
178        }
179
180        result.blocks_processed = to_block.saturating_sub(from_block) + 1;
181        self.last_scanned_block = to_block;
182
183        info!(
184            "Scan complete: {} new notes, {} nullifiers spent",
185            result.new_notes.len(),
186            result.spent_nullifiers.len()
187        );
188
189        Ok(result)
190    }
191
192    /// Discriminate a log by topic[0] signature hash and dispatch to the appropriate handler
193    fn process_log(
194        &mut self,
195        log: &Log,
196        block_number: u64,
197        result: &mut ScanResult,
198    ) -> Result<(), ScanError> {
199        if log.topics.is_empty() {
200            return Ok(());
201        }
202
203        let event_sig = log.topics[0];
204
205        debug!(
206            "Processing log: sig={:?}, topics={}, data_len={}",
207            event_sig,
208            log.topics.len(),
209            log.data.len()
210        );
211
212        if event_sig == *SIG_NULLIFIER_SPENT {
213            if log.topics.len() >= 2 {
214                let nullifier_hash = U256::from_big_endian(log.topics[1].as_bytes());
215                self.handle_nullifier_spent(nullifier_hash, result);
216            }
217        } else if event_sig == *SIG_NEW_PUBLIC_MEMO {
218            if log.topics.len() >= 3 && log.data.len() >= 5 * 32 {
219                let memo_id = U256::from_big_endian(log.topics[1].as_bytes());
220                let owner_x = U256::from_big_endian(log.topics[2].as_bytes());
221                info!(
222                    "Processing NewPublicMemo: memo_id={:?}, owner_x={:?}",
223                    memo_id, owner_x
224                );
225
226                if let Ok(event) = self.parse_new_public_memo_event(&log.data, memo_id, owner_x) {
227                    self.handle_new_public_memo(event, result);
228                }
229            }
230        } else if event_sig == *SIG_NEW_NOTE {
231            if log.topics.len() >= 3 && log.data.len() >= 9 * 32 {
232                let commitment = U256::from_big_endian(log.topics[2].as_bytes());
233                info!("Processing NewNote: commitment={:?}", commitment);
234
235                if let Ok(event) = self.parse_new_note_event(&log.data, commitment) {
236                    self.handle_new_note(event, block_number, result)?;
237                }
238            }
239        } else if event_sig == *SIG_NEW_PRIVATE_MEMO {
240            if log.topics.len() >= 4 && log.data.len() >= 13 * 32 {
241                let commitment = U256::from_big_endian(log.topics[2].as_bytes());
242                let transfer_tag = U256::from_big_endian(log.topics[3].as_bytes());
243                info!(
244                    "Processing NewPrivateMemo: commitment={:?}, tag={:?}",
245                    commitment, transfer_tag
246                );
247
248                if let Ok(event) =
249                    self.parse_new_private_memo_event(&log.data, commitment, transfer_tag)
250                {
251                    self.handle_new_private_memo(event, block_number, result)?;
252                }
253            }
254        } else {
255            debug!(
256                "Skipping unknown event: sig={:?}, topics={}, data_len={}",
257                event_sig,
258                log.topics.len(),
259                log.data.len()
260            );
261        }
262
263        Ok(())
264    }
265
266    /// Parse `NewNote` event from ABI-encoded log data
267    fn parse_new_note_event(
268        &self,
269        data: &Bytes,
270        commitment: U256,
271    ) -> Result<DarkPoolEvent, ScanError> {
272        if data.len() < 9 * 32 {
273            return Err(ScanError::InvalidEvent("NewNote data too short".into()));
274        }
275
276        let bytes = data.as_ref();
277
278        let epk_x = U256::from_big_endian(&bytes[0..32]);
279        let epk_y = U256::from_big_endian(&bytes[32..64]);
280
281        let mut packed_ciphertext = [U256::zero(); 7];
282        for (i, item) in packed_ciphertext.iter_mut().enumerate() {
283            let start = 64 + i * 32;
284            *item = U256::from_big_endian(&bytes[start..start + 32]);
285        }
286
287        Ok(DarkPoolEvent::NewNote {
288            commitment,
289            ephemeral_pk: (epk_x, epk_y),
290            packed_ciphertext,
291        })
292    }
293
294    /// Parse `NewPrivateMemo` event from ABI-encoded log data
295    fn parse_new_private_memo_event(
296        &self,
297        data: &Bytes,
298        commitment: U256,
299        transfer_tag: U256,
300    ) -> Result<DarkPoolEvent, ScanError> {
301        if data.len() < 13 * 32 {
302            return Err(ScanError::InvalidEvent(
303                "NewPrivateMemo data too short".into(),
304            ));
305        }
306
307        let bytes = data.as_ref();
308
309        let epk_x = U256::from_big_endian(&bytes[0..32]);
310        let epk_y = U256::from_big_endian(&bytes[32..64]);
311
312        let mut packed_ciphertext = [U256::zero(); 7];
313        for (i, item) in packed_ciphertext.iter_mut().enumerate() {
314            let start = 64 + i * 32;
315            *item = U256::from_big_endian(&bytes[start..start + 32]);
316        }
317
318        let int_bob_x = U256::from_big_endian(&bytes[288..320]);
319        let int_bob_y = U256::from_big_endian(&bytes[320..352]);
320
321        let int_carol_x = U256::from_big_endian(&bytes[352..384]);
322        let int_carol_y = U256::from_big_endian(&bytes[384..416]);
323
324        Ok(DarkPoolEvent::NewPrivateMemo {
325            commitment,
326            transfer_tag,
327            ephemeral_pk: (epk_x, epk_y),
328            packed_ciphertext,
329            int_bob: (int_bob_x, int_bob_y),
330            int_carol: (int_carol_x, int_carol_y),
331        })
332    }
333
334    /// Parse `NewPublicMemo` event from ABI-encoded log data
335    fn parse_new_public_memo_event(
336        &self,
337        data: &Bytes,
338        memo_id: U256,
339        owner_x: U256,
340    ) -> Result<DarkPoolEvent, ScanError> {
341        if data.len() < 5 * 32 {
342            return Err(ScanError::InvalidEvent(
343                "NewPublicMemo data too short".into(),
344            ));
345        }
346
347        let bytes = data.as_ref();
348
349        let owner_y = U256::from_big_endian(&bytes[0..32]);
350        // Address occupies the last 20 bytes of the 32-byte ABI word
351        let asset = Address::from_slice(&bytes[44..64]);
352        let value = U256::from_big_endian(&bytes[64..96]);
353        let timelock = U256::from_big_endian(&bytes[96..128]);
354        let salt = U256::from_big_endian(&bytes[128..160]);
355
356        Ok(DarkPoolEvent::NewPublicMemo {
357            memo_id,
358            owner_x,
359            owner_y,
360            asset,
361            value,
362            timelock,
363            salt,
364        })
365    }
366
367    /// Public memos are not Merkle commitments; tracked for `public_claim()` discovery.
368    fn handle_new_public_memo(&self, event: DarkPoolEvent, result: &mut ScanResult) {
369        if let DarkPoolEvent::NewPublicMemo {
370            memo_id,
371            owner_x,
372            owner_y,
373            asset,
374            value,
375            timelock,
376            salt,
377        } = event
378        {
379            info!(
380                "Discovered NewPublicMemo: memo_id={:?}, value={}, asset={:?}",
381                memo_id, value, asset
382            );
383            result.new_public_memos.push(PublicMemoInfo {
384                memo_id,
385                owner_pk: (owner_x, owner_y),
386                asset,
387                value,
388                timelock,
389                salt,
390            });
391        }
392    }
393
394    fn handle_nullifier_spent(&mut self, nullifier_hash: U256, result: &mut ScanResult) {
395        debug!("NullifierSpent: {:?}", nullifier_hash);
396
397        if let Some(_spent_note) = self.utxos.mark_spent(nullifier_hash) {
398            result.spent_nullifiers.push(nullifier_hash);
399        }
400    }
401
402    fn handle_new_note(
403        &mut self,
404        event: DarkPoolEvent,
405        block_number: u64,
406        result: &mut ScanResult,
407    ) -> Result<(), ScanError> {
408        if let DarkPoolEvent::NewNote {
409            commitment,
410            ephemeral_pk,
411            packed_ciphertext,
412        } = event
413        {
414            let leaf_index = self.tree.insert(commitment);
415            result.new_commitments.push(commitment);
416
417            if let Some((ephemeral_sk, _key_index)) =
418                self.keys.try_match_deposit(ephemeral_pk.0, ephemeral_pk.1)
419            {
420                match self.decrypt_deposit_note(
421                    ephemeral_sk,
422                    ephemeral_pk,
423                    &packed_ciphertext,
424                    commitment,
425                    leaf_index,
426                    block_number,
427                ) {
428                    Ok(note) => {
429                        let nullifier_hash = derive_nullifier_path_a(note.plaintext.nullifier);
430                        self.utxos.add_note(note, nullifier_hash);
431                        result.new_notes.push(commitment);
432                        info!("Received deposit note: commitment={:?}", commitment);
433                    }
434                    Err(e) => {
435                        warn!("Failed to decrypt matched note: {}", e);
436                    }
437                }
438            }
439        }
440
441        Ok(())
442    }
443
444    /// 3-party ECDH: `S = [ivk] * int_bob = a * b * c * G`
445    fn handle_new_private_memo(
446        &mut self,
447        event: DarkPoolEvent,
448        block_number: u64,
449        result: &mut ScanResult,
450    ) -> Result<(), ScanError> {
451        if let DarkPoolEvent::NewPrivateMemo {
452            commitment,
453            transfer_tag,
454            packed_ciphertext,
455            int_bob,
456            ..
457        } = event
458        {
459            let leaf_index = self.tree.insert(commitment);
460            result.new_commitments.push(commitment);
461
462            if let Some((recipient_sk, derivation_index)) =
463                self.keys.try_match_transfer(transfer_tag)
464            {
465                info!(
466                    "Transfer tag matched! Attempting 3-party decryption for derivation_index={}",
467                    derivation_index
468                );
469
470                match recipient_decrypt_3party(recipient_sk, int_bob, &packed_ciphertext) {
471                    Ok((note, shared_secret)) => {
472                        let nullifier_hash =
473                            derive_nullifier_path_b(shared_secret, commitment, leaf_index);
474                        let note_value = note.value;
475
476                        let owned_note = OwnedNote {
477                            plaintext: note,
478                            commitment,
479                            leaf_index,
480                            spending_secret: shared_secret,
481                            is_transfer: true,
482                            received_block: block_number,
483                        };
484
485                        self.utxos.add_note(owned_note, nullifier_hash);
486                        result.new_notes.push(commitment);
487                        info!(
488                            "Received transfer note: commitment={:?}, value={}",
489                            commitment, note_value
490                        );
491                    }
492                    Err(e) => {
493                        warn!("Failed to decrypt transfer memo: {}", e);
494                    }
495                }
496            }
497        }
498
499        Ok(())
500    }
501
502    fn decrypt_deposit_note(
503        &self,
504        ephemeral_sk: U256,
505        _ephemeral_pk: (U256, U256),
506        packed_ciphertext: &[U256; 7],
507        commitment: U256,
508        leaf_index: u64,
509        block_number: u64,
510    ) -> Result<OwnedNote, ScanError> {
511        // ECDH: shared_secret = ([ephemeral_sk] * compliance_pk).x
512        let shared_point = bjj_scalar_mul(ephemeral_sk, self.compliance_pk)
513            .map_err(|e| ScanError::Decryption(e.to_string()))?;
514        let shared_secret = shared_point.0;
515
516        let (key, iv) = kdf_to_aes_key_iv(shared_secret);
517        let ciphertext_bytes = unpack_ciphertext_from_fields(packed_ciphertext);
518        let plaintext_bytes = aes128_decrypt(&ciphertext_bytes, &key, &iv)
519            .map_err(|e| ScanError::Decryption(e.to_string()))?;
520        let note = unpack_note_plaintext(&plaintext_bytes);
521
522        Ok(OwnedNote {
523            plaintext: note,
524            commitment,
525            leaf_index,
526            spending_secret: shared_secret,
527            is_transfer: false,
528            received_block: block_number,
529        })
530    }
531
532    #[must_use]
533    pub fn utxos(&self) -> &UtxoStore {
534        &self.utxos
535    }
536
537    pub fn utxos_mut(&mut self) -> &mut UtxoStore {
538        &mut self.utxos
539    }
540
541    #[must_use]
542    pub fn tree(&self) -> &LocalMerkleTree {
543        &self.tree
544    }
545
546    pub fn tree_mut(&mut self) -> &mut LocalMerkleTree {
547        &mut self.tree
548    }
549
550    #[must_use]
551    pub fn keys(&self) -> &KeyRepository {
552        &self.keys
553    }
554
555    pub fn keys_mut(&mut self) -> &mut KeyRepository {
556        &mut self.keys
557    }
558
559    #[must_use]
560    pub fn last_scanned_block(&self) -> u64 {
561        self.last_scanned_block
562    }
563
564    #[must_use]
565    pub fn root(&self) -> U256 {
566        self.tree.root()
567    }
568
569    #[must_use]
570    pub fn balance(&self, asset: Address) -> U256 {
571        self.utxos.get_balance(asset)
572    }
573
574    pub fn advance_keys(&mut self, count: u64) {
575        self.keys.advance_ephemeral_keys(count);
576        self.keys.advance_incoming_keys(count);
577    }
578
579    /// Process pre-fetched logs (e.g. fetched via mixnet) without querying the provider.
580    pub fn process_logs_directly(&mut self, logs: &[Log]) -> Result<ScanResult, ScanError> {
581        let mut result = ScanResult::default();
582
583        info!("Processing {} pre-fetched logs", logs.len());
584
585        for log in logs {
586            let block_number = log.block_number.map_or(0, |b| b.as_u64());
587            self.process_log(log, block_number, &mut result)?;
588        }
589
590        info!(
591            "Direct log processing complete: {} new notes, {} nullifiers spent, {} commitments",
592            result.new_notes.len(),
593            result.spent_nullifiers.len(),
594            result.new_commitments.len()
595        );
596
597        Ok(result)
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604
605    #[test]
606    fn test_scan_result_default() {
607        let result = ScanResult::default();
608        assert!(result.new_notes.is_empty());
609        assert!(result.spent_nullifiers.is_empty());
610        assert_eq!(result.blocks_processed, 0);
611    }
612}