use super::super::{SnsHostError, SnsNeuronRow, SnsNeuronsRefreshReport, SnsNeuronsRefreshRequest};
use super::super::{
live::LiveSnsSource,
lookup::{enforce_mainnet_network, lookup_request_from_parts, resolve_sns_lookup},
source::{MainnetSns, MainnetSnsList, SnsFetchRequest, SnsNeuronsSource},
};
use super::{
SNS_NEURONS_CACHE_SCHEMA_VERSION, SNS_NEURONS_REFRESH_REPORT_SCHEMA_VERSION,
attempt::{
SnsNeuronsAttemptParts, attempt_from_parts, failed_attempt_from_latest_progress,
write_sns_neurons_attempt,
},
collection::fetch_complete_sns_neurons,
errors::sns_cache_file_error,
model::{CompleteSnsNeurons, SnsNeuronsCache, SnsNeuronsCompleteness},
paths::SnsNeuronsCachePaths,
};
use crate::cache_file::{
RefreshLockRequest, create_parent_directory, with_refresh_lock, write_text_atomically,
};
const SNS_NEURONS_REFRESH_LOCK_STALE_AFTER_SECONDS: u64 = 30 * 60;
struct SnsNeuronsRefreshContext<'a> {
request: &'a SnsNeuronsRefreshRequest,
fetch_request: &'a SnsFetchRequest,
list: MainnetSnsList,
id: usize,
sns: MainnetSns,
paths: SnsNeuronsCachePaths,
replaced_existing_cache: bool,
}
pub fn refresh_sns_neurons_cache(
request: &SnsNeuronsRefreshRequest,
) -> Result<SnsNeuronsRefreshReport, SnsHostError> {
refresh_sns_neurons_cache_with_source(request, &LiveSnsSource)
}
pub(in crate::sns::report) fn refresh_sns_neurons_cache_with_source(
request: &SnsNeuronsRefreshRequest,
source: &dyn SnsNeuronsSource,
) -> Result<SnsNeuronsRefreshReport, SnsHostError> {
enforce_mainnet_network(&request.network)?;
let lookup_request = lookup_request_from_parts(
&request.network,
&request.source_endpoint,
request.now_unix_secs,
&request.input,
);
let lookup = resolve_sns_lookup(&lookup_request, source)?;
let paths = SnsNeuronsCachePaths::for_root(
&request.icp_root,
&request.network,
&lookup.sns.root_canister_id,
);
create_parent_directory(&paths.cache_path).map_err(sns_cache_file_error)?;
let replaced_existing_cache = paths.cache_path.is_file();
let context_paths = paths.clone();
let fetch_request = lookup.fetch_request;
let list = lookup.list;
let id = lookup.id;
let sns = lookup.sns;
with_refresh_lock(
RefreshLockRequest {
lock_path: &paths.lock_path,
target_path: &paths.cache_path,
network: &request.network,
now_unix_secs: request.now_unix_secs,
lock_stale_after_seconds: SNS_NEURONS_REFRESH_LOCK_STALE_AFTER_SECONDS,
},
sns_cache_file_error,
|| {
refresh_sns_neurons_cache_locked(
SnsNeuronsRefreshContext {
request,
fetch_request: &fetch_request,
list,
id,
sns,
paths: context_paths,
replaced_existing_cache,
},
source,
)
},
)
}
fn refresh_sns_neurons_cache_locked(
context: SnsNeuronsRefreshContext<'_>,
source: &dyn SnsNeuronsSource,
) -> Result<SnsNeuronsRefreshReport, SnsHostError> {
write_sns_neurons_attempt(
&context.paths.attempt_path,
&attempt_from_parts(SnsNeuronsAttemptParts {
request: context.request,
fetch_request: context.fetch_request,
sns: &context.sns,
status: "running",
pages_fetched: 0,
rows_fetched: 0,
last_cursor: None,
last_error: None,
}),
)?;
match fetch_complete_sns_neurons(
context.request,
context.fetch_request,
&context.sns,
source,
&context.paths.attempt_path,
) {
Ok(complete) => publish_complete_sns_neurons_cache(context, complete),
Err(err) => {
let _ = write_sns_neurons_attempt(
&context.paths.attempt_path,
&failed_attempt_from_latest_progress(
&context.paths.attempt_path,
context.request,
context.fetch_request,
&context.sns,
&err,
),
);
Err(err)
}
}
}
fn publish_complete_sns_neurons_cache(
context: SnsNeuronsRefreshContext<'_>,
complete: CompleteSnsNeurons,
) -> Result<SnsNeuronsRefreshReport, SnsHostError> {
let cache = sns_neurons_cache_from_parts(
&context.list,
context.id,
&context.sns,
context.request.page_size,
complete.page_count,
complete.neurons,
);
let neuron_count = cache.neurons.len();
let cache_json =
serde_json::to_string_pretty(&cache).map_err(|source| SnsHostError::SerializeCache {
path: context.paths.cache_path.clone(),
source,
})?;
write_text_atomically(&context.paths.cache_path, &cache_json).map_err(sns_cache_file_error)?;
write_sns_neurons_attempt(
&context.paths.attempt_path,
&attempt_from_parts(SnsNeuronsAttemptParts {
request: context.request,
fetch_request: context.fetch_request,
sns: &context.sns,
status: "complete",
pages_fetched: complete.page_count,
rows_fetched: neuron_count,
last_cursor: complete.last_cursor,
last_error: None,
}),
)?;
Ok(SnsNeuronsRefreshReport {
schema_version: SNS_NEURONS_REFRESH_REPORT_SCHEMA_VERSION,
network: context.list.network,
sns_wasm_canister_id: context.list.sns_wasm_canister_id,
fetched_at: context.list.fetched_at,
source_endpoint: context.list.source_endpoint,
fetched_by: context.list.fetched_by,
id: context.id,
name: context.sns.name,
root_canister_id: context.sns.root_canister_id,
governance_canister_id: context.sns.governance_canister_id,
cache_path: context.paths.cache_path.display().to_string(),
refresh_lock_path: context.paths.lock_path.display().to_string(),
refresh_attempt_path: context.paths.attempt_path.display().to_string(),
page_size: context.request.page_size,
page_count: complete.page_count,
neuron_count,
complete: true,
replaced_existing_cache: context.replaced_existing_cache,
wrote_cache: true,
})
}
fn sns_neurons_cache_from_parts(
list: &MainnetSnsList,
id: usize,
sns: &MainnetSns,
page_size: u32,
page_count: u32,
neurons: Vec<SnsNeuronRow>,
) -> SnsNeuronsCache {
SnsNeuronsCache {
schema_version: SNS_NEURONS_CACHE_SCHEMA_VERSION,
network: list.network.clone(),
sns_wasm_canister_id: list.sns_wasm_canister_id.clone(),
fetched_at: list.fetched_at.clone(),
source_endpoint: list.source_endpoint.clone(),
fetched_by: list.fetched_by.clone(),
id,
name: sns.name.clone(),
root_canister_id: sns.root_canister_id.clone(),
governance_canister_id: sns.governance_canister_id.clone(),
completeness: SnsNeuronsCompleteness {
status: "api_exhausted".to_string(),
page_size,
page_count,
row_count: neurons.len(),
point_in_time_guaranteed: false,
},
neurons,
}
}