1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use nostr_sdk::nostr::{
4 ClientMessage, Event, Filter, JsonUtil, Keys, RelayMessage, SingleLetterTag, SubscriptionId,
5};
6use socket2::{Domain, Protocol, Socket, Type};
7use std::collections::{HashMap, HashSet};
8use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::net::UdpSocket;
12use tokio::sync::{mpsc, watch, Mutex};
13use tokio::time::Sleep;
14use tracing::{debug, warn};
15
16use crate::local_bus::LocalNostrBus;
17use crate::relay_bridge::SharedMeshEventStore;
18use crate::root_events::{
19 build_root_filter, is_hashtree_labeled_event, pick_latest_event, root_event_from_peer,
20 PeerRootEvent, HASHTREE_KIND, HASHTREE_LABEL,
21};
22
23#[derive(Debug, Clone)]
24pub struct MulticastConfig {
25 pub enabled: bool,
26 pub group: String,
27 pub port: u16,
28 pub max_peers: usize,
29 pub announce_interval_ms: u64,
30}
31
32#[async_trait]
33impl LocalNostrBus for MulticastNostrBus {
34 fn source_name(&self) -> &'static str {
35 "multicast"
36 }
37
38 async fn broadcast_event(&self, event: &Event) -> Result<()> {
39 MulticastNostrBus::broadcast_event(self, event).await
40 }
41
42 async fn query_root(
43 &self,
44 owner_pubkey: &str,
45 tree_name: &str,
46 timeout: Duration,
47 ) -> Option<PeerRootEvent> {
48 MulticastNostrBus::query_root(self, owner_pubkey, tree_name, timeout).await
49 }
50}
51
52impl MulticastConfig {
53 pub fn is_enabled(&self) -> bool {
54 self.enabled && self.max_peers > 0
55 }
56}
57
58impl Default for MulticastConfig {
59 fn default() -> Self {
60 Self {
61 enabled: false,
62 group: "239.255.42.98".to_string(),
63 port: 48555,
64 max_peers: 0,
65 announce_interval_ms: 2_000,
66 }
67 }
68}
69
70pub struct MulticastNostrBus {
71 config: MulticastConfig,
72 keys: Keys,
73 relay: SharedMeshEventStore,
74 socket: Arc<UdpSocket>,
75 target_addr: SocketAddr,
76 pending_queries: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<RelayMessage>>>>,
77 announced_event_ids: Arc<Mutex<HashSet<String>>>,
78}
79
80const QUERY_SETTLE_GRACE_MS: u64 = 150;
81
82impl MulticastNostrBus {
83 pub async fn bind(
84 config: MulticastConfig,
85 keys: Keys,
86 relay: SharedMeshEventStore,
87 ) -> Result<Arc<Self>> {
88 let group: Ipv4Addr = config
89 .group
90 .parse()
91 .with_context(|| format!("invalid multicast group {}", config.group))?;
92 let std_socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
93 std_socket.set_reuse_address(true)?;
94 #[cfg(unix)]
95 std_socket.set_reuse_port(true)?;
96 std_socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.port).into())?;
97 std_socket.set_multicast_loop_v4(true)?;
98 std_socket.join_multicast_v4(&group, &Ipv4Addr::UNSPECIFIED)?;
99 std_socket.set_nonblocking(true)?;
100
101 let socket = UdpSocket::from_std(std_socket.into())?;
102 let target_addr = SocketAddr::V4(SocketAddrV4::new(group, config.port));
103
104 Ok(Arc::new(Self {
105 config,
106 keys,
107 relay,
108 socket: Arc::new(socket),
109 target_addr,
110 pending_queries: Arc::new(Mutex::new(HashMap::new())),
111 announced_event_ids: Arc::new(Mutex::new(HashSet::new())),
112 }))
113 }
114
115 pub async fn run(
116 self: Arc<Self>,
117 mut shutdown_rx: watch::Receiver<bool>,
118 signaling_tx: mpsc::Sender<(String, Event)>,
119 ) -> Result<()> {
120 let mut announce_ticker = tokio::time::interval(Duration::from_millis(
121 self.config.announce_interval_ms.max(1),
122 ));
123 let mut buf = vec![0u8; 64 * 1024];
124
125 loop {
126 tokio::select! {
127 _ = shutdown_rx.changed() => {
128 if *shutdown_rx.borrow() {
129 break;
130 }
131 }
132 _ = announce_ticker.tick() => {
133 if let Err(err) = self.broadcast_known_root_updates().await {
134 debug!("multicast root announcement failed: {}", err);
135 }
136 }
137 recv = self.socket.recv_from(&mut buf) => {
138 let (len, _src) = match recv {
139 Ok(value) => value,
140 Err(err) => {
141 warn!("multicast receive failed: {}", err);
142 continue;
143 }
144 };
145 let text = match std::str::from_utf8(&buf[..len]) {
146 Ok(text) => text,
147 Err(err) => {
148 debug!("ignoring non-utf8 multicast datagram: {}", err);
149 continue;
150 }
151 };
152 self.handle_datagram(text, &signaling_tx).await;
153 }
154 }
155 }
156
157 Ok(())
158 }
159
160 pub async fn broadcast_event(&self, event: &Event) -> Result<()> {
161 let payload = event.as_json();
162 let copies = if event.kind.is_ephemeral() { 3 } else { 1 };
163 for _ in 0..copies {
164 self.socket
165 .send_to(payload.as_bytes(), self.target_addr)
166 .await?;
167 }
168 Ok(())
169 }
170
171 pub async fn query_root(
172 &self,
173 owner_pubkey: &str,
174 tree_name: &str,
175 timeout: Duration,
176 ) -> Option<PeerRootEvent> {
177 let filter = build_root_filter(owner_pubkey, tree_name)?;
178 let subscription_id = format!("multicast-root-{}", rand::random::<u64>());
179 let request =
180 ClientMessage::req(SubscriptionId::new(subscription_id.clone()), vec![filter]);
181 let (tx, mut rx) = mpsc::unbounded_channel();
182 self.pending_queries
183 .lock()
184 .await
185 .insert(subscription_id.clone(), tx);
186
187 if self
188 .socket
189 .send_to(request.as_json().as_bytes(), self.target_addr)
190 .await
191 .is_err()
192 {
193 self.pending_queries.lock().await.remove(&subscription_id);
194 return None;
195 }
196
197 let mut events = Vec::new();
198 let deadline = tokio::time::sleep(timeout);
199 tokio::pin!(deadline);
200 let mut settle_deadline: Option<std::pin::Pin<Box<Sleep>>> = None;
201
202 loop {
203 tokio::select! {
204 _ = &mut deadline => break,
205 _ = async {
206 if let Some(deadline) = &mut settle_deadline {
207 deadline.as_mut().await;
208 }
209 }, if settle_deadline.is_some() => break,
210 maybe_msg = rx.recv() => {
211 let Some(msg) = maybe_msg else {
212 break;
213 };
214 match msg {
215 RelayMessage::Event { subscription_id: sid, event }
216 if sid.to_string() == subscription_id =>
217 {
218 events.push(*event);
219 settle_deadline = Some(Box::pin(tokio::time::sleep(Duration::from_millis(
220 QUERY_SETTLE_GRACE_MS,
221 ))));
222 }
223 RelayMessage::EndOfStoredEvents(sid) if sid.to_string() == subscription_id => {
224 if !events.is_empty() && settle_deadline.is_none() {
225 settle_deadline = Some(Box::pin(tokio::time::sleep(Duration::from_millis(
226 QUERY_SETTLE_GRACE_MS,
227 ))));
228 }
229 }
230 _ => {}
231 }
232 }
233 }
234 }
235
236 self.pending_queries.lock().await.remove(&subscription_id);
237
238 let latest = pick_latest_event(events.iter())?;
239 root_event_from_peer(latest, self.source_name(), tree_name)
240 }
241
242 async fn handle_datagram(&self, text: &str, signaling_tx: &mpsc::Sender<(String, Event)>) {
243 if let Ok(event) = Event::from_json(text) {
244 if event.pubkey == self.keys.public_key() {
245 return;
246 }
247
248 if event.kind.is_ephemeral() {
249 let _ = signaling_tx.send(("multicast".to_string(), event)).await;
250 return;
251 }
252
253 if event.kind == nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND)
254 && is_hashtree_labeled_event(&event)
255 && event.verify().is_ok()
256 {
257 let _ = self.relay.ingest_trusted_event(event).await;
258 }
259 return;
260 }
261
262 if let Ok(msg) = ClientMessage::from_json(text) {
263 if let ClientMessage::Req {
264 subscription_id,
265 filters,
266 } = msg
267 {
268 for filter in filters {
269 let limit = filter.limit.unwrap_or(50).min(50);
270 for event in self.relay.query_events(&filter, limit).await {
271 let relay_msg = RelayMessage::event(subscription_id.clone(), event);
272 let _ = self
273 .socket
274 .send_to(relay_msg.as_json().as_bytes(), self.target_addr)
275 .await;
276 }
277 }
278 let eose = RelayMessage::eose(subscription_id);
279 let _ = self
280 .socket
281 .send_to(eose.as_json().as_bytes(), self.target_addr)
282 .await;
283 }
284 return;
285 }
286
287 if let Ok(msg) = RelayMessage::from_json(text) {
288 match &msg {
289 RelayMessage::Event {
290 subscription_id,
291 event,
292 } => {
293 if event.kind == nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND)
294 && is_hashtree_labeled_event(event)
295 && event.verify().is_ok()
296 {
297 let _ = self.relay.ingest_trusted_event((**event).clone()).await;
298 }
299 let tx = self
300 .pending_queries
301 .lock()
302 .await
303 .get(&subscription_id.to_string())
304 .cloned();
305 if let Some(tx) = tx {
306 let _ = tx.send(msg);
307 }
308 }
309 RelayMessage::EndOfStoredEvents(subscription_id) => {
310 let tx = self
311 .pending_queries
312 .lock()
313 .await
314 .get(&subscription_id.to_string())
315 .cloned();
316 if let Some(tx) = tx {
317 let _ = tx.send(msg);
318 }
319 }
320 _ => {}
321 }
322 }
323 }
324
325 async fn broadcast_known_root_updates(&self) -> Result<()> {
326 let filter = Filter::new()
327 .kind(nostr_sdk::nostr::Kind::Custom(HASHTREE_KIND))
328 .author(self.keys.public_key())
329 .custom_tag(
330 SingleLetterTag::lowercase(nostr_sdk::nostr::Alphabet::L),
331 vec![HASHTREE_LABEL.to_string()],
332 )
333 .limit(256);
334 let events = self.relay.query_events(&filter, 256).await;
335 let mut announced = self.announced_event_ids.lock().await;
336 for event in events {
337 let event_id = event.id.to_hex();
338 if announced.insert(event_id) {
339 self.broadcast_event(&event).await?;
340 }
341 }
342 Ok(())
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::relay_bridge::MeshEventStore;
350 use anyhow::Result;
351 use nostr_sdk::nostr::{Alphabet, EventBuilder, Kind, Tag, TagKind};
352 use std::time::{SystemTime, UNIX_EPOCH};
353
354 const HASHTREE_LABEL: &str = "hashtree";
355
356 #[derive(Default)]
357 struct TestEventStore {
358 events: Mutex<Vec<Event>>,
359 }
360
361 #[async_trait]
362 impl MeshEventStore for TestEventStore {
363 async fn ingest_trusted_event(&self, event: Event) -> Result<()> {
364 self.events.lock().await.push(event);
365 Ok(())
366 }
367
368 async fn query_events(&self, filter: &Filter, limit: usize) -> Vec<Event> {
369 self.events
370 .lock()
371 .await
372 .iter()
373 .filter(|event| filter.match_event(event))
374 .take(limit)
375 .cloned()
376 .collect()
377 }
378 }
379
380 fn unique_multicast_port() -> u16 {
381 let nanos = SystemTime::now()
382 .duration_since(UNIX_EPOCH)
383 .unwrap_or_default()
384 .subsec_nanos();
385 40000 + (nanos % 2000) as u16
386 }
387
388 fn build_root_event(keys: &Keys, tree_name: &str, hash_hex: &str) -> Event {
389 EventBuilder::new(
390 Kind::Custom(HASHTREE_KIND),
391 "",
392 [
393 Tag::identifier(tree_name.to_string()),
394 Tag::custom(
395 TagKind::SingleLetter(SingleLetterTag::lowercase(Alphabet::L)),
396 vec![HASHTREE_LABEL.to_string()],
397 ),
398 Tag::custom(TagKind::Custom("hash".into()), vec![hash_hex.to_string()]),
399 ],
400 )
401 .to_event(keys)
402 .expect("root event")
403 }
404
405 #[tokio::test]
406 async fn query_root_ignores_early_eose_until_grace_period_expires() -> Result<()> {
407 let keys = Keys::generate();
408 let owner_keys = Keys::generate();
409 let relay = Arc::new(TestEventStore::default()) as SharedMeshEventStore;
410 let bus = MulticastNostrBus::bind(
411 MulticastConfig {
412 enabled: true,
413 group: "239.255.43.10".to_string(),
414 port: unique_multicast_port(),
415 max_peers: 4,
416 announce_interval_ms: 60_000,
417 },
418 keys,
419 relay,
420 )
421 .await?;
422
423 let tree_name = "eose-race";
424 let hash_hex = "ef".repeat(32);
425 let event = build_root_event(&owner_keys, tree_name, &hash_hex);
426
427 let query_bus = Arc::clone(&bus);
428 let query = tokio::spawn(async move {
429 query_bus
430 .query_root(
431 &owner_keys.public_key().to_hex(),
432 tree_name,
433 Duration::from_millis(500),
434 )
435 .await
436 });
437
438 let subscription_id = tokio::time::timeout(Duration::from_secs(1), async {
439 loop {
440 if let Some(subscription_id) =
441 bus.pending_queries.lock().await.keys().next().cloned()
442 {
443 break subscription_id;
444 }
445 tokio::time::sleep(Duration::from_millis(10)).await;
446 }
447 })
448 .await
449 .expect("query registered pending subscription");
450
451 let (signal_tx, _signal_rx) = mpsc::channel(1);
452 bus.handle_datagram(
453 &RelayMessage::eose(SubscriptionId::new(subscription_id.clone())).as_json(),
454 &signal_tx,
455 )
456 .await;
457 bus.handle_datagram(
458 &RelayMessage::event(SubscriptionId::new(subscription_id), event.clone()).as_json(),
459 &signal_tx,
460 )
461 .await;
462
463 let resolved = query.await.expect("query task completed");
464 let resolved = resolved.expect("query returned root event after early eose");
465 assert_eq!(resolved.hash, hash_hex);
466 assert_eq!(resolved.event_id, event.id.to_hex());
467 Ok(())
468 }
469}