1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use nostr_sdk::nostr::{
4 ClientMessage, Event, Filter, JsonUtil, Keys, RelayMessage, SingleLetterTag, SubscriptionId,
5};
6use socket2::{Domain, Protocol, Socket, Type};
7use std::collections::{HashMap, HashSet};
8use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::net::UdpSocket;
12use tokio::sync::{mpsc, watch, Mutex};
13use tokio::time::Sleep;
14use tracing::{debug, warn};
15
16use crate::local_bus::LocalNostrBus;
17use crate::relay_bridge::SharedMeshEventStore;
18use crate::root_events::{
19 build_root_filter, is_hashtree_labeled_event, pick_latest_event, root_event_from_peer,
20 PeerRootEvent, HASHTREE_KIND, HASHTREE_LABEL,
21};
22
23#[derive(Debug, Clone)]
24pub struct MulticastConfig {
25 pub enabled: bool,
26 pub group: String,
27 pub port: u16,
28 pub max_peers: usize,
29 pub announce_interval_ms: u64,
30}
31
32#[async_trait]
33impl LocalNostrBus for MulticastNostrBus {
34 fn source_name(&self) -> &'static str {
35 "multicast"
36 }
37
38 async fn broadcast_event(&self, event: &Event) -> Result<()> {
39 MulticastNostrBus::broadcast_event(self, event).await
40 }
41
42 async fn query_root(
43 &self,
44 owner_pubkey: &str,
45 tree_name: &str,
46 timeout: Duration,
47 ) -> Option<PeerRootEvent> {
48 MulticastNostrBus::query_root(self, owner_pubkey, tree_name, timeout).await
49 }
50}
51
52impl MulticastConfig {
53 pub fn is_enabled(&self) -> bool {
54 self.enabled && self.max_peers > 0
55 }
56}
57
58impl Default for MulticastConfig {
59 fn default() -> Self {
60 Self {
61 enabled: false,
62 group: "239.255.42.98".to_string(),
63 port: 48555,
64 max_peers: 0,
65 announce_interval_ms: 2_000,
66 }
67 }
68}
69
70pub struct MulticastNostrBus {
71 config: MulticastConfig,
72 keys: Keys,
73 relay: SharedMeshEventStore,
74 socket: Arc<UdpSocket>,
75 target_addr: SocketAddr,
76 pending_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<RelayMessage>>>>,
77 announced_event_ids: Arc<Mutex<HashSet<String>>>,
78}
79
80const QUERY_SETTLE_GRACE_MS: u64 = 150;
81
82impl MulticastNostrBus {
83 pub async fn bind(
84 config: MulticastConfig,
85 keys: Keys,
86 relay: SharedMeshEventStore,
87 ) -> Result<Arc<Self>> {
88 let group: Ipv4Addr = config
89 .group
90 .parse()
91 .with_context(|| format!("invalid multicast group {}", config.group))?;
92 let std_socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
93 std_socket.set_reuse_address(true)?;
94 std_socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.port).into())?;
95 std_socket.set_multicast_loop_v4(true)?;
96 std_socket.join_multicast_v4(&group, &Ipv4Addr::UNSPECIFIED)?;
97 std_socket.set_nonblocking(true)?;
98
99 let socket = UdpSocket::from_std(std_socket.into())?;
100 let target_addr = SocketAddr::V4(SocketAddrV4::new(group, config.port));
101
102 Ok(Arc::new(Self {
103 config,
104 keys,
105 relay,
106 socket: Arc::new(socket),
107 target_addr,
108 pending_queries: Arc::new(Mutex::new(HashMap::new())),
109 announced_event_ids: Arc::new(Mutex::new(HashSet::new())),
110 }))
111 }
112
113 pub async fn run(
114 self: Arc<Self>,
115 mut shutdown_rx: watch::Receiver<bool>,
116 signaling_tx: mpsc::Sender<(String, Event)>,
117 ) -> Result<()> {
118 let mut announce_ticker = tokio::time::interval(Duration::from_millis(
119 self.config.announce_interval_ms.max(1),
120 ));
121 let mut buf = vec![0u8; 64 * 1024];
122
123 loop {
124 tokio::select! {
125 _ = shutdown_rx.changed() => {
126 if *shutdown_rx.borrow() {
127 break;
128 }
129 }
130 _ = announce_ticker.tick() => {
131 if let Err(err) = self.broadcast_known_root_updates().await {
132 debug!("multicast root announcement failed: {}", err);
133 }
134 }
135 recv = self.socket.recv_from(&mut buf) => {
136 let (len, _src) = match recv {
137 Ok(value) => value,
138 Err(err) => {
139 warn!("multicast receive failed: {}", err);
140 continue;
141 }
142 };
143 let text = match std::str::from_utf8(&buf[..len]) {
144 Ok(text) => text,
145 Err(err) => {
146 debug!("ignoring non-utf8 multicast datagram: {}", err);
147 continue;
148 }
149 };
150 self.handle_datagram(text, &signaling_tx).await;
151 }
152 }
153 }
154
155 Ok(())
156 }
157
158 pub async fn broadcast_event(&self, event: &Event) -> Result<()> {
159 let payload = event.as_json();
160 let copies = if event.kind.is_ephemeral() { 3 } else { 1 };
161 for _ in 0..copies {
162 self.socket
163 .send_to(payload.as_bytes(), self.target_addr)
164 .await?;
165 }
166 Ok(())
167 }
168
169 pub async fn query_root(
170 &self,
171 owner_pubkey: &str,
172 tree_name: &str,
173 timeout: Duration,
174 ) -> Option<PeerRootEvent> {
175 let filter = build_root_filter(owner_pubkey, tree_name)?;
176 let subscription_id = format!("multicast-root-{}", rand::random::<u64>());
177 let request =
178 ClientMessage::req(SubscriptionId::new(subscription_id.clone()), vec![filter]);
179 let (tx, mut rx) = mpsc::unbounded_channel();
180 self.pending_queries
181 .lock()
182 .await
183 .insert(subscription_id.clone(), tx);
184
185 if self
186 .socket
187 .send_to(request.as_json().as_bytes(), self.target_addr)
188 .await
189 .is_err()
190 {
191 self.pending_queries.lock().await.remove(&subscription_id);
192 return None;
193 }
194
195 let mut events = Vec::new();
196 let deadline = tokio::time::sleep(timeout);
197 tokio::pin!(deadline);
198 let mut settle_deadline: Option<std::pin::Pin<Box<Sleep>>> = None;
199
200 loop {
201 tokio::select! {
202 _ = &mut deadline => break,
203 _ = async {
204 if let Some(deadline) = &mut settle_deadline {
205 deadline.as_mut().await;
206 }
207 }, if settle_deadline.is_some() => break,
208 maybe_msg = rx.recv() => {
209 let Some(msg) = maybe_msg else {
210 break;
211 };
212 match msg {
213 RelayMessage::Event { subscription_id: sid, event }
214 if sid.to_string() == subscription_id =>
215 {
216 events.push(*event);
217 settle_deadline = Some(Box::pin(tokio::time::sleep(Duration::from_millis(
218 QUERY_SETTLE_GRACE_MS,
219 ))));
220 }
221 RelayMessage::EndOfStoredEvents(sid) if sid.to_string() == subscription_id => {
222 if !events.is_empty() && settle_deadline.is_none() {
223 settle_deadline = Some(Box::pin(tokio::time::sleep(Duration::from_millis(
224 QUERY_SETTLE_GRACE_MS,
225 ))));
226 }
227 }
228 _ => {}
229 }
230 }
231 }
232 }
233
234 self.pending_queries.lock().await.remove(&subscription_id);
235
236 let latest = pick_latest_event(events.iter())?;
237 root_event_from_peer(latest, self.source_name(), tree_name)
238 }
239
240 async fn handle_datagram(&self, text: &str, signaling_tx: &mpsc::Sender<(String, Event)>) {
241 if let Ok(event) = Event::from_json(text) {
242 if event.pubkey == self.keys.public_key() {
243 return;
244 }
245
246 if event.kind.is_ephemeral() {
247 let _ = signaling_tx.send(("multicast".to_string(), event)).await;
248 return;
249 }
250
251 if event.kind == nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND)
252 && is_hashtree_labeled_event(&event)
253 && event.verify().is_ok()
254 {
255 let _ = self.relay.ingest_trusted_event(event).await;
256 }
257 return;
258 }
259
260 if let Ok(msg) = ClientMessage::from_json(text) {
261 if let ClientMessage::Req {
262 subscription_id,
263 filters,
264 } = msg
265 {
266 for filter in filters {
267 let limit = filter.limit.unwrap_or(50).min(50);
268 for event in self.relay.query_events(&filter, limit).await {
269 let relay_msg = RelayMessage::event(subscription_id.clone(), event);
270 let _ = self
271 .socket
272 .send_to(relay_msg.as_json().as_bytes(), self.target_addr)
273 .await;
274 }
275 }
276 let eose = RelayMessage::eose(subscription_id);
277 let _ = self
278 .socket
279 .send_to(eose.as_json().as_bytes(), self.target_addr)
280 .await;
281 }
282 return;
283 }
284
285 if let Ok(msg) = RelayMessage::from_json(text) {
286 match &msg {
287 RelayMessage::Event {
288 subscription_id,
289 event,
290 } => {
291 if event.kind == nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND)
292 && is_hashtree_labeled_event(event)
293 && event.verify().is_ok()
294 {
295 let _ = self.relay.ingest_trusted_event((**event).clone()).await;
296 }
297 let tx = self
298 .pending_queries
299 .lock()
300 .await
301 .get(&subscription_id.to_string())
302 .cloned();
303 if let Some(tx) = tx {
304 let _ = tx.send(msg);
305 }
306 }
307 RelayMessage::EndOfStoredEvents(subscription_id) => {
308 let tx = self
309 .pending_queries
310 .lock()
311 .await
312 .get(&subscription_id.to_string())
313 .cloned();
314 if let Some(tx) = tx {
315 let _ = tx.send(msg);
316 }
317 }
318 _ => {}
319 }
320 }
321 }
322
323 async fn broadcast_known_root_updates(&self) -> Result<()> {
324 let filter = Filter::new()
325 .kind(nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND))
326 .author(self.keys.public_key())
327 .custom_tag(
328 SingleLetterTag::lowercase(nostr_sdk::nostr::Alphabet::L),
329 vec![HASHTREE_LABEL.to_string()],
330 )
331 .limit(256);
332 let events = self.relay.query_events(&filter, 256).await;
333 let mut announced = self.announced_event_ids.lock().await;
334 for event in events {
335 let event_id = event.id.to_hex();
336 if announced.insert(event_id) {
337 self.broadcast_event(&event).await?;
338 }
339 }
340 Ok(())
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::relay_bridge::MeshEventStore;
348 use anyhow::Result;
349 use nostr_sdk::nostr::{Alphabet, EventBuilder, Kind, Tag, TagKind};
350 use std::time::{SystemTime, UNIX_EPOCH};
351
352 const HASHTREE_LABEL: &str = "hashtree";
353
354 #[derive(Default)]
355 struct TestEventStore {
356 events: Mutex<Vec<Event>>,
357 }
358
359 #[async_trait]
360 impl MeshEventStore for TestEventStore {
361 async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
362 self.events.lock().await.push(event);
363 Ok(())
364 }
365
366 async fn query_events(&self, filter: &Filter, limit: usize) -> Vec<Event> {
367 self.events
368 .lock()
369 .await
370 .iter()
371 .filter(|event| filter.match_event(event))
372 .take(limit)
373 .cloned()
374 .collect()
375 }
376 }
377
378 fn unique_multicast_port() -> u16 {
379 let nanos = SystemTime::now()
380 .duration_since(UNIX_EPOCH)
381 .unwrap_or_default()
382 .subsec_nanos();
383 40000 + (nanos % 2000) as u16
384 }
385
386 fn build_root_event(keys: &Keys, tree_name: &str, hash_hex: &str) -> Event {
387 EventBuilder::new(
388 Kind::Custom(HASHTREE_KIND),
389 "",
390 [
391 Tag::identifier(tree_name.to_string()),
392 Tag::custom(
393 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
394 vec![HASHTREE_LABEL.to_string()],
395 ),
396 Tag::custom(TagKind::Custom("hash".into()), vec![hash_hex.to_string()]),
397 ],
398 )
399 .to_event(keys)
400 .expect("root event")
401 }
402
403 #[tokio::test]
404 async fn query_root_ignores_early_eose_until_grace_period_expires() -> Result<()> {
405 let keys = Keys::generate();
406 let owner_keys = Keys::generate();
407 let relay = Arc::new(TestEventStore::default()) as SharedMeshEventStore;
408 let bus = MulticastNostrBus::bind(
409 MulticastConfig {
410 enabled: true,
411 group: "239.255.43.10".to_string(),
412 port: unique_multicast_port(),
413 max_peers: 4,
414 announce_interval_ms: 60_000,
415 },
416 keys,
417 relay,
418 )
419 .await?;
420
421 let tree_name = "eose-race";
422 let hash_hex = "ef".repeat(32);
423 let event = build_root_event(&owner_keys, tree_name, &hash_hex);
424
425 let query_bus = Arc::clone(&bus);
426 let query = tokio::spawn(async move {
427 query_bus
428 .query_root(
429 &owner_keys.public_key().to_hex(),
430 tree_name,
431 Duration::from_millis(500),
432 )
433 .await
434 });
435
436 let subscription_id = tokio::time::timeout(Duration::from_secs(1), async {
437 loop {
438 if let Some(subscription_id) =
439 bus.pending_queries.lock().await.keys().next().cloned()
440 {
441 break subscription_id;
442 }
443 tokio::time::sleep(Duration::from_millis(10)).await;
444 }
445 })
446 .await
447 .expect("query registered pending subscription");
448
449 let (signal_tx, _signal_rx) = mpsc::channel(1);
450 bus.handle_datagram(
451 &RelayMessage::eose(SubscriptionId::new(subscription_id.clone())).as_json(),
452 &signal_tx,
453 )
454 .await;
455 bus.handle_datagram(
456 &RelayMessage::event(SubscriptionId::new(subscription_id), event.clone()).as_json(),
457 &signal_tx,
458 )
459 .await;
460
461 let resolved = query.await.expect("query task completed");
462 let resolved = resolved.expect("query returned root event after early eose");
463 assert_eq!(resolved.hash, hash_hex);
464 assert_eq!(resolved.event_id, event.id.to_hex());
465 Ok(())
466 }
467}