hashtree-network 0.2.46

Mesh networking stack for hashtree: routing, signaling, peer links, and stores
Documentation
use anyhow::Result;
use async_trait::async_trait;
use nostr_sdk::nostr::{Event, Filter};
use std::sync::Arc;
use std::time::Duration;

use crate::local_bus::SharedLocalNostrBus;
use crate::root_events::{
    build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
    root_event_from_peer, PeerRootEvent,
};
use crate::types::{
    decrement_htl_with_policy, should_forward_htl, MeshNostrFrame, PeerHTLConfig, MESH_EVENT_POLICY,
};

#[async_trait]
pub trait MeshSession: Send + Sync {
    fn is_ready(&self) -> bool;
    fn is_connected(&self) -> bool;
    fn htl_config(&self) -> PeerHTLConfig;

    async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>>;

    async fn query_nostr_events(
        &self,
        filters: Vec<Filter>,
        timeout: Duration,
    ) -> Result<Vec<Event>>;

    async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()>;

    async fn close(&self) -> Result<()>;

    fn transport_debug_state(&self) -> Option<String> {
        None
    }
}

pub async fn resolve_root_from_peer_sessions(
    peer_refs: Vec<(String, Arc<dyn MeshSession>)>,
    owner_pubkey: &str,
    tree_name: &str,
    per_peer_timeout: Duration,
) -> Option<PeerRootEvent> {
    let filter = build_root_filter(owner_pubkey, tree_name)?;

    for (peer_label, peer) in peer_refs {
        if !peer.is_ready() {
            continue;
        }

        let events = match peer
            .query_nostr_events(vec![filter.clone()], per_peer_timeout)
            .await
        {
            Ok(events) => events,
            Err(_) => continue,
        };

        let latest = pick_latest_event(events.iter().filter(|event| {
            hashtree_event_identifier(event).as_deref() == Some(tree_name)
                && is_hashtree_labeled_event(event)
        }));
        if let Some(event) = latest {
            if let Some(root) = root_event_from_peer(event, &peer_label, tree_name) {
                return Some(root);
            }
        }
    }

    None
}

pub async fn resolve_root_from_local_buses_with_source(
    buses: Vec<SharedLocalNostrBus>,
    owner_pubkey: &str,
    tree_name: &str,
    timeout: Duration,
) -> Option<(&'static str, PeerRootEvent)> {
    for bus in buses {
        if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
            return Some((bus.source_name(), root));
        }
    }
    None
}

