use std::collections::HashMap;
use crate::network::key_types::X25519Pubkey;
use crate::network::service_node::ServiceNode;
pub type SwarmId = u64;
pub const INVALID_SWARM_ID: SwarmId = u64::MAX;
pub fn pubkey_to_swarm_space(pk: &X25519Pubkey) -> SwarmId {
let bytes = pk.as_bytes();
let mut result: u64 = 0;
for i in 0..4 {
let mut buf = [0u8; 8];
buf.copy_from_slice(&bytes[i * 8..(i + 1) * 8]);
result ^= u64::from_ne_bytes(buf);
}
u64::from_be(result)
}
pub fn generate_swarms(nodes: &[ServiceNode]) -> Vec<(SwarmId, Vec<ServiceNode>)> {
let mut grouped: HashMap<SwarmId, Vec<ServiceNode>> = HashMap::new();
for node in nodes {
grouped.entry(node.swarm_id).or_default().push(node.clone());
}
let mut result: Vec<(SwarmId, Vec<ServiceNode>)> = grouped.into_iter().collect();
result.sort_by_key(|(id, _)| *id);
result
}
pub fn get_swarm(
swarm_pubkey: &X25519Pubkey,
all_swarms: &[(SwarmId, Vec<ServiceNode>)],
) -> Option<(SwarmId, Vec<ServiceNode>)> {
if all_swarms.is_empty() {
return None;
}
if all_swarms.len() == 1 {
return Some(all_swarms[0].clone());
}
let swarm_id = pubkey_to_swarm_space(swarm_pubkey);
let right_idx = all_swarms
.iter()
.position(|(id, _)| *id >= swarm_id);
let right_idx = right_idx.unwrap_or(0);
let left_idx = if right_idx == 0 {
all_swarms.len() - 1
} else {
right_idx - 1
};
let d_right = all_swarms[right_idx].0.wrapping_sub(swarm_id);
let d_left = swarm_id.wrapping_sub(all_swarms[left_idx].0);
let chosen = if d_right < d_left {
right_idx
} else {
left_idx
};
Some(all_swarms[chosen].clone())
}
use std::time::Duration;
use crate::network::rpc;
use crate::network::snode_pool::SnodePool;
use crate::network::transport::{Transport, TransportError, TransportRequest};
#[derive(Debug, thiserror::Error)]
pub enum SwarmFetchError {
#[error("transport: {0}")]
Transport(#[from] TransportError),
#[error("snode returned status {0}")]
BadStatus(u16),
#[error("parse: {0}")]
Parse(String),
#[error("snode pool is empty")]
PoolEmpty,
}
pub const DEFAULT_SWARM_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
pub async fn fetch_swarm<T: Transport>(
pool: &SnodePool,
transport: &T,
pubkey_hex: &str,
) -> Result<Vec<ServiceNode>, SwarmFetchError> {
let candidates = pool.get_unused_nodes(1, &[]);
let snode = candidates
.into_iter()
.next()
.ok_or(SwarmFetchError::PoolEmpty)?;
let url = format!("https://{}/json_rpc", snode.to_https_string());
let body = rpc::wrap_rpc_envelope(
rpc::METHOD_GET_SWARM,
rpc::build_get_swarm_params(pubkey_hex),
);
let body_bytes = serde_json::to_vec(&body)
.map_err(|e| SwarmFetchError::Parse(format!("encode request: {e}")))?;
let req = TransportRequest {
url,
method: "POST".to_string(),
body: body_bytes,
headers: vec![("Content-Type".to_string(), "application/json".to_string())],
timeout: DEFAULT_SWARM_REQUEST_TIMEOUT,
accept_invalid_certs: true,
};
let resp = transport.send_request(&req).await?;
if resp.status_code < 200 || resp.status_code >= 300 {
return Err(SwarmFetchError::BadStatus(resp.status_code));
}
parse_swarm_response(&resp.body)
}
fn parse_swarm_response(body: &[u8]) -> Result<Vec<ServiceNode>, SwarmFetchError> {
let v: serde_json::Value = serde_json::from_slice(body)
.map_err(|e| SwarmFetchError::Parse(format!("json: {e}")))?;
let arr = v
.get("snodes")
.or_else(|| v.get("result").and_then(|r| r.get("snodes")))
.and_then(|s| s.as_array())
.ok_or_else(|| SwarmFetchError::Parse("missing snodes array".into()))?;
let mut nodes = Vec::with_capacity(arr.len());
for entry in arr {
if let Ok(node) = ServiceNode::from_json(entry) {
nodes.push(node);
}
}
if nodes.is_empty() {
return Err(SwarmFetchError::Parse("no parseable snodes".into()));
}
Ok(nodes)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::network::key_types::Ed25519Pubkey;
fn make_node(swarm_id: SwarmId, hex_suffix: u8) -> ServiceNode {
let mut pk_bytes = [0u8; 32];
pk_bytes[31] = hex_suffix;
ServiceNode {
ed25519_pubkey: Ed25519Pubkey(pk_bytes),
ip: [1, 2, 3, 4],
https_port: 443,
omq_port: 22000,
storage_server_version: [2, 11, 0],
swarm_id,
requested_unlock_height: 0,
}
}
#[test]
fn test_generate_swarms() {
let nodes = vec![
make_node(100, 1),
make_node(200, 2),
make_node(100, 3),
make_node(300, 4),
];
let swarms = generate_swarms(&nodes);
assert_eq!(swarms.len(), 3);
assert_eq!(swarms[0].0, 100);
assert_eq!(swarms[0].1.len(), 2);
assert_eq!(swarms[1].0, 200);
assert_eq!(swarms[1].1.len(), 1);
assert_eq!(swarms[2].0, 300);
assert_eq!(swarms[2].1.len(), 1);
}
#[test]
fn test_get_swarm_single() {
let swarms = vec![(100, vec![make_node(100, 1)])];
let pk = X25519Pubkey([0u8; 32]);
let result = get_swarm(&pk, &swarms).unwrap();
assert_eq!(result.0, 100);
}
#[test]
fn test_get_swarm_empty() {
let pk = X25519Pubkey([0u8; 32]);
assert!(get_swarm(&pk, &[]).is_none());
}
fn swarm_response_body(ed: &str, ip: &str, port: u16) -> Vec<u8> {
let body = serde_json::json!({
"snodes": [{
"public_ip": ip,
"storage_port": port,
"storage_lmq_port": 22000,
"pubkey_x25519": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"pubkey_ed25519": ed,
"storage_server_version": [2, 11, 0],
"swarm_id": 12345
}]
});
serde_json::to_vec(&body).unwrap()
}
#[tokio::test]
async fn test_fetch_swarm_returns_snodes() {
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::MockTransport;
let mut pool = SnodePool::new(SnodePoolConfig::default());
pool.add_nodes(vec![make_node(100, 1)]);
let t = MockTransport::new();
t.route_ok_json(
"json_rpc",
swarm_response_body(
"1f000f09a7b07828dcb72af7cd16857050c10c02bd58afb0e38111fb6cda1fef",
"95.216.33.113",
22100,
),
);
let snodes = fetch_swarm(
&pool,
&t,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await
.unwrap();
assert_eq!(snodes.len(), 1);
assert_eq!(snodes[0].host(), "95.216.33.113");
}
#[tokio::test]
async fn test_fetch_swarm_empty_pool_errors() {
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::MockTransport;
let pool = SnodePool::new(SnodePoolConfig::default());
let t = MockTransport::new();
let r = fetch_swarm(
&pool,
&t,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await;
assert!(matches!(r, Err(SwarmFetchError::PoolEmpty)));
}
#[tokio::test]
async fn test_fetch_swarm_handles_wrapped_result() {
use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
use crate::network::transport::MockTransport;
let mut pool = SnodePool::new(SnodePoolConfig::default());
pool.add_nodes(vec![make_node(100, 1)]);
let t = MockTransport::new();
let body = serde_json::json!({
"result": {
"snodes": [{
"public_ip": "95.216.33.113",
"storage_port": 22100,
"storage_lmq_port": 22000,
"pubkey_x25519": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"pubkey_ed25519": "1f000f09a7b07828dcb72af7cd16857050c10c02bd58afb0e38111fb6cda1fef",
"storage_server_version": [2, 11, 0],
"swarm_id": 42
}]
}
});
t.route_ok_json("json_rpc", serde_json::to_vec(&body).unwrap());
let snodes = fetch_swarm(
&pool,
&t,
"05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
)
.await
.unwrap();
assert_eq!(snodes.len(), 1);
}
}