libsession 0.1.7

Session messenger core library - cryptography, config management, networking
Documentation
//! Swarm ID computation and closest-swarm lookup.
//!
//! Maps public keys into swarm space via XOR-folding and finds the closest
//! swarm on the ID ring using wrapping distance.

use std::collections::HashMap;

use crate::network::key_types::X25519Pubkey;
use crate::network::service_node::ServiceNode;

/// Swarm identifier type (u64).
pub type SwarmId = u64;
/// Sentinel value indicating an invalid or unassigned swarm ID.
pub const INVALID_SWARM_ID: SwarmId = u64::MAX;

/// Maps a public key into swarm ID space by XOR-folding the 32 bytes into 8 bytes,
/// then interpreting as big-endian u64.
///
/// Mirrors `session::network::swarm::pubkey_to_swarm_space` from the C++ code.
pub fn pubkey_to_swarm_space(pk: &X25519Pubkey) -> SwarmId {
    let bytes = pk.as_bytes();
    let mut result: u64 = 0;

    // XOR four 8-byte chunks together
    for i in 0..4 {
        let mut buf = [0u8; 8];
        buf.copy_from_slice(&bytes[i * 8..(i + 1) * 8]);
        // In the C++ code, the bytes are memcpy'd into a u64 (native endian),
        // then after XOR the result is big_to_host. Since we read as native-endian
        // and then convert, we replicate by reading as little-endian on all platforms
        // and then at the end treating the result as big-endian.
        result ^= u64::from_ne_bytes(buf);
    }

    // The C++ does big_to_host_inplace on the XOR result
    u64::from_be(result)
}

/// Groups service nodes by their swarm_id and returns a sorted list of
/// (swarm_id, nodes) pairs.
pub fn generate_swarms(nodes: &[ServiceNode]) -> Vec<(SwarmId, Vec<ServiceNode>)> {
    let mut grouped: HashMap<SwarmId, Vec<ServiceNode>> = HashMap::new();

    for node in nodes {
        grouped.entry(node.swarm_id).or_default().push(node.clone());
    }

    let mut result: Vec<(SwarmId, Vec<ServiceNode>)> = grouped.into_iter().collect();
    result.sort_by_key(|(id, _)| *id);
    result
}

/// Finds the closest swarm to the given pubkey using the swarm ID ring.
///
/// Uses wrapping arithmetic distance, matching the C++ implementation.
pub fn get_swarm(
    swarm_pubkey: &X25519Pubkey,
    all_swarms: &[(SwarmId, Vec<ServiceNode>)],
) -> Option<(SwarmId, Vec<ServiceNode>)> {
    if all_swarms.is_empty() {
        return None;
    }

    if all_swarms.len() == 1 {
        return Some(all_swarms[0].clone());
    }

    let swarm_id = pubkey_to_swarm_space(swarm_pubkey);

    // Find the first swarm with id >= swarm_id (right boundary)
    let right_idx = all_swarms
        .iter()
        .position(|(id, _)| *id >= swarm_id);

    let right_idx = right_idx.unwrap_or(0); // wrap around if beyond end

    // Left is the one just before right (with wraparound)
    let left_idx = if right_idx == 0 {
        all_swarms.len() - 1
    } else {
        right_idx - 1
    };

    // Use wrapping distance
    let d_right = all_swarms[right_idx].0.wrapping_sub(swarm_id);
    let d_left = swarm_id.wrapping_sub(all_swarms[left_idx].0);

    let chosen = if d_right < d_left {
        right_idx
    } else {
        left_idx
    };

    Some(all_swarms[chosen].clone())
}

// ---------------------------------------------------------------------------
// Live swarm lookup
// ---------------------------------------------------------------------------

use std::time::Duration;

use crate::network::rpc;
use crate::network::snode_pool::SnodePool;
use crate::network::transport::{Transport, TransportError, TransportRequest};

