Skip to main content

hashtree_network/
mesh_session.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use nostr_sdk::nostr::{Event, Filter};
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::local_bus::SharedLocalNostrBus;
8use crate::root_events::{
9    build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
10    root_event_from_peer, PeerRootEvent,
11};
12use crate::types::{
13    decrement_htl_with_policy, should_forward_htl, MeshNostrFrame, PeerHTLConfig, MESH_EVENT_POLICY,
14};
15
16#[async_trait]
17pub trait MeshSession: Send + Sync {
18    fn is_ready(&self) -> bool;
19    fn is_connected(&self) -> bool;
20    fn htl_config(&self) -> PeerHTLConfig;
21
22    async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>>;
23
24    async fn query_nostr_events(
25        &self,
26        filters: Vec<Filter>,
27        timeout: Duration,
28    ) -> Result<Vec<Event>>;
29
30    async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()>;
31
32    async fn close(&self) -> Result<()>;
33
34    fn transport_debug_state(&self) -> Option<String> {
35        None
36    }
37}
38
39pub async fn resolve_root_from_peer_sessions(
40    peer_refs: Vec<(String, Arc<dyn MeshSession>)>,
41    owner_pubkey: &str,
42    tree_name: &str,
43    per_peer_timeout: Duration,
44) -> Option<PeerRootEvent> {
45    let filter = build_root_filter(owner_pubkey, tree_name)?;
46
47    for (peer_label, peer) in peer_refs {
48        if !peer.is_ready() {
49            continue;
50        }
51
52        let events = match peer
53            .query_nostr_events(vec![filter.clone()], per_peer_timeout)
54            .await
55        {
56            Ok(events) => events,
57            Err(_) => continue,
58        };
59
60        let latest = pick_latest_event(events.iter().filter(|event| {
61            hashtree_event_identifier(event).as_deref() == Some(tree_name)
62                && is_hashtree_labeled_event(event)
63        }));
64        if let Some(event) = latest {
65            if let Some(root) = root_event_from_peer(event, &peer_label, tree_name) {
66                return Some(root);
67            }
68        }
69    }
70
71    None
72}
73
74pub async fn resolve_root_from_local_buses_with_source(
75    buses: Vec<SharedLocalNostrBus>,
76    owner_pubkey: &str,
77    tree_name: &str,
78    timeout: Duration,
79) -> Option<(&'static str, PeerRootEvent)> {
80    for bus in buses {
81        if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
82            return Some((bus.source_name(), root));
83        }
84    }
85    None
86}
87
88pub async fn forward_mesh_frame_to_sessions(
89    sessions: Vec<(String, Arc<dyn MeshSession>)>,
90    frame: &MeshNostrFrame,
91    exclude_peer_id: Option<&str>,
92) -> usize {
93    let mut forwarded = 0usize;
94
95    for (peer_id, session) in sessions {
96        if exclude_peer_id
97            .map(|exclude| exclude == peer_id.as_str())
98            .unwrap_or(false)
99        {
100            continue;
101        }
102        if !session.is_ready() {
103            continue;
104        }
105
106        let next_htl =
107            decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &session.htl_config());
108        if !should_forward_htl(next_htl) {
109            continue;
110        }
111
112        let mut outbound = frame.clone();
113        outbound.htl = next_htl;
114        if session.send_mesh_frame_text(&outbound).await.is_ok() {
115            forwarded += 1;
116        }
117    }
118
119    forwarded
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use crate::local_bus::LocalNostrBus;
126    use crate::root_events::{HASHTREE_KIND, HASHTREE_LABEL};
127    use crate::types::{MESH_DEFAULT_HTL, NOSTR_KIND_HASHTREE};
128    use nostr_sdk::nostr::{EventBuilder, Keys, Kind, Tag, Timestamp};
129    use std::sync::atomic::{AtomicBool, Ordering};
130    use tokio::sync::Mutex;
131
132    struct TestSession {
133        ready: bool,
134        connected: bool,
135        htl_config: PeerHTLConfig,
136        query_events: Mutex<Vec<Event>>,
137        sent_frames: Mutex<Vec<MeshNostrFrame>>,
138        closed: AtomicBool,
139    }
140
141    impl TestSession {
142        fn with_events(events: Vec<Event>) -> Arc<Self> {
143            Arc::new(Self {
144                ready: true,
145                connected: true,
146                htl_config: PeerHTLConfig::from_flags(false, false),
147                query_events: Mutex::new(events),
148                sent_frames: Mutex::new(Vec::new()),
149                closed: AtomicBool::new(false),
150            })
151        }
152
153        fn with_htl_config(htl_config: PeerHTLConfig) -> Arc<Self> {
154            Arc::new(Self {
155                ready: true,
156                connected: true,
157                htl_config,
158                query_events: Mutex::new(Vec::new()),
159                sent_frames: Mutex::new(Vec::new()),
160                closed: AtomicBool::new(false),
161            })
162        }
163    }
164
165    #[async_trait]
166    impl MeshSession for TestSession {
167        fn is_ready(&self) -> bool {
168            self.ready
169        }
170
171        fn is_connected(&self) -> bool {
172            self.connected
173        }
174
175        fn htl_config(&self) -> PeerHTLConfig {
176            self.htl_config
177        }
178
179        async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
180            Ok(None)
181        }
182
183        async fn query_nostr_events(
184            &self,
185            _filters: Vec<Filter>,
186            _timeout: Duration,
187        ) -> Result<Vec<Event>> {
188            Ok(self.query_events.lock().await.clone())
189        }
190
191        async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
192            self.sent_frames.lock().await.push(frame.clone());
193            Ok(())
194        }
195
196        async fn close(&self) -> Result<()> {
197            self.closed.store(true, Ordering::Relaxed);
198            Ok(())
199        }
200    }
201
202    struct TestLocalBus {
203        source: &'static str,
204        root: Option<PeerRootEvent>,
205    }
206
207    #[async_trait]
208    impl LocalNostrBus for TestLocalBus {
209        fn source_name(&self) -> &'static str {
210            self.source
211        }
212
213        async fn broadcast_event(&self, _event: &Event) -> Result<()> {
214            Ok(())
215        }
216
217        async fn query_root(
218            &self,
219            _owner_pubkey: &str,
220            _tree_name: &str,
221            _timeout: Duration,
222        ) -> Option<PeerRootEvent> {
223            self.root.clone()
224        }
225    }
226
227    fn build_root_event(keys: &Keys, tree_name: &str, hash_hex: &str, created_at: u64) -> Event {
228        EventBuilder::new(Kind::Custom(HASHTREE_KIND), "")
229            .tags([
230                Tag::parse(["d", tree_name]).expect("d tag"),
231                Tag::parse(["l", HASHTREE_LABEL]).expect("label tag"),
232                Tag::parse(["hash", hash_hex]).expect("hash tag"),
233            ])
234            .custom_created_at(Timestamp::from_secs(created_at))
235            .sign_with_keys(keys)
236            .expect("root event")
237    }
238
239    fn build_mesh_event(keys: &Keys) -> Event {
240        EventBuilder::new(Kind::Custom(NOSTR_KIND_HASHTREE), "mesh")
241            .sign_with_keys(keys)
242            .expect("mesh event")
243    }
244
245    #[tokio::test]
246    async fn resolve_root_from_peer_sessions_returns_matching_latest_event() {
247        let owner_keys = Keys::generate();
248        let tree_name = "repo";
249        let newer_hash = "bb".repeat(32);
250        let older_hash = "aa".repeat(32);
251        let older = build_root_event(&owner_keys, tree_name, &older_hash, 10);
252        let newer = build_root_event(&owner_keys, tree_name, &newer_hash, 20);
253        let peer = TestSession::with_events(vec![older, newer]);
254
255        let resolved = resolve_root_from_peer_sessions(
256            vec![("peer-a".to_string(), peer as Arc<dyn MeshSession>)],
257            &owner_keys.public_key().to_hex(),
258            tree_name,
259            Duration::from_millis(10),
260        )
261        .await
262        .expect("resolved root");
263
264        assert_eq!(resolved.hash, newer_hash);
265        assert_eq!(resolved.peer_id, "peer-a");
266    }
267
268    #[tokio::test]
269    async fn resolve_root_from_local_buses_with_source_returns_first_match() {
270        let root = PeerRootEvent {
271            hash: "ab".repeat(32),
272            key: None,
273            encrypted_key: None,
274            self_encrypted_key: None,
275            event_id: "event-1".to_string(),
276            created_at: 1,
277            peer_id: "bus-peer".to_string(),
278        };
279
280        let resolved = resolve_root_from_local_buses_with_source(
281            vec![
282                Arc::new(TestLocalBus {
283                    source: "empty",
284                    root: None,
285                }) as SharedLocalNostrBus,
286                Arc::new(TestLocalBus {
287                    source: "mock-bus",
288                    root: Some(root.clone()),
289                }) as SharedLocalNostrBus,
290            ],
291            "owner",
292            "tree",
293            Duration::from_millis(10),
294        )
295        .await
296        .expect("resolved root");
297
298        assert_eq!(resolved.0, "mock-bus");
299        assert_eq!(resolved.1, root);
300    }
301
302    #[tokio::test]
303    async fn forward_mesh_frame_to_sessions_skips_excluded_and_applies_per_peer_htl() {
304        let keys = Keys::generate();
305        let frame = MeshNostrFrame::new_event(build_mesh_event(&keys), "sender", MESH_DEFAULT_HTL);
306        let first = TestSession::with_htl_config(PeerHTLConfig::from_flags(false, false));
307        let second = TestSession::with_htl_config(PeerHTLConfig::from_flags(true, false));
308
309        let forwarded = forward_mesh_frame_to_sessions(
310            vec![
311                ("peer-a".to_string(), first.clone() as Arc<dyn MeshSession>),
312                ("peer-b".to_string(), second.clone() as Arc<dyn MeshSession>),
313            ],
314            &frame,
315            Some("peer-a"),
316        )
317        .await;
318
319        assert_eq!(forwarded, 1);
320        assert!(first.sent_frames.lock().await.is_empty());
321        let sent = second.sent_frames.lock().await;
322        assert_eq!(sent.len(), 1);
323        assert!(sent[0].htl < frame.htl);
324    }
325}