use std::{collections::HashSet, fmt::Display, net::SocketAddr, path::PathBuf, sync::Arc};
use chrono::Utc;
use dashmap::DashMap;
use lazysort::SortedBy;
use prometheus_http_query::Client as PrometheusClient;
use serde::de::DeserializeOwned;
use snops_common::{
constant::ENV_AGENT_KEY,
node_targets::NodeTargets,
state::{
AgentId, AgentPeer, AgentState, EnvId, LatestBlockInfo, NetworkId, NodeType, StorageId,
},
util::OpaqueDebug,
};
use tokio::sync::{Mutex, Semaphore};
use tracing::info;
use super::{
snarkos_request::{self, reparse_json_env},
AddrMap, AgentClient, AgentPool, EnvMap, StorageMap,
};
use crate::{
cli::Cli,
db::Database,
env::{cache::NetworkCache, error::EnvRequestError, Environment, PortType},
error::StateError,
schema::storage::{LoadedStorage, STORAGE_DIR},
server::{error::StartError, prometheus::HttpsdResponse},
ReloadHandler,
};
lazy_static::lazy_static! {
pub(crate) static ref REST_CLIENT: reqwest::Client = reqwest::Client::new();
}
#[derive(Debug)]
pub struct GlobalState {
pub db: OpaqueDebug<Database>,
pub cli: Cli,
pub agent_key: Option<String>,
pub pool: AgentPool,
pub storage: StorageMap,
pub envs: EnvMap,
pub env_network_cache: OpaqueDebug<DashMap<EnvId, NetworkCache>>,
pub prom_httpsd: Mutex<HttpsdResponse>,
pub prometheus: OpaqueDebug<Option<PrometheusClient>>,
pub log_level_handler: ReloadHandler,
}
type RankedPeerItem = (
u32,
Option<LatestBlockInfo>,
Option<AgentId>,
Option<SocketAddr>,
);
impl GlobalState {
pub async fn load(
cli: Cli,
db: Database,
prometheus: Option<PrometheusClient>,
log_level_handler: ReloadHandler,
) -> Result<Arc<Self>, StartError> {
let storage_meta = db.storage.read_all();
let storage = StorageMap::default();
for ((network, id), meta) in storage_meta {
let loaded = match meta.load(&cli).await {
Ok(l) => l,
Err(e) => {
tracing::error!("Error loading storage from persistence {network}/{id}: {e}");
continue;
}
};
storage.insert((network, id), Arc::new(loaded));
}
let pool: DashMap<_, _> = db.agents.read_all().collect();
let state = Arc::new(Self {
cli,
agent_key: std::env::var(ENV_AGENT_KEY).ok(),
pool,
storage,
envs: EnvMap::default(),
prom_httpsd: Default::default(),
prometheus: OpaqueDebug(prometheus),
db: OpaqueDebug(db),
env_network_cache: Default::default(),
log_level_handler,
});
let env_meta = state.db.envs.read_all().collect::<Vec<_>>();
let num_cannons = env_meta.iter().map(|(_, e)| e.cannons.len()).sum();
let cannons_ready = Arc::new(Semaphore::const_new(num_cannons));
let cannons_ready_guard = Arc::clone(&cannons_ready);
let _cannons_guard = cannons_ready_guard
.acquire_many(num_cannons as u32)
.await
.unwrap();
for (id, meta) in env_meta.into_iter() {
let loaded = match meta
.load(Arc::clone(&state), Arc::clone(&cannons_ready))
.await
{
Ok(l) => l,
Err(e) => {
tracing::error!("Error loading storage from persistence {id}: {e}");
continue;
}
};
info!("loaded env {id} from persistence");
state.insert_env(id, Arc::new(loaded));
}
for mut entry in state.pool.iter_mut() {
let AgentState::Node(env, _) = entry.value().state() else {
continue;
};
if state.envs.contains_key(env) {
continue;
}
info!(
"setting agent {} to Inventory state due to missing env {env}",
entry.key()
);
entry.set_state(AgentState::Inventory);
let _ = state.db.agents.save(entry.key(), entry.value());
}
Ok(state)
}
pub fn storage_path(&self, network: NetworkId, storage_id: StorageId) -> PathBuf {
self.cli
.path
.join(STORAGE_DIR)
.join(network.to_string())
.join(storage_id.to_string())
}
pub async fn get_addr_map(
&self,
filter: Option<&HashSet<AgentId>>,
) -> Result<AddrMap, StateError> {
self.pool
.iter()
.filter(|agent| filter.is_none() || filter.is_some_and(|p| p.contains(&agent.id())))
.map(|agent| {
let addrs = agent
.addrs
.as_ref()
.ok_or_else(|| StateError::NoAddress(agent.id()))?;
Ok((agent.id(), addrs.clone()))
})
.collect()
}
pub fn get_client(&self, id: AgentId) -> Option<AgentClient> {
self.pool.get(&id)?.client_owned()
}
pub fn is_agent_node_online(&self, id: AgentId) -> bool {
let Some(agent) = self.pool.get(&id) else {
return false;
};
match agent.state() {
AgentState::Node(_, state) => state.online,
_ => false,
}
}
pub fn try_unload_storage(
&self,
network: NetworkId,
id: StorageId,
) -> Option<Arc<LoadedStorage>> {
if self
.envs
.iter()
.any(|e| e.storage.id == id && e.storage.network == network)
{
return None;
}
let (_, storage) = self.storage.remove(&(network, id))?;
if let Err(e) = self.db.storage.delete(&(network, id)) {
tracing::error!("[storage {network}.{id}] failed to delete persistence: {e}");
}
Some(storage)
}
pub fn insert_env(&self, env_id: EnvId, env: Arc<Environment>) {
self.envs.insert(env_id, env);
self.env_network_cache.insert(env_id, Default::default());
}
pub fn remove_env(&self, env_id: EnvId) -> Option<Arc<Environment>> {
self.env_network_cache.remove(&env_id);
self.envs.remove(&env_id).map(|(_, env)| env)
}
pub fn get_env(&self, id: EnvId) -> Option<Arc<Environment>> {
Some(Arc::clone(self.envs.get(&id)?.value()))
}
pub fn get_env_block_info(&self, id: EnvId) -> Option<LatestBlockInfo> {
self.env_network_cache
.get(&id)
.and_then(|cache| cache.latest.clone())
}
pub fn update_env_block_info(&self, id: EnvId, info: &LatestBlockInfo) -> bool {
let mut cache = self.env_network_cache.entry(id).or_default();
cache.update_latest_info(info)
}
pub fn get_scored_peers(&self, env_id: EnvId, target: &NodeTargets) -> Vec<RankedPeerItem> {
let Some(env) = self.get_env(env_id) else {
return Vec::new();
};
let cache = self.env_network_cache.get(&env_id);
let ext_infos = cache.as_ref().map(|c| &c.external_peer_infos);
let now = Utc::now();
env.matching_peers(target, &self.pool, PortType::Rest)
.filter_map(|(key, peer)| {
if key.ty == NodeType::Prover {
return None;
}
let agent_id = match peer {
AgentPeer::Internal(id, _) => id,
AgentPeer::External(addr) => {
return Some(if let Some(info) = ext_infos.and_then(|c| c.get(key)) {
(info.score(&now), Some(info.clone()), None, None)
} else {
(0u32, None, None, Some(addr))
});
}
};
let agent = self.pool.get(&agent_id)?;
if !matches!(agent.state(), AgentState::Node(_, _)) {
return None;
}
Some((
agent
.status
.block_info
.as_ref()
.map(|info| info.score(&now))
.unwrap_or_default(),
agent.status.block_info.clone(),
Some(agent_id),
agent.rest_addr(),
))
})
.collect()
}
pub async fn snarkos_get<T: DeserializeOwned + Clone>(
&self,
env_id: EnvId,
route: impl Display,
target: &NodeTargets,
) -> Result<T, EnvRequestError> {
let Some(env) = self.get_env(env_id) else {
return Err(EnvRequestError::MissingEnv(env_id));
};
let query_nodes = self.get_scored_peers(env_id, target);
if query_nodes.is_empty() {
return Err(EnvRequestError::NoMatchingNodes);
}
let route_str = route.to_string();
let prefix = snarkos_request::route_prefix_check(&route_str);
for (_, info, agent_id, addr) in query_nodes.into_iter().sorted_by(|a, b| a.0.cmp(&b.0)) {
if let (Some(prefix), Some(info)) = (prefix, info) {
use snarkos_request::RoutePrefix::*;
return match prefix {
StateRoot => reparse_json_env(info.state_root),
BlockHeight => reparse_json_env(info.height),
BlockHash => reparse_json_env(info.block_hash),
};
}
if let Some(agent_id) = agent_id {
if let Some(client) = self.get_client(agent_id) {
match client.snarkos_get::<T>(&route).await {
Ok(res) => return Ok(res),
Err(e) => {
tracing::error!("env {env_id} agent {agent_id} request failed: {e}");
continue;
}
}
}
}
let Some(addr) = addr else {
continue;
};
match snarkos_request::get_on_addr(env.network, &route_str, addr).await {
Ok(res) => return Ok(res),
Err(e) => {
tracing::error!("env {env_id} request to `{addr}{route_str}`: {e}");
continue;
}
}
}
Err(EnvRequestError::NoResponsiveNodes)
}
}