use std::collections::HashMap;
use crate::network::key_types::X25519Pubkey;
use crate::network::service_node::ServiceNode;
pub type SwarmId = u64;
pub const INVALID_SWARM_ID: SwarmId = u64::MAX;
pub fn pubkey_to_swarm_space(pk: &X25519Pubkey) -> SwarmId {
let bytes = pk.as_bytes();
let mut result: u64 = 0;
for i in 0..4 {
let mut buf = [0u8; 8];
buf.copy_from_slice(&bytes[i * 8..(i + 1) * 8]);
result ^= u64::from_ne_bytes(buf);
}
u64::from_be(result)
}
pub fn generate_swarms(nodes: &[ServiceNode]) -> Vec<(SwarmId, Vec<ServiceNode>)> {
let mut grouped: HashMap<SwarmId, Vec<ServiceNode>> = HashMap::new();
for node in nodes {
grouped.entry(node.swarm_id).or_default().push(node.clone());
}
let mut result: Vec<(SwarmId, Vec<ServiceNode>)> = grouped.into_iter().collect();
result.sort_by_key(|(id, _)| *id);
result
}
pub fn get_swarm(
swarm_pubkey: &X25519Pubkey,
all_swarms: &[(SwarmId, Vec<ServiceNode>)],
) -> Option<(SwarmId, Vec<ServiceNode>)> {
if all_swarms.is_empty() {
return None;
}
if all_swarms.len() == 1 {
return Some(all_swarms[0].clone());
}
let swarm_id = pubkey_to_swarm_space(swarm_pubkey);
let right_idx = all_swarms
.iter()
.position(|(id, _)| *id >= swarm_id);
let right_idx = right_idx.unwrap_or(0);
let left_idx = if right_idx == 0 {
all_swarms.len() - 1
} else {
right_idx - 1
};
let d_right = all_swarms[right_idx].0.wrapping_sub(swarm_id);
let d_left = swarm_id.wrapping_sub(all_swarms[left_idx].0);
let chosen = if d_right < d_left {
right_idx
} else {
left_idx
};
Some(all_swarms[chosen].clone())
}
use std::time::Duration;
use crate::network::routing::onion_request::{OnionRequestRouter, OnionRouteError};
use crate::network::routing::path_manager::{PathCategory, PathManager};
use crate::network::rpc;
use crate::network::snode_pool::SnodePool;
use crate::network::transport::{Transport, TransportError, TransportRequest};
use crate::network::types::NetworkDestination;
#[derive(Debug, thiserror::Error)]
pub enum SwarmFetchError {
#[error("transport: {0}")]
Transport(#[from] TransportError),
#[error("snode returned status {0}")]
BadStatus(u16),
#[error("parse: {0}")]
Parse(String),
#[error("snode pool is empty")]
PoolEmpty,
#[error("onion: {0}")]
Onion(Box<OnionRouteError>),
}
pub const DEFAULT_SWARM_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
pub async fn fetch_swarm<T: Transport>(
pool: &SnodePool,
transport: &T,
pubkey_hex: &str,
) -> Result<Vec<ServiceNode>, SwarmFetchError> {
let candidates = pool.get_unused_nodes(1, &[]);
let snode = candidates
.into_iter()
.next()
.ok_or(SwarmFetchError::PoolEmpty)?;
let url = format!("https://{}/storage_rpc/v1", snode.to_https_string());
let body = rpc::wrap_rpc_envelope(
rpc::METHOD_GET_SWARM,
rpc::build_get_swarm_params(pubkey_hex),
);
let body_bytes = serde_json::to_vec(&body)
.map_err(|e| SwarmFetchError::Parse(format!("encode request: {e}")))?;
let req = TransportRequest {
url,
method: "POST".to_string(),
body: body_bytes,
headers: vec![("Content-Type".to_string(), "application/json".to_string())],
timeout: DEFAULT_SWARM_REQUEST_TIMEOUT,
accept_invalid_certs: true,
};
let resp = transport.send_request(&req).await?;
if resp.status_code < 200 || resp.status_code >= 300 {
return Err(SwarmFetchError::BadStatus(resp.status_code));
}
parse_swarm_response(&resp.body)
}
fn parse_swarm_response(body: &[u8]) -> Result<Vec<ServiceNode>, SwarmFetchError> {
let v: serde_json::Value = serde_json::from_slice(body)
.map_err(|e| SwarmFetchError::Parse(format!("json: {e}")))?;
let arr = v
.get("snodes")
.or_else(|| v.get("result").and_then(|r| r.get("snodes")))
.and_then(|s| s.as_array())
.ok_or_else(|| SwarmFetchError::Parse("missing snodes array".into()))?;
let mut nodes = Vec::with_capacity(arr.len());
for entry in arr {
if let Ok(node) = ServiceNode::from_json(entry) {
nodes.push(node);
}
}
if nodes.is_empty() {
return Err(SwarmFetchError::Parse("no parseable snodes".into()));
}
Ok(nodes)
}
pub async fn fetch_swarm_via_onion<T: Transport>(
router: &OnionRequestRouter,
transport: &T,
pool: &SnodePool,
paths: &mut PathManager,
pubkey_hex: &str,
) -> Result<Vec<ServiceNode>, SwarmFetchError> {
let mut candidates = pool.get_unused_nodes(1, &[]);
let dest_snode = candidates.pop().ok_or(SwarmFetchError::PoolEmpty)?;
let destination = NetworkDestination::ServiceNode(dest_snode);
let params = rpc::build_get_swarm_params(pubkey_hex);
let body_bytes = serde_json::to_vec(¶ms)
.map_err(|e| SwarmFetchError::Parse(format!("encode: {e}")))?;
let resp = router
.send(
transport,
pool,
paths,
PathCategory::Standard,
&destination,
rpc::METHOD_GET_SWARM,
&body_bytes,
)
.await
.map_err(|e| SwarmFetchError::Onion(Box::new(e)))?;
if resp.status_code < 200 || resp.status_code >= 300 {
return Err(SwarmFetchError::BadStatus(resp.status_code.max(0) as u16));
}
let body_str = resp.body.as_deref().unwrap_or("");
parse_swarm_response(body_str.as_bytes())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::network::key_types::Ed25519Pubkey;
fn make_node(swarm_id: SwarmId, hex_suffix: u8) -> ServiceNode {
let mut pk_bytes = [0u8; 32];
pk_bytes[31] = hex_suffix;
ServiceNode {
ed25519_pubkey: Ed25519Pubkey(pk_bytes),
ip: [1, 2, 3, 4],
https_port: 443,
omq_port: 22000,
storage_server_version: [2, 11, 0],
swarm_id,
requested_unlock_height: 0,
}
}
#[test]
fn test_generate_swarms() {
let nodes = vec![
make_node(100, 1),
make_node(200, 2),
make_node(100, 3),
make_node(300, 4),
];
let swarms = generate_swarms(&nodes);
assert_eq!(swarms.len(), 3);
assert_eq!(swarms[0].0, 100);
assert_eq!(swarms[0].1.len(), 2);
assert_eq!(swarms[1].0, 200);
assert_eq!(swarms[1].1.len(), 1);
assert_eq!(swarms[2].0, 300);
assert_eq!(swarms[2].1.len(), 1);
}
#[test]
fn test_get_swarm_single() {
let swarms = vec![(100, vec![make_node(100, 1)])];
let pk = X25519Pubkey([0u8; 32]);
let result = get_swarm(&pk, &swarms).unwrap();
assert_eq!(result.0, 100);
}
#[test]
fn test_get_swarm_empty() {
let pk = X25519Pubkey([0u8; 32]);
assert!(get_swarm(&pk, &[]).is_none());
}
fn swarm_response_body(ed: &str, ip: &str, port: u16) -> Vec<u8> {
let body = serde_json::json!({
"snodes": [{
"public_ip": ip,
"storage_port": port,
"storage_lmq_port": 22000,
"pubkey_x25519": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"pubkey_ed25519": ed,
"storage_server_version": [2, 11, 0],
"swarm_id": 12345
}]
});
serde_json::to_vec(&body).unwrap()
}
#[tokio::test]
async fn test_fetch_swarm_returns_snodes() {
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::MockTransport;
let mut pool = SnodePool::new(SnodePoolConfig::default());
pool.add_nodes(vec![make_node(100, 1)]);
let t = MockTransport::new();
t.route_ok_json(
"storage_rpc",
swarm_response_body(
"1f000f09a7b07828dcb72af7cd16857050c10c02bd58afb0e38111fb6cda1fef",
"95.216.33.113",
22100,
),
);
let snodes = fetch_swarm(
&pool,
&t,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await
.unwrap();
assert_eq!(snodes.len(), 1);
assert_eq!(snodes[0].host(), "95.216.33.113");
}
#[tokio::test]
async fn test_fetch_swarm_empty_pool_errors() {
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::MockTransport;
let pool = SnodePool::new(SnodePoolConfig::default());
let t = MockTransport::new();
let r = fetch_swarm(
&pool,
&t,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await;
assert!(matches!(r, Err(SwarmFetchError::PoolEmpty)));
}
#[tokio::test]
async fn test_fetch_swarm_handles_wrapped_result() {
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::MockTransport;
let mut pool = SnodePool::new(SnodePoolConfig::default());
pool.add_nodes(vec![make_node(100, 1)]);
let t = MockTransport::new();
let body = serde_json::json!({
"result": {
"snodes": [{
"public_ip": "95.216.33.113",
"storage_port": 22100,
"storage_lmq_port": 22000,
"pubkey_x25519": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"pubkey_ed25519": "1f000f09a7b07828dcb72af7cd16857050c10c02bd58afb0e38111fb6cda1fef",
"storage_server_version": [2, 11, 0],
"swarm_id": 42
}]
}
});
t.route_ok_json("storage_rpc", serde_json::to_vec(&body).unwrap());
let snodes = fetch_swarm(
&pool,
&t,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await
.unwrap();
assert_eq!(snodes.len(), 1);
}
fn make_real_node(seed_byte: u8) -> ServiceNode {
use crate::crypto::ed25519 as ed;
let mut seed = [0u8; 32];
seed[0] = seed_byte;
if seed_byte == 0 {
seed[1] = 1;
}
let (pk, _sk) = ed::ed25519_key_pair_from_seed(&seed).unwrap();
ServiceNode {
ed25519_pubkey: Ed25519Pubkey(pk),
ip: [10, 0, 0, seed_byte],
https_port: 443,
omq_port: 22000,
storage_server_version: [2, 11, 0],
swarm_id: INVALID_SWARM_ID,
requested_unlock_height: 0,
}
}
#[tokio::test]
async fn test_fetch_swarm_via_onion_pool_empty_errors() {
use crate::network::routing::onion_request::{
OnionRequestRouter, OnionRequestRouterConfig,
};
use crate::network::routing::path_manager::{PathManager, PathManagerConfig};
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::MockTransport;
let pool = SnodePool::new(SnodePoolConfig::default());
let mut paths = PathManager::new(PathManagerConfig::default());
let router = OnionRequestRouter::new(OnionRequestRouterConfig::default());
let t = MockTransport::new();
let r = fetch_swarm_via_onion(
&router,
&t,
&pool,
&mut paths,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await;
assert!(matches!(r, Err(SwarmFetchError::PoolEmpty)));
}
#[tokio::test]
async fn test_fetch_swarm_via_onion_guard_502_surfaces_as_onion_error() {
use crate::network::routing::onion_request::{
OnionRequestRouter, OnionRequestRouterConfig,
};
use crate::network::routing::path_manager::{PathCategory, PathManager, PathManagerConfig};
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::{MockRoute, MockTransport, TransportResponse};
let mut pool = SnodePool::new(SnodePoolConfig::default());
pool.add_nodes((1..=20).map(make_real_node).collect());
let mut paths = PathManager::new(PathManagerConfig {
target_paths_per_category: 1,
path_strike_threshold: 1, });
paths.build_up_to_target(PathCategory::Standard, &pool).unwrap();
let router = OnionRequestRouter::new(OnionRequestRouterConfig::default());
let t = MockTransport::new();
t.route(MockRoute {
url_contains: "onion_req".into(),
body_contains: None,
response: TransportResponse {
status_code: 502,
body: b"no".to_vec(),
headers: Vec::new(),
},
});
let r = fetch_swarm_via_onion(
&router,
&t,
&pool,
&mut paths,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await;
assert!(matches!(r, Err(SwarmFetchError::Onion(_))));
}
}