/// Error from an async swarm fetch.
#[derive(Debug, thiserror::Error)]
pub enum SwarmFetchError {
    /// Transport-level error.
    #[error("transport: {0}")]
    Transport(#[from] TransportError),
    /// Snode returned a non-2xx status.
    #[error("snode returned status {0}")]
    BadStatus(u16),
    /// Response was not valid JSON or was malformed.
    #[error("parse: {0}")]
    Parse(String),
    /// The pool had no snodes to ask.
    #[error("snode pool is empty")]
    PoolEmpty,
}

/// Default request timeout for swarm lookups.
pub const DEFAULT_SWARM_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);

/// Queries a random snode from the pool for `get_snodes_for_pubkey`.
///
/// `pubkey_hex` is the target Session id (`05...` for users, `03...` for
/// closed groups). Returns the swarm — the list of snodes that host messages
/// for that pubkey.
pub async fn fetch_swarm<T: Transport>(
    pool: &SnodePool,
    transport: &T,
    pubkey_hex: &str,
) -> Result<Vec<ServiceNode>, SwarmFetchError> {
    let candidates = pool.get_unused_nodes(1, &[]);
    let snode = candidates
        .into_iter()
        .next()
        .ok_or(SwarmFetchError::PoolEmpty)?;

    let url = format!("https://{}/json_rpc", snode.to_https_string());
    let body = rpc::wrap_rpc_envelope(
        rpc::METHOD_GET_SWARM,
        rpc::build_get_swarm_params(pubkey_hex),
    );
    let body_bytes = serde_json::to_vec(&body)
        .map_err(|e| SwarmFetchError::Parse(format!("encode request: {e}")))?;

    let req = TransportRequest {
        url,
        method: "POST".to_string(),
        body: body_bytes,
        headers: vec![("Content-Type".to_string(), "application/json".to_string())],
        timeout: DEFAULT_SWARM_REQUEST_TIMEOUT,
        accept_invalid_certs: true,
    };

    let resp = transport.send_request(&req).await?;
    if resp.status_code < 200 || resp.status_code >= 300 {
        return Err(SwarmFetchError::BadStatus(resp.status_code));
    }

    parse_swarm_response(&resp.body)
}

