use super::{inspect_record::OutboundInspectValueResult, *};
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct RehydrateReport {
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
rehydrated: ValueSubkeyRangeSet,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct RehydrationRequest {
pub subkeys: ValueSubkeyRangeSet,
pub consensus_count: usize,
}
impl StorageManager {
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip_all)
)]
pub fn add_rehydration_request(
&self,
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
) {
let req = RehydrationRequest {
subkeys,
consensus_count,
};
veilid_log!(self debug "Adding rehydration request: {} {:?}", opaque_record_key, req);
let mut inner = self.inner.lock();
inner
.rehydration_requests
.entry(opaque_record_key)
.and_modify(|r| {
r.subkeys = r.subkeys.union(&req.subkeys);
r.consensus_count.max_assign(req.consensus_count);
})
.or_insert(req);
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip(self), ret)
)]
pub(super) async fn rehydrate_record(
&self,
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
) -> VeilidAPIResult<RehydrateReport> {
let Ok(_guard) = self.startup_lock.enter() else {
apibail_not_initialized!();
};
let local_record_store = self.get_local_record_store()?;
veilid_log!(self debug "Checking for record rehydration: {} {} @ consensus {}", opaque_record_key, subkeys, consensus_count);
let subkeys = if subkeys.is_empty() {
ValueSubkeyRangeSet::full()
} else {
subkeys
};
let peek_lock = self
.record_lock_table
.peek_lock(opaque_record_key.clone())
.await;
let safety_selection = {
let inner = self.inner.lock();
if inner
.outbound_transaction_manager
.get_transaction_by_record(&opaque_record_key)
.is_some()
{
apibail_try_again!("not rehydrating while records is in transaction");
}
if let Some(opened_record) = inner.opened_records.get(&opaque_record_key) {
opened_record.safety_selection()
} else {
let Some(safety_selection) = local_record_store
.with_record(&opaque_record_key, |rec| {
rec.detail().safety_selection.clone()
})?
else {
apibail_key_not_found!(opaque_record_key);
};
safety_selection
}
};
let local_inspect_result = self
.handle_inspect_local_values_with_peek_lock(&peek_lock, subkeys.clone(), true)
.await?;
if !self.dht_is_online() {
apibail_try_again!("offline, try again later");
};
let local_inspect_result = local_inspect_result.strip_none_seqs();
let outbound_inspect_result = self
.outbound_inspect_value(
&opaque_record_key,
local_inspect_result.subkeys().clone(),
safety_selection.clone(),
InspectResult::default(),
true,
)
.await?;
let results_iter = outbound_inspect_result
.inspect_result
.subkeys()
.iter()
.map(ValueSubkeyRangeSet::single)
.zip(
outbound_inspect_result
.subkey_fanout_results
.clone()
.into_iter(),
);
let existed = self.process_fanout_results(
opaque_record_key.clone(),
results_iter,
false,
self.config().network.dht.consensus_width as usize,
)?;
if !existed {
apibail_internal!(
"record was locked for inspect but is now missing: {}",
opaque_record_key
);
}
if outbound_inspect_result.inspect_result.subkeys().is_empty()
|| outbound_inspect_result
.inspect_result
.opt_descriptor()
.is_none()
{
return self.rehydrate_all_subkeys(
opaque_record_key,
subkeys,
consensus_count,
safety_selection,
local_inspect_result,
);
}
self.rehydrate_required_subkeys(
opaque_record_key,
subkeys,
consensus_count,
safety_selection,
local_inspect_result,
outbound_inspect_result,
)
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip(self), ret, err)
)]
pub(super) fn rehydrate_all_subkeys(
&self,
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
safety_selection: SafetySelection,
local_inspect_result: InspectResult,
) -> VeilidAPIResult<RehydrateReport> {
veilid_log!(self debug "Rehydrating all subkeys: record={} subkeys={}", opaque_record_key, subkeys);
let mut rehydrated = ValueSubkeyRangeSet::new();
for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() {
if local_inspect_result.seqs()[n].is_some() {
self.add_offline_subkey_write(
opaque_record_key.clone(),
subkey,
safety_selection.clone(),
);
rehydrated.insert(subkey);
}
}
if rehydrated.is_empty() {
veilid_log!(self debug "Record wanted full rehydrating, but no subkey data available: record={} subkeys={}", opaque_record_key, subkeys);
} else {
veilid_log!(self debug "Record full rehydrating: record={} subkeys={} rehydrated={}", opaque_record_key, subkeys, rehydrated);
}
Ok(RehydrateReport {
opaque_record_key,
subkeys,
consensus_count,
rehydrated,
})
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", target = "stor", skip(self), ret, err)
)]
pub(super) fn rehydrate_required_subkeys(
&self,
opaque_record_key: OpaqueRecordKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
safety_selection: SafetySelection,
local_inspect_result: InspectResult,
outbound_inspect_result: OutboundInspectValueResult,
) -> VeilidAPIResult<RehydrateReport> {
let mut rehydrated = ValueSubkeyRangeSet::new();
for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() {
let local_seq = local_inspect_result.seqs()[n];
if local_seq.is_none() {
apibail_internal!("None sequence number found in local inspect results. Should have been stripped by strip_none_seqs(): {:?}", local_inspect_result);
};
let mut rehydrate = false;
let network_seq = outbound_inspect_result.inspect_result.seqs()[n];
if local_seq > network_seq {
rehydrate = true;
} else {
let sfr = outbound_inspect_result
.subkey_fanout_results
.get(n)
.unwrap_or_log();
if sfr.consensus_nodes.len() < consensus_count {
rehydrate = true;
}
}
if rehydrate {
self.add_offline_subkey_write(
opaque_record_key.clone(),
subkey,
safety_selection.clone(),
);
rehydrated.insert(subkey);
}
}
if rehydrated.is_empty() {
veilid_log!(self debug "Record did not need rehydrating: record={} local_subkeys={}", opaque_record_key, local_inspect_result.subkeys());
} else {
veilid_log!(self debug "Record rehydrating: record={} local_subkeys={} rehydrated={}", opaque_record_key, local_inspect_result.subkeys(), rehydrated);
}
let results_iter = outbound_inspect_result
.inspect_result
.subkeys()
.iter()
.map(ValueSubkeyRangeSet::single)
.zip(outbound_inspect_result.subkey_fanout_results);
let existed = self.process_fanout_results(
opaque_record_key.clone(),
results_iter,
false,
self.config().network.dht.consensus_width as usize,
)?;
if !existed {
veilid_log!(self debug
"record was rehydrated but was deleted locally: {}",
opaque_record_key
);
}
Ok(RehydrateReport {
opaque_record_key,
subkeys,
consensus_count,
rehydrated,
})
}
}