mod sync;
use std::path::Path;
use std::sync::{Arc, Mutex};
use rkyv::Deserialize;
use dusk_bytes::Serializable;
use dusk_core::BlsScalar;
use dusk_core::Error as ExecutionCoreError;
use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
use dusk_core::stake::{StakeData, StakeFundOwner, StakeKeys};
use dusk_core::transfer::Transaction;
use dusk_core::transfer::moonlight::AccountData;
use dusk_core::transfer::phoenix::{
ArchivedNoteLeaf, Note, NoteLeaf, NoteOpening, Prove,
PublicKey as PhoenixPublicKey,
};
use flume::Receiver;
use futures::executor::block_on;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::{Duration, sleep};
use wallet_core::keys::{
derive_phoenix_pk, derive_phoenix_sk, derive_phoenix_vk,
};
use wallet_core::pick_notes;
use zeroize::Zeroize;
use self::sync::sync_db;
use super::cache::Cache;
use crate::rues::HttpClient as RuesHttpClient;
use crate::store::LocalStore;
use crate::{Address, Error, MAX_PROFILES};
const TRANSFER_CONTRACT: &str =
"0100000000000000000000000000000000000000000000000000000000000000";
const STAKE_CONTRACT: &str =
"0200000000000000000000000000000000000000000000000000000000000000";
const SYNC_INTERVAL_SECONDS: u64 = 3;
pub const TREE_LEAF: usize = std::mem::size_of::<ArchivedNoteLeaf>();
pub struct Prover;
impl Prove for Prover {
fn prove(
&self,
tx_circuit_vec_bytes: &[u8],
) -> Result<Vec<u8>, ExecutionCoreError> {
Ok(tx_circuit_vec_bytes.to_vec())
}
}
pub struct State {
cache: Mutex<Arc<Cache>>,
status: fn(&str),
client: RuesHttpClient,
prover: RuesHttpClient,
store: LocalStore,
pub sync_rx: Option<Receiver<String>>,
sync_shutdown: Option<(Arc<Notify>, JoinHandle<()>)>,
}
impl State {
pub(crate) fn new(
data_dir: &Path,
status: fn(&str),
client: RuesHttpClient,
prover: RuesHttpClient,
store: LocalStore,
) -> Result<Self, Error> {
let cfs = (0..MAX_PROFILES)
.flat_map(|i| {
#[allow(clippy::cast_possible_truncation)]
let pk: PhoenixPublicKey =
derive_phoenix_pk(store.get_seed(), i as u8);
let pk = bs58::encode(pk.to_bytes()).into_string();
[pk.clone(), format!("spent_{pk}")]
})
.collect();
let cache = Mutex::new(Arc::new(Cache::new(data_dir, cfs, status)?));
Ok(Self {
cache,
sync_rx: None,
store,
prover,
status,
client,
sync_shutdown: None,
})
}
pub fn client(&self) -> &RuesHttpClient {
&self.client
}
pub async fn check_connection(&self) -> bool {
self.client.check_connection().await.is_ok()
}
pub(crate) fn cache(&self) -> Arc<Cache> {
let state = self.cache.lock();
match state {
Ok(guard) => Arc::clone(&guard),
Err(poisoned) => Arc::clone(&poisoned.into_inner()),
}
}
pub fn register_sync(&mut self) {
let (sync_tx, sync_rx) = flume::unbounded::<String>();
self.sync_rx = Some(sync_rx);
let cache = self.cache();
let client = self.client.clone();
let mut store = self.store.clone();
let shutdown = Arc::new(Notify::new());
let shutdown_signal = shutdown.clone();
let handle = tokio::spawn(async move {
tracing::debug!("Starting background sync loop");
loop {
tokio::select! {
biased;
() = shutdown_signal.notified() => break,
() = sleep(Duration::from_secs(SYNC_INTERVAL_SECONDS)) => {
let _ = sync_tx.send("Syncing..".to_string());
let _ = match sync_db(&client, &cache, &store, |_| {}).await {
Ok(()) => sync_tx.send("Syncing Complete".to_string()),
Err(e) => sync_tx.send(format!("Error during sync:.. {e}")),
};
}
}
}
store.inner_mut().zeroize();
tracing::debug!("Background sync loop stopped");
});
self.sync_shutdown = Some((shutdown, handle));
}
pub async fn sync(&self) -> Result<(), Error> {
sync_db(&self.client, &self.cache(), &self.store, self.status).await
}
pub async fn prove(&self, tx: Transaction) -> Result<Transaction, Error> {
let prover = &self.prover;
let mut tx = tx;
if let Transaction::Phoenix(utx) = &mut tx {
let status = self.status;
let proof = utx.proof();
status("Attempt to prove tx...");
let proof =
prover.call("prover", None, "prove", proof).await.map_err(
|e| ExecutionCoreError::PhoenixCircuit(e.to_string()),
)?;
utx.set_proof(proof);
status("Proving sucesss!");
}
Ok(tx)
}
pub async fn propagate(
&self,
tx: Transaction,
) -> Result<Transaction, Error> {
let status = self.status;
let tx_bytes = tx.to_var_bytes();
status("Attempt to preverify tx...");
let _ = self
.client
.call("transactions", None, "preverify", &tx_bytes)
.await?;
status("Preverify success!");
status("Propagating tx...");
let _ = self
.client
.call("transactions", None, "propagate", &tx_bytes)
.await?;
status("Transaction propagated!");
Ok(tx)
}
pub(crate) async fn tx_input_notes(
&self,
index: u8,
tx_cost: u64,
) -> Result<Vec<(Note, NoteOpening, BlsScalar)>, Error> {
let vk = derive_phoenix_vk(self.store().get_seed(), index);
let mut sk = derive_phoenix_sk(self.store().get_seed(), index);
let pk = derive_phoenix_pk(self.store().get_seed(), index);
let cached_notes: Vec<_> = self
.cache()
.notes(&pk)
.inspect_err(|_| sk.zeroize())?
.into_iter()
.map(|note_leaf| {
let nullifier = note_leaf.note.gen_nullifier(&sk);
(nullifier, note_leaf)
})
.collect();
sk.zeroize();
let tx_input_notes = pick_notes(&vk, cached_notes.into(), tx_cost);
if tx_input_notes.is_empty() {
return Err(Error::NotEnoughBalance);
}
let mut tx_input = Vec::<(Note, NoteOpening, BlsScalar)>::new();
for (nullifier, note_leaf) in &tx_input_notes {
let opening = self.fetch_opening(note_leaf.as_ref()).await?;
tx_input.push((note_leaf.note.clone(), opening, *nullifier));
}
Ok(tx_input)
}
pub(crate) async fn fetch_account(
&self,
pk: &BlsPublicKey,
) -> Result<AccountData, Error> {
let status = self.status;
status("Fetching account-data...");
let bytes = self
.client
.contract_query::<_, _, 1024>(TRANSFER_CONTRACT, "account", pk)
.await?;
let account: AccountData =
rkyv::check_archived_root::<AccountData>(&bytes)
.map_err(|_| Error::Rkyv)?
.deserialize(&mut rkyv::Infallible)
.unwrap();
status("account-data received!");
Ok(account)
}
pub(crate) fn fetch_notes(
&self,
pk: &PhoenixPublicKey,
) -> Result<Vec<NoteLeaf>, Error> {
self.cache().notes(pk).map(|set| set.into_iter().collect())
}
pub(crate) async fn fetch_root(&self) -> Result<BlsScalar, Error> {
let status = self.status;
status("Fetching root...");
let bytes = self
.client
.contract_query::<(), _, 0>(TRANSFER_CONTRACT, "root", &())
.await?;
let root: BlsScalar = rkyv::check_archived_root::<BlsScalar>(&bytes)
.map_err(|_| Error::Rkyv)?
.deserialize(&mut rkyv::Infallible)
.unwrap();
status("root received!");
Ok(root)
}
pub(crate) async fn fetch_stake(
&self,
pk: &BlsPublicKey,
) -> Result<Option<StakeData>, Error> {
let status = self.status;
status("Fetching stake...");
let bytes = self
.client
.contract_query::<_, _, 1024>(STAKE_CONTRACT, "get_stake", pk)
.await?;
let stake_data: Option<StakeData> =
rkyv::check_archived_root::<Option<StakeData>>(&bytes)
.map_err(|_| Error::Rkyv)?
.deserialize(&mut rkyv::Infallible)
.unwrap();
status("Stake received!");
println!("Staking address: {}", Address::Public(*pk));
Ok(stake_data)
}
pub(crate) async fn fetch_stake_owner(
&self,
pk: &BlsPublicKey,
) -> Result<Option<StakeFundOwner>, Error> {
let status = self.status;
status("Fetching stake owner...");
let bytes = self
.client
.contract_query::<_, _, 1024>(STAKE_CONTRACT, "get_stake_keys", pk)
.await?;
let stake_keys: Option<StakeKeys> =
rkyv::check_archived_root::<Option<StakeKeys>>(&bytes)
.map_err(|_| Error::Rkyv)?
.deserialize(&mut rkyv::Infallible)
.unwrap();
let stake_owner = stake_keys.map(|keys| keys.owner);
Ok(stake_owner)
}
pub(crate) fn store(&self) -> &LocalStore {
&self.store
}
pub(crate) async fn fetch_chain_id(&self) -> Result<u8, Error> {
let status = self.status;
status("Fetching chain_id...");
let bytes = self
.client
.contract_query::<_, _, { u8::SIZE }>(
TRANSFER_CONTRACT,
"chain_id",
&(),
)
.await?;
let chain_id: u8 = rkyv::check_archived_root::<u8>(&bytes)
.map_err(|_| Error::Rkyv)?
.deserialize(&mut rkyv::Infallible)
.unwrap();
status("Chain id received!");
Ok(chain_id)
}
async fn fetch_opening(&self, note: &Note) -> Result<NoteOpening, Error> {
let status = self.status;
status("Fetching note opening...");
let bytes = self
.client
.contract_query::<_, _, 1024>(
TRANSFER_CONTRACT,
"opening",
note.pos(),
)
.await?;
let opening: Option<NoteOpening> =
rkyv::check_archived_root::<Option<NoteOpening>>(&bytes)
.map_err(|_| Error::Rkyv)?
.deserialize(&mut rkyv::Infallible)
.unwrap();
let opening = opening.ok_or(Error::NoteNotFound)?;
status("Note opening received!");
Ok(opening)
}
pub async fn fetch_num_notes(&self) -> Result<u64, Error> {
let status = self.status;
status("Fetching note count...");
let bytes = self
.client
.contract_query::<_, _, { u64::SIZE }>(
TRANSFER_CONTRACT,
"num_notes",
&(),
)
.await?;
let note_count: u64 = rkyv::check_archived_root::<u64>(&bytes)
.map_err(|_| Error::Rkyv)?
.deserialize(&mut rkyv::Infallible)
.unwrap();
status("Latest note count received!");
Ok(note_count)
}
pub fn close(&mut self) {
self.cache().close();
let store = &mut self.store;
if let Some((shutdown, handle)) = self.sync_shutdown.take() {
shutdown.notify_one();
if let Err(e) = block_on(handle) {
eprintln!("Error while closing sync handle: {e}");
}
}
store.inner_mut().zeroize();
}
}