Skip to main content

hashtree_network/
mesh_session.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use nostr_sdk::nostr::{Event, Filter};
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::local_bus::SharedLocalNostrBus;
8use crate::root_events::{
9    build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
10    root_event_from_peer, PeerRootEvent,
11};
12use crate::types::{
13    decrement_htl_with_policy, should_forward_htl, MeshNostrFrame, PeerHTLConfig, MESH_EVENT_POLICY,
14};
15
16#[async_trait]
17pub trait MeshSession: Send + Sync {
18    fn is_ready(&self) -> bool;
19    fn is_connected(&self) -> bool;
20    fn htl_config(&self) -> PeerHTLConfig;
21
22    async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>>;
23
24    async fn query_nostr_events(
25        &self,
26        filters: Vec<Filter>,
27        timeout: Duration,
28    ) -> Result<Vec<Event>>;
29
30    async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()>;
31
32    async fn close(&self) -> Result<()>;
33
34    fn transport_debug_state(&self) -> Option<String> {
35        None
36    }
37}
38
39pub async fn resolve_root_from_peer_sessions(
40    peer_refs: Vec<(String, Arc<dyn MeshSession>)>,
41    owner_pubkey: &str,
42    tree_name: &str,
43    per_peer_timeout: Duration,
44) -> Option<PeerRootEvent> {
45    let filter = build_root_filter(owner_pubkey, tree_name)?;
46
47    for (peer_label, peer) in peer_refs {
48        if !peer.is_ready() {
49            continue;
50        }
51
52        let events = match peer
53            .query_nostr_events(vec![filter.clone()], per_peer_timeout)
54            .await
55        {
56            Ok(events) => events,
57            Err(_) => continue,
58        };
59
60        let latest = pick_latest_event(events.iter().filter(|event| {
61            hashtree_event_identifier(event).as_deref() == Some(tree_name)
62                && is_hashtree_labeled_event(event)
63        }));
64        if let Some(event) = latest {
65            if let Some(root) = root_event_from_peer(event, &peer_label, tree_name) {
66                return Some(root);
67            }
68        }
69    }
70
71    None
72}
73
74pub async fn resolve_root_from_local_buses_with_source(
75    buses: Vec<SharedLocalNostrBus>,
76    owner_pubkey: &str,
77    tree_name: &str,
78    timeout: Duration,
79) -> Option<(&'static str, PeerRootEvent)> {
80    for bus in buses {
81        if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
82            return Some((bus.source_name(), root));
83        }
84    }
85    None
86}
87
88pub async fn forward_mesh_frame_to_sessions(
89    sessions: Vec<(String, Arc<dyn MeshSession>)>,
90    frame: &MeshNostrFrame,
91    exclude_peer_id: Option<&str>,
92) -> usize {
93    let mut forwarded = 0usize;
94
95    for (peer_id, session) in sessions {
96        if exclude_peer_id
97            .map(|exclude| exclude == peer_id.as_str())
98            .unwrap_or(false)
99        {
100            continue;
101        }
102        if !session.is_ready() {
103            continue;
104        }
105
106        let next_htl =
107            decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &session.htl_config());
108        if !should_forward_htl(next_htl) {
109            continue;
110        }
111
112        let mut outbound = frame.clone();
113        outbound.htl = next_htl;
114        if session.send_mesh_frame_text(&outbound).await.is_ok() {
115            forwarded += 1;
116        }
117    }
118
119    forwarded
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use crate::local_bus::LocalNostrBus;
126    use crate::root_events::{HASHTREE_KIND, HASHTREE_LABEL};
127    use crate::types::{MESH_DEFAULT_HTL, NOSTR_KIND_HASHTREE};
128    use nostr_sdk::nostr::{EventBuilder, Keys, Kind, Tag, Timestamp};
129    use std::sync::atomic::{AtomicBool, Ordering};
130    use tokio::sync::Mutex;
131
132    struct TestSession {
133        ready: bool,
134        connected: bool,
135        htl_config: PeerHTLConfig,
136        query_events: Mutex<Vec<Event>>,
137        sent_frames: Mutex<Vec<MeshNostrFrame>>,
138        closed: AtomicBool,
139    }
140
141    impl TestSession {
142        fn with_events(events: Vec<Event>) -> Arc<Self> {
143            Arc::new(Self {
144                ready: true,
145                connected: true,
146                htl_config: PeerHTLConfig::from_flags(false, false),
147                query_events: Mutex::new(events),
148                sent_frames: Mutex::new(Vec::new()),
149                closed: AtomicBool::new(false),
150            })
151        }
152
153        fn with_htl_config(htl_config: PeerHTLConfig) -> Arc<Self> {
154            Arc::new(Self {
155                ready: true,
156                connected: true,
157                htl_config,
158                query_events: Mutex::new(Vec::new()),
159                sent_frames: Mutex::new(Vec::new()),
160                closed: AtomicBool::new(false),
161            })
162        }
163    }
164
165    #[async_trait]
166    impl MeshSession for TestSession {
167        fn is_ready(&self) -> bool {
168            self.ready
169        }
170
171        fn is_connected(&self) -> bool {
172            self.connected
173        }
174
175        fn htl_config(&self) -> PeerHTLConfig {
176            self.htl_config
177        }
178
179        async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
180            Ok(None)
181        }
182
183        async fn query_nostr_events(
184            &self,
185            _filters: Vec<Filter>,
186            _timeout: Duration,
187        ) -> Result<Vec<Event>> {
188            Ok(self.query_events.lock().await.clone())
189        }
190
191        async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
192            self.sent_frames.lock().await.push(frame.clone());
193            Ok(())
194        }
195
196        async fn close(&self) -> Result<()> {
197            self.closed.store(true, Ordering::Relaxed);
198            Ok(())
199        }
200    }
201
202    struct TestLocalBus {
203        source: &'static str,
204        root: Option<PeerRootEvent>,
205    }
206
207    #[async_trait]
208    impl LocalNostrBus for TestLocalBus {
209        fn source_name(&self) -> &'static str {
210            self.source
211        }
212
213        async fn broadcast_event(&self, _event: &Event) -> Result<()> {
214            Ok(())
215        }
216
217        async fn query_root(
218            &self,
219            _owner_pubkey: &str,
220            _tree_name: &str,
221            _timeout: Duration,
222        ) -> Option<PeerRootEvent> {
223            self.root.clone()
224        }
225    }
226
227    fn build_root_event(keys: &Keys, tree_name: &str, hash_hex: &str, created_at: u64) -> Event {
228        EventBuilder::new(
229            Kind::Custom(HASHTREE_KIND),
230            "",
231            [
232                Tag::parse(&["d", tree_name]).expect("d tag"),
233                Tag::parse(&["l", HASHTREE_LABEL]).expect("label tag"),
234                Tag::parse(&["hash", hash_hex]).expect("hash tag"),
235            ],
236        )
237        .custom_created_at(Timestamp::from_secs(created_at))
238        .to_event(keys)
239        .expect("root event")
240    }
241
242    fn build_mesh_event(keys: &Keys) -> Event {
243        EventBuilder::new(Kind::Custom(NOSTR_KIND_HASHTREE), "mesh", [])
244            .to_event(keys)
245            .expect("mesh event")
246    }
247
248    #[tokio::test]
249    async fn resolve_root_from_peer_sessions_returns_matching_latest_event() {
250        let owner_keys = Keys::generate();
251        let tree_name = "repo";
252        let newer_hash = "bb".repeat(32);
253        let older_hash = "aa".repeat(32);
254        let older = build_root_event(&owner_keys, tree_name, &older_hash, 10);
255        let newer = build_root_event(&owner_keys, tree_name, &newer_hash, 20);
256        let peer = TestSession::with_events(vec![older, newer]);
257
258        let resolved = resolve_root_from_peer_sessions(
259            vec![("peer-a".to_string(), peer as Arc<dyn MeshSession>)],
260            &owner_keys.public_key().to_hex(),
261            tree_name,
262            Duration::from_millis(10),
263        )
264        .await
265        .expect("resolved root");
266
267        assert_eq!(resolved.hash, newer_hash);
268        assert_eq!(resolved.peer_id, "peer-a");
269    }
270
271    #[tokio::test]
272    async fn resolve_root_from_local_buses_with_source_returns_first_match() {
273        let root = PeerRootEvent {
274            hash: "ab".repeat(32),
275            key: None,
276            encrypted_key: None,
277            self_encrypted_key: None,
278            event_id: "event-1".to_string(),
279            created_at: 1,
280            peer_id: "bus-peer".to_string(),
281        };
282
283        let resolved = resolve_root_from_local_buses_with_source(
284            vec![
285                Arc::new(TestLocalBus {
286                    source: "empty",
287                    root: None,
288                }) as SharedLocalNostrBus,
289                Arc::new(TestLocalBus {
290                    source: "mock-bus",
291                    root: Some(root.clone()),
292                }) as SharedLocalNostrBus,
293            ],
294            "owner",
295            "tree",
296            Duration::from_millis(10),
297        )
298        .await
299        .expect("resolved root");
300
301        assert_eq!(resolved.0, "mock-bus");
302        assert_eq!(resolved.1, root);
303    }
304
305    #[tokio::test]
306    async fn forward_mesh_frame_to_sessions_skips_excluded_and_applies_per_peer_htl() {
307        let keys = Keys::generate();
308        let frame = MeshNostrFrame::new_event(build_mesh_event(&keys), "sender", MESH_DEFAULT_HTL);
309        let first = TestSession::with_htl_config(PeerHTLConfig::from_flags(false, false));
310        let second = TestSession::with_htl_config(PeerHTLConfig::from_flags(true, false));
311
312        let forwarded = forward_mesh_frame_to_sessions(
313            vec![
314                ("peer-a".to_string(), first.clone() as Arc<dyn MeshSession>),
315                ("peer-b".to_string(), second.clone() as Arc<dyn MeshSession>),
316            ],
317            &frame,
318            Some("peer-a"),
319        )
320        .await;
321
322        assert_eq!(forwarded, 1);
323        assert!(first.sent_frames.lock().await.is_empty());
324        let sent = second.sent_frames.lock().await;
325        assert_eq!(sent.len(), 1);
326        assert!(sent[0].htl < frame.htl);
327    }
328}