1use alloc::collections::{BTreeMap, BTreeSet};
12use alloc::string::ToString;
13use alloc::vec::Vec;
14
15use miden_protocol::block::BlockNumber;
16use miden_protocol::note::{
17 Note,
18 NoteDetails,
19 NoteFile,
20 NoteId,
21 NoteInclusionProof,
22 NoteMetadata,
23 NoteTag,
24};
25use miden_tx::auth::TransactionAuthenticator;
26
27use crate::rpc::RpcError;
28use crate::rpc::domain::note::FetchedNote;
29use crate::store::input_note_states::ExpectedNoteState;
30use crate::store::{InputNoteRecord, InputNoteState, NoteFilter};
31use crate::sync::NoteTagRecord;
32use crate::{Client, ClientError};
33
34impl<AUTH> Client<AUTH>
36where
37 AUTH: TransactionAuthenticator + Sync + 'static,
38{
39 pub async fn import_notes(
63 &mut self,
64 note_files: &[NoteFile],
65 ) -> Result<Vec<NoteId>, ClientError> {
66 let mut note_ids_map = BTreeMap::new();
67 for note_file in note_files {
68 let id = match ¬e_file {
69 NoteFile::NoteId(id) => *id,
70 NoteFile::NoteDetails { details, .. } => details.id(),
71 NoteFile::NoteWithProof(note, _) => note.id(),
72 };
73 note_ids_map.insert(id, note_file);
74 }
75
76 let note_ids: Vec<NoteId> = note_ids_map.keys().copied().collect();
77 let previous_notes: Vec<InputNoteRecord> =
78 self.get_input_notes(NoteFilter::List(note_ids)).await?;
79 let previous_notes_map: BTreeMap<NoteId, InputNoteRecord> =
80 previous_notes.into_iter().map(|note| (note.id(), note)).collect();
81
82 let mut requests_by_id = BTreeMap::new();
83 let mut requests_by_details = vec![];
84 let mut requests_by_proof = vec![];
85
86 for (note_id, note_file) in note_ids_map {
87 let previous_note = previous_notes_map.get(¬e_id).cloned();
88
89 if let Some(true) = previous_note.as_ref().map(InputNoteRecord::is_processing) {
92 return Err(ClientError::NoteImportError(format!(
93 "Can't overwrite note with id {note_id} as it's currently being processed",
94 )));
95 }
96
97 match note_file.clone() {
98 NoteFile::NoteId(id) => {
99 requests_by_id.insert(id, previous_note);
100 },
101 NoteFile::NoteDetails { details, after_block_num, tag } => {
102 requests_by_details.push((previous_note, details, after_block_num, tag));
103 },
104 NoteFile::NoteWithProof(note, inclusion_proof) => {
105 requests_by_proof.push((previous_note, note, inclusion_proof));
106 },
107 }
108 }
109
110 let mut imported_notes = vec![];
111 if !requests_by_id.is_empty() {
112 let notes_by_id = self.import_note_records_by_id(requests_by_id).await?;
113 imported_notes.extend(notes_by_id.values().cloned());
114 }
115
116 if !requests_by_details.is_empty() {
117 let notes_by_details = self.import_note_records_by_details(requests_by_details).await?;
118 imported_notes.extend(notes_by_details);
119 }
120
121 if !requests_by_proof.is_empty() {
122 let notes_by_proof = self.import_note_records_by_proof(requests_by_proof).await?;
123 imported_notes.extend(notes_by_proof);
124 }
125
126 let mut imported_note_ids = Vec::with_capacity(imported_notes.len());
127 for note in imported_notes.into_iter().flatten() {
128 imported_note_ids.push(note.id());
129 if let InputNoteState::Expected(ExpectedNoteState { tag: Some(tag), .. }) = note.state()
130 {
131 self.insert_note_tag(NoteTagRecord::with_note_source(*tag, note.id())).await?;
132 }
133 self.store.upsert_input_notes(&[note]).await?;
134 }
135
136 Ok(imported_note_ids)
137 }
138
139 async fn import_note_records_by_id(
150 &self,
151 notes: BTreeMap<NoteId, Option<InputNoteRecord>>,
152 ) -> Result<BTreeMap<NoteId, Option<InputNoteRecord>>, ClientError> {
153 let note_ids = notes.keys().copied().collect::<Vec<_>>();
154
155 let fetched_notes =
156 self.rpc_api.get_notes_by_id(¬e_ids).await.map_err(|err| match err {
157 RpcError::NoteNotFound(note_id) => ClientError::NoteNotFoundOnChain(note_id),
158 err => ClientError::RpcError(err),
159 })?;
160
161 if fetched_notes.is_empty() {
162 return Err(ClientError::NoteImportError("No notes fetched from node".to_string()));
163 }
164
165 let mut note_records = BTreeMap::new();
166 let mut notes_to_request = vec![];
167 for fetched_note in fetched_notes {
168 let note_id = fetched_note.id();
169 let inclusion_proof = fetched_note.inclusion_proof().clone();
170
171 let previous_note =
172 notes.get(¬e_id).cloned().ok_or(ClientError::NoteImportError(format!(
173 "Failed to retrieve note with id {note_id} from node"
174 )))?;
175 if let Some(mut previous_note) = previous_note {
176 if previous_note
177 .inclusion_proof_received(inclusion_proof, fetched_note.metadata().clone())?
178 {
179 self.store.remove_note_tag((&previous_note).try_into()?).await?;
180
181 note_records.insert(note_id, Some(previous_note));
182 } else {
183 note_records.insert(note_id, None);
184 }
185 } else {
186 let fetched_note = match fetched_note {
187 FetchedNote::Public(note, _) => note,
188 FetchedNote::Private(..) => {
189 return Err(ClientError::NoteImportError(
190 "Incomplete imported note is private".to_string(),
191 ));
192 },
193 };
194
195 let note_request = (previous_note, fetched_note, inclusion_proof);
196 notes_to_request.push(note_request);
197 }
198 }
199
200 if !notes_to_request.is_empty() {
201 let note_records_by_proof = self.import_note_records_by_proof(notes_to_request).await?;
202 for note_record in note_records_by_proof.iter().flatten().cloned() {
203 note_records.insert(note_record.id(), Some(note_record));
204 }
205 }
206 Ok(note_records)
207 }
208
209 pub(crate) async fn import_note_records_by_proof(
217 &self,
218 requested_notes: Vec<(Option<InputNoteRecord>, Note, NoteInclusionProof)>,
219 ) -> Result<Vec<Option<InputNoteRecord>>, ClientError> {
220 let mut note_records = vec![];
222
223 let mut nullifier_requests = BTreeSet::new();
224 let mut lowest_block_height: BlockNumber = u32::MAX.into();
225 for (previous_note, note, inclusion_proof) in &requested_notes {
226 if let Some(previous_note) = previous_note {
227 nullifier_requests.insert(previous_note.nullifier());
228 if inclusion_proof.location().block_num() < lowest_block_height {
229 lowest_block_height = inclusion_proof.location().block_num();
230 }
231 } else {
232 nullifier_requests.insert(note.nullifier());
233 if inclusion_proof.location().block_num() < lowest_block_height {
234 lowest_block_height = inclusion_proof.location().block_num();
235 }
236 }
237 }
238
239 let nullifier_commit_heights = self
240 .rpc_api
241 .get_nullifier_commit_heights(nullifier_requests, lowest_block_height)
242 .await?;
243
244 for (previous_note, note, inclusion_proof) in requested_notes {
245 let metadata = note.metadata().clone();
246 let mut note_record = previous_note.unwrap_or(InputNoteRecord::new(
247 note.into(),
248 self.store.get_current_timestamp(),
249 ExpectedNoteState {
250 metadata: Some(metadata.clone()),
251 after_block_num: inclusion_proof.location().block_num(),
252 tag: Some(metadata.tag()),
253 }
254 .into(),
255 ));
256
257 if let Some(Some(block_height)) = nullifier_commit_heights.get(¬e_record.nullifier())
258 {
259 if note_record.consumed_externally(note_record.nullifier(), *block_height)? {
260 note_records.push(Some(note_record));
261 }
262
263 note_records.push(None);
264 } else {
265 let block_height = inclusion_proof.location().block_num();
266 let current_block_num = self.get_sync_height().await?;
267
268 let tag = metadata.tag();
269 let mut note_changed =
270 note_record.inclusion_proof_received(inclusion_proof, metadata)?;
271
272 if block_height <= current_block_num {
273 let mut current_partial_mmr = self.store.get_current_partial_mmr().await?;
279
280 let block_header = self
281 .get_and_store_authenticated_block(block_height, &mut current_partial_mmr)
282 .await?;
283
284 note_changed |= note_record.block_header_received(&block_header)?;
285 } else {
286 self.insert_note_tag(NoteTagRecord::with_note_source(tag, note_record.id()))
289 .await?;
290 }
291
292 if note_changed {
293 note_records.push(Some(note_record));
294 } else {
295 note_records.push(None);
296 }
297 }
298 }
299
300 Ok(note_records)
301 }
302
303 async fn import_note_records_by_details(
306 &mut self,
307 requested_notes: Vec<(Option<InputNoteRecord>, NoteDetails, BlockNumber, Option<NoteTag>)>,
308 ) -> Result<Vec<Option<InputNoteRecord>>, ClientError> {
309 let mut lowest_request_block: BlockNumber = u32::MAX.into();
310 let mut note_requests = vec![];
311 for (_, details, after_block_num, tag) in &requested_notes {
312 if let Some(tag) = tag {
313 note_requests.push((details.id(), tag));
314 if after_block_num < &lowest_request_block {
315 lowest_request_block = *after_block_num;
316 }
317 }
318 }
319 let mut committed_notes_data =
320 self.check_expected_notes(lowest_request_block, note_requests).await?;
321
322 let mut note_records = vec![];
323 for (previous_note, details, after_block_num, tag) in requested_notes {
324 let mut note_record = previous_note.unwrap_or({
325 InputNoteRecord::new(
326 details,
327 self.store.get_current_timestamp(),
328 ExpectedNoteState { metadata: None, after_block_num, tag }.into(),
329 )
330 });
331
332 match committed_notes_data.remove(¬e_record.id()) {
333 Some(Some((metadata, inclusion_proof))) => {
334 let mut current_partial_mmr = self.store.get_current_partial_mmr().await?;
338 let block_header = self
339 .get_and_store_authenticated_block(
340 inclusion_proof.location().block_num(),
341 &mut current_partial_mmr,
342 )
343 .await?;
344
345 let tag = metadata.tag();
346 let note_changed =
347 note_record.inclusion_proof_received(inclusion_proof, metadata)?;
348
349 if note_record.block_header_received(&block_header)? | note_changed {
350 self.store
351 .remove_note_tag(NoteTagRecord::with_note_source(tag, note_record.id()))
352 .await?;
353
354 note_records.push(Some(note_record));
355 } else {
356 note_records.push(None);
357 }
358 },
359 _ => {
360 note_records.push(Some(note_record));
361 },
362 }
363 }
364
365 Ok(note_records)
366 }
367
368 async fn check_expected_notes(
372 &mut self,
373 mut request_block_num: BlockNumber,
374 expected_notes: Vec<(NoteId, &NoteTag)>,
376 ) -> Result<BTreeMap<NoteId, Option<(NoteMetadata, NoteInclusionProof)>>, ClientError> {
377 let tracked_tags: BTreeSet<NoteTag> = expected_notes.iter().map(|(_, tag)| **tag).collect();
378 let mut retrieved_proofs = BTreeMap::new();
379 let current_block_num = self.get_sync_height().await?;
380 loop {
381 if request_block_num > current_block_num {
382 break;
383 }
384
385 let sync_notes =
386 self.rpc_api.sync_notes(request_block_num, None, &tracked_tags).await?;
387
388 for sync_note in sync_notes.notes {
389 if !expected_notes.iter().any(|(id, _)| id == sync_note.note_id()) {
390 continue;
391 }
392
393 let note_block_num = sync_notes.block_header.block_num();
396
397 if note_block_num > current_block_num {
398 break;
399 }
400
401 let note_inclusion_proof = NoteInclusionProof::new(
402 note_block_num,
403 sync_note.note_index(),
404 sync_note.inclusion_path().clone(),
405 )?;
406
407 retrieved_proofs.insert(
408 *sync_note.note_id(),
409 Some((sync_note.metadata(), note_inclusion_proof)),
410 );
411 }
412
413 if sync_notes.block_header.block_num() == sync_notes.chain_tip {
415 break;
416 }
417
418 request_block_num = sync_notes.block_header.block_num();
423 }
424 Ok(retrieved_proofs)
425 }
426}