1use alloc::collections::{BTreeMap, BTreeSet};
12use alloc::string::ToString;
13use alloc::vec::Vec;
14
15use miden_protocol::block::BlockNumber;
16use miden_protocol::note::{
17 Note,
18 NoteDetails,
19 NoteFile,
20 NoteId,
21 NoteInclusionProof,
22 NoteMetadata,
23 NoteTag,
24};
25use miden_tx::auth::TransactionAuthenticator;
26
27use crate::rpc::RpcError;
28use crate::rpc::domain::note::FetchedNote;
29use crate::store::input_note_states::ExpectedNoteState;
30use crate::store::{InputNoteRecord, InputNoteState, NoteFilter};
31use crate::sync::NoteTagRecord;
32use crate::{Client, ClientError};
33
34impl<AUTH> Client<AUTH>
36where
37 AUTH: TransactionAuthenticator + Sync + 'static,
38{
39 pub async fn import_notes(
64 &mut self,
65 note_files: &[NoteFile],
66 ) -> Result<Vec<NoteId>, ClientError> {
67 let mut note_ids_map = BTreeMap::new();
68 for note_file in note_files {
69 let id = match ¬e_file {
70 NoteFile::NoteId(id) => *id,
71 NoteFile::NoteDetails { details, .. } => details.id(),
72 NoteFile::NoteWithProof(note, _) => note.id(),
73 };
74 note_ids_map.insert(id, note_file);
75 }
76
77 let note_ids: Vec<NoteId> = note_ids_map.keys().copied().collect();
78 let previous_notes: Vec<InputNoteRecord> =
79 self.get_input_notes(NoteFilter::List(note_ids)).await?;
80 let previous_notes_map: BTreeMap<NoteId, InputNoteRecord> =
81 previous_notes.into_iter().map(|note| (note.id(), note)).collect();
82
83 let mut requests_by_id = BTreeMap::new();
84 let mut requests_by_details = vec![];
85 let mut requests_by_proof = vec![];
86
87 for (note_id, note_file) in note_ids_map {
88 let previous_note = previous_notes_map.get(¬e_id).cloned();
89
90 if let Some(true) = previous_note.as_ref().map(InputNoteRecord::is_processing) {
93 return Err(ClientError::NoteImportError(format!(
94 "Can't overwrite note with id {note_id} as it's currently being processed",
95 )));
96 }
97
98 match note_file.clone() {
99 NoteFile::NoteId(id) => {
100 requests_by_id.insert(id, previous_note);
101 },
102 NoteFile::NoteDetails { details, after_block_num, tag } => {
103 requests_by_details.push((previous_note, details, after_block_num, tag));
104 },
105 NoteFile::NoteWithProof(note, inclusion_proof) => {
106 requests_by_proof.push((previous_note, note, inclusion_proof));
107 },
108 }
109 }
110
111 let mut imported_notes = vec![];
112 if !requests_by_id.is_empty() {
113 let notes_by_id = self.import_note_records_by_id(requests_by_id).await?;
114 imported_notes.extend(notes_by_id.values().cloned());
115 }
116
117 if !requests_by_details.is_empty() {
118 let notes_by_details = self.import_note_records_by_details(requests_by_details).await?;
119 imported_notes.extend(notes_by_details);
120 }
121
122 if !requests_by_proof.is_empty() {
123 let notes_by_proof = self.import_note_records_by_proof(requests_by_proof).await?;
124 imported_notes.extend(notes_by_proof);
125 }
126
127 let mut imported_note_ids = Vec::with_capacity(imported_notes.len());
128 for note in imported_notes.into_iter().flatten() {
129 imported_note_ids.push(note.id());
130 if let InputNoteState::Expected(ExpectedNoteState { tag: Some(tag), .. }) = note.state()
131 {
132 self.insert_note_tag(NoteTagRecord::with_note_source(*tag, note.id())).await?;
133 }
134 self.store.upsert_input_notes(&[note]).await?;
135 }
136
137 Ok(imported_note_ids)
138 }
139
140 async fn import_note_records_by_id(
151 &self,
152 notes: BTreeMap<NoteId, Option<InputNoteRecord>>,
153 ) -> Result<BTreeMap<NoteId, Option<InputNoteRecord>>, ClientError> {
154 let note_ids = notes.keys().copied().collect::<Vec<_>>();
155
156 let fetched_notes =
157 self.rpc_api.get_notes_by_id(¬e_ids).await.map_err(|err| match err {
158 RpcError::NoteNotFound(note_id) => ClientError::NoteNotFoundOnChain(note_id),
159 err => ClientError::RpcError(err),
160 })?;
161
162 if fetched_notes.is_empty() {
163 return Err(ClientError::NoteImportError("No notes fetched from node".to_string()));
164 }
165
166 let mut note_records = BTreeMap::new();
167 let mut notes_to_request = vec![];
168 for fetched_note in fetched_notes {
169 let note_id = fetched_note.id();
170 let inclusion_proof = fetched_note.inclusion_proof().clone();
171
172 let previous_note =
173 notes.get(¬e_id).cloned().ok_or(ClientError::NoteImportError(format!(
174 "Failed to retrieve note with id {note_id} from node"
175 )))?;
176 if let Some(mut previous_note) = previous_note {
177 if previous_note
178 .inclusion_proof_received(inclusion_proof, fetched_note.metadata().clone())?
179 {
180 self.store.remove_note_tag((&previous_note).try_into()?).await?;
181
182 note_records.insert(note_id, Some(previous_note));
183 } else {
184 note_records.insert(note_id, None);
185 }
186 } else {
187 let fetched_note = match fetched_note {
188 FetchedNote::Public(note, _) => note,
189 FetchedNote::Private(..) => {
190 return Err(ClientError::NoteImportError(
191 "Incomplete imported note is private".to_string(),
192 ));
193 },
194 };
195
196 let note_request = (previous_note, fetched_note, inclusion_proof);
197 notes_to_request.push(note_request);
198 }
199 }
200
201 if !notes_to_request.is_empty() {
202 let note_records_by_proof = self.import_note_records_by_proof(notes_to_request).await?;
203 for note_record in note_records_by_proof.iter().flatten().cloned() {
204 note_records.insert(note_record.id(), Some(note_record));
205 }
206 }
207 Ok(note_records)
208 }
209
210 pub(crate) async fn import_note_records_by_proof(
218 &self,
219 requested_notes: Vec<(Option<InputNoteRecord>, Note, NoteInclusionProof)>,
220 ) -> Result<Vec<Option<InputNoteRecord>>, ClientError> {
221 let mut note_records = vec![];
223
224 let mut nullifier_requests = BTreeSet::new();
225 let mut lowest_block_height: BlockNumber = u32::MAX.into();
226 for (previous_note, note, inclusion_proof) in &requested_notes {
227 if let Some(previous_note) = previous_note {
228 nullifier_requests.insert(previous_note.nullifier());
229 if inclusion_proof.location().block_num() < lowest_block_height {
230 lowest_block_height = inclusion_proof.location().block_num();
231 }
232 } else {
233 nullifier_requests.insert(note.nullifier());
234 if inclusion_proof.location().block_num() < lowest_block_height {
235 lowest_block_height = inclusion_proof.location().block_num();
236 }
237 }
238 }
239
240 let nullifier_commit_heights = self
241 .rpc_api
242 .get_nullifier_commit_heights(nullifier_requests, lowest_block_height)
243 .await?;
244
245 for (previous_note, note, inclusion_proof) in requested_notes {
246 let metadata = note.metadata().clone();
247 let mut note_record = previous_note.unwrap_or(InputNoteRecord::new(
248 note.into(),
249 self.store.get_current_timestamp(),
250 ExpectedNoteState {
251 metadata: Some(metadata.clone()),
252 after_block_num: inclusion_proof.location().block_num(),
253 tag: Some(metadata.tag()),
254 }
255 .into(),
256 ));
257
258 if let Some(Some(block_height)) = nullifier_commit_heights.get(¬e_record.nullifier())
259 {
260 if note_record.consumed_externally(note_record.nullifier(), *block_height)? {
261 note_records.push(Some(note_record));
262 }
263
264 note_records.push(None);
265 } else {
266 let block_height = inclusion_proof.location().block_num();
267 let current_block_num = self.get_sync_height().await?;
268
269 let mut note_changed =
270 note_record.inclusion_proof_received(inclusion_proof, metadata.clone())?;
271
272 if block_height <= current_block_num {
273 let mut current_partial_mmr = self.store.get_current_partial_mmr().await?;
279
280 let block_header = self
281 .get_and_store_authenticated_block(block_height, &mut current_partial_mmr)
282 .await?;
283
284 note_changed |= note_record.block_header_received(&block_header)?;
285 } else {
286 self.insert_note_tag(NoteTagRecord::with_note_source(
289 metadata.tag(),
290 note_record.id(),
291 ))
292 .await?;
293 }
294
295 if note_changed {
296 note_records.push(Some(note_record));
297 } else {
298 note_records.push(None);
299 }
300 }
301 }
302
303 Ok(note_records)
304 }
305
306 async fn import_note_records_by_details(
309 &mut self,
310 requested_notes: Vec<(Option<InputNoteRecord>, NoteDetails, BlockNumber, Option<NoteTag>)>,
311 ) -> Result<Vec<Option<InputNoteRecord>>, ClientError> {
312 let mut lowest_request_block: BlockNumber = u32::MAX.into();
313 let mut note_requests = vec![];
314 for (_, details, after_block_num, tag) in &requested_notes {
315 if let Some(tag) = tag {
316 note_requests.push((details.id(), tag));
317 if after_block_num < &lowest_request_block {
318 lowest_request_block = *after_block_num;
319 }
320 }
321 }
322 let mut committed_notes_data =
323 self.check_expected_notes(lowest_request_block, note_requests).await?;
324
325 let mut note_records = vec![];
326 for (previous_note, details, after_block_num, tag) in requested_notes {
327 let mut note_record = previous_note.unwrap_or({
328 InputNoteRecord::new(
329 details,
330 self.store.get_current_timestamp(),
331 ExpectedNoteState { metadata: None, after_block_num, tag }.into(),
332 )
333 });
334
335 match committed_notes_data.remove(¬e_record.id()) {
336 Some(Some((metadata, inclusion_proof))) => {
337 let mut current_partial_mmr = self.store.get_current_partial_mmr().await?;
341 let block_header = self
342 .get_and_store_authenticated_block(
343 inclusion_proof.location().block_num(),
344 &mut current_partial_mmr,
345 )
346 .await?;
347
348 let note_changed =
349 note_record.inclusion_proof_received(inclusion_proof, metadata.clone())?;
350
351 if note_record.block_header_received(&block_header)? | note_changed {
352 self.store
353 .remove_note_tag(NoteTagRecord::with_note_source(
354 metadata.tag(),
355 note_record.id(),
356 ))
357 .await?;
358
359 note_records.push(Some(note_record));
360 } else {
361 note_records.push(None);
362 }
363 },
364 _ => {
365 note_records.push(Some(note_record));
366 },
367 }
368 }
369
370 Ok(note_records)
371 }
372
373 async fn check_expected_notes(
377 &mut self,
378 mut request_block_num: BlockNumber,
379 expected_notes: Vec<(NoteId, &NoteTag)>,
381 ) -> Result<BTreeMap<NoteId, Option<(NoteMetadata, NoteInclusionProof)>>, ClientError> {
382 let tracked_tags: BTreeSet<NoteTag> = expected_notes.iter().map(|(_, tag)| **tag).collect();
383 let mut retrieved_proofs = BTreeMap::new();
384 let current_block_num = self.get_sync_height().await?;
385 loop {
386 if request_block_num > current_block_num {
387 break;
388 }
389
390 let sync_notes =
391 self.rpc_api.sync_notes(request_block_num, None, &tracked_tags).await?;
392
393 for sync_note in sync_notes.notes {
394 if !expected_notes.iter().any(|(id, _)| id == sync_note.note_id()) {
395 continue;
396 }
397
398 let note_block_num = sync_notes.block_header.block_num();
401
402 if note_block_num > current_block_num {
403 break;
404 }
405
406 let note_inclusion_proof = NoteInclusionProof::new(
407 note_block_num,
408 sync_note.note_index(),
409 sync_note.inclusion_path().clone(),
410 )?;
411
412 retrieved_proofs.insert(
413 *sync_note.note_id(),
414 Some((sync_note.metadata(), note_inclusion_proof)),
415 );
416 }
417
418 if sync_notes.block_header.block_num() == sync_notes.chain_tip {
420 break;
421 }
422
423 request_block_num = sync_notes.block_header.block_num();
428 }
429 Ok(retrieved_proofs)
430 }
431}