1use ethers::prelude::*;
5use ethers::types::{Address, Filter, Log, H256, U256};
6use std::sync::{Arc, LazyLock};
7use thiserror::Error;
8use tiny_keccak::{Hasher, Keccak};
9use tracing::{debug, info, warn};
10
11use crate::crypto_helpers::{
12 aes128_decrypt, bjj_scalar_mul, derive_nullifier_path_a, derive_nullifier_path_b,
13 kdf_to_aes_key_iv, recipient_decrypt_3party, unpack_ciphertext_from_fields,
14 unpack_note_plaintext,
15};
16use crate::key_repository::KeyRepository;
17use crate::merkle_tree::LocalMerkleTree;
18use crate::utxo_store::{OwnedNote, UtxoStore};
19
20#[derive(Debug, Error)]
21pub enum ScanError {
22 #[error("Provider error: {0}")]
23 Provider(String),
24 #[error("Decryption failed: {0}")]
25 Decryption(String),
26 #[error("Invalid event data: {0}")]
27 InvalidEvent(String),
28}
29
30#[derive(Debug, Clone)]
32pub struct PublicMemoInfo {
33 pub memo_id: U256,
34 pub owner_pk: (U256, U256),
35 pub asset: Address,
36 pub value: U256,
37 pub timelock: U256,
38 pub salt: U256,
39}
40
41#[derive(Debug, Default)]
42pub struct ScanResult {
43 pub new_notes: Vec<U256>,
44 pub spent_nullifiers: Vec<U256>,
45 pub blocks_processed: u64,
46 pub new_commitments: Vec<U256>,
47 pub new_public_memos: Vec<PublicMemoInfo>,
48}
49
50#[derive(Debug, Clone)]
51pub enum DarkPoolEvent {
52 NewNote {
53 commitment: U256,
54 ephemeral_pk: (U256, U256),
55 packed_ciphertext: [U256; 7],
56 },
57 NewPrivateMemo {
58 commitment: U256,
59 transfer_tag: U256,
60 ephemeral_pk: (U256, U256),
61 packed_ciphertext: [U256; 7],
62 int_bob: (U256, U256),
64 int_carol: (U256, U256),
66 },
67 NewPublicMemo {
68 memo_id: U256,
69 owner_x: U256,
70 owner_y: U256,
71 asset: Address,
72 value: U256,
73 timelock: U256,
74 salt: U256,
75 },
76 NullifierSpent {
77 nullifier_hash: U256,
78 },
79}
80
81pub struct ScanEngine<M: Middleware> {
82 provider: Arc<M>,
83 darkpool_address: Address,
84 keys: KeyRepository,
85 utxos: UtxoStore,
86 tree: LocalMerkleTree,
87 compliance_pk: (U256, U256),
88 last_scanned_block: u64,
89}
90
91fn keccak256_event_sig(sig: &str) -> H256 {
93 let mut hasher = Keccak::v256();
94 hasher.update(sig.as_bytes());
95 let mut output = [0u8; 32];
96 hasher.finalize(&mut output);
97 H256::from(output)
98}
99
100static SIG_NEW_NOTE: LazyLock<H256> =
102 LazyLock::new(|| keccak256_event_sig("NewNote(uint256,bytes32,uint256,uint256,bytes32[7])"));
103static SIG_NULLIFIER_SPENT: LazyLock<H256> =
104 LazyLock::new(|| keccak256_event_sig("NullifierSpent(bytes32)"));
105static SIG_NEW_PRIVATE_MEMO: LazyLock<H256> = LazyLock::new(|| {
106 keccak256_event_sig("NewPrivateMemo(uint256,bytes32,uint256,uint256,uint256,bytes32[7],uint256,uint256,uint256,uint256)")
107});
108static SIG_NEW_PUBLIC_MEMO: LazyLock<H256> = LazyLock::new(|| {
109 keccak256_event_sig("NewPublicMemo(bytes32,uint256,uint256,address,uint256,uint256,uint256)")
110});
111
112impl<M: Middleware + 'static> ScanEngine<M> {
113 pub fn new(
114 provider: Arc<M>,
115 darkpool_address: Address,
116 keys: KeyRepository,
117 compliance_pk: (U256, U256),
118 ) -> Self {
119 Self {
120 provider,
121 darkpool_address,
122 keys,
123 utxos: UtxoStore::new(),
124 tree: LocalMerkleTree::new(),
125 compliance_pk,
126 last_scanned_block: 0,
127 }
128 }
129
130 pub fn with_state(
131 provider: Arc<M>,
132 darkpool_address: Address,
133 keys: KeyRepository,
134 utxos: UtxoStore,
135 tree: LocalMerkleTree,
136 compliance_pk: (U256, U256),
137 last_block: u64,
138 ) -> Self {
139 Self {
140 provider,
141 darkpool_address,
142 keys,
143 utxos,
144 tree,
145 compliance_pk,
146 last_scanned_block: last_block,
147 }
148 }
149
150 pub async fn scan_blocks(
151 &mut self,
152 from_block: u64,
153 to_block: u64,
154 ) -> Result<ScanResult, ScanError> {
155 let mut result = ScanResult::default();
156
157 info!(
158 "Scanning blocks {} to {} for DarkPool events at {:?}",
159 from_block, to_block, self.darkpool_address
160 );
161
162 let filter = Filter::new()
163 .address(self.darkpool_address)
164 .from_block(from_block)
165 .to_block(to_block);
166
167 let logs = self
168 .provider
169 .get_logs(&filter)
170 .await
171 .map_err(|e| ScanError::Provider(e.to_string()))?;
172
173 info!("Found {} logs from DarkPool", logs.len());
174
175 for log in logs {
176 let block_number = log.block_number.map_or(from_block, |b| b.as_u64());
177 self.process_log(&log, block_number, &mut result)?;
178 }
179
180 result.blocks_processed = to_block.saturating_sub(from_block) + 1;
181 self.last_scanned_block = to_block;
182
183 info!(
184 "Scan complete: {} new notes, {} nullifiers spent",
185 result.new_notes.len(),
186 result.spent_nullifiers.len()
187 );
188
189 Ok(result)
190 }
191
192 fn process_log(
194 &mut self,
195 log: &Log,
196 block_number: u64,
197 result: &mut ScanResult,
198 ) -> Result<(), ScanError> {
199 if log.topics.is_empty() {
200 return Ok(());
201 }
202
203 let event_sig = log.topics[0];
204
205 debug!(
206 "Processing log: sig={:?}, topics={}, data_len={}",
207 event_sig,
208 log.topics.len(),
209 log.data.len()
210 );
211
212 if event_sig == *SIG_NULLIFIER_SPENT {
213 if log.topics.len() >= 2 {
214 let nullifier_hash = U256::from_big_endian(log.topics[1].as_bytes());
215 self.handle_nullifier_spent(nullifier_hash, result);
216 }
217 } else if event_sig == *SIG_NEW_PUBLIC_MEMO {
218 if log.topics.len() >= 3 && log.data.len() >= 5 * 32 {
219 let memo_id = U256::from_big_endian(log.topics[1].as_bytes());
220 let owner_x = U256::from_big_endian(log.topics[2].as_bytes());
221 info!(
222 "Processing NewPublicMemo: memo_id={:?}, owner_x={:?}",
223 memo_id, owner_x
224 );
225
226 if let Ok(event) = self.parse_new_public_memo_event(&log.data, memo_id, owner_x) {
227 self.handle_new_public_memo(event, result);
228 }
229 }
230 } else if event_sig == *SIG_NEW_NOTE {
231 if log.topics.len() >= 3 && log.data.len() >= 9 * 32 {
232 let commitment = U256::from_big_endian(log.topics[2].as_bytes());
233 info!("Processing NewNote: commitment={:?}", commitment);
234
235 if let Ok(event) = self.parse_new_note_event(&log.data, commitment) {
236 self.handle_new_note(event, block_number, result)?;
237 }
238 }
239 } else if event_sig == *SIG_NEW_PRIVATE_MEMO {
240 if log.topics.len() >= 4 && log.data.len() >= 13 * 32 {
241 let commitment = U256::from_big_endian(log.topics[2].as_bytes());
242 let transfer_tag = U256::from_big_endian(log.topics[3].as_bytes());
243 info!(
244 "Processing NewPrivateMemo: commitment={:?}, tag={:?}",
245 commitment, transfer_tag
246 );
247
248 if let Ok(event) =
249 self.parse_new_private_memo_event(&log.data, commitment, transfer_tag)
250 {
251 self.handle_new_private_memo(event, block_number, result)?;
252 }
253 }
254 } else {
255 debug!(
256 "Skipping unknown event: sig={:?}, topics={}, data_len={}",
257 event_sig,
258 log.topics.len(),
259 log.data.len()
260 );
261 }
262
263 Ok(())
264 }
265
266 fn parse_new_note_event(
268 &self,
269 data: &Bytes,
270 commitment: U256,
271 ) -> Result<DarkPoolEvent, ScanError> {
272 if data.len() < 9 * 32 {
273 return Err(ScanError::InvalidEvent("NewNote data too short".into()));
274 }
275
276 let bytes = data.as_ref();
277
278 let epk_x = U256::from_big_endian(&bytes[0..32]);
279 let epk_y = U256::from_big_endian(&bytes[32..64]);
280
281 let mut packed_ciphertext = [U256::zero(); 7];
282 for (i, item) in packed_ciphertext.iter_mut().enumerate() {
283 let start = 64 + i * 32;
284 *item = U256::from_big_endian(&bytes[start..start + 32]);
285 }
286
287 Ok(DarkPoolEvent::NewNote {
288 commitment,
289 ephemeral_pk: (epk_x, epk_y),
290 packed_ciphertext,
291 })
292 }
293
294 fn parse_new_private_memo_event(
296 &self,
297 data: &Bytes,
298 commitment: U256,
299 transfer_tag: U256,
300 ) -> Result<DarkPoolEvent, ScanError> {
301 if data.len() < 13 * 32 {
302 return Err(ScanError::InvalidEvent(
303 "NewPrivateMemo data too short".into(),
304 ));
305 }
306
307 let bytes = data.as_ref();
308
309 let epk_x = U256::from_big_endian(&bytes[0..32]);
310 let epk_y = U256::from_big_endian(&bytes[32..64]);
311
312 let mut packed_ciphertext = [U256::zero(); 7];
313 for (i, item) in packed_ciphertext.iter_mut().enumerate() {
314 let start = 64 + i * 32;
315 *item = U256::from_big_endian(&bytes[start..start + 32]);
316 }
317
318 let int_bob_x = U256::from_big_endian(&bytes[288..320]);
319 let int_bob_y = U256::from_big_endian(&bytes[320..352]);
320
321 let int_carol_x = U256::from_big_endian(&bytes[352..384]);
322 let int_carol_y = U256::from_big_endian(&bytes[384..416]);
323
324 Ok(DarkPoolEvent::NewPrivateMemo {
325 commitment,
326 transfer_tag,
327 ephemeral_pk: (epk_x, epk_y),
328 packed_ciphertext,
329 int_bob: (int_bob_x, int_bob_y),
330 int_carol: (int_carol_x, int_carol_y),
331 })
332 }
333
334 fn parse_new_public_memo_event(
336 &self,
337 data: &Bytes,
338 memo_id: U256,
339 owner_x: U256,
340 ) -> Result<DarkPoolEvent, ScanError> {
341 if data.len() < 5 * 32 {
342 return Err(ScanError::InvalidEvent(
343 "NewPublicMemo data too short".into(),
344 ));
345 }
346
347 let bytes = data.as_ref();
348
349 let owner_y = U256::from_big_endian(&bytes[0..32]);
350 let asset = Address::from_slice(&bytes[44..64]);
352 let value = U256::from_big_endian(&bytes[64..96]);
353 let timelock = U256::from_big_endian(&bytes[96..128]);
354 let salt = U256::from_big_endian(&bytes[128..160]);
355
356 Ok(DarkPoolEvent::NewPublicMemo {
357 memo_id,
358 owner_x,
359 owner_y,
360 asset,
361 value,
362 timelock,
363 salt,
364 })
365 }
366
367 fn handle_new_public_memo(&self, event: DarkPoolEvent, result: &mut ScanResult) {
369 if let DarkPoolEvent::NewPublicMemo {
370 memo_id,
371 owner_x,
372 owner_y,
373 asset,
374 value,
375 timelock,
376 salt,
377 } = event
378 {
379 info!(
380 "Discovered NewPublicMemo: memo_id={:?}, value={}, asset={:?}",
381 memo_id, value, asset
382 );
383 result.new_public_memos.push(PublicMemoInfo {
384 memo_id,
385 owner_pk: (owner_x, owner_y),
386 asset,
387 value,
388 timelock,
389 salt,
390 });
391 }
392 }
393
394 fn handle_nullifier_spent(&mut self, nullifier_hash: U256, result: &mut ScanResult) {
395 debug!("NullifierSpent: {:?}", nullifier_hash);
396
397 if let Some(_spent_note) = self.utxos.mark_spent(nullifier_hash) {
398 result.spent_nullifiers.push(nullifier_hash);
399 }
400 }
401
402 fn handle_new_note(
403 &mut self,
404 event: DarkPoolEvent,
405 block_number: u64,
406 result: &mut ScanResult,
407 ) -> Result<(), ScanError> {
408 if let DarkPoolEvent::NewNote {
409 commitment,
410 ephemeral_pk,
411 packed_ciphertext,
412 } = event
413 {
414 let leaf_index = self.tree.insert(commitment);
415 result.new_commitments.push(commitment);
416
417 if let Some((ephemeral_sk, _key_index)) =
418 self.keys.try_match_deposit(ephemeral_pk.0, ephemeral_pk.1)
419 {
420 match self.decrypt_deposit_note(
421 ephemeral_sk,
422 ephemeral_pk,
423 &packed_ciphertext,
424 commitment,
425 leaf_index,
426 block_number,
427 ) {
428 Ok(note) => {
429 let nullifier_hash = derive_nullifier_path_a(note.plaintext.nullifier);
430 self.utxos.add_note(note, nullifier_hash);
431 result.new_notes.push(commitment);
432 info!("Received deposit note: commitment={:?}", commitment);
433 }
434 Err(e) => {
435 warn!("Failed to decrypt matched note: {}", e);
436 }
437 }
438 }
439 }
440
441 Ok(())
442 }
443
444 fn handle_new_private_memo(
446 &mut self,
447 event: DarkPoolEvent,
448 block_number: u64,
449 result: &mut ScanResult,
450 ) -> Result<(), ScanError> {
451 if let DarkPoolEvent::NewPrivateMemo {
452 commitment,
453 transfer_tag,
454 packed_ciphertext,
455 int_bob,
456 ..
457 } = event
458 {
459 let leaf_index = self.tree.insert(commitment);
460 result.new_commitments.push(commitment);
461
462 if let Some((recipient_sk, derivation_index)) =
463 self.keys.try_match_transfer(transfer_tag)
464 {
465 info!(
466 "Transfer tag matched! Attempting 3-party decryption for derivation_index={}",
467 derivation_index
468 );
469
470 match recipient_decrypt_3party(recipient_sk, int_bob, &packed_ciphertext) {
471 Ok((note, shared_secret)) => {
472 let nullifier_hash =
473 derive_nullifier_path_b(shared_secret, commitment, leaf_index);
474 let note_value = note.value;
475
476 let owned_note = OwnedNote {
477 plaintext: note,
478 commitment,
479 leaf_index,
480 spending_secret: shared_secret,
481 is_transfer: true,
482 received_block: block_number,
483 };
484
485 self.utxos.add_note(owned_note, nullifier_hash);
486 result.new_notes.push(commitment);
487 info!(
488 "Received transfer note: commitment={:?}, value={}",
489 commitment, note_value
490 );
491 }
492 Err(e) => {
493 warn!("Failed to decrypt transfer memo: {}", e);
494 }
495 }
496 }
497 }
498
499 Ok(())
500 }
501
502 fn decrypt_deposit_note(
503 &self,
504 ephemeral_sk: U256,
505 _ephemeral_pk: (U256, U256),
506 packed_ciphertext: &[U256; 7],
507 commitment: U256,
508 leaf_index: u64,
509 block_number: u64,
510 ) -> Result<OwnedNote, ScanError> {
511 let shared_point = bjj_scalar_mul(ephemeral_sk, self.compliance_pk)
513 .map_err(|e| ScanError::Decryption(e.to_string()))?;
514 let shared_secret = shared_point.0;
515
516 let (key, iv) = kdf_to_aes_key_iv(shared_secret);
517 let ciphertext_bytes = unpack_ciphertext_from_fields(packed_ciphertext);
518 let plaintext_bytes = aes128_decrypt(&ciphertext_bytes, &key, &iv)
519 .map_err(|e| ScanError::Decryption(e.to_string()))?;
520 let note = unpack_note_plaintext(&plaintext_bytes);
521
522 Ok(OwnedNote {
523 plaintext: note,
524 commitment,
525 leaf_index,
526 spending_secret: shared_secret,
527 is_transfer: false,
528 received_block: block_number,
529 })
530 }
531
532 #[must_use]
533 pub fn utxos(&self) -> &UtxoStore {
534 &self.utxos
535 }
536
537 pub fn utxos_mut(&mut self) -> &mut UtxoStore {
538 &mut self.utxos
539 }
540
541 #[must_use]
542 pub fn tree(&self) -> &LocalMerkleTree {
543 &self.tree
544 }
545
546 pub fn tree_mut(&mut self) -> &mut LocalMerkleTree {
547 &mut self.tree
548 }
549
550 #[must_use]
551 pub fn keys(&self) -> &KeyRepository {
552 &self.keys
553 }
554
555 pub fn keys_mut(&mut self) -> &mut KeyRepository {
556 &mut self.keys
557 }
558
559 #[must_use]
560 pub fn last_scanned_block(&self) -> u64 {
561 self.last_scanned_block
562 }
563
564 #[must_use]
565 pub fn root(&self) -> U256 {
566 self.tree.root()
567 }
568
569 #[must_use]
570 pub fn balance(&self, asset: Address) -> U256 {
571 self.utxos.get_balance(asset)
572 }
573
574 pub fn advance_keys(&mut self, count: u64) {
575 self.keys.advance_ephemeral_keys(count);
576 self.keys.advance_incoming_keys(count);
577 }
578
579 pub fn process_logs_directly(&mut self, logs: &[Log]) -> Result<ScanResult, ScanError> {
581 let mut result = ScanResult::default();
582
583 info!("Processing {} pre-fetched logs", logs.len());
584
585 for log in logs {
586 let block_number = log.block_number.map_or(0, |b| b.as_u64());
587 self.process_log(log, block_number, &mut result)?;
588 }
589
590 info!(
591 "Direct log processing complete: {} new notes, {} nullifiers spent, {} commitments",
592 result.new_notes.len(),
593 result.spent_nullifiers.len(),
594 result.new_commitments.len()
595 );
596
597 Ok(result)
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604
605 #[test]
606 fn test_scan_result_default() {
607 let result = ScanResult::default();
608 assert!(result.new_notes.is_empty());
609 assert!(result.spent_nullifiers.is_empty());
610 assert_eq!(result.blocks_processed, 0);
611 }
612}