pub async fn forward_mesh_frame_to_sessions(
    sessions: Vec<(String, Arc<dyn MeshSession>)>,
    frame: &MeshNostrFrame,
    exclude_peer_id: Option<&str>,
) -> usize {
    let mut forwarded = 0usize;

    for (peer_id, session) in sessions {
        if exclude_peer_id
            .map(|exclude| exclude == peer_id.as_str())
            .unwrap_or(false)
        {
            continue;
        }
        if !session.is_ready() {
            continue;
        }

        let next_htl =
            decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &session.htl_config());
        if !should_forward_htl(next_htl) {
            continue;
        }

        let mut outbound = frame.clone();
        outbound.htl = next_htl;
        if session.send_mesh_frame_text(&outbound).await.is_ok() {
            forwarded += 1;
        }
    }

    forwarded
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::local_bus::LocalNostrBus;
    use crate::root_events::{HASHTREE_KIND, HASHTREE_LABEL};
    use crate::types::{MESH_DEFAULT_HTL, NOSTR_KIND_HASHTREE};
    use nostr_sdk::nostr::{EventBuilder, Keys, Kind, Tag, Timestamp};
    use std::sync::atomic::{AtomicBool, Ordering};
    use tokio::sync::Mutex;

    struct TestSession {
        ready: bool,
        connected: bool,
        htl_config: PeerHTLConfig,
        query_events: Mutex<Vec<Event>>,
        sent_frames: Mutex<Vec<MeshNostrFrame>>,
        closed: AtomicBool,
    }

    impl TestSession {
        fn with_events(events: Vec<Event>) -> Arc<Self> {
            Arc::new(Self {
                ready: true,
                connected: true,
                htl_config: PeerHTLConfig::from_flags(false, false),
                query_events: Mutex::new(events),
                sent_frames: Mutex::new(Vec::new()),
                closed: AtomicBool::new(false),
            })
        }

        fn with_htl_config(htl_config: PeerHTLConfig) -> Arc<Self> {
            Arc::new(Self {
                ready: true,
                connected: true,
                htl_config,
                query_events: Mutex::new(Vec::new()),
                sent_frames: Mutex::new(Vec::new()),
                closed: AtomicBool::new(false),
            })
        }
    }

    #[async_trait]
    impl MeshSession for TestSession {
        fn is_ready(&self) -> bool {
            self.ready
        }

        fn is_connected(&self) -> bool {
            self.connected
        }

        fn htl_config(&self) -> PeerHTLConfig {
            self.htl_config
        }

        async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
            Ok(None)
        }

        async fn query_nostr_events(
            &self,
            _filters: Vec<Filter>,
            _timeout: Duration,
        ) -> Result<Vec<Event>> {
            Ok(self.query_events.lock().await.clone())
        }

        async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
            self.sent_frames.lock().await.push(frame.clone());
            Ok(())
        }

        async fn close(&self) -> Result<()> {
            self.closed.store(true, Ordering::Relaxed);
            Ok(())
        }
    }

    struct TestLocalBus {
        source: &'static str,
        root: Option<PeerRootEvent>,
    }

    #[async_trait]
    impl LocalNostrBus for TestLocalBus {
        fn source_name(&self) -> &'static str {
            self.source
        }

        async fn broadcast_event(&self, _event: &Event) -> Result<()> {
            Ok(())
        }

        async fn query_root(
            &self,
            _owner_pubkey: &str,
            _tree_name: &str,
            _timeout: Duration,
        ) -> Option<PeerRootEvent> {
            self.root.clone()
        }
    }

    fn build_root_event(keys: &Keys, tree_name: &str, hash_hex: &str, created_at: u64) -> Event {
        EventBuilder::new(
            Kind::Custom(HASHTREE_KIND),
            "",
            [
                Tag::parse(&["d", tree_name]).expect("d tag"),
                Tag::parse(&["l", HASHTREE_LABEL]).expect("label tag"),
                Tag::parse(&["hash", hash_hex]).expect("hash tag"),
            ],
        )
        .custom_created_at(Timestamp::from_secs(created_at))
        .to_event(keys)
        .expect("root event")
    }

    fn build_mesh_event(keys: &Keys) -> Event {
        EventBuilder::new(Kind::Custom(NOSTR_KIND_HASHTREE), "mesh", [])
            .to_event(keys)
            .expect("mesh event")
    }

    #[tokio::test]
    async fn resolve_root_from_peer_sessions_returns_matching_latest_event() {
        let owner_keys = Keys::generate();
        let tree_name = "repo";
        let newer_hash = "bb".repeat(32);
        let older_hash = "aa".repeat(32);
        let older = build_root_event(&owner_keys, tree_name, &older_hash, 10);
        let newer = build_root_event(&owner_keys, tree_name, &newer_hash, 20);
        let peer = TestSession::with_events(vec![older, newer]);

        let resolved = resolve_root_from_peer_sessions(
            vec![("peer-a".to_string(), peer as Arc<dyn MeshSession>)],
            &owner_keys.public_key().to_hex(),
            tree_name,
            Duration::from_millis(10),
        )
        .await
        .expect("resolved root");

        assert_eq!(resolved.hash, newer_hash);
        assert_eq!(resolved.peer_id, "peer-a");
    }

    #[tokio::test]
    async fn resolve_root_from_local_buses_with_source_returns_first_match() {
        let root = PeerRootEvent {
            hash: "ab".repeat(32),
            key: None,
            encrypted_key: None,
            self_encrypted_key: None,
            event_id: "event-1".to_string(),
            created_at: 1,
            peer_id: "bus-peer".to_string(),
        };

        let resolved = resolve_root_from_local_buses_with_source(
            vec![
                Arc::new(TestLocalBus {
                    source: "empty",
                    root: None,
                }) as SharedLocalNostrBus,
                Arc::new(TestLocalBus {
                    source: "mock-bus",
                    root: Some(root.clone()),
                }) as SharedLocalNostrBus,
            ],
            "owner",
            "tree",
            Duration::from_millis(10),
        )
        .await
        .expect("resolved root");

        assert_eq!(resolved.0, "mock-bus");
        assert_eq!(resolved.1, root);
    }

    #[tokio::test]
    async fn forward_mesh_frame_to_sessions_skips_excluded_and_applies_per_peer_htl() {
        let keys = Keys::generate();
        let frame = MeshNostrFrame::new_event(build_mesh_event(&keys), "sender", MESH_DEFAULT_HTL);
        let first = TestSession::with_htl_config(PeerHTLConfig::from_flags(false, false));
        let second = TestSession::with_htl_config(PeerHTLConfig::from_flags(true, false));

        let forwarded = forward_mesh_frame_to_sessions(
            vec![
                ("peer-a".to_string(), first.clone() as Arc<dyn MeshSession>),
                ("peer-b".to_string(), second.clone() as Arc<dyn MeshSession>),
            ],
            &frame,
            Some("peer-a"),
        )
        .await;

        assert_eq!(forwarded, 1);
        assert!(first.sent_frames.lock().await.is_empty());
        let sent = second.sent_frames.lock().await;
        assert_eq!(sent.len(), 1);
        assert!(sent[0].htl < frame.htl);
    }
}