use alloc::collections::{BTreeMap, BTreeSet};
use alloc::string::ToString;
use alloc::vec::Vec;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{
Note,
NoteDetails,
NoteFile,
NoteId,
NoteInclusionProof,
NoteMetadata,
NoteTag,
};
use miden_tx::auth::TransactionAuthenticator;
use crate::rpc::RpcError;
use crate::rpc::domain::note::FetchedNote;
use crate::store::input_note_states::ExpectedNoteState;
use crate::store::{InputNoteRecord, InputNoteState, NoteFilter};
use crate::sync::NoteTagRecord;
use crate::{Client, ClientError};
impl<AUTH> Client<AUTH>
where
AUTH: TransactionAuthenticator + Sync + 'static,
{
pub async fn import_notes(
&mut self,
note_files: &[NoteFile],
) -> Result<Vec<NoteId>, ClientError> {
let mut note_ids_map = BTreeMap::new();
for note_file in note_files {
let id = match ¬e_file {
NoteFile::NoteId(id) => *id,
NoteFile::NoteDetails { details, .. } => details.id(),
NoteFile::NoteWithProof(note, _) => note.id(),
};
note_ids_map.insert(id, note_file);
}
let note_ids: Vec<NoteId> = note_ids_map.keys().copied().collect();
let previous_notes: Vec<InputNoteRecord> =
self.get_input_notes(NoteFilter::List(note_ids)).await?;
let previous_notes_map: BTreeMap<NoteId, InputNoteRecord> =
previous_notes.into_iter().map(|note| (note.id(), note)).collect();
let mut requests_by_id = BTreeMap::new();
let mut requests_by_details = vec![];
let mut requests_by_proof = vec![];
for (note_id, note_file) in note_ids_map {
let previous_note = previous_notes_map.get(¬e_id).cloned();
if let Some(true) = previous_note.as_ref().map(InputNoteRecord::is_processing) {
return Err(ClientError::NoteImportError(format!(
"Can't overwrite note with id {note_id} as it's currently being processed",
)));
}
match note_file.clone() {
NoteFile::NoteId(id) => {
requests_by_id.insert(id, previous_note);
},
NoteFile::NoteDetails { details, after_block_num, tag } => {
requests_by_details.push((previous_note, details, after_block_num, tag));
},
NoteFile::NoteWithProof(note, inclusion_proof) => {
requests_by_proof.push((previous_note, note, inclusion_proof));
},
}
}
let mut imported_notes = vec![];
if !requests_by_id.is_empty() {
let notes_by_id = self.import_note_records_by_id(requests_by_id).await?;
imported_notes.extend(notes_by_id.values().cloned());
}
if !requests_by_details.is_empty() {
let notes_by_details = self.import_note_records_by_details(requests_by_details).await?;
imported_notes.extend(notes_by_details);
}
if !requests_by_proof.is_empty() {
let notes_by_proof = self.import_note_records_by_proof(requests_by_proof).await?;
imported_notes.extend(notes_by_proof);
}
let mut imported_note_ids = Vec::with_capacity(imported_notes.len());
for note in imported_notes.into_iter().flatten() {
imported_note_ids.push(note.id());
if let InputNoteState::Expected(ExpectedNoteState { tag: Some(tag), .. }) = note.state()
{
self.insert_note_tag(NoteTagRecord::with_note_source(*tag, note.id())).await?;
}
self.store.upsert_input_notes(&[note]).await?;
}
Ok(imported_note_ids)
}
async fn import_note_records_by_id(
&self,
notes: BTreeMap<NoteId, Option<InputNoteRecord>>,
) -> Result<BTreeMap<NoteId, Option<InputNoteRecord>>, ClientError> {
let note_ids = notes.keys().copied().collect::<Vec<_>>();
let fetched_notes =
self.rpc_api.get_notes_by_id(¬e_ids).await.map_err(|err| match err {
RpcError::NoteNotFound(note_id) => ClientError::NoteNotFoundOnChain(note_id),
err => ClientError::RpcError(err),
})?;
if fetched_notes.is_empty() {
return Err(ClientError::NoteImportError("No notes fetched from node".to_string()));
}
let mut note_records = BTreeMap::new();
let mut notes_to_request = vec![];
for fetched_note in fetched_notes {
let note_id = fetched_note.id();
let inclusion_proof = fetched_note.inclusion_proof().clone();
let previous_note =
notes.get(¬e_id).cloned().ok_or(ClientError::NoteImportError(format!(
"Failed to retrieve note with id {note_id} from node"
)))?;
if let Some(mut previous_note) = previous_note {
if previous_note
.inclusion_proof_received(inclusion_proof, fetched_note.metadata().clone())?
{
self.store.remove_note_tag((&previous_note).try_into()?).await?;
note_records.insert(note_id, Some(previous_note));
} else {
note_records.insert(note_id, None);
}
} else {
let fetched_note = match fetched_note {
FetchedNote::Public(note, _) => note,
FetchedNote::Private(..) => {
return Err(ClientError::NoteImportError(
"Incomplete imported note is private".to_string(),
));
},
};
let note_request = (previous_note, fetched_note, inclusion_proof);
notes_to_request.push(note_request);
}
}
if !notes_to_request.is_empty() {
let note_records_by_proof = self.import_note_records_by_proof(notes_to_request).await?;
for note_record in note_records_by_proof.iter().flatten().cloned() {
note_records.insert(note_record.id(), Some(note_record));
}
}
Ok(note_records)
}
pub(crate) async fn import_note_records_by_proof(
&self,
requested_notes: Vec<(Option<InputNoteRecord>, Note, NoteInclusionProof)>,
) -> Result<Vec<Option<InputNoteRecord>>, ClientError> {
let mut note_records = vec![];
let mut nullifier_requests = BTreeSet::new();
let mut lowest_block_height: BlockNumber = u32::MAX.into();
for (previous_note, note, inclusion_proof) in &requested_notes {
if let Some(previous_note) = previous_note {
nullifier_requests.insert(previous_note.nullifier());
if inclusion_proof.location().block_num() < lowest_block_height {
lowest_block_height = inclusion_proof.location().block_num();
}
} else {
nullifier_requests.insert(note.nullifier());
if inclusion_proof.location().block_num() < lowest_block_height {
lowest_block_height = inclusion_proof.location().block_num();
}
}
}
let nullifier_commit_heights = self
.rpc_api
.get_nullifier_commit_heights(nullifier_requests, lowest_block_height)
.await?;
for (previous_note, note, inclusion_proof) in requested_notes {
let metadata = note.metadata().clone();
let mut note_record = previous_note.unwrap_or(InputNoteRecord::new(
note.into(),
self.store.get_current_timestamp(),
ExpectedNoteState {
metadata: Some(metadata.clone()),
after_block_num: inclusion_proof.location().block_num(),
tag: Some(metadata.tag()),
}
.into(),
));
if let Some(Some(block_height)) = nullifier_commit_heights.get(¬e_record.nullifier())
{
if note_record.consumed_externally(note_record.nullifier(), *block_height, None)? {
note_records.push(Some(note_record));
}
note_records.push(None);
} else {
let block_height = inclusion_proof.location().block_num();
let current_block_num = self.get_sync_height().await?;
let tag = metadata.tag();
let mut note_changed =
note_record.inclusion_proof_received(inclusion_proof, metadata)?;
if block_height <= current_block_num {
let mut current_partial_mmr = self.store.get_current_partial_mmr().await?;
let block_header = self
.get_and_store_authenticated_block(block_height, &mut current_partial_mmr)
.await?;
note_changed |= note_record.block_header_received(&block_header)?;
} else {
self.insert_note_tag(NoteTagRecord::with_note_source(tag, note_record.id()))
.await?;
}
if note_changed {
note_records.push(Some(note_record));
} else {
note_records.push(None);
}
}
}
Ok(note_records)
}
async fn import_note_records_by_details(
&mut self,
requested_notes: Vec<(Option<InputNoteRecord>, NoteDetails, BlockNumber, Option<NoteTag>)>,
) -> Result<Vec<Option<InputNoteRecord>>, ClientError> {
let mut lowest_request_block: BlockNumber = u32::MAX.into();
let mut note_requests = vec![];
for (_, details, after_block_num, tag) in &requested_notes {
if let Some(tag) = tag {
note_requests.push((details.id(), tag));
if after_block_num < &lowest_request_block {
lowest_request_block = *after_block_num;
}
}
}
let mut committed_notes_data =
self.check_expected_notes(lowest_request_block, note_requests).await?;
let mut note_records = vec![];
for (previous_note, details, after_block_num, tag) in requested_notes {
let mut note_record = previous_note.unwrap_or({
InputNoteRecord::new(
details,
self.store.get_current_timestamp(),
ExpectedNoteState { metadata: None, after_block_num, tag }.into(),
)
});
match committed_notes_data.remove(¬e_record.id()) {
Some(Some((metadata, inclusion_proof))) => {
let mut current_partial_mmr = self.store.get_current_partial_mmr().await?;
let block_header = self
.get_and_store_authenticated_block(
inclusion_proof.location().block_num(),
&mut current_partial_mmr,
)
.await?;
let tag = metadata.tag();
let note_changed =
note_record.inclusion_proof_received(inclusion_proof, metadata)?;
if note_record.block_header_received(&block_header)? | note_changed {
self.store
.remove_note_tag(NoteTagRecord::with_note_source(tag, note_record.id()))
.await?;
note_records.push(Some(note_record));
} else {
note_records.push(None);
}
},
_ => {
note_records.push(Some(note_record));
},
}
}
Ok(note_records)
}
async fn check_expected_notes(
&mut self,
request_block_num: BlockNumber,
expected_notes: Vec<(NoteId, &NoteTag)>,
) -> Result<BTreeMap<NoteId, Option<(NoteMetadata, NoteInclusionProof)>>, ClientError> {
let tracked_tags: BTreeSet<NoteTag> = expected_notes.iter().map(|(_, tag)| **tag).collect();
let mut retrieved_proofs = BTreeMap::new();
let current_block_num = self.get_sync_height().await?;
if request_block_num > current_block_num {
return Ok(retrieved_proofs);
}
let sync_result = self
.rpc_api
.sync_notes_with_details(request_block_num, Some(current_block_num), &tracked_tags)
.await
.map_err(ClientError::RpcError)?;
for block in &sync_result.blocks {
if block.block_header.block_num() > current_block_num {
break;
}
for sync_note in block.notes.values() {
if !expected_notes.iter().any(|(id, _)| id == sync_note.note_id()) {
continue;
}
let metadata = sync_note
.metadata()
.cloned()
.expect("metadata should be available after sync_notes_with_details");
retrieved_proofs.insert(
*sync_note.note_id(),
Some((metadata, sync_note.inclusion_proof().clone())),
);
}
}
retrieved_proofs
.into_iter()
.map(|(note_id, data)| Ok((note_id, data)))
.collect()
}
}