/// Parses the response from `get_snodes_for_pubkey`.
///
/// Accepts both shapes seen in the wild:
/// * `{"snodes": [...]}` — direct shape used by snodes
/// * `{"result": {"snodes": [...]}}` — wrapped shape from some paths
fn parse_swarm_response(body: &[u8]) -> Result<Vec<ServiceNode>, SwarmFetchError> {
    let v: serde_json::Value = serde_json::from_slice(body)
        .map_err(|e| SwarmFetchError::Parse(format!("json: {e}")))?;

    let arr = v
        .get("snodes")
        .or_else(|| v.get("result").and_then(|r| r.get("snodes")))
        .and_then(|s| s.as_array())
        .ok_or_else(|| SwarmFetchError::Parse("missing snodes array".into()))?;

    let mut nodes = Vec::with_capacity(arr.len());
    for entry in arr {
        if let Ok(node) = ServiceNode::from_json(entry) {
            nodes.push(node);
        }
    }

    if nodes.is_empty() {
        return Err(SwarmFetchError::Parse("no parseable snodes".into()));
    }
    Ok(nodes)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::network::key_types::Ed25519Pubkey;

    fn make_node(swarm_id: SwarmId, hex_suffix: u8) -> ServiceNode {
        let mut pk_bytes = [0u8; 32];
        pk_bytes[31] = hex_suffix;
        ServiceNode {
            ed25519_pubkey: Ed25519Pubkey(pk_bytes),
            ip: [1, 2, 3, 4],
            https_port: 443,
            omq_port: 22000,
            storage_server_version: [2, 11, 0],
            swarm_id,
            requested_unlock_height: 0,
        }
    }

    #[test]
    fn test_generate_swarms() {
        let nodes = vec![
            make_node(100, 1),
            make_node(200, 2),
            make_node(100, 3),
            make_node(300, 4),
        ];

        let swarms = generate_swarms(&nodes);
        assert_eq!(swarms.len(), 3);
        // Should be sorted by swarm_id
        assert_eq!(swarms[0].0, 100);
        assert_eq!(swarms[0].1.len(), 2);
        assert_eq!(swarms[1].0, 200);
        assert_eq!(swarms[1].1.len(), 1);
        assert_eq!(swarms[2].0, 300);
        assert_eq!(swarms[2].1.len(), 1);
    }

    #[test]
    fn test_get_swarm_single() {
        let swarms = vec![(100, vec![make_node(100, 1)])];
        let pk = X25519Pubkey([0u8; 32]);
        let result = get_swarm(&pk, &swarms).unwrap();
        assert_eq!(result.0, 100);
    }

    #[test]
    fn test_get_swarm_empty() {
        let pk = X25519Pubkey([0u8; 32]);
        assert!(get_swarm(&pk, &[]).is_none());
    }

    fn swarm_response_body(ed: &str, ip: &str, port: u16) -> Vec<u8> {
        let body = serde_json::json!({
            "snodes": [{
                "public_ip": ip,
                "storage_port": port,
                "storage_lmq_port": 22000,
                "pubkey_x25519": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
                "pubkey_ed25519": ed,
                "storage_server_version": [2, 11, 0],
                "swarm_id": 12345
            }]
        });
        serde_json::to_vec(&body).unwrap()
    }

    #[tokio::test]
    async fn test_fetch_swarm_returns_snodes() {
        use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
        use crate::network::transport::MockTransport;

        let mut pool = SnodePool::new(SnodePoolConfig::default());
        pool.add_nodes(vec![make_node(100, 1)]);

        let t = MockTransport::new();
        t.route_ok_json(
            "json_rpc",
            swarm_response_body(
                "1f000f09a7b07828dcb72af7cd16857050c10c02bd58afb0e38111fb6cda1fef",
                "95.216.33.113",
                22100,
            ),
        );

        let snodes = fetch_swarm(
            &pool,
            &t,
            "05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
        )
        .await
        .unwrap();

        assert_eq!(snodes.len(), 1);
        assert_eq!(snodes[0].host(), "95.216.33.113");
    }

    #[tokio::test]
    async fn test_fetch_swarm_empty_pool_errors() {
        use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
        use crate::network::transport::MockTransport;

        let pool = SnodePool::new(SnodePoolConfig::default());
        let t = MockTransport::new();
        let r = fetch_swarm(
            &pool,
            &t,
            "05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
        )
        .await;
        assert!(matches!(r, Err(SwarmFetchError::PoolEmpty)));
    }

    #[tokio::test]
    async fn test_fetch_swarm_handles_wrapped_result() {
        use crate::network::snode_pool::{SnodePool, SnodePoolConfig};
        use crate::network::transport::MockTransport;

        let mut pool = SnodePool::new(SnodePoolConfig::default());
        pool.add_nodes(vec![make_node(100, 1)]);

        let t = MockTransport::new();
        let body = serde_json::json!({
            "result": {
                "snodes": [{
                    "public_ip": "95.216.33.113",
                    "storage_port": 22100,
                    "storage_lmq_port": 22000,
                    "pubkey_x25519": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
                    "pubkey_ed25519": "1f000f09a7b07828dcb72af7cd16857050c10c02bd58afb0e38111fb6cda1fef",
                    "storage_server_version": [2, 11, 0],
                    "swarm_id": 42
                }]
            }
        });
        t.route_ok_json("json_rpc", serde_json::to_vec(&body).unwrap());

        let snodes = fetch_swarm(
            &pool,
            &t,
            "05aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
        )
        .await
        .unwrap();
        assert_eq!(snodes.len(), 1);
    }
}