ic-query 0.1.12

Internet Computer query CLI for NNS, SNS, and related public network metadata
Documentation
use super::{
    RegistryFetchError, SUBNET_LIST_KEY, normalized_data_center_id, principal_text_from_raw,
    principal_text_from_required_raw,
    proto::{DataCenterRecord, NodeOperatorRecord, NodeRecord, SubnetListRecord, SubnetRecord},
    relations::{
        RegistryRelationInventory, RegistryRelationInventoryScope,
        assigned_node_principals_from_subnets, node_provider_counts_from_records,
    },
    subnet_record_key,
    transport::{decode_message, get_registry_value},
};
use candid::Principal;
use futures::{StreamExt, TryStreamExt, stream};
use ic_agent::Agent;
use std::collections::{BTreeMap, BTreeSet};

const NODE_RECORD_KEY_PREFIX: &str = "node_record_";
const NODE_OPERATOR_RECORD_KEY_PREFIX: &str = "node_operator_record_";
const DATA_CENTER_RECORD_KEY_PREFIX: &str = "data_center_record_";
const INVENTORY_FETCH_CONCURRENCY: usize = 32;

pub(super) async fn fetch_node_provider_node_counts(
    agent: &Agent,
    registry_canister: &Principal,
    registry_version: u64,
) -> Result<BTreeMap<String, u32>, RegistryFetchError> {
    let inventory = fetch_registry_relation_inventory(
        agent,
        registry_canister,
        registry_version,
        RegistryRelationInventoryScope::BaseRelations,
    )
    .await?;
    node_provider_counts_from_records(
        &inventory.node_principals,
        &inventory.node_records,
        &inventory.node_operator_records,
    )
}

pub(super) async fn fetch_registry_relation_inventory(
    agent: &Agent,
    registry_canister: &Principal,
    registry_version: u64,
    scope: RegistryRelationInventoryScope,
) -> Result<RegistryRelationInventory, RegistryFetchError> {
    let subnet_list_bytes =
        get_registry_value(agent, registry_canister, SUBNET_LIST_KEY, registry_version).await?;
    let subnet_list = decode_message::<SubnetListRecord>("SubnetListRecord", &subnet_list_bytes)?;
    if subnet_list.subnets.is_empty() {
        return Err(RegistryFetchError::EmptySubnetList);
    }

    let subnet_principals = subnet_list
        .subnets
        .iter()
        .map(|subnet_raw| principal_text_from_raw(subnet_raw, "subnet_list.subnets"))
        .collect::<Result<Vec<_>, _>>()?;
    let subnet_records = stream::iter(subnet_principals)
        .map(|subnet_principal| async move {
            let key = subnet_record_key(&subnet_principal);
            let record_bytes =
                get_registry_value(agent, registry_canister, &key, registry_version).await?;
            let record = decode_message::<SubnetRecord>("SubnetRecord", &record_bytes)?;
            Ok::<_, RegistryFetchError>((subnet_principal, record))
        })
        .buffer_unordered(INVENTORY_FETCH_CONCURRENCY)
        .try_collect::<BTreeMap<_, _>>()
        .await?;

    let node_principals = assigned_node_principals_from_subnets(&subnet_records)?;
    let node_records = stream::iter(node_principals.iter().cloned())
        .map(|node_principal| async move {
            let key = node_record_key(&node_principal);
            let record_bytes =
                get_registry_value(agent, registry_canister, &key, registry_version).await?;
            let record = decode_message::<NodeRecord>("NodeRecord", &record_bytes)?;
            Ok::<_, RegistryFetchError>((node_principal, record))
        })
        .buffer_unordered(INVENTORY_FETCH_CONCURRENCY)
        .try_collect::<BTreeMap<_, _>>()
        .await?;

    let mut node_operator_principals = BTreeSet::new();
    for record in node_records.values() {
        node_operator_principals.insert(principal_text_from_required_raw(
            &record.node_operator_id,
            "node_record.node_operator_id",
        )?);
    }

    let node_operator_records = stream::iter(node_operator_principals)
        .map(|node_operator_principal| async move {
            let key = node_operator_record_key(&node_operator_principal);
            let record_bytes =
                get_registry_value(agent, registry_canister, &key, registry_version).await?;
            let record = decode_message::<NodeOperatorRecord>("NodeOperatorRecord", &record_bytes)?;
            Ok::<_, RegistryFetchError>((node_operator_principal, record))
        })
        .buffer_unordered(INVENTORY_FETCH_CONCURRENCY)
        .try_collect::<BTreeMap<_, _>>()
        .await?;

    let data_center_records = match scope {
        RegistryRelationInventoryScope::BaseRelations => BTreeMap::new(),
        RegistryRelationInventoryScope::WithDataCenters => {
            fetch_data_center_records_for_inventory(
                agent,
                registry_canister,
                registry_version,
                &node_operator_records,
            )
            .await?
        }
    };

    Ok(RegistryRelationInventory {
        node_principals,
        node_records,
        node_operator_records,
        subnet_records,
        data_center_records,
    })
}

async fn fetch_data_center_records_for_inventory(
    agent: &Agent,
    registry_canister: &Principal,
    registry_version: u64,
    node_operator_records: &BTreeMap<String, NodeOperatorRecord>,
) -> Result<BTreeMap<String, DataCenterRecord>, RegistryFetchError> {
    let data_center_ids = node_operator_records
        .values()
        .filter_map(|record| normalized_data_center_id(&record.dc_id))
        .collect::<BTreeSet<_>>();
    stream::iter(data_center_ids)
        .map(|data_center_id| async move {
            let key = data_center_record_key(&data_center_id);
            let record_bytes =
                get_registry_value(agent, registry_canister, &key, registry_version).await?;
            let record = decode_message::<DataCenterRecord>("DataCenterRecord", &record_bytes)?;
            Ok::<_, RegistryFetchError>((data_center_id, record))
        })
        .buffer_unordered(INVENTORY_FETCH_CONCURRENCY)
        .try_collect::<BTreeMap<_, _>>()
        .await
}

fn node_record_key(node_principal: &str) -> String {
    format!("{NODE_RECORD_KEY_PREFIX}{node_principal}")
}

fn node_operator_record_key(node_operator_principal: &str) -> String {
    format!("{NODE_OPERATOR_RECORD_KEY_PREFIX}{node_operator_principal}")
}

fn data_center_record_key(data_center_id: &str) -> String {
    format!("{DATA_CENTER_RECORD_KEY_PREFIX}{data_center_id}")
}