Skip to main content

scp2p_core/api/
node_net.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7//! Network / sync / download / wire-serving operations on `NodeHandle`.
8
9use std::collections::{HashMap, HashSet};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use ed25519_dalek::VerifyingKey;
15use tokio::task::JoinHandle;
16use tracing::{debug, info, warn};
17
18use crate::{
19    content::ChunkedContent,
20    dht_keys::{content_provider_key, manifest_loc_key, share_head_key},
21    ids::{ContentId, ManifestId, ShareId},
22    manifest::{ManifestV1, ShareVisibility},
23    net_fetch::{
24        FetchPolicy, PeerConnector, ProgressCallback, RelayAwareTransport, RequestTransport,
25        download_swarm_over_network, download_swarm_to_file, fetch_chunk_hashes_with_retry,
26        fetch_manifest_with_retry,
27    },
28    peer::PeerAddr,
29    store::{PersistedPartialDownload, decrypt_secret, encrypt_secret},
30    transport::{read_envelope, write_envelope},
31    wire::{
32        ChunkData, CommunityPublicShareList, CommunityStatus, Envelope, FLAG_RESPONSE, FindNode,
33        FindNodeResult, FindValueResult, MsgType, PexOffer, Providers, PublicShareList,
34        RelayListResponse, RelayRegister, Store as WireStore, WirePayload,
35    },
36};
37
38use super::{
39    NodeHandle, SearchPage, SearchPageQuery, SearchQuery, SearchResult, SearchTrustFilter,
40    helpers::{
41        build_search_snippet, error_envelope, merge_peer_list, now_unix_secs, persist_state,
42        request_class, validate_dht_value_for_known_keyspaces,
43    },
44};
45
46impl NodeHandle {
47    pub async fn sync_subscriptions(&self) -> anyhow::Result<()> {
48        let mut indexed_manifests = Vec::new();
49        let store = {
50            let mut state = self.state.write().await;
51            let now = now_unix_secs()?;
52            let subscription_ids = state.subscriptions.keys().copied().collect::<Vec<_>>();
53
54            for share_id in subscription_ids {
55                let share_pubkey = state
56                    .subscriptions
57                    .get(&share_id)
58                    .and_then(|sub| sub.share_pubkey);
59                let local_seq = state
60                    .subscriptions
61                    .get(&share_id)
62                    .map(|sub| sub.latest_seq)
63                    .unwrap_or_default();
64
65                let Some(head_val) = state
66                    .dht
67                    .find_value(share_head_key(&ShareId(share_id)), now)
68                else {
69                    continue;
70                };
71
72                let head: crate::manifest::ShareHead = crate::cbor::from_slice(&head_val.value)?;
73                if let Some(pubkey) = share_pubkey {
74                    head.verify_with_pubkey(pubkey)?;
75                }
76
77                if head.latest_seq <= local_seq {
78                    continue;
79                }
80
81                let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id).cloned()
82                else {
83                    continue;
84                };
85                manifest.verify()?;
86                if manifest.share_id != share_id {
87                    anyhow::bail!("manifest share_id mismatch while syncing subscription");
88                }
89
90                for item in &manifest.items {
91                    let content = ChunkedContent {
92                        content_id: ContentId(item.content_id),
93                        chunks: vec![],
94                        chunk_count: item.chunk_count,
95                        chunk_list_hash: item.chunk_list_hash,
96                    };
97                    state.content_catalog.insert(item.content_id, content);
98                }
99                state.search_index.index_manifest(&manifest);
100                indexed_manifests.push(manifest);
101
102                if let Some(sub) = state.subscriptions.get_mut(&share_id) {
103                    sub.latest_seq = head.latest_seq;
104                    sub.latest_manifest_id = Some(head.latest_manifest_id);
105                }
106            }
107            state.store.clone()
108        };
109        // FTS5 write-through: persist search index items outside the state lock.
110        for manifest in &indexed_manifests {
111            store.index_manifest_for_search(manifest).await?;
112        }
113        {
114            let mut s = self.state.write().await;
115            s.dirty.subscriptions = true;
116            s.dirty.manifests = true;
117        }
118        persist_state(self).await
119    }
120
121    pub async fn sync_subscriptions_over_dht<T: RequestTransport + ?Sized>(
122        &self,
123        transport: &T,
124        seed_peers: &[PeerAddr],
125    ) -> anyhow::Result<()> {
126        /// Number of subscription fetches executed concurrently during the
127        /// network phase.  Bounded to avoid overwhelming the transport or the
128        /// remote peer's rate limits.
129        const SYNC_CONCURRENCY: usize = 20;
130
131        let subscription_meta = {
132            let state = self.state.read().await;
133            state
134                .subscriptions
135                .iter()
136                .map(|(share_id, sub)| (*share_id, sub.share_pubkey, sub.latest_seq))
137                .collect::<Vec<_>>()
138        };
139
140        if subscription_meta.is_empty() {
141            debug!("sync: no subscriptions to sync");
142            return Ok(());
143        }
144
145        info!(
146            subscriptions = subscription_meta.len(),
147            seed_peers = seed_peers.len(),
148            "sync: starting subscription sync over DHT"
149        );
150
151        // Phase 1: concurrent network — DHT head lookups + manifest fetches
152        // processed in batches of SYNC_CONCURRENCY.  No state mutations here.
153        type FetchOk = ([u8; 32], [u8; 32], ManifestV1, u64); // (share_id, manifest_id, manifest, new_seq)
154        let mut fetch_results: Vec<Option<FetchOk>> = Vec::new();
155        for chunk in subscription_meta.chunks(SYNC_CONCURRENCY) {
156            let batch = futures_util::future::join_all(chunk.iter().map(
157                |&(share_id, share_pubkey, local_seq)| {
158                    self.fetch_subscription_update_network(
159                        transport,
160                        share_id,
161                        share_pubkey,
162                        local_seq,
163                        seed_peers,
164                    )
165                },
166            ))
167            .await;
168            for result in batch {
169                fetch_results.push(result?);
170            }
171        }
172
173        // Phase 2: apply all fetched manifests to state under a single write lock.
174        let (phase2_store, indexed_manifests) = {
175            let mut indexed_manifests = Vec::new();
176            let mut state = self.state.write().await;
177            for result in fetch_results {
178                let Some((share_id, manifest_id, manifest, new_seq)) = result else {
179                    continue;
180                };
181                state.manifest_cache.insert(manifest_id, manifest.clone());
182                for item in &manifest.items {
183                    let content = ChunkedContent {
184                        content_id: ContentId(item.content_id),
185                        chunks: vec![],
186                        chunk_count: item.chunk_count,
187                        chunk_list_hash: item.chunk_list_hash,
188                    };
189                    state.content_catalog.insert(item.content_id, content);
190                }
191                state.search_index.index_manifest(&manifest);
192                indexed_manifests.push(manifest);
193                if let Some(sub) = state.subscriptions.get_mut(&share_id) {
194                    sub.latest_seq = new_seq;
195                    sub.latest_manifest_id = Some(manifest_id);
196                }
197            }
198            state.dirty.subscriptions = true;
199            state.dirty.manifests = true;
200            (state.store.clone(), indexed_manifests)
201        };
202        // FTS5 write-through: persist search index items outside the state lock.
203        for manifest in &indexed_manifests {
204            phase2_store.index_manifest_for_search(manifest).await?;
205        }
206
207        // Phase 3: single persist (replaces one-per-subscription persist calls).
208        persist_state(self).await
209    }
210
211    /// Network-only phase for a single subscription update.
212    ///
213    /// Performs the DHT head lookup and manifest fetch with no state mutations,
214    /// making it safe to call concurrently for multiple subscriptions.
215    ///
216    /// Returns `None` when the subscription is already up to date or cannot be
217    /// resolved over the network. Returns `Some((share_id, manifest_id,
218    /// manifest, new_seq))` when a newer manifest was fetched.
219    async fn fetch_subscription_update_network<T: RequestTransport + ?Sized>(
220        &self,
221        transport: &T,
222        share_id: [u8; 32],
223        share_pubkey: Option<[u8; 32]>,
224        local_seq: u64,
225        seed_peers: &[PeerAddr],
226    ) -> anyhow::Result<Option<([u8; 32], [u8; 32], ManifestV1, u64)>> {
227        let Some(head) = self
228            .dht_find_share_head_iterative(transport, ShareId(share_id), share_pubkey, seed_peers)
229            .await?
230        else {
231            debug!(
232                share_id = hex::encode(share_id),
233                "sync: no share head found in DHT"
234            );
235            return Ok(None);
236        };
237        if head.latest_seq <= local_seq {
238            debug!(
239                share_id = hex::encode(share_id),
240                remote_seq = head.latest_seq,
241                local_seq,
242                "sync: already up to date"
243            );
244            return Ok(None);
245        }
246
247        info!(
248            share_id = hex::encode(share_id),
249            remote_seq = head.latest_seq,
250            local_seq,
251            "sync: newer head found, fetching manifest"
252        );
253
254        let manifest_id = head.latest_manifest_id;
255
256        // Check manifest cache before going to the network.
257        let cached_manifest = {
258            let state = self.state.read().await;
259            state.manifest_cache.get(&manifest_id).cloned()
260        };
261
262        let manifest = if let Some(cached) = cached_manifest {
263            cached
264        } else {
265            // Try the DHT first — the publisher stores serialised manifests
266            // at manifest_loc_key so they can be retrieved without a direct
267            // connection (essential when both peers are behind NAT).
268            let dht_key = manifest_loc_key(&ManifestId(manifest_id));
269            let dht_manifest = self
270                .dht_find_value_iterative(transport, dht_key, seed_peers)
271                .await?
272                .and_then(|v| crate::cbor::from_slice::<ManifestV1>(&v.value).ok())
273                .filter(|m| {
274                    m.manifest_id()
275                        .map(|mid| mid.0 == manifest_id)
276                        .unwrap_or(false)
277                });
278
279            if let Some(m) = dht_manifest {
280                info!(
281                    manifest_id = hex::encode(manifest_id),
282                    "sync: manifest found in DHT"
283                );
284                m
285            } else {
286                debug!(
287                    manifest_id = hex::encode(manifest_id),
288                    "sync: manifest not in DHT, trying direct peer fetch"
289                );
290                // Fallback: direct peer fetch (works when publisher is reachable).
291                let mut target = [0u8; 20];
292                target.copy_from_slice(&manifest_id[..20]);
293                let mut peers = seed_peers.to_vec();
294                let discovered = self
295                    .dht_find_node_iterative(transport, target, seed_peers)
296                    .await?;
297                merge_peer_list(&mut peers, discovered);
298                match fetch_manifest_with_retry(
299                    transport,
300                    &peers,
301                    manifest_id,
302                    &FetchPolicy::default(),
303                )
304                .await
305                {
306                    Ok(m) => {
307                        info!(
308                            manifest_id = hex::encode(manifest_id),
309                            "sync: manifest fetched from peer"
310                        );
311                        m
312                    }
313                    Err(e) => {
314                        warn!(
315                            manifest_id = hex::encode(manifest_id),
316                            err = %e,
317                            "sync: manifest fetch failed from all peers"
318                        );
319                        return Ok(None);
320                    }
321                }
322            }
323        };
324
325        manifest.verify()?;
326        if manifest.share_id != share_id {
327            anyhow::bail!("manifest share_id mismatch while syncing subscription");
328        }
329
330        Ok(Some((share_id, manifest_id, manifest, head.latest_seq)))
331    }
332
333    /// Fetch and apply blocklist rules for all enabled blocklist shares that
334    /// publish a manifest item named `"blocklist"` (§4.11).
335    ///
336    /// For each enabled share whose cached manifest contains an item with
337    /// `name == "blocklist"`, this method downloads the content bytes from
338    /// `seed_peers`, decodes them as [`BlocklistRules`], and calls
339    /// [`NodeHandle::set_blocklist_rules`] to apply the update automatically.
340    ///
341    /// Returns the number of shares whose rules were successfully refreshed.
342    pub async fn apply_blocklist_updates_from_subscriptions<T: RequestTransport + ?Sized>(
343        &self,
344        transport: &T,
345        seed_peers: &[PeerAddr],
346    ) -> anyhow::Result<usize> {
347        /// Conventional item name for blocklist-rule content within a manifest.
348        const BLOCKLIST_ITEM_NAME: &str = "blocklist";
349
350        // Phase 1 (read-lock): collect candidate items from cached manifests.
351        // Only shares that are subscribed, enabled, and have a matching item
352        // will be included.
353        type BlocklistCandidate = ([u8; 32], [u8; 32], u32, [u8; 32]);
354        let candidates: Vec<BlocklistCandidate> = {
355            let state = self.state.read().await;
356            state
357                .enabled_blocklist_shares
358                .iter()
359                .filter_map(|share_id| {
360                    let sub = state.subscriptions.get(share_id)?;
361                    let manifest_id = sub.latest_manifest_id?;
362                    let manifest = state.manifest_cache.get(&manifest_id)?;
363                    let item = manifest
364                        .items
365                        .iter()
366                        .find(|i| i.name == BLOCKLIST_ITEM_NAME)?;
367                    Some((
368                        *share_id,
369                        item.content_id,
370                        item.chunk_count,
371                        item.chunk_list_hash,
372                    ))
373                })
374                .collect()
375        };
376
377        if candidates.is_empty() {
378            return Ok(0);
379        }
380
381        // Phase 2 (network): fetch and decode each blocklist item.
382        let policy = FetchPolicy::default();
383        let mut updated = 0usize;
384
385        for (share_id, content_id, chunk_count, chunk_list_hash) in candidates {
386            if chunk_count == 0 {
387                continue;
388            }
389
390            let chunk_hashes = match fetch_chunk_hashes_with_retry(
391                transport,
392                seed_peers,
393                content_id,
394                chunk_count,
395                chunk_list_hash,
396                &policy,
397            )
398            .await
399            {
400                Ok(h) => h,
401                Err(_) => continue,
402            };
403
404            let bytes = match download_swarm_over_network(
405                transport,
406                seed_peers,
407                content_id,
408                &chunk_hashes,
409                &policy,
410                None,
411            )
412            .await
413            {
414                Ok(b) => b,
415                Err(_) => continue,
416            };
417
418            let Ok(rules) = crate::cbor::from_slice::<super::BlocklistRules>(&bytes) else {
419                continue;
420            };
421
422            if self
423                .set_blocklist_rules(ShareId(share_id), rules)
424                .await
425                .is_ok()
426            {
427                updated += 1;
428            }
429        }
430
431        Ok(updated)
432    }
433
434    /// Spawn a background task that periodically syncs subscriptions over the
435    /// DHT and then refreshes blocklist rules from any newly updated blocklist
436    /// share manifests (§4.11).
437    ///
438    /// The loop runs `sync_subscriptions_over_dht` followed by
439    /// `apply_blocklist_updates_from_subscriptions` on each tick.
440    pub fn start_blocklist_auto_sync_loop(
441        self,
442        transport: Arc<dyn RequestTransport>,
443        seed_peers: Vec<PeerAddr>,
444        interval: Duration,
445    ) -> JoinHandle<()> {
446        tokio::spawn(async move {
447            loop {
448                let _ = self
449                    .sync_subscriptions_over_dht(transport.as_ref(), &seed_peers)
450                    .await;
451                let _ = self
452                    .apply_blocklist_updates_from_subscriptions(transport.as_ref(), &seed_peers)
453                    .await;
454                tokio::time::sleep(interval).await;
455            }
456        })
457    }
458
459    pub async fn search(&self, query: SearchQuery) -> anyhow::Result<Vec<SearchResult>> {
460        self.search_with_trust_filter(query, SearchTrustFilter::default())
461            .await
462    }
463
464    pub async fn search_with_trust_filter(
465        &self,
466        query: SearchQuery,
467        trust_filter: SearchTrustFilter,
468    ) -> anyhow::Result<Vec<SearchResult>> {
469        self.search_hits(&query.text, trust_filter, false).await
470    }
471
472    pub async fn search_page(&self, query: SearchPageQuery) -> anyhow::Result<SearchPage> {
473        self.search_page_with_trust_filter(query, SearchTrustFilter::default())
474            .await
475    }
476
477    pub async fn search_page_with_trust_filter(
478        &self,
479        query: SearchPageQuery,
480        trust_filter: SearchTrustFilter,
481    ) -> anyhow::Result<SearchPage> {
482        let query = query.normalized();
483        let hits = self
484            .search_hits(&query.text, trust_filter, query.include_snippets)
485            .await?;
486        let total = hits.len();
487        if query.offset >= total {
488            return Ok(SearchPage {
489                total,
490                results: vec![],
491            });
492        }
493        let results = hits
494            .into_iter()
495            .skip(query.offset)
496            .take(query.limit)
497            .collect();
498        Ok(SearchPage { total, results })
499    }
500
501    async fn search_hits(
502        &self,
503        query_text: &str,
504        trust_filter: SearchTrustFilter,
505        include_snippets: bool,
506    ) -> anyhow::Result<Vec<SearchResult>> {
507        let state = self.state.read().await;
508        let subscribed = state
509            .subscriptions
510            .iter()
511            .filter_map(|(share_id, sub)| {
512                if trust_filter.allows(sub.trust_level) {
513                    Some(*share_id)
514                } else {
515                    None
516                }
517            })
518            .collect::<HashSet<_>>();
519        let mut blocked_shares = HashSet::<[u8; 32]>::new();
520        let mut blocked_content_ids = HashSet::<[u8; 32]>::new();
521        for share_id in &state.enabled_blocklist_shares {
522            if !state.subscriptions.contains_key(share_id) {
523                continue;
524            }
525            let Some(rules) = state.blocklist_rules_by_share.get(share_id) else {
526                continue;
527            };
528            blocked_shares.extend(rules.blocked_share_ids.iter().copied());
529            blocked_content_ids.extend(rules.blocked_content_ids.iter().copied());
530        }
531        let hits = state
532            .search_index
533            .search(query_text, &subscribed, &state.share_weights)
534            .into_iter()
535            .filter(|(item, _)| {
536                !blocked_shares.contains(&item.share_id)
537                    && !blocked_content_ids.contains(&item.content_id)
538            })
539            .map(|(item, score)| {
540                let snippet = build_search_snippet(&item, query_text, include_snippets);
541                SearchResult {
542                    share_id: ShareId(item.share_id),
543                    content_id: item.content_id,
544                    name: item.name,
545                    snippet,
546                    score,
547                }
548            })
549            .collect::<Vec<_>>();
550        Ok(hits)
551    }
552
553    pub async fn begin_partial_download(
554        &self,
555        content_id: [u8; 32],
556        target_path: String,
557        total_chunks: u32,
558    ) -> anyhow::Result<()> {
559        {
560            let mut state = self.state.write().await;
561            state.partial_downloads.insert(
562                content_id,
563                PersistedPartialDownload {
564                    content_id,
565                    target_path,
566                    total_chunks,
567                    completed_chunks: vec![],
568                },
569            );
570            state.dirty.partial_downloads = true;
571        }
572        persist_state(self).await
573    }
574
575    pub async fn mark_partial_chunk_complete(
576        &self,
577        content_id: [u8; 32],
578        chunk_index: u32,
579    ) -> anyhow::Result<()> {
580        {
581            let mut state = self.state.write().await;
582            if let Some(partial) = state.partial_downloads.get_mut(&content_id)
583                && !partial.completed_chunks.contains(&chunk_index)
584            {
585                partial.completed_chunks.push(chunk_index);
586            }
587            state.dirty.partial_downloads = true;
588        }
589        persist_state(self).await
590    }
591
592    pub async fn clear_partial_download(&self, content_id: [u8; 32]) -> anyhow::Result<()> {
593        {
594            let mut state = self.state.write().await;
595            state.partial_downloads.remove(&content_id);
596            state.dirty.partial_downloads = true;
597        }
598        persist_state(self).await
599    }
600
601    pub async fn set_encrypted_node_key(
602        &self,
603        key_material: &[u8],
604        passphrase: &str,
605    ) -> anyhow::Result<()> {
606        {
607            let mut state = self.state.write().await;
608            state.encrypted_node_key = Some(encrypt_secret(key_material, passphrase)?);
609            state.dirty.node_key = true;
610        }
611        persist_state(self).await
612    }
613
614    pub async fn decrypt_node_key(&self, passphrase: &str) -> anyhow::Result<Option<Vec<u8>>> {
615        let state = self.state.read().await;
616        let Some(encrypted) = state.encrypted_node_key.as_ref() else {
617            return Ok(None);
618        };
619        Ok(Some(decrypt_secret(encrypted, passphrase)?))
620    }
621
622    /// Encrypt all in-memory publisher identity secrets with `passphrase`.
623    ///
624    /// After this call the plaintext secrets remain in memory (for
625    /// runtime use) but are persisted in encrypted form only.
626    pub async fn encrypt_publisher_identities(&self, passphrase: &str) -> anyhow::Result<()> {
627        {
628            let mut state = self.state.write().await;
629            let mut encrypted_map = HashMap::new();
630            for (label, secret) in &state.publisher_identities {
631                encrypted_map.insert(label.clone(), encrypt_secret(secret.as_ref(), passphrase)?);
632            }
633            state.encrypted_publisher_secrets = encrypted_map;
634            state.dirty.publisher_identities = true;
635        }
636        persist_state(self).await
637    }
638
639    /// Decrypt persisted publisher identity secrets that were encrypted
640    /// with [`encrypt_publisher_identities`].  After this call the
641    /// plaintext secrets are available for use by
642    /// [`ensure_publisher_identity`] and related APIs.
643    pub async fn unlock_publisher_identities(&self, passphrase: &str) -> anyhow::Result<usize> {
644        let persisted = {
645            let state = self.state.read().await;
646            state.store.load_state().await?
647        };
648        let mut decrypted_count = 0usize;
649        let mut state = self.state.write().await;
650        for identity in &persisted.publisher_identities {
651            if state.publisher_identities.contains_key(&identity.label) {
652                continue; // already unlocked
653            }
654            if let Some(encrypted) = &identity.encrypted_share_secret {
655                let plaintext = decrypt_secret(encrypted, passphrase)?;
656                if plaintext.len() != 32 {
657                    anyhow::bail!(
658                        "decrypted publisher secret for '{}' has wrong length",
659                        identity.label
660                    );
661                }
662                let mut secret = [0u8; 32];
663                secret.copy_from_slice(&plaintext);
664                state
665                    .publisher_identities
666                    .insert(identity.label.clone(), secret);
667                decrypted_count += 1;
668            }
669        }
670        Ok(decrypted_count)
671    }
672
673    pub async fn fetch_manifest_from_peers<C: PeerConnector>(
674        &self,
675        connector: &C,
676        peers: &[PeerAddr],
677        manifest_id: [u8; 32],
678        policy: &FetchPolicy,
679    ) -> anyhow::Result<ManifestV1> {
680        let manifest = fetch_manifest_with_retry(connector, peers, manifest_id, policy).await?;
681        manifest.verify()?;
682        let store = {
683            let mut state = self.state.write().await;
684            state.manifest_cache.insert(manifest_id, manifest.clone());
685            state.search_index.index_manifest(&manifest);
686            state.dirty.manifests = true;
687            state.store.clone()
688        };
689        // FTS5 write-through outside the state lock.
690        store.index_manifest_for_search(&manifest).await?;
691        persist_state(self).await?;
692        Ok(manifest)
693    }
694
695    /// Download content from the network using all available peers.
696    ///
697    /// Before fetching, any additional seeders recorded in the DHT via
698    /// `content_provider_key` are merged into the peer list.  After a
699    /// successful download the content is stored locally and the
700    /// downloading node registers itself as a new seeder so future
701    /// peers can pull from it (forming a swarm).
702    #[allow(clippy::too_many_arguments)]
703    pub async fn download_from_peers<C: PeerConnector>(
704        &self,
705        connector: &C,
706        peers: &[PeerAddr],
707        content_id: [u8; 32],
708        target_path: &str,
709        policy: &FetchPolicy,
710        self_addr: Option<PeerAddr>,
711        on_progress: Option<&ProgressCallback>,
712    ) -> anyhow::Result<()> {
713        let content = {
714            let mut state = self.state.write().await;
715            let content = state
716                .content_catalog
717                .get(&content_id)
718                .cloned()
719                .ok_or_else(|| anyhow::anyhow!("unknown content metadata"))?;
720            state.partial_downloads.insert(
721                content_id,
722                PersistedPartialDownload {
723                    content_id,
724                    target_path: target_path.to_owned(),
725                    total_chunks: content.chunk_count,
726                    completed_chunks: vec![],
727                },
728            );
729            state.dirty.partial_downloads = true;
730            content
731        };
732        persist_state(self).await?;
733
734        // ── Gap 2: merge DHT-advertised seeders into the peer list ──
735        //
736        // First, do a *network* DHT lookup for the content provider key so
737        // that provider entries published by the remote peer (via the relay)
738        // are fetched and cached locally.  Without this, a subscriber that
739        // never received the provider entry would only have the bootstrap
740        // peers (relay), which don't hold the actual content data.
741        let transport = RelayAwareTransport::new(connector);
742        let provider_key = content_provider_key(&content_id);
743        let _ = self
744            .dht_find_value_iterative(&transport, provider_key, peers)
745            .await;
746
747        let mut all_peers = peers.to_vec();
748        {
749            let mut state = self.state.write().await;
750            let now = now_unix_secs()?;
751            if let Some(val) = state.dht.find_value(provider_key, now)
752                && let Ok(providers) = crate::cbor::from_slice::<Providers>(&val.value)
753            {
754                for p in providers.providers {
755                    if !all_peers.contains(&p) {
756                        all_peers.push(p);
757                    }
758                }
759            }
760        }
761
762        // Filter out our own address so we never download from ourselves.
763        if let Some(ref me) = self_addr {
764            all_peers.retain(|p| p != me);
765        }
766        if all_peers.is_empty() {
767            anyhow::bail!(
768                "no remote peers available for content download (only local provider found)"
769            );
770        }
771
772        // Pattern B: chunk hashes are not stored in the manifest.
773        // If content_catalog has empty chunks (subscribed content), fetch them on demand.
774        let chunk_hashes = if content.chunks.is_empty() && content.chunk_count > 0 {
775            fetch_chunk_hashes_with_retry(
776                &transport,
777                &all_peers,
778                content_id,
779                content.chunk_count,
780                content.chunk_list_hash,
781                policy,
782            )
783            .await?
784        } else {
785            content.chunks.clone()
786        };
787
788        let target_pb = PathBuf::from(target_path);
789
790        // Seed reputation scores from the local peer DB so that historically
791        // reliable peers are scheduled before less-trusted peers.
792        let seeded_policy;
793        let effective_policy: &FetchPolicy = if policy.initial_reputations.is_empty() {
794            let reps = {
795                let state = self.state.read().await;
796                state.peer_db.reputation_for_peers(&all_peers)
797            };
798            seeded_policy = FetchPolicy {
799                initial_reputations: reps,
800                ..policy.clone()
801            };
802            &seeded_policy
803        } else {
804            policy
805        };
806
807        download_swarm_to_file(
808            &transport,
809            &all_peers,
810            content_id,
811            &chunk_hashes,
812            effective_policy,
813            &target_pb,
814            on_progress,
815        )
816        .await?;
817
818        // ── Gap 1: self-seed — register file path as provider (no blob copy) ──
819        if let Some(addr) = self_addr {
820            let bytes = std::fs::read(&target_pb)?;
821            self.register_content_by_path(addr, &bytes, target_pb)
822                .await?;
823        }
824
825        {
826            let mut state = self.state.write().await;
827            state.partial_downloads.remove(&content_id);
828            state.dirty.partial_downloads = true;
829            state.dirty.content_paths = true;
830        }
831        persist_state(self).await
832    }
833
834    pub(super) async fn serve_wire_stream<S>(
835        &self,
836        mut stream: S,
837        remote_peer: Option<PeerAddr>,
838    ) -> anyhow::Result<()>
839    where
840        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
841    {
842        loop {
843            let incoming = read_envelope(&mut stream).await?;
844
845            // Detect RelayRegister with tunnel=true: after handling and
846            // responding we transition this connection to relay-bridge mode.
847            let tunnel_request = if incoming.r#type == MsgType::RelayRegister as u16 {
848                incoming.decode_typed().ok().and_then(|wp| match wp {
849                    WirePayload::RelayRegister(ref reg) if reg.tunnel => Some(reg.clone()),
850                    _ => None,
851                })
852            } else {
853                None
854            };
855
856            let response = self
857                .handle_incoming_envelope(incoming, remote_peer.as_ref())
858                .await;
859            if let Some(envelope) = &response {
860                write_envelope(&mut stream, envelope).await?;
861            }
862
863            // If this was a tunnel RelayRegister and the response was
864            // successful (RelayRegistered), transition to bridge mode.
865            if let Some(_reg) = tunnel_request
866                && let Some(resp) = response
867                && resp.flags & FLAG_RESPONSE != 0
868                && resp.r#type == MsgType::RelayRegistered as u16
869            {
870                let registered: crate::wire::RelayRegistered =
871                    crate::cbor::from_slice(&resp.payload)?;
872                return self
873                    .run_relay_bridge(stream, registered.relay_slot_id, remote_peer)
874                    .await;
875            }
876        }
877    }
878
879    /// Run the relay bridge loop for a firewalled node.
880    ///
881    /// After a firewalled node sends `RelayRegister { tunnel: true }` and
882    /// receives `RelayRegistered`, the relay node enters this loop.
883    /// The firewalled node's TCP stream is held open and used to forward
884    /// request envelopes from other peers (downloaders) and read back
885    /// the firewalled node's responses.
886    async fn run_relay_bridge<S>(
887        &self,
888        mut stream: S,
889        slot_id: u64,
890        _remote_peer: Option<PeerAddr>,
891    ) -> anyhow::Result<()>
892    where
893        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
894    {
895        let mut rx = self.relay_tunnels.register(slot_id, 32).await;
896
897        let result = async {
898            while let Some((request_envelope, response_tx)) = rx.recv().await {
899                write_envelope(&mut stream, &request_envelope).await?;
900                let response = read_envelope(&mut stream).await?;
901                // Best-effort: if the downloader gave up, drop silently.
902                let _ = response_tx.send(response);
903            }
904            Ok::<(), anyhow::Error>(())
905        }
906        .await;
907
908        self.relay_tunnels.remove(slot_id).await;
909        result
910    }
911
912    pub(super) async fn handle_incoming_envelope(
913        &self,
914        envelope: Envelope,
915        remote_peer: Option<&PeerAddr>,
916    ) -> Option<Envelope> {
917        let req_id = envelope.req_id;
918        let req_type = envelope.r#type;
919        let typed = match envelope.decode_typed() {
920            Ok(payload) => payload,
921            Err(err) => {
922                return Some(error_envelope(req_type, req_id, &err.to_string()));
923            }
924        };
925        if let Some(peer) = remote_peer {
926            let now = now_unix_secs().unwrap_or(0);
927            let mut state = self.state.write().await;
928            if let Err(err) = state.enforce_request_limits(peer, request_class(&typed), now) {
929                return Some(error_envelope(req_type, req_id, &err.to_string()));
930            }
931        }
932        let result = match typed {
933            WirePayload::FindNode(msg) => self
934                .dht_find_node(msg)
935                .await
936                .and_then(|peers| {
937                    crate::cbor::to_vec(&FindNodeResult { peers }).map_err(Into::into)
938                })
939                .map(|payload| Envelope {
940                    r#type: MsgType::FindNode as u16,
941                    req_id,
942                    flags: FLAG_RESPONSE,
943                    payload,
944                }),
945            WirePayload::FindValue(msg) => {
946                let target = {
947                    let mut t = [0u8; 20];
948                    t.copy_from_slice(&msg.key[..20]);
949                    t
950                };
951                let closer_peers = match self
952                    .dht_find_node(FindNode {
953                        target_node_id: target,
954                    })
955                    .await
956                {
957                    Ok(peers) => peers,
958                    Err(err) => return Some(error_envelope(req_type, req_id, &err.to_string())),
959                };
960                self.dht_find_value(msg.key)
961                    .await
962                    .map(|value| {
963                        let now = now_unix_secs().unwrap_or(0);
964                        let wire_value = value.and_then(|v| {
965                            if validate_dht_value_for_known_keyspaces(v.key, &v.value).is_err() {
966                                return None;
967                            }
968                            Some(WireStore {
969                                key: v.key,
970                                value: v.value,
971                                ttl_secs: v.expires_at_unix.saturating_sub(now).max(1),
972                            })
973                        });
974                        FindValueResult {
975                            value: wire_value,
976                            closer_peers,
977                        }
978                    })
979                    .and_then(|result| crate::cbor::to_vec(&result).map_err(Into::into))
980                    .map(|payload| Envelope {
981                        r#type: MsgType::FindValue as u16,
982                        req_id,
983                        flags: FLAG_RESPONSE,
984                        payload,
985                    })
986            }
987            WirePayload::Store(msg) => self.dht_store(msg).await.map(|_| Envelope {
988                r#type: MsgType::Store as u16,
989                req_id,
990                flags: FLAG_RESPONSE,
991                payload: vec![],
992            }),
993            WirePayload::GetManifest(msg) => self
994                .manifest_bytes(msg.manifest_id)
995                .await
996                .and_then(|maybe| {
997                    maybe
998                        .ok_or_else(|| anyhow::anyhow!("manifest not found"))
999                        .and_then(|bytes| {
1000                            crate::cbor::to_vec(&crate::wire::ManifestData {
1001                                manifest_id: msg.manifest_id,
1002                                bytes,
1003                            })
1004                            .map_err(Into::into)
1005                        })
1006                })
1007                .map(|payload| Envelope {
1008                    r#type: MsgType::ManifestData as u16,
1009                    req_id,
1010                    flags: FLAG_RESPONSE,
1011                    payload,
1012                }),
1013            WirePayload::ListPublicShares(msg) => self
1014                .list_local_public_shares(msg.max_entries as usize)
1015                .await
1016                .and_then(|shares| {
1017                    crate::cbor::to_vec(&PublicShareList { shares }).map_err(Into::into)
1018                })
1019                .map(|payload| Envelope {
1020                    r#type: MsgType::PublicShareList as u16,
1021                    req_id,
1022                    flags: FLAG_RESPONSE,
1023                    payload,
1024                }),
1025            WirePayload::GetCommunityStatus(msg) => {
1026                let (joined, proof, name) =
1027                    match VerifyingKey::from_bytes(&msg.community_share_pubkey) {
1028                        Ok(pubkey) if ShareId::from_pubkey(&pubkey).0 == msg.community_share_id => {
1029                            let state = self.state.read().await;
1030                            match state.communities.get(&msg.community_share_id) {
1031                                Some(membership) => {
1032                                    let proof = membership
1033                                        .token
1034                                        .as_ref()
1035                                        .and_then(|t| crate::cbor::to_vec(t).ok());
1036                                    (true, proof, membership.name.clone())
1037                                }
1038                                None => (false, None, None),
1039                            }
1040                        }
1041                        _ => {
1042                            return Some(error_envelope(
1043                                req_type,
1044                                req_id,
1045                                "community share_id does not match share_pubkey",
1046                            ));
1047                        }
1048                    };
1049                crate::cbor::to_vec(&CommunityStatus {
1050                    community_share_id: msg.community_share_id,
1051                    joined,
1052                    membership_proof: proof,
1053                    name,
1054                })
1055                .map_err(Into::into)
1056                .map(|payload| Envelope {
1057                    r#type: MsgType::CommunityStatus as u16,
1058                    req_id,
1059                    flags: FLAG_RESPONSE,
1060                    payload,
1061                })
1062            }
1063            WirePayload::ListCommunityPublicShares(msg) => self
1064                .list_local_community_public_shares(
1065                    ShareId(msg.community_share_id),
1066                    msg.community_share_pubkey,
1067                    msg.max_entries as usize,
1068                    msg.requester_node_pubkey,
1069                    msg.requester_membership_proof.as_deref(),
1070                )
1071                .await
1072                .and_then(|shares| {
1073                    crate::cbor::to_vec(&CommunityPublicShareList {
1074                        community_share_id: msg.community_share_id,
1075                        shares,
1076                    })
1077                    .map_err(Into::into)
1078                })
1079                .map(|payload| Envelope {
1080                    r#type: MsgType::CommunityPublicShareList as u16,
1081                    req_id,
1082                    flags: FLAG_RESPONSE,
1083                    payload,
1084                }),
1085            WirePayload::GetChunk(msg) => self
1086                .chunk_bytes(msg.content_id, msg.chunk_index)
1087                .await
1088                .and_then(|maybe| {
1089                    maybe
1090                        .ok_or_else(|| anyhow::anyhow!("chunk not found"))
1091                        .and_then(|bytes| {
1092                            crate::cbor::to_vec(&ChunkData {
1093                                content_id: msg.content_id,
1094                                chunk_index: msg.chunk_index,
1095                                bytes,
1096                            })
1097                            .map_err(Into::into)
1098                        })
1099                })
1100                .map(|payload| Envelope {
1101                    r#type: MsgType::ChunkData as u16,
1102                    req_id,
1103                    flags: FLAG_RESPONSE,
1104                    payload,
1105                }),
1106            WirePayload::GetChunkHashes(msg) => self
1107                .chunk_hash_list(msg.content_id)
1108                .await
1109                .and_then(|maybe| {
1110                    maybe
1111                        .ok_or_else(|| anyhow::anyhow!("chunk hashes not found"))
1112                        .and_then(|hashes| {
1113                            crate::cbor::to_vec(&crate::wire::ChunkHashList {
1114                                content_id: msg.content_id,
1115                                hashes,
1116                            })
1117                            .map_err(Into::into)
1118                        })
1119                })
1120                .map(|payload| Envelope {
1121                    r#type: MsgType::ChunkHashList as u16,
1122                    req_id,
1123                    flags: FLAG_RESPONSE,
1124                    payload,
1125                }),
1126            WirePayload::RelayRegister(RelayRegister {
1127                relay_slot_id,
1128                tunnel: _,
1129            }) => {
1130                let Some(peer) = remote_peer else {
1131                    return Some(error_envelope(
1132                        req_type,
1133                        req_id,
1134                        "missing remote peer identity",
1135                    ));
1136                };
1137                self.relay_register_with_slot(peer.clone(), relay_slot_id)
1138                    .await
1139                    .and_then(|registered| crate::cbor::to_vec(&registered).map_err(Into::into))
1140                    .map(|payload| Envelope {
1141                        r#type: MsgType::RelayRegistered as u16,
1142                        req_id,
1143                        flags: FLAG_RESPONSE,
1144                        payload,
1145                    })
1146            }
1147            WirePayload::RelayConnect(msg) => {
1148                let Some(peer) = remote_peer else {
1149                    return Some(error_envelope(
1150                        req_type,
1151                        req_id,
1152                        "missing remote peer identity",
1153                    ));
1154                };
1155                self.relay_connect(peer.clone(), msg)
1156                    .await
1157                    .map(|_| Envelope {
1158                        r#type: MsgType::RelayConnect as u16,
1159                        req_id,
1160                        flags: FLAG_RESPONSE,
1161                        payload: vec![],
1162                    })
1163            }
1164            WirePayload::RelayStream(msg) => {
1165                let Some(peer) = remote_peer else {
1166                    return Some(error_envelope(
1167                        req_type,
1168                        req_id,
1169                        "missing remote peer identity",
1170                    ));
1171                };
1172
1173                // If a relay tunnel exists for this slot, forward the
1174                // inner payload envelope to the firewalled node and
1175                // return its response.
1176                if self.relay_tunnels.has_tunnel(msg.relay_slot_id).await {
1177                    let inner_envelope = match Envelope::decode(&msg.payload) {
1178                        Ok(env) => env,
1179                        Err(err) => {
1180                            return Some(error_envelope(
1181                                req_type,
1182                                req_id,
1183                                &format!("relay tunnel: bad inner envelope: {err}"),
1184                            ));
1185                        }
1186                    };
1187                    let timeout = std::time::Duration::from_secs(30);
1188                    match self
1189                        .relay_tunnels
1190                        .forward(msg.relay_slot_id, inner_envelope, timeout)
1191                        .await
1192                    {
1193                        Ok(inner_response) => {
1194                            let response_payload = match inner_response.encode() {
1195                                Ok(bytes) => bytes,
1196                                Err(err) => {
1197                                    return Some(error_envelope(
1198                                        req_type,
1199                                        req_id,
1200                                        &format!("relay tunnel: encode response: {err}"),
1201                                    ));
1202                                }
1203                            };
1204                            let relay_stream_response = crate::wire::RelayStream {
1205                                relay_slot_id: msg.relay_slot_id,
1206                                stream_id: msg.stream_id,
1207                                kind: msg.kind,
1208                                payload: response_payload,
1209                            };
1210                            crate::cbor::to_vec(&relay_stream_response)
1211                                .map_err(|e| anyhow::anyhow!(e))
1212                                .map(|payload| Envelope {
1213                                    r#type: MsgType::RelayStream as u16,
1214                                    req_id,
1215                                    flags: FLAG_RESPONSE,
1216                                    payload,
1217                                })
1218                        }
1219                        Err(err) => Err(err),
1220                    }
1221                } else {
1222                    // Fallback: in-process relay logic (no tunnel active).
1223                    self.relay_stream(peer.clone(), msg)
1224                        .await
1225                        .and_then(|relayed| crate::cbor::to_vec(&relayed).map_err(Into::into))
1226                        .map(|payload| Envelope {
1227                            r#type: MsgType::RelayStream as u16,
1228                            req_id,
1229                            flags: FLAG_RESPONSE,
1230                            payload,
1231                        })
1232                }
1233            }
1234            // ── PEX: ingest offered peers ────────────────────────────
1235            WirePayload::PexOffer(msg) => {
1236                let now = now_unix_secs().unwrap_or(0);
1237                let mut state = self.state.write().await;
1238                for peer in &msg.peers {
1239                    state.peer_db.upsert_seen(peer.clone(), now);
1240                }
1241                state.dirty.peers = true;
1242                // PexOffer is fire-and-forget; return empty ack.
1243                Ok(Envelope {
1244                    r#type: MsgType::PexOffer as u16,
1245                    req_id,
1246                    flags: FLAG_RESPONSE,
1247                    payload: vec![],
1248                })
1249            }
1250            // ── PEX: respond with sampled fresh peers ────────────────
1251            WirePayload::PexRequest(msg) => {
1252                let now = now_unix_secs().unwrap_or(0);
1253                let state = self.state.read().await;
1254                let peers = state.peer_db.sample_fresh(now, msg.max_peers as usize);
1255                crate::cbor::to_vec(&PexOffer { peers })
1256                    .map_err(Into::into)
1257                    .map(|payload| Envelope {
1258                        r#type: MsgType::PexOffer as u16,
1259                        req_id,
1260                        flags: FLAG_RESPONSE,
1261                        payload,
1262                    })
1263            }
1264            // ── Provider hint: peer advertises it has content ────────
1265            WirePayload::HaveContent(_msg) => {
1266                // Acknowledged but not stored — provider hints are
1267                // managed via DHT STORE in the current architecture.
1268                Ok(Envelope {
1269                    r#type: MsgType::HaveContent as u16,
1270                    req_id,
1271                    flags: FLAG_RESPONSE,
1272                    payload: vec![],
1273                })
1274            }
1275            // ── Relay-PEX: respond with known relay announcements ────
1276            WirePayload::RelayListRequest(msg) => {
1277                let state = self.state.read().await;
1278                let announcements = state
1279                    .relay
1280                    .known_announcements()
1281                    .into_iter()
1282                    .take(msg.max_count as usize)
1283                    .collect();
1284                crate::cbor::to_vec(&RelayListResponse { announcements })
1285                    .map_err(Into::into)
1286                    .map(|payload| Envelope {
1287                        r#type: MsgType::RelayListResponse as u16,
1288                        req_id,
1289                        flags: FLAG_RESPONSE,
1290                        payload,
1291                    })
1292            }
1293            _ => Err(anyhow::anyhow!("unsupported message type")),
1294        };
1295        Some(match result {
1296            Ok(ok) => ok,
1297            Err(err) => error_envelope(req_type, req_id, &err.to_string()),
1298        })
1299    }
1300
1301    async fn manifest_bytes(&self, manifest_id: [u8; 32]) -> anyhow::Result<Option<Vec<u8>>> {
1302        let state = self.state.read().await;
1303        Ok(state
1304            .manifest_cache
1305            .get(&manifest_id)
1306            .map(crate::cbor::to_vec)
1307            .transpose()?)
1308    }
1309
1310    async fn chunk_bytes(
1311        &self,
1312        content_id: [u8; 32],
1313        chunk_index: u32,
1314    ) -> anyhow::Result<Option<Vec<u8>>> {
1315        let state = self.state.read().await;
1316        if let Some(path) = state.content_paths.get(&content_id) {
1317            return crate::blob_store::read_chunk_from_path(path, chunk_index);
1318        }
1319        Ok(None)
1320    }
1321
1322    /// Return the chunk hash list for a locally-stored content object.
1323    ///
1324    /// If chunk hashes are not yet computed (e.g. after a cold restart),
1325    /// they are derived lazily from the file on disk and cached in the
1326    /// content catalog for subsequent requests.
1327    async fn chunk_hash_list(&self, content_id: [u8; 32]) -> anyhow::Result<Option<Vec<[u8; 32]>>> {
1328        // Fast path: already cached.
1329        {
1330            let state = self.state.read().await;
1331            if let Some(c) = state.content_catalog.get(&content_id)
1332                && !c.chunks.is_empty()
1333            {
1334                return Ok(Some(c.chunks.clone()));
1335            }
1336        }
1337
1338        // Slow path: compute from file on disk, then cache.
1339        let mut state = self.state.write().await;
1340        // Re-check under write lock (another task may have raced).
1341        if let Some(c) = state.content_catalog.get(&content_id)
1342            && !c.chunks.is_empty()
1343        {
1344            return Ok(Some(c.chunks.clone()));
1345        }
1346        let path = match state.content_paths.get(&content_id) {
1347            Some(p) => p.clone(),
1348            None => return Ok(None),
1349        };
1350        let bytes = match std::fs::read(&path) {
1351            Ok(b) => b,
1352            Err(_) => return Ok(None),
1353        };
1354        let chunks = crate::content::chunk_hashes(&bytes);
1355        if let Some(entry) = state.content_catalog.get_mut(&content_id) {
1356            entry.chunks = chunks.clone();
1357        }
1358        Ok(Some(chunks))
1359    }
1360
1361    /// Re-announce all locally seeded content in the DHT.
1362    ///
1363    /// Call this periodically (e.g. every 10–15 minutes) to keep the
1364    /// provider records alive past their TTL.  For each content ID in
1365    /// the local blob store the node ensures its own `PeerAddr` appears
1366    /// in the `Providers` list under `content_provider_key`.
1367    pub async fn reannounce_seeded_content(&self, self_addr: PeerAddr) -> anyhow::Result<usize> {
1368        let content_ids: Vec<[u8; 32]> = {
1369            let state = self.state.read().await;
1370            state
1371                .content_catalog
1372                .keys()
1373                .filter(|id| {
1374                    // Content is seedable if we have a file path on disk.
1375                    state.content_paths.get(*id).is_some_and(|p| p.exists())
1376                })
1377                .copied()
1378                .collect()
1379        };
1380
1381        let mut announced = 0usize;
1382        for content_id in &content_ids {
1383            let now = now_unix_secs()?;
1384            let mut state = self.state.write().await;
1385
1386            let mut providers: Providers = state
1387                .dht
1388                .find_value(content_provider_key(content_id), now)
1389                .and_then(|v| crate::cbor::from_slice(&v.value).ok())
1390                .unwrap_or(Providers {
1391                    content_id: *content_id,
1392                    providers: vec![],
1393                    updated_at: now,
1394                });
1395
1396            if !providers.providers.contains(&self_addr) {
1397                providers.providers.push(self_addr.clone());
1398            }
1399            providers.updated_at = now;
1400
1401            state.dht.store(
1402                content_provider_key(content_id),
1403                crate::cbor::to_vec(&providers)?,
1404                crate::dht::DEFAULT_TTL_SECS,
1405                now,
1406            )?;
1407            announced += 1;
1408        }
1409
1410        if announced > 0 {
1411            persist_state(self).await?;
1412        }
1413        Ok(announced)
1414    }
1415
1416    /// Re-announce share heads for **public** subscribed shares in the local DHT.
1417    ///
1418    /// Subscribers cache signed `ShareHead` records received during sync.
1419    /// For public shares we re-store them so that `dht_republish_once`
1420    /// propagates them to the network — keeping the share discoverable
1421    /// even after the original publisher goes offline.
1422    ///
1423    /// Private shares are **never** re-announced: they die when the
1424    /// publisher stops refreshing.
1425    pub async fn reannounce_subscribed_share_heads(&self) -> anyhow::Result<usize> {
1426        let now = now_unix_secs()?;
1427        let mut state = self.state.write().await;
1428        let mut refreshed = 0usize;
1429
1430        let share_ids: Vec<[u8; 32]> = state.subscriptions.keys().copied().collect();
1431        for share_id in share_ids {
1432            let sub = match state.subscriptions.get(&share_id) {
1433                Some(s) => s,
1434                None => continue,
1435            };
1436            // Need a cached manifest to check visibility.
1437            let manifest_id = match sub.latest_manifest_id {
1438                Some(id) => id,
1439                None => continue,
1440            };
1441            let manifest = match state.manifest_cache.get(&manifest_id) {
1442                Some(m) => m,
1443                None => continue,
1444            };
1445            if manifest.visibility != ShareVisibility::Public {
1446                continue;
1447            }
1448
1449            // Try to find the signed share-head bytes already in the DHT.
1450            let key = share_head_key(&ShareId(share_id));
1451            let encoded = match state.dht.find_value(key, now) {
1452                Some(val) => val.value,
1453                None => {
1454                    // Fallback: if the publisher also stores it in
1455                    // `published_share_heads`, encode from there.
1456                    match state.published_share_heads.get(&share_id) {
1457                        Some(head) => match crate::cbor::to_vec(head) {
1458                            Ok(enc) => enc,
1459                            Err(_) => continue,
1460                        },
1461                        None => continue,
1462                    }
1463                }
1464            };
1465
1466            state
1467                .dht
1468                .store(key, encoded, crate::dht::DEFAULT_TTL_SECS, now)?;
1469            refreshed += 1;
1470        }
1471
1472        if refreshed > 0 {
1473            drop(state);
1474            persist_state(self).await?;
1475        }
1476        Ok(refreshed)
1477    }
1478
1479    /// Re-populate the in-memory DHT with share heads **and** manifests for
1480    /// shares we have *published*.  `reannounce_subscribed_share_heads` only
1481    /// covers the subscriber side (iterates `subscriptions`); this covers the
1482    /// publisher side so that data survives an app restart where the ephemeral
1483    /// DHT is empty.
1484    pub async fn reannounce_published_share_data(&self) -> anyhow::Result<usize> {
1485        let now = super::helpers::now_unix_secs()?;
1486        let mut state = self.state.write().await;
1487        let mut refreshed = 0usize;
1488
1489        // Collect keys first to avoid borrow issues.
1490        let heads: Vec<([u8; 32], crate::manifest::ShareHead)> = state
1491            .published_share_heads
1492            .iter()
1493            .map(|(k, v)| (*k, v.clone()))
1494            .collect();
1495
1496        for (share_id, head) in heads {
1497            // 1. Re-announce the share head itself.
1498            let head_key = share_head_key(&ShareId(share_id));
1499            let head_bytes = match crate::cbor::to_vec(&head) {
1500                Ok(b) => b,
1501                Err(_) => continue,
1502            };
1503            state
1504                .dht
1505                .store(head_key, head_bytes, crate::dht::DEFAULT_TTL_SECS, now)?;
1506            refreshed += 1;
1507
1508            // 2. Re-announce the manifest so subscribers behind NAT can
1509            //    fetch it from the relay / DHT.
1510            let manifest_id = head.latest_manifest_id;
1511            if let Some(manifest) = state.manifest_cache.get(&manifest_id) {
1512                let manifest_bytes = match crate::cbor::to_vec(manifest) {
1513                    Ok(b) => b,
1514                    Err(_) => continue,
1515                };
1516                state.dht.store(
1517                    manifest_loc_key(&ManifestId(manifest_id)),
1518                    manifest_bytes,
1519                    crate::dht::DEFAULT_TTL_SECS,
1520                    now,
1521                )?;
1522                refreshed += 1;
1523            }
1524        }
1525
1526        if refreshed > 0 {
1527            debug!(
1528                refreshed,
1529                "reannounce_published_share_data: DHT entries restored"
1530            );
1531        }
1532
1533        Ok(refreshed)
1534    }
1535}