1use anyhow::Result;
2use async_trait::async_trait;
3use nostr_sdk::nostr::{Event, Filter};
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::local_bus::SharedLocalNostrBus;
8use crate::root_events::{
9 build_root_filter, hashtree_event_identifier, is_hashtree_labeled_event, pick_latest_event,
10 root_event_from_peer, PeerRootEvent,
11};
12use crate::types::{
13 decrement_htl_with_policy, should_forward_htl, MeshNostrFrame, PeerHTLConfig, MESH_EVENT_POLICY,
14};
15
16#[async_trait]
17pub trait MeshSession: Send + Sync {
18 fn is_ready(&self) -> bool;
19 fn is_connected(&self) -> bool;
20 fn htl_config(&self) -> PeerHTLConfig;
21
22 async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>>;
23
24 async fn query_nostr_events(
25 &self,
26 filters: Vec<Filter>,
27 timeout: Duration,
28 ) -> Result<Vec<Event>>;
29
30 async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()>;
31
32 async fn close(&self) -> Result<()>;
33
34 fn transport_debug_state(&self) -> Option<String> {
35 None
36 }
37}
38
39pub async fn resolve_root_from_peer_sessions(
40 peer_refs: Vec<(String, Arc<dyn MeshSession>)>,
41 owner_pubkey: &str,
42 tree_name: &str,
43 per_peer_timeout: Duration,
44) -> Option<PeerRootEvent> {
45 let filter = build_root_filter(owner_pubkey, tree_name)?;
46
47 for (peer_label, peer) in peer_refs {
48 if !peer.is_ready() {
49 continue;
50 }
51
52 let events = match peer
53 .query_nostr_events(vec![filter.clone()], per_peer_timeout)
54 .await
55 {
56 Ok(events) => events,
57 Err(_) => continue,
58 };
59
60 let latest = pick_latest_event(events.iter().filter(|event| {
61 hashtree_event_identifier(event).as_deref() == Some(tree_name)
62 && is_hashtree_labeled_event(event)
63 }));
64 if let Some(event) = latest {
65 if let Some(root) = root_event_from_peer(event, &peer_label, tree_name) {
66 return Some(root);
67 }
68 }
69 }
70
71 None
72}
73
74pub async fn resolve_root_from_local_buses_with_source(
75 buses: Vec<SharedLocalNostrBus>,
76 owner_pubkey: &str,
77 tree_name: &str,
78 timeout: Duration,
79) -> Option<(&'static str, PeerRootEvent)> {
80 for bus in buses {
81 if let Some(root) = bus.query_root(owner_pubkey, tree_name, timeout).await {
82 return Some((bus.source_name(), root));
83 }
84 }
85 None
86}
87
88pub async fn forward_mesh_frame_to_sessions(
89 sessions: Vec<(String, Arc<dyn MeshSession>)>,
90 frame: &MeshNostrFrame,
91 exclude_peer_id: Option<&str>,
92) -> usize {
93 let mut forwarded = 0usize;
94
95 for (peer_id, session) in sessions {
96 if exclude_peer_id
97 .map(|exclude| exclude == peer_id.as_str())
98 .unwrap_or(false)
99 {
100 continue;
101 }
102 if !session.is_ready() {
103 continue;
104 }
105
106 let next_htl =
107 decrement_htl_with_policy(frame.htl, &MESH_EVENT_POLICY, &session.htl_config());
108 if !should_forward_htl(next_htl) {
109 continue;
110 }
111
112 let mut outbound = frame.clone();
113 outbound.htl = next_htl;
114 if session.send_mesh_frame_text(&outbound).await.is_ok() {
115 forwarded += 1;
116 }
117 }
118
119 forwarded
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use crate::local_bus::LocalNostrBus;
126 use crate::root_events::{HASHTREE_KIND, HASHTREE_LABEL};
127 use crate::types::{MESH_DEFAULT_HTL, NOSTR_KIND_HASHTREE};
128 use nostr_sdk::nostr::{EventBuilder, Keys, Kind, Tag, Timestamp};
129 use std::sync::atomic::{AtomicBool, Ordering};
130 use tokio::sync::Mutex;
131
132 struct TestSession {
133 ready: bool,
134 connected: bool,
135 htl_config: PeerHTLConfig,
136 query_events: Mutex<Vec<Event>>,
137 sent_frames: Mutex<Vec<MeshNostrFrame>>,
138 closed: AtomicBool,
139 }
140
141 impl TestSession {
142 fn with_events(events: Vec<Event>) -> Arc<Self> {
143 Arc::new(Self {
144 ready: true,
145 connected: true,
146 htl_config: PeerHTLConfig::from_flags(false, false),
147 query_events: Mutex::new(events),
148 sent_frames: Mutex::new(Vec::new()),
149 closed: AtomicBool::new(false),
150 })
151 }
152
153 fn with_htl_config(htl_config: PeerHTLConfig) -> Arc<Self> {
154 Arc::new(Self {
155 ready: true,
156 connected: true,
157 htl_config,
158 query_events: Mutex::new(Vec::new()),
159 sent_frames: Mutex::new(Vec::new()),
160 closed: AtomicBool::new(false),
161 })
162 }
163 }
164
165 #[async_trait]
166 impl MeshSession for TestSession {
167 fn is_ready(&self) -> bool {
168 self.ready
169 }
170
171 fn is_connected(&self) -> bool {
172 self.connected
173 }
174
175 fn htl_config(&self) -> PeerHTLConfig {
176 self.htl_config
177 }
178
179 async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
180 Ok(None)
181 }
182
183 async fn query_nostr_events(
184 &self,
185 _filters: Vec<Filter>,
186 _timeout: Duration,
187 ) -> Result<Vec<Event>> {
188 Ok(self.query_events.lock().await.clone())
189 }
190
191 async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
192 self.sent_frames.lock().await.push(frame.clone());
193 Ok(())
194 }
195
196 async fn close(&self) -> Result<()> {
197 self.closed.store(true, Ordering::Relaxed);
198 Ok(())
199 }
200 }
201
202 struct TestLocalBus {
203 source: &'static str,
204 root: Option<PeerRootEvent>,
205 }
206
207 #[async_trait]
208 impl LocalNostrBus for TestLocalBus {
209 fn source_name(&self) -> &'static str {
210 self.source
211 }
212
213 async fn broadcast_event(&self, _event: &Event) -> Result<()> {
214 Ok(())
215 }
216
217 async fn query_root(
218 &self,
219 _owner_pubkey: &str,
220 _tree_name: &str,
221 _timeout: Duration,
222 ) -> Option<PeerRootEvent> {
223 self.root.clone()
224 }
225 }
226
227 fn build_root_event(keys: &Keys, tree_name: &str, hash_hex: &str, created_at: u64) -> Event {
228 EventBuilder::new(Kind::Custom(HASHTREE_KIND), "")
229 .tags([
230 Tag::parse(["d", tree_name]).expect("d tag"),
231 Tag::parse(["l", HASHTREE_LABEL]).expect("label tag"),
232 Tag::parse(["hash", hash_hex]).expect("hash tag"),
233 ])
234 .custom_created_at(Timestamp::from_secs(created_at))
235 .sign_with_keys(keys)
236 .expect("root event")
237 }
238
239 fn build_mesh_event(keys: &Keys) -> Event {
240 EventBuilder::new(Kind::Custom(NOSTR_KIND_HASHTREE), "mesh")
241 .sign_with_keys(keys)
242 .expect("mesh event")
243 }
244
245 #[tokio::test]
246 async fn resolve_root_from_peer_sessions_returns_matching_latest_event() {
247 let owner_keys = Keys::generate();
248 let tree_name = "repo";
249 let newer_hash = "bb".repeat(32);
250 let older_hash = "aa".repeat(32);
251 let older = build_root_event(&owner_keys, tree_name, &older_hash, 10);
252 let newer = build_root_event(&owner_keys, tree_name, &newer_hash, 20);
253 let peer = TestSession::with_events(vec![older, newer]);
254
255 let resolved = resolve_root_from_peer_sessions(
256 vec![("peer-a".to_string(), peer as Arc<dyn MeshSession>)],
257 &owner_keys.public_key().to_hex(),
258 tree_name,
259 Duration::from_millis(10),
260 )
261 .await
262 .expect("resolved root");
263
264 assert_eq!(resolved.hash, newer_hash);
265 assert_eq!(resolved.peer_id, "peer-a");
266 }
267
268 #[tokio::test]
269 async fn resolve_root_from_local_buses_with_source_returns_first_match() {
270 let root = PeerRootEvent {
271 hash: "ab".repeat(32),
272 key: None,
273 encrypted_key: None,
274 self_encrypted_key: None,
275 event_id: "event-1".to_string(),
276 created_at: 1,
277 peer_id: "bus-peer".to_string(),
278 };
279
280 let resolved = resolve_root_from_local_buses_with_source(
281 vec![
282 Arc::new(TestLocalBus {
283 source: "empty",
284 root: None,
285 }) as SharedLocalNostrBus,
286 Arc::new(TestLocalBus {
287 source: "mock-bus",
288 root: Some(root.clone()),
289 }) as SharedLocalNostrBus,
290 ],
291 "owner",
292 "tree",
293 Duration::from_millis(10),
294 )
295 .await
296 .expect("resolved root");
297
298 assert_eq!(resolved.0, "mock-bus");
299 assert_eq!(resolved.1, root);
300 }
301
302 #[tokio::test]
303 async fn forward_mesh_frame_to_sessions_skips_excluded_and_applies_per_peer_htl() {
304 let keys = Keys::generate();
305 let frame = MeshNostrFrame::new_event(build_mesh_event(&keys), "sender", MESH_DEFAULT_HTL);
306 let first = TestSession::with_htl_config(PeerHTLConfig::from_flags(false, false));
307 let second = TestSession::with_htl_config(PeerHTLConfig::from_flags(true, false));
308
309 let forwarded = forward_mesh_frame_to_sessions(
310 vec![
311 ("peer-a".to_string(), first.clone() as Arc<dyn MeshSession>),
312 ("peer-b".to_string(), second.clone() as Arc<dyn MeshSession>),
313 ],
314 &frame,
315 Some("peer-a"),
316 )
317 .await;
318
319 assert_eq!(forwarded, 1);
320 assert!(first.sent_frames.lock().await.is_empty());
321 let sent = second.sent_frames.lock().await;
322 assert_eq!(sent.len(), 1);
323 assert!(sent[0].htl < frame.htl);
324 }
325}