1use std::collections::{HashMap, HashSet};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use ed25519_dalek::VerifyingKey;
15use tokio::task::JoinHandle;
16use tracing::{debug, info, warn};
17
18use crate::{
19 content::ChunkedContent,
20 dht_keys::{content_provider_key, manifest_loc_key, share_head_key},
21 ids::{ContentId, ManifestId, ShareId},
22 manifest::{ManifestV1, ShareVisibility},
23 net_fetch::{
24 FetchPolicy, PeerConnector, ProgressCallback, RelayAwareTransport, RequestTransport,
25 download_swarm_over_network, download_swarm_to_file, fetch_chunk_hashes_with_retry,
26 fetch_manifest_with_retry,
27 },
28 peer::PeerAddr,
29 store::{PersistedPartialDownload, decrypt_secret, encrypt_secret},
30 transport::{read_envelope, write_envelope},
31 wire::{
32 ChunkData, CommunityPublicShareList, CommunityStatus, Envelope, FLAG_RESPONSE, FindNode,
33 FindNodeResult, FindValueResult, MsgType, PexOffer, Providers, PublicShareList,
34 RelayListResponse, RelayRegister, Store as WireStore, WirePayload,
35 },
36};
37
38use super::{
39 NodeHandle, SearchPage, SearchPageQuery, SearchQuery, SearchResult, SearchTrustFilter,
40 helpers::{
41 build_search_snippet, error_envelope, merge_peer_list, now_unix_secs, persist_state,
42 request_class, validate_dht_value_for_known_keyspaces,
43 },
44};
45
46impl NodeHandle {
47 pub async fn sync_subscriptions(&self) -> anyhow::Result<()> {
48 let mut indexed_manifests = Vec::new();
49 let store = {
50 let mut state = self.state.write().await;
51 let now = now_unix_secs()?;
52 let subscription_ids = state.subscriptions.keys().copied().collect::<Vec<_>>();
53
54 for share_id in subscription_ids {
55 let share_pubkey = state
56 .subscriptions
57 .get(&share_id)
58 .and_then(|sub| sub.share_pubkey);
59 let local_seq = state
60 .subscriptions
61 .get(&share_id)
62 .map(|sub| sub.latest_seq)
63 .unwrap_or_default();
64
65 let Some(head_val) = state
66 .dht
67 .find_value(share_head_key(&ShareId(share_id)), now)
68 else {
69 continue;
70 };
71
72 let head: crate::manifest::ShareHead = crate::cbor::from_slice(&head_val.value)?;
73 if let Some(pubkey) = share_pubkey {
74 head.verify_with_pubkey(pubkey)?;
75 }
76
77 if head.latest_seq <= local_seq {
78 continue;
79 }
80
81 let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id).cloned()
82 else {
83 continue;
84 };
85 manifest.verify()?;
86 if manifest.share_id != share_id {
87 anyhow::bail!("manifest share_id mismatch while syncing subscription");
88 }
89
90 for item in &manifest.items {
91 let content = ChunkedContent {
92 content_id: ContentId(item.content_id),
93 chunks: vec![],
94 chunk_count: item.chunk_count,
95 chunk_list_hash: item.chunk_list_hash,
96 };
97 state.content_catalog.insert(item.content_id, content);
98 }
99 state.search_index.index_manifest(&manifest);
100 indexed_manifests.push(manifest);
101
102 if let Some(sub) = state.subscriptions.get_mut(&share_id) {
103 sub.latest_seq = head.latest_seq;
104 sub.latest_manifest_id = Some(head.latest_manifest_id);
105 }
106 }
107 state.store.clone()
108 };
109 for manifest in &indexed_manifests {
111 store.index_manifest_for_search(manifest).await?;
112 }
113 {
114 let mut s = self.state.write().await;
115 s.dirty.subscriptions = true;
116 s.dirty.manifests = true;
117 }
118 persist_state(self).await
119 }
120
121 pub async fn sync_subscriptions_over_dht<T: RequestTransport + ?Sized>(
122 &self,
123 transport: &T,
124 seed_peers: &[PeerAddr],
125 ) -> anyhow::Result<()> {
126 const SYNC_CONCURRENCY: usize = 20;
130
131 let subscription_meta = {
132 let state = self.state.read().await;
133 state
134 .subscriptions
135 .iter()
136 .map(|(share_id, sub)| (*share_id, sub.share_pubkey, sub.latest_seq))
137 .collect::<Vec<_>>()
138 };
139
140 if subscription_meta.is_empty() {
141 debug!("sync: no subscriptions to sync");
142 return Ok(());
143 }
144
145 info!(
146 subscriptions = subscription_meta.len(),
147 seed_peers = seed_peers.len(),
148 "sync: starting subscription sync over DHT"
149 );
150
151 type FetchOk = ([u8; 32], [u8; 32], ManifestV1, u64); let mut fetch_results: Vec<Option<FetchOk>> = Vec::new();
155 for chunk in subscription_meta.chunks(SYNC_CONCURRENCY) {
156 let batch = futures_util::future::join_all(chunk.iter().map(
157 |&(share_id, share_pubkey, local_seq)| {
158 self.fetch_subscription_update_network(
159 transport,
160 share_id,
161 share_pubkey,
162 local_seq,
163 seed_peers,
164 )
165 },
166 ))
167 .await;
168 for result in batch {
169 fetch_results.push(result?);
170 }
171 }
172
173 let (phase2_store, indexed_manifests) = {
175 let mut indexed_manifests = Vec::new();
176 let mut state = self.state.write().await;
177 for result in fetch_results {
178 let Some((share_id, manifest_id, manifest, new_seq)) = result else {
179 continue;
180 };
181 state.manifest_cache.insert(manifest_id, manifest.clone());
182 for item in &manifest.items {
183 let content = ChunkedContent {
184 content_id: ContentId(item.content_id),
185 chunks: vec![],
186 chunk_count: item.chunk_count,
187 chunk_list_hash: item.chunk_list_hash,
188 };
189 state.content_catalog.insert(item.content_id, content);
190 }
191 state.search_index.index_manifest(&manifest);
192 indexed_manifests.push(manifest);
193 if let Some(sub) = state.subscriptions.get_mut(&share_id) {
194 sub.latest_seq = new_seq;
195 sub.latest_manifest_id = Some(manifest_id);
196 }
197 }
198 state.dirty.subscriptions = true;
199 state.dirty.manifests = true;
200 (state.store.clone(), indexed_manifests)
201 };
202 for manifest in &indexed_manifests {
204 phase2_store.index_manifest_for_search(manifest).await?;
205 }
206
207 persist_state(self).await
209 }
210
211 async fn fetch_subscription_update_network<T: RequestTransport + ?Sized>(
220 &self,
221 transport: &T,
222 share_id: [u8; 32],
223 share_pubkey: Option<[u8; 32]>,
224 local_seq: u64,
225 seed_peers: &[PeerAddr],
226 ) -> anyhow::Result<Option<([u8; 32], [u8; 32], ManifestV1, u64)>> {
227 let Some(head) = self
228 .dht_find_share_head_iterative(transport, ShareId(share_id), share_pubkey, seed_peers)
229 .await?
230 else {
231 debug!(
232 share_id = hex::encode(share_id),
233 "sync: no share head found in DHT"
234 );
235 return Ok(None);
236 };
237 if head.latest_seq <= local_seq {
238 debug!(
239 share_id = hex::encode(share_id),
240 remote_seq = head.latest_seq,
241 local_seq,
242 "sync: already up to date"
243 );
244 return Ok(None);
245 }
246
247 info!(
248 share_id = hex::encode(share_id),
249 remote_seq = head.latest_seq,
250 local_seq,
251 "sync: newer head found, fetching manifest"
252 );
253
254 let manifest_id = head.latest_manifest_id;
255
256 let cached_manifest = {
258 let state = self.state.read().await;
259 state.manifest_cache.get(&manifest_id).cloned()
260 };
261
262 let manifest = if let Some(cached) = cached_manifest {
263 cached
264 } else {
265 let dht_key = manifest_loc_key(&ManifestId(manifest_id));
269 let dht_manifest = self
270 .dht_find_value_iterative(transport, dht_key, seed_peers)
271 .await?
272 .and_then(|v| crate::cbor::from_slice::<ManifestV1>(&v.value).ok())
273 .filter(|m| {
274 m.manifest_id()
275 .map(|mid| mid.0 == manifest_id)
276 .unwrap_or(false)
277 });
278
279 if let Some(m) = dht_manifest {
280 info!(
281 manifest_id = hex::encode(manifest_id),
282 "sync: manifest found in DHT"
283 );
284 m
285 } else {
286 debug!(
287 manifest_id = hex::encode(manifest_id),
288 "sync: manifest not in DHT, trying direct peer fetch"
289 );
290 let mut target = [0u8; 20];
292 target.copy_from_slice(&manifest_id[..20]);
293 let mut peers = seed_peers.to_vec();
294 let discovered = self
295 .dht_find_node_iterative(transport, target, seed_peers)
296 .await?;
297 merge_peer_list(&mut peers, discovered);
298 match fetch_manifest_with_retry(
299 transport,
300 &peers,
301 manifest_id,
302 &FetchPolicy::default(),
303 )
304 .await
305 {
306 Ok(m) => {
307 info!(
308 manifest_id = hex::encode(manifest_id),
309 "sync: manifest fetched from peer"
310 );
311 m
312 }
313 Err(e) => {
314 warn!(
315 manifest_id = hex::encode(manifest_id),
316 err = %e,
317 "sync: manifest fetch failed from all peers"
318 );
319 return Ok(None);
320 }
321 }
322 }
323 };
324
325 manifest.verify()?;
326 if manifest.share_id != share_id {
327 anyhow::bail!("manifest share_id mismatch while syncing subscription");
328 }
329
330 Ok(Some((share_id, manifest_id, manifest, head.latest_seq)))
331 }
332
333 pub async fn apply_blocklist_updates_from_subscriptions<T: RequestTransport + ?Sized>(
343 &self,
344 transport: &T,
345 seed_peers: &[PeerAddr],
346 ) -> anyhow::Result<usize> {
347 const BLOCKLIST_ITEM_NAME: &str = "blocklist";
349
350 type BlocklistCandidate = ([u8; 32], [u8; 32], u32, [u8; 32]);
354 let candidates: Vec<BlocklistCandidate> = {
355 let state = self.state.read().await;
356 state
357 .enabled_blocklist_shares
358 .iter()
359 .filter_map(|share_id| {
360 let sub = state.subscriptions.get(share_id)?;
361 let manifest_id = sub.latest_manifest_id?;
362 let manifest = state.manifest_cache.get(&manifest_id)?;
363 let item = manifest
364 .items
365 .iter()
366 .find(|i| i.name == BLOCKLIST_ITEM_NAME)?;
367 Some((
368 *share_id,
369 item.content_id,
370 item.chunk_count,
371 item.chunk_list_hash,
372 ))
373 })
374 .collect()
375 };
376
377 if candidates.is_empty() {
378 return Ok(0);
379 }
380
381 let policy = FetchPolicy::default();
383 let mut updated = 0usize;
384
385 for (share_id, content_id, chunk_count, chunk_list_hash) in candidates {
386 if chunk_count == 0 {
387 continue;
388 }
389
390 let chunk_hashes = match fetch_chunk_hashes_with_retry(
391 transport,
392 seed_peers,
393 content_id,
394 chunk_count,
395 chunk_list_hash,
396 &policy,
397 )
398 .await
399 {
400 Ok(h) => h,
401 Err(_) => continue,
402 };
403
404 let bytes = match download_swarm_over_network(
405 transport,
406 seed_peers,
407 content_id,
408 &chunk_hashes,
409 &policy,
410 None,
411 )
412 .await
413 {
414 Ok(b) => b,
415 Err(_) => continue,
416 };
417
418 let Ok(rules) = crate::cbor::from_slice::<super::BlocklistRules>(&bytes) else {
419 continue;
420 };
421
422 if self
423 .set_blocklist_rules(ShareId(share_id), rules)
424 .await
425 .is_ok()
426 {
427 updated += 1;
428 }
429 }
430
431 Ok(updated)
432 }
433
434 pub fn start_blocklist_auto_sync_loop(
441 self,
442 transport: Arc<dyn RequestTransport>,
443 seed_peers: Vec<PeerAddr>,
444 interval: Duration,
445 ) -> JoinHandle<()> {
446 tokio::spawn(async move {
447 loop {
448 let _ = self
449 .sync_subscriptions_over_dht(transport.as_ref(), &seed_peers)
450 .await;
451 let _ = self
452 .apply_blocklist_updates_from_subscriptions(transport.as_ref(), &seed_peers)
453 .await;
454 tokio::time::sleep(interval).await;
455 }
456 })
457 }
458
459 pub async fn search(&self, query: SearchQuery) -> anyhow::Result<Vec<SearchResult>> {
460 self.search_with_trust_filter(query, SearchTrustFilter::default())
461 .await
462 }
463
464 pub async fn search_with_trust_filter(
465 &self,
466 query: SearchQuery,
467 trust_filter: SearchTrustFilter,
468 ) -> anyhow::Result<Vec<SearchResult>> {
469 self.search_hits(&query.text, trust_filter, false).await
470 }
471
472 pub async fn search_page(&self, query: SearchPageQuery) -> anyhow::Result<SearchPage> {
473 self.search_page_with_trust_filter(query, SearchTrustFilter::default())
474 .await
475 }
476
477 pub async fn search_page_with_trust_filter(
478 &self,
479 query: SearchPageQuery,
480 trust_filter: SearchTrustFilter,
481 ) -> anyhow::Result<SearchPage> {
482 let query = query.normalized();
483 let hits = self
484 .search_hits(&query.text, trust_filter, query.include_snippets)
485 .await?;
486 let total = hits.len();
487 if query.offset >= total {
488 return Ok(SearchPage {
489 total,
490 results: vec![],
491 });
492 }
493 let results = hits
494 .into_iter()
495 .skip(query.offset)
496 .take(query.limit)
497 .collect();
498 Ok(SearchPage { total, results })
499 }
500
501 async fn search_hits(
502 &self,
503 query_text: &str,
504 trust_filter: SearchTrustFilter,
505 include_snippets: bool,
506 ) -> anyhow::Result<Vec<SearchResult>> {
507 let state = self.state.read().await;
508 let subscribed = state
509 .subscriptions
510 .iter()
511 .filter_map(|(share_id, sub)| {
512 if trust_filter.allows(sub.trust_level) {
513 Some(*share_id)
514 } else {
515 None
516 }
517 })
518 .collect::<HashSet<_>>();
519 let mut blocked_shares = HashSet::<[u8; 32]>::new();
520 let mut blocked_content_ids = HashSet::<[u8; 32]>::new();
521 for share_id in &state.enabled_blocklist_shares {
522 if !state.subscriptions.contains_key(share_id) {
523 continue;
524 }
525 let Some(rules) = state.blocklist_rules_by_share.get(share_id) else {
526 continue;
527 };
528 blocked_shares.extend(rules.blocked_share_ids.iter().copied());
529 blocked_content_ids.extend(rules.blocked_content_ids.iter().copied());
530 }
531 let hits = state
532 .search_index
533 .search(query_text, &subscribed, &state.share_weights)
534 .into_iter()
535 .filter(|(item, _)| {
536 !blocked_shares.contains(&item.share_id)
537 && !blocked_content_ids.contains(&item.content_id)
538 })
539 .map(|(item, score)| {
540 let snippet = build_search_snippet(&item, query_text, include_snippets);
541 SearchResult {
542 share_id: ShareId(item.share_id),
543 content_id: item.content_id,
544 name: item.name,
545 snippet,
546 score,
547 }
548 })
549 .collect::<Vec<_>>();
550 Ok(hits)
551 }
552
553 pub async fn begin_partial_download(
554 &self,
555 content_id: [u8; 32],
556 target_path: String,
557 total_chunks: u32,
558 ) -> anyhow::Result<()> {
559 {
560 let mut state = self.state.write().await;
561 state.partial_downloads.insert(
562 content_id,
563 PersistedPartialDownload {
564 content_id,
565 target_path,
566 total_chunks,
567 completed_chunks: vec![],
568 },
569 );
570 state.dirty.partial_downloads = true;
571 }
572 persist_state(self).await
573 }
574
575 pub async fn mark_partial_chunk_complete(
576 &self,
577 content_id: [u8; 32],
578 chunk_index: u32,
579 ) -> anyhow::Result<()> {
580 {
581 let mut state = self.state.write().await;
582 if let Some(partial) = state.partial_downloads.get_mut(&content_id)
583 && !partial.completed_chunks.contains(&chunk_index)
584 {
585 partial.completed_chunks.push(chunk_index);
586 }
587 state.dirty.partial_downloads = true;
588 }
589 persist_state(self).await
590 }
591
592 pub async fn clear_partial_download(&self, content_id: [u8; 32]) -> anyhow::Result<()> {
593 {
594 let mut state = self.state.write().await;
595 state.partial_downloads.remove(&content_id);
596 state.dirty.partial_downloads = true;
597 }
598 persist_state(self).await
599 }
600
601 pub async fn set_encrypted_node_key(
602 &self,
603 key_material: &[u8],
604 passphrase: &str,
605 ) -> anyhow::Result<()> {
606 {
607 let mut state = self.state.write().await;
608 state.encrypted_node_key = Some(encrypt_secret(key_material, passphrase)?);
609 state.dirty.node_key = true;
610 }
611 persist_state(self).await
612 }
613
614 pub async fn decrypt_node_key(&self, passphrase: &str) -> anyhow::Result<Option<Vec<u8>>> {
615 let state = self.state.read().await;
616 let Some(encrypted) = state.encrypted_node_key.as_ref() else {
617 return Ok(None);
618 };
619 Ok(Some(decrypt_secret(encrypted, passphrase)?))
620 }
621
622 pub async fn encrypt_publisher_identities(&self, passphrase: &str) -> anyhow::Result<()> {
627 {
628 let mut state = self.state.write().await;
629 let mut encrypted_map = HashMap::new();
630 for (label, secret) in &state.publisher_identities {
631 encrypted_map.insert(label.clone(), encrypt_secret(secret.as_ref(), passphrase)?);
632 }
633 state.encrypted_publisher_secrets = encrypted_map;
634 state.dirty.publisher_identities = true;
635 }
636 persist_state(self).await
637 }
638
639 pub async fn unlock_publisher_identities(&self, passphrase: &str) -> anyhow::Result<usize> {
644 let persisted = {
645 let state = self.state.read().await;
646 state.store.load_state().await?
647 };
648 let mut decrypted_count = 0usize;
649 let mut state = self.state.write().await;
650 for identity in &persisted.publisher_identities {
651 if state.publisher_identities.contains_key(&identity.label) {
652 continue; }
654 if let Some(encrypted) = &identity.encrypted_share_secret {
655 let plaintext = decrypt_secret(encrypted, passphrase)?;
656 if plaintext.len() != 32 {
657 anyhow::bail!(
658 "decrypted publisher secret for '{}' has wrong length",
659 identity.label
660 );
661 }
662 let mut secret = [0u8; 32];
663 secret.copy_from_slice(&plaintext);
664 state
665 .publisher_identities
666 .insert(identity.label.clone(), secret);
667 decrypted_count += 1;
668 }
669 }
670 Ok(decrypted_count)
671 }
672
673 pub async fn fetch_manifest_from_peers<C: PeerConnector>(
674 &self,
675 connector: &C,
676 peers: &[PeerAddr],
677 manifest_id: [u8; 32],
678 policy: &FetchPolicy,
679 ) -> anyhow::Result<ManifestV1> {
680 let manifest = fetch_manifest_with_retry(connector, peers, manifest_id, policy).await?;
681 manifest.verify()?;
682 let store = {
683 let mut state = self.state.write().await;
684 state.manifest_cache.insert(manifest_id, manifest.clone());
685 state.search_index.index_manifest(&manifest);
686 state.dirty.manifests = true;
687 state.store.clone()
688 };
689 store.index_manifest_for_search(&manifest).await?;
691 persist_state(self).await?;
692 Ok(manifest)
693 }
694
695 #[allow(clippy::too_many_arguments)]
703 pub async fn download_from_peers<C: PeerConnector>(
704 &self,
705 connector: &C,
706 peers: &[PeerAddr],
707 content_id: [u8; 32],
708 target_path: &str,
709 policy: &FetchPolicy,
710 self_addr: Option<PeerAddr>,
711 on_progress: Option<&ProgressCallback>,
712 ) -> anyhow::Result<()> {
713 let content = {
714 let mut state = self.state.write().await;
715 let content = state
716 .content_catalog
717 .get(&content_id)
718 .cloned()
719 .ok_or_else(|| anyhow::anyhow!("unknown content metadata"))?;
720 state.partial_downloads.insert(
721 content_id,
722 PersistedPartialDownload {
723 content_id,
724 target_path: target_path.to_owned(),
725 total_chunks: content.chunk_count,
726 completed_chunks: vec![],
727 },
728 );
729 state.dirty.partial_downloads = true;
730 content
731 };
732 persist_state(self).await?;
733
734 let transport = RelayAwareTransport::new(connector);
742 let provider_key = content_provider_key(&content_id);
743 let _ = self
744 .dht_find_value_iterative(&transport, provider_key, peers)
745 .await;
746
747 let mut all_peers = peers.to_vec();
748 {
749 let mut state = self.state.write().await;
750 let now = now_unix_secs()?;
751 if let Some(val) = state.dht.find_value(provider_key, now)
752 && let Ok(providers) = crate::cbor::from_slice::<Providers>(&val.value)
753 {
754 for p in providers.providers {
755 if !all_peers.contains(&p) {
756 all_peers.push(p);
757 }
758 }
759 }
760 }
761
762 if let Some(ref me) = self_addr {
764 all_peers.retain(|p| p != me);
765 }
766 if all_peers.is_empty() {
767 anyhow::bail!(
768 "no remote peers available for content download (only local provider found)"
769 );
770 }
771
772 let chunk_hashes = if content.chunks.is_empty() && content.chunk_count > 0 {
775 fetch_chunk_hashes_with_retry(
776 &transport,
777 &all_peers,
778 content_id,
779 content.chunk_count,
780 content.chunk_list_hash,
781 policy,
782 )
783 .await?
784 } else {
785 content.chunks.clone()
786 };
787
788 let target_pb = PathBuf::from(target_path);
789
790 let seeded_policy;
793 let effective_policy: &FetchPolicy = if policy.initial_reputations.is_empty() {
794 let reps = {
795 let state = self.state.read().await;
796 state.peer_db.reputation_for_peers(&all_peers)
797 };
798 seeded_policy = FetchPolicy {
799 initial_reputations: reps,
800 ..policy.clone()
801 };
802 &seeded_policy
803 } else {
804 policy
805 };
806
807 download_swarm_to_file(
808 &transport,
809 &all_peers,
810 content_id,
811 &chunk_hashes,
812 effective_policy,
813 &target_pb,
814 on_progress,
815 )
816 .await?;
817
818 if let Some(addr) = self_addr {
820 let bytes = std::fs::read(&target_pb)?;
821 self.register_content_by_path(addr, &bytes, target_pb)
822 .await?;
823 }
824
825 {
826 let mut state = self.state.write().await;
827 state.partial_downloads.remove(&content_id);
828 state.dirty.partial_downloads = true;
829 state.dirty.content_paths = true;
830 }
831 persist_state(self).await
832 }
833
834 pub(super) async fn serve_wire_stream<S>(
835 &self,
836 mut stream: S,
837 remote_peer: Option<PeerAddr>,
838 ) -> anyhow::Result<()>
839 where
840 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
841 {
842 loop {
843 let incoming = read_envelope(&mut stream).await?;
844
845 let tunnel_request = if incoming.r#type == MsgType::RelayRegister as u16 {
848 incoming.decode_typed().ok().and_then(|wp| match wp {
849 WirePayload::RelayRegister(ref reg) if reg.tunnel => Some(reg.clone()),
850 _ => None,
851 })
852 } else {
853 None
854 };
855
856 let response = self
857 .handle_incoming_envelope(incoming, remote_peer.as_ref())
858 .await;
859 if let Some(envelope) = &response {
860 write_envelope(&mut stream, envelope).await?;
861 }
862
863 if let Some(_reg) = tunnel_request
866 && let Some(resp) = response
867 && resp.flags & FLAG_RESPONSE != 0
868 && resp.r#type == MsgType::RelayRegistered as u16
869 {
870 let registered: crate::wire::RelayRegistered =
871 crate::cbor::from_slice(&resp.payload)?;
872 return self
873 .run_relay_bridge(stream, registered.relay_slot_id, remote_peer)
874 .await;
875 }
876 }
877 }
878
879 async fn run_relay_bridge<S>(
887 &self,
888 mut stream: S,
889 slot_id: u64,
890 _remote_peer: Option<PeerAddr>,
891 ) -> anyhow::Result<()>
892 where
893 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
894 {
895 let mut rx = self.relay_tunnels.register(slot_id, 32).await;
896
897 let result = async {
898 while let Some((request_envelope, response_tx)) = rx.recv().await {
899 write_envelope(&mut stream, &request_envelope).await?;
900 let response = read_envelope(&mut stream).await?;
901 let _ = response_tx.send(response);
903 }
904 Ok::<(), anyhow::Error>(())
905 }
906 .await;
907
908 self.relay_tunnels.remove(slot_id).await;
909 result
910 }
911
912 pub(super) async fn handle_incoming_envelope(
913 &self,
914 envelope: Envelope,
915 remote_peer: Option<&PeerAddr>,
916 ) -> Option<Envelope> {
917 let req_id = envelope.req_id;
918 let req_type = envelope.r#type;
919 let typed = match envelope.decode_typed() {
920 Ok(payload) => payload,
921 Err(err) => {
922 return Some(error_envelope(req_type, req_id, &err.to_string()));
923 }
924 };
925 if let Some(peer) = remote_peer {
926 let now = now_unix_secs().unwrap_or(0);
927 let mut state = self.state.write().await;
928 if let Err(err) = state.enforce_request_limits(peer, request_class(&typed), now) {
929 return Some(error_envelope(req_type, req_id, &err.to_string()));
930 }
931 }
932 let result = match typed {
933 WirePayload::FindNode(msg) => self
934 .dht_find_node(msg)
935 .await
936 .and_then(|peers| {
937 crate::cbor::to_vec(&FindNodeResult { peers }).map_err(Into::into)
938 })
939 .map(|payload| Envelope {
940 r#type: MsgType::FindNode as u16,
941 req_id,
942 flags: FLAG_RESPONSE,
943 payload,
944 }),
945 WirePayload::FindValue(msg) => {
946 let target = {
947 let mut t = [0u8; 20];
948 t.copy_from_slice(&msg.key[..20]);
949 t
950 };
951 let closer_peers = match self
952 .dht_find_node(FindNode {
953 target_node_id: target,
954 })
955 .await
956 {
957 Ok(peers) => peers,
958 Err(err) => return Some(error_envelope(req_type, req_id, &err.to_string())),
959 };
960 self.dht_find_value(msg.key)
961 .await
962 .map(|value| {
963 let now = now_unix_secs().unwrap_or(0);
964 let wire_value = value.and_then(|v| {
965 if validate_dht_value_for_known_keyspaces(v.key, &v.value).is_err() {
966 return None;
967 }
968 Some(WireStore {
969 key: v.key,
970 value: v.value,
971 ttl_secs: v.expires_at_unix.saturating_sub(now).max(1),
972 })
973 });
974 FindValueResult {
975 value: wire_value,
976 closer_peers,
977 }
978 })
979 .and_then(|result| crate::cbor::to_vec(&result).map_err(Into::into))
980 .map(|payload| Envelope {
981 r#type: MsgType::FindValue as u16,
982 req_id,
983 flags: FLAG_RESPONSE,
984 payload,
985 })
986 }
987 WirePayload::Store(msg) => self.dht_store(msg).await.map(|_| Envelope {
988 r#type: MsgType::Store as u16,
989 req_id,
990 flags: FLAG_RESPONSE,
991 payload: vec![],
992 }),
993 WirePayload::GetManifest(msg) => self
994 .manifest_bytes(msg.manifest_id)
995 .await
996 .and_then(|maybe| {
997 maybe
998 .ok_or_else(|| anyhow::anyhow!("manifest not found"))
999 .and_then(|bytes| {
1000 crate::cbor::to_vec(&crate::wire::ManifestData {
1001 manifest_id: msg.manifest_id,
1002 bytes,
1003 })
1004 .map_err(Into::into)
1005 })
1006 })
1007 .map(|payload| Envelope {
1008 r#type: MsgType::ManifestData as u16,
1009 req_id,
1010 flags: FLAG_RESPONSE,
1011 payload,
1012 }),
1013 WirePayload::ListPublicShares(msg) => self
1014 .list_local_public_shares(msg.max_entries as usize)
1015 .await
1016 .and_then(|shares| {
1017 crate::cbor::to_vec(&PublicShareList { shares }).map_err(Into::into)
1018 })
1019 .map(|payload| Envelope {
1020 r#type: MsgType::PublicShareList as u16,
1021 req_id,
1022 flags: FLAG_RESPONSE,
1023 payload,
1024 }),
1025 WirePayload::GetCommunityStatus(msg) => {
1026 let (joined, proof, name) =
1027 match VerifyingKey::from_bytes(&msg.community_share_pubkey) {
1028 Ok(pubkey) if ShareId::from_pubkey(&pubkey).0 == msg.community_share_id => {
1029 let state = self.state.read().await;
1030 match state.communities.get(&msg.community_share_id) {
1031 Some(membership) => {
1032 let proof = membership
1033 .token
1034 .as_ref()
1035 .and_then(|t| crate::cbor::to_vec(t).ok());
1036 (true, proof, membership.name.clone())
1037 }
1038 None => (false, None, None),
1039 }
1040 }
1041 _ => {
1042 return Some(error_envelope(
1043 req_type,
1044 req_id,
1045 "community share_id does not match share_pubkey",
1046 ));
1047 }
1048 };
1049 crate::cbor::to_vec(&CommunityStatus {
1050 community_share_id: msg.community_share_id,
1051 joined,
1052 membership_proof: proof,
1053 name,
1054 })
1055 .map_err(Into::into)
1056 .map(|payload| Envelope {
1057 r#type: MsgType::CommunityStatus as u16,
1058 req_id,
1059 flags: FLAG_RESPONSE,
1060 payload,
1061 })
1062 }
1063 WirePayload::ListCommunityPublicShares(msg) => self
1064 .list_local_community_public_shares(
1065 ShareId(msg.community_share_id),
1066 msg.community_share_pubkey,
1067 msg.max_entries as usize,
1068 msg.requester_node_pubkey,
1069 msg.requester_membership_proof.as_deref(),
1070 )
1071 .await
1072 .and_then(|shares| {
1073 crate::cbor::to_vec(&CommunityPublicShareList {
1074 community_share_id: msg.community_share_id,
1075 shares,
1076 })
1077 .map_err(Into::into)
1078 })
1079 .map(|payload| Envelope {
1080 r#type: MsgType::CommunityPublicShareList as u16,
1081 req_id,
1082 flags: FLAG_RESPONSE,
1083 payload,
1084 }),
1085 WirePayload::GetChunk(msg) => self
1086 .chunk_bytes(msg.content_id, msg.chunk_index)
1087 .await
1088 .and_then(|maybe| {
1089 maybe
1090 .ok_or_else(|| anyhow::anyhow!("chunk not found"))
1091 .and_then(|bytes| {
1092 crate::cbor::to_vec(&ChunkData {
1093 content_id: msg.content_id,
1094 chunk_index: msg.chunk_index,
1095 bytes,
1096 })
1097 .map_err(Into::into)
1098 })
1099 })
1100 .map(|payload| Envelope {
1101 r#type: MsgType::ChunkData as u16,
1102 req_id,
1103 flags: FLAG_RESPONSE,
1104 payload,
1105 }),
1106 WirePayload::GetChunkHashes(msg) => self
1107 .chunk_hash_list(msg.content_id)
1108 .await
1109 .and_then(|maybe| {
1110 maybe
1111 .ok_or_else(|| anyhow::anyhow!("chunk hashes not found"))
1112 .and_then(|hashes| {
1113 crate::cbor::to_vec(&crate::wire::ChunkHashList {
1114 content_id: msg.content_id,
1115 hashes,
1116 })
1117 .map_err(Into::into)
1118 })
1119 })
1120 .map(|payload| Envelope {
1121 r#type: MsgType::ChunkHashList as u16,
1122 req_id,
1123 flags: FLAG_RESPONSE,
1124 payload,
1125 }),
1126 WirePayload::RelayRegister(RelayRegister {
1127 relay_slot_id,
1128 tunnel: _,
1129 }) => {
1130 let Some(peer) = remote_peer else {
1131 return Some(error_envelope(
1132 req_type,
1133 req_id,
1134 "missing remote peer identity",
1135 ));
1136 };
1137 self.relay_register_with_slot(peer.clone(), relay_slot_id)
1138 .await
1139 .and_then(|registered| crate::cbor::to_vec(®istered).map_err(Into::into))
1140 .map(|payload| Envelope {
1141 r#type: MsgType::RelayRegistered as u16,
1142 req_id,
1143 flags: FLAG_RESPONSE,
1144 payload,
1145 })
1146 }
1147 WirePayload::RelayConnect(msg) => {
1148 let Some(peer) = remote_peer else {
1149 return Some(error_envelope(
1150 req_type,
1151 req_id,
1152 "missing remote peer identity",
1153 ));
1154 };
1155 self.relay_connect(peer.clone(), msg)
1156 .await
1157 .map(|_| Envelope {
1158 r#type: MsgType::RelayConnect as u16,
1159 req_id,
1160 flags: FLAG_RESPONSE,
1161 payload: vec![],
1162 })
1163 }
1164 WirePayload::RelayStream(msg) => {
1165 let Some(peer) = remote_peer else {
1166 return Some(error_envelope(
1167 req_type,
1168 req_id,
1169 "missing remote peer identity",
1170 ));
1171 };
1172
1173 if self.relay_tunnels.has_tunnel(msg.relay_slot_id).await {
1177 let inner_envelope = match Envelope::decode(&msg.payload) {
1178 Ok(env) => env,
1179 Err(err) => {
1180 return Some(error_envelope(
1181 req_type,
1182 req_id,
1183 &format!("relay tunnel: bad inner envelope: {err}"),
1184 ));
1185 }
1186 };
1187 let timeout = std::time::Duration::from_secs(30);
1188 match self
1189 .relay_tunnels
1190 .forward(msg.relay_slot_id, inner_envelope, timeout)
1191 .await
1192 {
1193 Ok(inner_response) => {
1194 let response_payload = match inner_response.encode() {
1195 Ok(bytes) => bytes,
1196 Err(err) => {
1197 return Some(error_envelope(
1198 req_type,
1199 req_id,
1200 &format!("relay tunnel: encode response: {err}"),
1201 ));
1202 }
1203 };
1204 let relay_stream_response = crate::wire::RelayStream {
1205 relay_slot_id: msg.relay_slot_id,
1206 stream_id: msg.stream_id,
1207 kind: msg.kind,
1208 payload: response_payload,
1209 };
1210 crate::cbor::to_vec(&relay_stream_response)
1211 .map_err(|e| anyhow::anyhow!(e))
1212 .map(|payload| Envelope {
1213 r#type: MsgType::RelayStream as u16,
1214 req_id,
1215 flags: FLAG_RESPONSE,
1216 payload,
1217 })
1218 }
1219 Err(err) => Err(err),
1220 }
1221 } else {
1222 self.relay_stream(peer.clone(), msg)
1224 .await
1225 .and_then(|relayed| crate::cbor::to_vec(&relayed).map_err(Into::into))
1226 .map(|payload| Envelope {
1227 r#type: MsgType::RelayStream as u16,
1228 req_id,
1229 flags: FLAG_RESPONSE,
1230 payload,
1231 })
1232 }
1233 }
1234 WirePayload::PexOffer(msg) => {
1236 let now = now_unix_secs().unwrap_or(0);
1237 let mut state = self.state.write().await;
1238 for peer in &msg.peers {
1239 state.peer_db.upsert_seen(peer.clone(), now);
1240 }
1241 state.dirty.peers = true;
1242 Ok(Envelope {
1244 r#type: MsgType::PexOffer as u16,
1245 req_id,
1246 flags: FLAG_RESPONSE,
1247 payload: vec![],
1248 })
1249 }
1250 WirePayload::PexRequest(msg) => {
1252 let now = now_unix_secs().unwrap_or(0);
1253 let state = self.state.read().await;
1254 let peers = state.peer_db.sample_fresh(now, msg.max_peers as usize);
1255 crate::cbor::to_vec(&PexOffer { peers })
1256 .map_err(Into::into)
1257 .map(|payload| Envelope {
1258 r#type: MsgType::PexOffer as u16,
1259 req_id,
1260 flags: FLAG_RESPONSE,
1261 payload,
1262 })
1263 }
1264 WirePayload::HaveContent(_msg) => {
1266 Ok(Envelope {
1269 r#type: MsgType::HaveContent as u16,
1270 req_id,
1271 flags: FLAG_RESPONSE,
1272 payload: vec![],
1273 })
1274 }
1275 WirePayload::RelayListRequest(msg) => {
1277 let state = self.state.read().await;
1278 let announcements = state
1279 .relay
1280 .known_announcements()
1281 .into_iter()
1282 .take(msg.max_count as usize)
1283 .collect();
1284 crate::cbor::to_vec(&RelayListResponse { announcements })
1285 .map_err(Into::into)
1286 .map(|payload| Envelope {
1287 r#type: MsgType::RelayListResponse as u16,
1288 req_id,
1289 flags: FLAG_RESPONSE,
1290 payload,
1291 })
1292 }
1293 _ => Err(anyhow::anyhow!("unsupported message type")),
1294 };
1295 Some(match result {
1296 Ok(ok) => ok,
1297 Err(err) => error_envelope(req_type, req_id, &err.to_string()),
1298 })
1299 }
1300
1301 async fn manifest_bytes(&self, manifest_id: [u8; 32]) -> anyhow::Result<Option<Vec<u8>>> {
1302 let state = self.state.read().await;
1303 Ok(state
1304 .manifest_cache
1305 .get(&manifest_id)
1306 .map(crate::cbor::to_vec)
1307 .transpose()?)
1308 }
1309
1310 async fn chunk_bytes(
1311 &self,
1312 content_id: [u8; 32],
1313 chunk_index: u32,
1314 ) -> anyhow::Result<Option<Vec<u8>>> {
1315 let state = self.state.read().await;
1316 if let Some(path) = state.content_paths.get(&content_id) {
1317 return crate::blob_store::read_chunk_from_path(path, chunk_index);
1318 }
1319 Ok(None)
1320 }
1321
1322 async fn chunk_hash_list(&self, content_id: [u8; 32]) -> anyhow::Result<Option<Vec<[u8; 32]>>> {
1328 {
1330 let state = self.state.read().await;
1331 if let Some(c) = state.content_catalog.get(&content_id)
1332 && !c.chunks.is_empty()
1333 {
1334 return Ok(Some(c.chunks.clone()));
1335 }
1336 }
1337
1338 let mut state = self.state.write().await;
1340 if let Some(c) = state.content_catalog.get(&content_id)
1342 && !c.chunks.is_empty()
1343 {
1344 return Ok(Some(c.chunks.clone()));
1345 }
1346 let path = match state.content_paths.get(&content_id) {
1347 Some(p) => p.clone(),
1348 None => return Ok(None),
1349 };
1350 let bytes = match std::fs::read(&path) {
1351 Ok(b) => b,
1352 Err(_) => return Ok(None),
1353 };
1354 let chunks = crate::content::chunk_hashes(&bytes);
1355 if let Some(entry) = state.content_catalog.get_mut(&content_id) {
1356 entry.chunks = chunks.clone();
1357 }
1358 Ok(Some(chunks))
1359 }
1360
1361 pub async fn reannounce_seeded_content(&self, self_addr: PeerAddr) -> anyhow::Result<usize> {
1368 let content_ids: Vec<[u8; 32]> = {
1369 let state = self.state.read().await;
1370 state
1371 .content_catalog
1372 .keys()
1373 .filter(|id| {
1374 state.content_paths.get(*id).is_some_and(|p| p.exists())
1376 })
1377 .copied()
1378 .collect()
1379 };
1380
1381 let mut announced = 0usize;
1382 for content_id in &content_ids {
1383 let now = now_unix_secs()?;
1384 let mut state = self.state.write().await;
1385
1386 let mut providers: Providers = state
1387 .dht
1388 .find_value(content_provider_key(content_id), now)
1389 .and_then(|v| crate::cbor::from_slice(&v.value).ok())
1390 .unwrap_or(Providers {
1391 content_id: *content_id,
1392 providers: vec![],
1393 updated_at: now,
1394 });
1395
1396 if !providers.providers.contains(&self_addr) {
1397 providers.providers.push(self_addr.clone());
1398 }
1399 providers.updated_at = now;
1400
1401 state.dht.store(
1402 content_provider_key(content_id),
1403 crate::cbor::to_vec(&providers)?,
1404 crate::dht::DEFAULT_TTL_SECS,
1405 now,
1406 )?;
1407 announced += 1;
1408 }
1409
1410 if announced > 0 {
1411 persist_state(self).await?;
1412 }
1413 Ok(announced)
1414 }
1415
1416 pub async fn reannounce_subscribed_share_heads(&self) -> anyhow::Result<usize> {
1426 let now = now_unix_secs()?;
1427 let mut state = self.state.write().await;
1428 let mut refreshed = 0usize;
1429
1430 let share_ids: Vec<[u8; 32]> = state.subscriptions.keys().copied().collect();
1431 for share_id in share_ids {
1432 let sub = match state.subscriptions.get(&share_id) {
1433 Some(s) => s,
1434 None => continue,
1435 };
1436 let manifest_id = match sub.latest_manifest_id {
1438 Some(id) => id,
1439 None => continue,
1440 };
1441 let manifest = match state.manifest_cache.get(&manifest_id) {
1442 Some(m) => m,
1443 None => continue,
1444 };
1445 if manifest.visibility != ShareVisibility::Public {
1446 continue;
1447 }
1448
1449 let key = share_head_key(&ShareId(share_id));
1451 let encoded = match state.dht.find_value(key, now) {
1452 Some(val) => val.value,
1453 None => {
1454 match state.published_share_heads.get(&share_id) {
1457 Some(head) => match crate::cbor::to_vec(head) {
1458 Ok(enc) => enc,
1459 Err(_) => continue,
1460 },
1461 None => continue,
1462 }
1463 }
1464 };
1465
1466 state
1467 .dht
1468 .store(key, encoded, crate::dht::DEFAULT_TTL_SECS, now)?;
1469 refreshed += 1;
1470 }
1471
1472 if refreshed > 0 {
1473 drop(state);
1474 persist_state(self).await?;
1475 }
1476 Ok(refreshed)
1477 }
1478
1479 pub async fn reannounce_published_share_data(&self) -> anyhow::Result<usize> {
1485 let now = super::helpers::now_unix_secs()?;
1486 let mut state = self.state.write().await;
1487 let mut refreshed = 0usize;
1488
1489 let heads: Vec<([u8; 32], crate::manifest::ShareHead)> = state
1491 .published_share_heads
1492 .iter()
1493 .map(|(k, v)| (*k, v.clone()))
1494 .collect();
1495
1496 for (share_id, head) in heads {
1497 let head_key = share_head_key(&ShareId(share_id));
1499 let head_bytes = match crate::cbor::to_vec(&head) {
1500 Ok(b) => b,
1501 Err(_) => continue,
1502 };
1503 state
1504 .dht
1505 .store(head_key, head_bytes, crate::dht::DEFAULT_TTL_SECS, now)?;
1506 refreshed += 1;
1507
1508 let manifest_id = head.latest_manifest_id;
1511 if let Some(manifest) = state.manifest_cache.get(&manifest_id) {
1512 let manifest_bytes = match crate::cbor::to_vec(manifest) {
1513 Ok(b) => b,
1514 Err(_) => continue,
1515 };
1516 state.dht.store(
1517 manifest_loc_key(&ManifestId(manifest_id)),
1518 manifest_bytes,
1519 crate::dht::DEFAULT_TTL_SECS,
1520 now,
1521 )?;
1522 refreshed += 1;
1523 }
1524 }
1525
1526 if refreshed > 0 {
1527 debug!(
1528 refreshed,
1529 "reannounce_published_share_data: DHT entries restored"
1530 );
1531 }
1532
1533 Ok(refreshed)
1534 }
1535}