1use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
10
11use ed25519_dalek::SigningKey;
12use tokio::net::TcpListener;
13use tokio::task::JoinHandle;
14use tracing::{debug, info};
15
16use crate::transport_net::{
17 QuicServerHandle, TlsServerHandle, quic_accept_bi_session, tls_accept_session,
18};
19use crate::{
20 capabilities::Capabilities,
21 dht::{ALPHA, DEFAULT_TTL_SECS, DhtInsertResult, DhtNodeRecord, DhtValue, K, MAX_VALUE_SIZE},
22 dht_keys::{community_info_key, share_head_key},
23 ids::{NodeId, ShareId},
24 manifest::ShareHead,
25 net_fetch::RequestTransport,
26 peer::PeerAddr,
27 wire::{CommunityMembers, FindNode, Store as WireStore},
28};
29
30use super::{
31 NodeHandle,
32 helpers::{
33 merge_peer_list, now_unix_secs, peer_key, persist_state, query_find_node, query_find_value,
34 replicate_store_to_closest, replicate_store_to_peers, sort_peers_for_target,
35 validate_dht_value_for_known_keyspaces,
36 },
37};
38
39impl NodeHandle {
40 pub async fn dht_upsert_peer(
41 &self,
42 local_target: NodeId,
43 node_id: NodeId,
44 addr: PeerAddr,
45 ) -> anyhow::Result<()> {
46 let mut state = self.state.write().await;
47 let result = state.dht.upsert_node(
48 DhtNodeRecord {
49 node_id,
50 addr,
51 last_seen_unix: now_unix_secs()?,
52 },
53 local_target,
54 );
55
56 match result {
57 DhtInsertResult::Inserted => {}
58 DhtInsertResult::PendingEviction {
59 stale_node,
60 new_node,
61 bucket_idx,
62 } => {
63 let handle = self.clone();
69 tokio::spawn(async move {
70 let remote =
71 std::net::SocketAddr::new(stale_node.addr.ip, stale_node.addr.port);
72 let alive = tokio::time::timeout(
73 Duration::from_millis(1500),
74 tokio::net::TcpStream::connect(remote),
75 )
76 .await
77 .is_ok_and(|r| r.is_ok());
78
79 let mut state = handle.state.write().await;
80 if alive {
81 let ts = now_unix_secs().unwrap_or(0);
82 state.dht.refresh_node(&stale_node.node_id, ts);
83 } else {
84 state
85 .dht
86 .complete_eviction(bucket_idx, stale_node.node_id, *new_node);
87 }
88 });
89 }
90 DhtInsertResult::RejectedSubnetLimit => {}
91 }
92
93 Ok(())
94 }
95
96 pub async fn dht_find_node(&self, req: FindNode) -> anyhow::Result<Vec<PeerAddr>> {
97 let state = self.state.read().await;
98 let nodes = state.dht.find_node(NodeId(req.target_node_id), K);
99 Ok(nodes.into_iter().map(|n| n.addr).collect())
100 }
101
102 pub async fn dht_store(&self, req: WireStore) -> anyhow::Result<()> {
103 validate_dht_value_for_known_keyspaces(req.key, &req.value)?;
104 let now = now_unix_secs()?;
105 let value = merge_community_members_if_applicable(req.key, req.value, self, now).await;
106 let mut state = self.state.write().await;
107 state
108 .dht
109 .store(req.key, value, req.ttl_secs.max(DEFAULT_TTL_SECS), now)
110 }
111
112 pub async fn dht_find_value(&self, key: [u8; 32]) -> anyhow::Result<Option<DhtValue>> {
113 let mut state = self.state.write().await;
114 Ok(state.dht.find_value(key, now_unix_secs()?))
115 }
116
117 pub async fn dht_store_replicated<T: RequestTransport + ?Sized>(
118 &self,
119 transport: &T,
120 req: WireStore,
121 seed_peers: &[PeerAddr],
122 ) -> anyhow::Result<usize> {
123 validate_dht_value_for_known_keyspaces(req.key, &req.value)?;
124 let now = now_unix_secs()?;
125 {
126 let mut state = self.state.write().await;
127 state.dht.store(
128 req.key,
129 req.value.clone(),
130 req.ttl_secs.max(DEFAULT_TTL_SECS),
131 now,
132 )?;
133 }
134 persist_state(self).await?;
135 replicate_store_to_closest(
136 transport,
137 self,
138 req.key,
139 req.value,
140 req.ttl_secs.max(DEFAULT_TTL_SECS),
141 seed_peers,
142 K,
143 )
144 .await
145 }
146
147 pub async fn dht_find_node_iterative<T: RequestTransport + ?Sized>(
148 &self,
149 transport: &T,
150 target_node_id: [u8; 20],
151 seed_peers: &[PeerAddr],
152 ) -> anyhow::Result<Vec<PeerAddr>> {
153 let target_hex = hex::encode(&target_node_id[..8]);
154 let mut peers = self
155 .collect_seed_and_known_node_peers(target_node_id, seed_peers)
156 .await;
157 debug!(
158 target = %target_hex,
159 initial_peers = peers.len(),
160 "dht_find_node_iterative: starting"
161 );
162 let mut queried = HashSet::new();
163
164 loop {
165 sort_peers_for_target(&mut peers, target_node_id);
166 let to_query = peers
167 .iter()
168 .filter(|peer| !queried.contains(&peer_key(peer)))
169 .take(ALPHA)
170 .cloned()
171 .collect::<Vec<_>>();
172 if to_query.is_empty() {
173 break;
174 }
175
176 let mut discovered = false;
177 for peer in to_query {
178 queried.insert(peer_key(&peer));
179 match query_find_node(transport, &peer, target_node_id).await {
180 Ok(result) => {
181 debug!(
182 target = %target_hex,
183 peer = ?peer,
184 returned_peers = result.peers.len(),
185 "dht_find_node_iterative: queried peer"
186 );
187 discovered |= merge_peer_list(&mut peers, result.peers);
188 }
189 Err(e) => {
190 debug!(
191 target = %target_hex,
192 peer = ?peer,
193 error = %e,
194 "dht_find_node_iterative: peer query failed"
195 );
196 }
197 }
198 }
199 if !discovered {
200 break;
201 }
202 }
203
204 sort_peers_for_target(&mut peers, target_node_id);
205 peers.truncate(K);
206 debug!(
207 target = %target_hex,
208 result_peers = peers.len(),
209 queried = queried.len(),
210 "dht_find_node_iterative: complete"
211 );
212 Ok(peers)
213 }
214
215 pub async fn dht_find_value_iterative<T: RequestTransport + ?Sized>(
216 &self,
217 transport: &T,
218 key: [u8; 32],
219 seed_peers: &[PeerAddr],
220 ) -> anyhow::Result<Option<DhtValue>> {
221 let key_hex = hex::encode(&key[..8]);
222 debug!(key = %key_hex, "dht_find_value_iterative: starting");
223 if let Some(value) = self.dht_find_value(key).await? {
224 debug!(key = %key_hex, "dht_find_value_iterative: found locally");
225 return Ok(Some(value));
226 }
227
228 self.dht_find_value_from_network(transport, key, seed_peers)
229 .await
230 }
231
232 pub async fn dht_find_value_from_network<T: RequestTransport + ?Sized>(
239 &self,
240 transport: &T,
241 key: [u8; 32],
242 seed_peers: &[PeerAddr],
243 ) -> anyhow::Result<Option<DhtValue>> {
244 let key_hex = hex::encode(&key[..8]);
245 debug!(key = %key_hex, "dht_find_value_from_network: starting");
246
247 let mut target = [0u8; 20];
248 target.copy_from_slice(&key[..20]);
249 let mut peers = self
250 .collect_seed_and_known_node_peers(target, seed_peers)
251 .await;
252 let mut queried = HashSet::new();
253
254 loop {
255 sort_peers_for_target(&mut peers, target);
256 let to_query = peers
257 .iter()
258 .filter(|peer| !queried.contains(&peer_key(peer)))
259 .take(ALPHA)
260 .cloned()
261 .collect::<Vec<_>>();
262 if to_query.is_empty() {
263 break;
264 }
265
266 let mut discovered = false;
267 for peer in to_query {
268 queried.insert(peer_key(&peer));
269 let Ok(result) = query_find_value(transport, &peer, key).await else {
270 continue;
271 };
272
273 if let Some(remote) = result.value
274 && remote.key == key
275 && remote.value.len() <= MAX_VALUE_SIZE
276 && validate_dht_value_for_known_keyspaces(remote.key, &remote.value).is_ok()
277 {
278 let now = now_unix_secs()?;
279 let mut state = self.state.write().await;
280 state.dht.store(
281 key,
282 remote.value,
283 remote.ttl_secs.max(DEFAULT_TTL_SECS),
284 now,
285 )?;
286 debug!(key = %key_hex, "dht_find_value_from_network: found remotely");
287 return Ok(state.dht.find_value(key, now));
288 }
289 discovered |= merge_peer_list(&mut peers, result.closer_peers);
290 }
291 if !discovered {
292 break;
293 }
294 }
295
296 debug!(key = %key_hex, queried = queried.len(), "dht_find_value_from_network: not found");
297 Ok(None)
298 }
299
300 pub async fn dht_find_share_head_iterative<T: RequestTransport + ?Sized>(
301 &self,
302 transport: &T,
303 share_id: ShareId,
304 share_pubkey: Option<[u8; 32]>,
305 seed_peers: &[PeerAddr],
306 ) -> anyhow::Result<Option<ShareHead>> {
307 debug!(?share_id, "dht_find_share_head_iterative: starting");
308 let key = share_head_key(&share_id);
309 let local_best = self
310 .dht_find_value(key)
311 .await?
312 .and_then(|value| crate::cbor::from_slice::<ShareHead>(&value.value).ok())
313 .filter(|head| head.share_id == share_id.0);
314
315 let mut target = [0u8; 20];
316 target.copy_from_slice(&key[..20]);
317 let mut peers = self
318 .collect_seed_and_known_node_peers(target, seed_peers)
319 .await;
320 let mut queried = HashSet::new();
321 let mut best = local_best;
322
323 loop {
324 sort_peers_for_target(&mut peers, target);
325 let to_query = peers
326 .iter()
327 .filter(|peer| !queried.contains(&peer_key(peer)))
328 .take(ALPHA)
329 .cloned()
330 .collect::<Vec<_>>();
331 if to_query.is_empty() {
332 break;
333 }
334
335 let mut discovered = false;
336 for peer in to_query {
337 queried.insert(peer_key(&peer));
338 let Ok(result) = query_find_value(transport, &peer, key).await else {
339 continue;
340 };
341
342 if let Some(remote) = result.value
343 && remote.key == key
344 && remote.value.len() <= MAX_VALUE_SIZE
345 && validate_dht_value_for_known_keyspaces(remote.key, &remote.value).is_ok()
346 {
347 let head: ShareHead = crate::cbor::from_slice(&remote.value)?;
348 if head.share_id != share_id.0 {
349 continue;
350 }
351 if let Some(pubkey) = share_pubkey {
352 head.verify_with_pubkey(pubkey)?;
353 }
354 if best
355 .as_ref()
356 .map(|current| {
357 head.latest_seq > current.latest_seq
358 || (head.latest_seq == current.latest_seq
359 && head.updated_at > current.updated_at)
360 })
361 .unwrap_or(true)
362 {
363 best = Some(head.clone());
364 let now = now_unix_secs()?;
365 let mut state = self.state.write().await;
366 state.dht.store(
367 key,
368 crate::cbor::to_vec(&head)?,
369 remote.ttl_secs.max(DEFAULT_TTL_SECS),
370 now,
371 )?;
372 }
373 }
374 discovered |= merge_peer_list(&mut peers, result.closer_peers);
375 }
376 if !discovered {
377 break;
378 }
379 }
380 if best.is_some() {
381 info!(
382 ?share_id,
383 queried = queried.len(),
384 "dht_find_share_head_iterative: found head"
385 );
386 } else {
387 debug!(
388 ?share_id,
389 queried = queried.len(),
390 "dht_find_share_head_iterative: no head found"
391 );
392 }
393 Ok(best)
394 }
395
396 pub async fn dht_republish_once<T: RequestTransport + ?Sized>(
397 &self,
398 transport: &T,
399 seed_peers: &[PeerAddr],
400 ) -> anyhow::Result<usize> {
401 let _ = self.reannounce_published_share_data().await;
404
405 let _ = self.reannounce_subscribed_share_heads().await;
408
409 let now = now_unix_secs()?;
410 let values = {
411 let mut state = self.state.write().await;
412 let values = state.dht.active_values(now);
413 for value in &values {
414 state
415 .dht
416 .store(value.key, value.value.clone(), DEFAULT_TTL_SECS, now)?;
417 }
418 values
419 };
420 persist_state(self).await?;
421
422 let total = values.len();
423 info!(
424 active_values = total,
425 seed_peers = seed_peers.len(),
426 "DHT republish: starting"
427 );
428
429 let representative_target = {
434 let mut t = [0u8; 20];
435 if let Some(v) = values.first() {
436 t.copy_from_slice(&v.key[..20]);
437 }
438 t
439 };
440 let cached_peers = self
441 .dht_find_node_iterative(transport, representative_target, seed_peers)
442 .await
443 .unwrap_or_default();
444
445 debug!(
446 cached_peers = cached_peers.len(),
447 "DHT republish: using cached peer list"
448 );
449
450 const INTER_VALUE_DELAY: Duration = Duration::from_millis(500);
453 const RATE_LIMIT_BACKOFF: Duration = Duration::from_secs(5);
455
456 let mut republished = 0usize;
457 for (idx, value) in values.into_iter().enumerate() {
458 if validate_dht_value_for_known_keyspaces(value.key, &value.value).is_err() {
459 debug!(
460 key = %hex::encode(&value.key[..8]),
461 value_len = value.value.len(),
462 "DHT republish: skipping value that fails validation"
463 );
464 continue;
465 }
466 let replication_factor = if value.is_popular() { K * 2 } else { K };
467 let (stored, rate_limited) = replicate_store_to_peers(
468 transport,
469 value.key,
470 value.value,
471 DEFAULT_TTL_SECS,
472 &cached_peers,
473 replication_factor,
474 )
475 .await;
476 if stored > 0 {
477 republished += 1;
478 }
479 if rate_limited {
480 debug!("DHT republish: rate-limited, backing off");
481 tokio::time::sleep(RATE_LIMIT_BACKOFF).await;
482 } else if idx + 1 < total {
483 tokio::time::sleep(INTER_VALUE_DELAY).await;
484 }
485 }
486 info!(republished, total, "DHT republish: complete");
487 Ok(republished)
488 }
489
490 pub fn start_dht_republish_loop(
491 self,
492 transport: Arc<dyn RequestTransport>,
493 seed_peers: Vec<PeerAddr>,
494 interval: Duration,
495 ) -> JoinHandle<()> {
496 info!(
497 interval_secs = interval.as_secs(),
498 seed_peers = seed_peers.len(),
499 "starting DHT republish loop"
500 );
501 tokio::spawn(async move {
502 loop {
503 let _ = self
504 .dht_republish_once(transport.as_ref(), &seed_peers)
505 .await;
506 tokio::time::sleep(interval).await;
507 }
508 })
509 }
510
511 pub fn start_subscription_sync_loop(
512 self,
513 transport: Arc<dyn RequestTransport>,
514 seed_peers: Vec<PeerAddr>,
515 interval: Duration,
516 ) -> JoinHandle<()> {
517 info!(
518 interval_secs = interval.as_secs(),
519 seed_peers = seed_peers.len(),
520 "starting subscription sync loop"
521 );
522 tokio::spawn(async move {
523 loop {
524 let _ = self
525 .sync_subscriptions_over_dht(transport.as_ref(), &seed_peers)
526 .await;
527 tokio::time::sleep(interval).await;
528 }
529 })
530 }
531
532 pub fn start_tls_dht_service(
538 self,
539 bind_addr: SocketAddr,
540 local_signing_key: SigningKey,
541 capabilities: Capabilities,
542 tls_server: Arc<TlsServerHandle>,
543 ) -> JoinHandle<anyhow::Result<()>> {
544 tokio::spawn(async move {
545 let listener = TcpListener::bind(bind_addr).await?;
546 let mut nonce_tracker = crate::transport::NonceTracker::new();
547 loop {
548 let accepted = tls_accept_session(
549 &listener,
550 &tls_server,
551 &local_signing_key,
552 capabilities.clone(),
553 None,
554 Some(&mut nonce_tracker),
555 )
556 .await;
557 let Ok((stream, session, remote_addr)) = accepted else {
558 continue;
559 };
560 let remote_peer = PeerAddr {
561 ip: remote_addr.ip(),
562 port: remote_addr.port(),
563 transport: crate::peer::TransportProtocol::Tcp,
564 pubkey_hint: Some(session.remote_node_pubkey),
565 relay_via: None,
566 };
567 let node = self.clone();
568 tokio::spawn(async move {
569 let _ = node.serve_wire_stream(stream, Some(remote_peer)).await;
570 });
571 }
572 })
573 }
574
575 pub fn start_quic_dht_service(
581 self,
582 quic_server: QuicServerHandle,
583 local_signing_key: SigningKey,
584 capabilities: Capabilities,
585 ) -> JoinHandle<anyhow::Result<()>> {
586 tokio::spawn(async move {
587 let mut nonce_tracker = crate::transport::NonceTracker::new();
588 loop {
589 let accepted = quic_accept_bi_session(
590 &quic_server,
591 &local_signing_key,
592 capabilities.clone(),
593 None,
594 Some(&mut nonce_tracker),
595 )
596 .await;
597 let Ok((stream, session, remote_addr)) = accepted else {
598 continue;
599 };
600 let remote_peer = PeerAddr {
601 ip: remote_addr.ip(),
602 port: remote_addr.port(),
603 transport: crate::peer::TransportProtocol::Quic,
604 pubkey_hint: Some(session.remote_node_pubkey),
605 relay_via: None,
606 };
607 let node = self.clone();
608 tokio::spawn(async move {
609 let _ = node.serve_wire_stream(stream, Some(remote_peer)).await;
610 });
611 }
612 })
613 }
614
615 pub(super) async fn collect_seed_and_known_node_peers(
616 &self,
617 target_node_id: [u8; 20],
618 seed_peers: &[PeerAddr],
619 ) -> Vec<PeerAddr> {
620 let state = self.state.read().await;
621 let mut peers = seed_peers.to_vec();
622 let known = state
623 .dht
624 .find_node(NodeId(target_node_id), K)
625 .into_iter()
626 .map(|node| node.addr)
627 .collect::<Vec<_>>();
628 merge_peer_list(&mut peers, known);
629 peers
630 }
631
632 pub async fn upsert_community_member(
636 &self,
637 community_share_id: ShareId,
638 self_addr: PeerAddr,
639 ) -> anyhow::Result<()> {
640 let key = community_info_key(&community_share_id);
641 let now = now_unix_secs()?;
642 let mut state = self.state.write().await;
643 let mut cm: CommunityMembers = state
644 .dht
645 .find_value(key, now)
646 .and_then(|v| crate::cbor::from_slice(&v.value).ok())
647 .unwrap_or(CommunityMembers {
648 community_share_id: community_share_id.0,
649 members: vec![],
650 updated_at: now,
651 });
652 if !cm.members.contains(&self_addr) {
653 cm.members.push(self_addr);
654 }
655 cm.updated_at = now;
656 state
657 .dht
658 .store(key, crate::cbor::to_vec(&cm)?, DEFAULT_TTL_SECS, now)?;
659 Ok(())
660 }
661
662 pub async fn reannounce_community_memberships(
667 &self,
668 self_addr: PeerAddr,
669 ) -> anyhow::Result<usize> {
670 let community_ids: Vec<[u8; 32]> = {
671 let state = self.state.read().await;
672 state.communities.keys().copied().collect()
673 };
674 let mut count = 0usize;
675 for cid in community_ids {
676 if let Err(e) = self
677 .upsert_community_member(ShareId(cid), self_addr.clone())
678 .await
679 {
680 debug!(
681 community = %hex::encode(&cid[..8]),
682 error = %e,
683 "reannounce_community_memberships: failed"
684 );
685 } else {
686 count += 1;
687 }
688 }
689 if count > 0 {
690 debug!(count, "reannounce_community_memberships: updated");
691 }
692 Ok(count)
693 }
694}
695
696async fn merge_community_members_if_applicable(
700 key: [u8; 32],
701 value: Vec<u8>,
702 node: &NodeHandle,
703 now: u64,
704) -> Vec<u8> {
705 let Ok(incoming) = crate::cbor::from_slice::<CommunityMembers>(&value) else {
706 return value;
707 };
708 let expected = community_info_key(&ShareId(incoming.community_share_id));
709 if expected != key {
710 return value;
711 }
712 let existing = {
713 let mut state = node.state.write().await;
714 state
715 .dht
716 .find_value(key, now)
717 .and_then(|v| crate::cbor::from_slice::<CommunityMembers>(&v.value).ok())
718 };
719 let Some(mut merged) = existing else {
720 return value;
721 };
722 for member in incoming.members {
723 if !merged.members.contains(&member) {
724 merged.members.push(member);
725 }
726 }
727 merged.updated_at = merged.updated_at.max(incoming.updated_at);
728 crate::cbor::to_vec(&merged).unwrap_or(value)
729}