use crate::error::Error;
use crate::events::{SyncCallback, SyncEvent};
use crate::index::master_index::MasterIndex;
use crate::index::{PadInfo, PadStatus};
use crate::internal_events::invoke_sync_callback;
use crate::network::client::Config;
use crate::network::{Network, NetworkError};
use crate::ops::utils::derive_master_index_info;
use ant_networking::GetRecordError;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use mutant_protocol::SyncResult;
use super::DATA_ENCODING_MASTER_INDEX;
pub(super) async fn sync(
index: Arc<RwLock<MasterIndex>>,
network: Arc<Network>,
force: bool,
sync_callback: Option<SyncCallback>,
) -> Result<SyncResult, Error> {
let mut sync_result = SyncResult {
nb_keys_added: 0,
nb_keys_updated: 0,
nb_free_pads_added: 0,
nb_pending_pads_added: 0,
};
let callback = sync_callback.clone();
invoke_sync_callback(&callback, SyncEvent::FetchingRemoteIndex)
.await
.unwrap();
let owner_secret_key_data = network.secret_key();
let (owner_address, owner_secret_key) =
derive_master_index_info(&owner_secret_key_data.to_hex())?;
let client_get = network
.get_client(Config::Get)
.await
.map_err(|e| Error::Network(NetworkError::ClientAccessError(e.to_string())))?;
let (remote_index, remote_index_counter) = match network
.get(&client_get, &owner_address, Some(&owner_secret_key))
.await
{
Ok(get_result) => {
let remote_index = if force {
MasterIndex::new(network.network_choice())
} else {
serde_cbor::from_slice(&get_result.data).unwrap()
};
(remote_index, get_result.counter)
}
Err(_e) => (MasterIndex::new(network.network_choice()), 0),
};
invoke_sync_callback(&callback, SyncEvent::Merging)
.await
.unwrap();
let mut local_index = index.write().await;
if !force {
for (key, remote_entry) in remote_index.list() {
let local_entry = local_index.get_entry(&key);
if local_entry.is_none() {
local_index.add_entry(&key, remote_entry.clone())?; sync_result.nb_keys_added += 1;
} else {
if local_index.update_entry(&key, remote_entry.clone())? {
sync_result.nb_keys_updated += 1;
}
}
}
let mut free_pads_to_add = Vec::new();
let mut pending_pads_to_add = Vec::new();
for pad in remote_index.export_raw_pads_private_key()? {
if local_index.pad_exists(&pad.address) {
continue;
}
if pad.status == PadStatus::Generated {
pending_pads_to_add.push(pad);
sync_result.nb_pending_pads_added += 1;
} else {
free_pads_to_add.push(pad);
sync_result.nb_free_pads_added += 1;
}
}
local_index.import_raw_pads_private_key(free_pads_to_add)?;
local_index.import_raw_pads_private_key(pending_pads_to_add)?;
}
let serialized_index = serde_cbor::to_vec(&*local_index).unwrap(); drop(local_index);
let client_put = network
.get_client(Config::Put)
.await
.map_err(|e| Error::Network(NetworkError::ClientAccessError(e.to_string())))?;
invoke_sync_callback(&callback, SyncEvent::PushingRemoteIndex)
.await
.unwrap();
let pad_info = PadInfo {
address: owner_address,
status: PadStatus::Confirmed,
chunk_index: 0,
size: serialized_index.len(),
last_known_counter: remote_index_counter + 1,
sk_bytes: owner_secret_key.to_bytes().to_vec(),
checksum: 0,
};
network
.put(
&client_put,
&pad_info,
&serialized_index,
DATA_ENCODING_MASTER_INDEX,
false,
)
.await?;
let client_verify = network
.get_client(Config::Get)
.await
.map_err(|e| Error::Network(NetworkError::ClientAccessError(e.to_string())))?;
invoke_sync_callback(&callback, SyncEvent::VerifyingRemoteIndex)
.await
.unwrap();
let mut retries = 20;
loop {
match network
.get(&client_verify, &owner_address, Some(&owner_secret_key))
.await
{
Ok(get_result) => {
if get_result.data != serialized_index {
} else if get_result.counter != remote_index_counter + 1 {
} else {
break Ok(());
}
}
Err(_e) => {}
};
if retries == 0 {
break Err(Error::Network(
NetworkError::GetError(GetRecordError::RecordNotFound).into(),
));
}
retries -= 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}?;
invoke_sync_callback(&callback, SyncEvent::Complete)
.await
.unwrap();
Ok(sync_result)
}