Skip to main content

scp2p_core/api/
node_publish.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7//! Publishing operations on `NodeHandle`: publish shares, files, folders; list and download share items.
8
9use std::path::{Path, PathBuf};
10
11use anyhow::Context as _;
12use tracing::{debug, info};
13
14use crate::{
15    content::{describe_content, describe_content_file},
16    dht::DEFAULT_TTL_SECS,
17    dht_keys::{content_provider_key, manifest_loc_key, share_head_key},
18    ids::{ManifestId, ShareId},
19    manifest::{ItemV1, ManifestV1, ShareHead, ShareKeypair, ShareVisibility},
20    peer::PeerAddr,
21    wire::Providers,
22};
23
24use super::{
25    NodeHandle, NodeState, ShareItemInfo,
26    helpers::{
27        collect_files_recursive, mime_from_extension, normalize_item_path, now_unix_secs,
28        persist_state,
29    },
30};
31
32impl NodeHandle {
33    pub async fn publish_share(
34        &self,
35        mut manifest: ManifestV1,
36        publisher: &ShareKeypair,
37    ) -> anyhow::Result<[u8; 32]> {
38        manifest.check_limits()?;
39        manifest.sign(publisher)?;
40        manifest.verify()?;
41        let manifest_id = manifest.manifest_id()?.0;
42        let share_id = ShareId(manifest.share_id);
43        info!(
44            share_id = hex::encode(share_id.0),
45            manifest_id = hex::encode(manifest_id),
46            seq = manifest.seq,
47            items = manifest.items.len(),
48            title = manifest.title.as_deref().unwrap_or(""),
49            "publishing share"
50        );
51
52        let head = ShareHead::new_signed(
53            share_id.0,
54            manifest.seq,
55            manifest_id,
56            now_unix_secs()?,
57            publisher,
58        )?;
59
60        let manifest_id = {
61            let mut state = self.state.write().await;
62            let now = now_unix_secs()?;
63            // Store share head in DHT so peers can discover latest seq.
64            state.dht.store(
65                share_head_key(&share_id),
66                crate::cbor::to_vec(&head)?,
67                DEFAULT_TTL_SECS,
68                now,
69            )?;
70            // Store serialised manifest in DHT so peers behind NAT can
71            // fetch it without a direct connection to the publisher.
72            let manifest_bytes = crate::cbor::to_vec(&manifest)?;
73            state.dht.store(
74                manifest_loc_key(&ManifestId(manifest_id)),
75                manifest_bytes,
76                DEFAULT_TTL_SECS,
77                now,
78            )?;
79            state.manifest_cache.insert(manifest_id, manifest);
80            state.published_share_heads.insert(share_id.0, head.clone());
81            state.dirty.manifests = true;
82            state.dirty.share_heads = true;
83            manifest_id
84        };
85        persist_state(self).await?;
86
87        info!(
88            share_id = hex::encode(share_id.0),
89            manifest_id = hex::encode(manifest_id),
90            "share published and persisted"
91        );
92
93        Ok(manifest_id)
94    }
95
96    /// Register a file at `path` as a locally-seedable content item.
97    ///
98    /// Chunks are served directly from `path` via seek-based reads, so no
99    /// separate blob copy is made.
100    pub async fn register_content_by_path(
101        &self,
102        peer: PeerAddr,
103        content_bytes: &[u8],
104        path: PathBuf,
105    ) -> anyhow::Result<[u8; 32]> {
106        crate::blob_store::validate_no_traversal(&path)?;
107        let desc = describe_content(content_bytes);
108        let content_id = desc.content_id.0;
109        debug!(
110            content_id = hex::encode(content_id),
111            size = content_bytes.len(),
112            path = %path.display(),
113            "registering content by path"
114        );
115        let now = now_unix_secs()?;
116        let mut state = self.state.write().await;
117
118        state.content_catalog.insert(content_id, desc);
119        state.content_paths.insert(content_id, path);
120        state.dirty.content_paths = true;
121
122        upsert_provider(&mut state, content_id, peer, now)?;
123
124        Ok(content_id)
125    }
126
127    /// Register a file as seedable content when the [`ChunkedContent`]
128    /// descriptor has already been computed (e.g. via streaming hashing).
129    ///
130    /// This avoids re-reading and re-hashing the file.
131    pub async fn register_content_precomputed(
132        &self,
133        peer: PeerAddr,
134        desc: crate::content::ChunkedContent,
135        path: PathBuf,
136    ) -> anyhow::Result<[u8; 32]> {
137        crate::blob_store::validate_no_traversal(&path)?;
138        let content_id = desc.content_id.0;
139        debug!(
140            content_id = hex::encode(content_id),
141            path = %path.display(),
142            "registering content (precomputed)"
143        );
144        let now = now_unix_secs()?;
145        let mut state = self.state.write().await;
146
147        state.content_catalog.insert(content_id, desc);
148        state.content_paths.insert(content_id, path);
149        state.dirty.content_paths = true;
150
151        upsert_provider(&mut state, content_id, peer, now)?;
152
153        Ok(content_id)
154    }
155
156    /// Register in-memory bytes as seedable content.
157    ///
158    /// Writes `content_bytes` to `{data_dir}/{hex_content_id}.dat` then
159    /// delegates to [`Self::register_content_by_path`].  Use this for small
160    /// payloads (e.g. text publishing) that are not already on disk.
161    pub async fn register_content_from_bytes(
162        &self,
163        peer: PeerAddr,
164        content_bytes: &[u8],
165        data_dir: &Path,
166    ) -> anyhow::Result<[u8; 32]> {
167        let desc = describe_content(content_bytes);
168        let content_id = desc.content_id.0;
169        std::fs::create_dir_all(data_dir)?;
170        let file_path = data_dir.join(format!("{}.dat", hex::encode(content_id)));
171        std::fs::write(&file_path, content_bytes)?;
172        self.register_content_by_path(peer, content_bytes, file_path)
173            .await
174    }
175
176    /// Publish one or more files from disk as a single share.
177    ///
178    /// Each file becomes an `ItemV1` in the manifest.  If `base_dir` is
179    /// `Some`, then `ItemV1.path` is set to the path of the file relative to
180    /// `base_dir`; otherwise `path` is `None` and `name` is the plain
181    /// filename.
182    #[allow(clippy::too_many_arguments)]
183    pub async fn publish_files(
184        &self,
185        files: &[PathBuf],
186        base_dir: Option<&Path>,
187        title: &str,
188        description: Option<&str>,
189        visibility: ShareVisibility,
190        communities: &[[u8; 32]],
191        provider: PeerAddr,
192        publisher: &ShareKeypair,
193    ) -> anyhow::Result<[u8; 32]> {
194        let now = now_unix_secs()?;
195        let next_seq = self
196            .published_share_head(publisher.share_id())
197            .await
198            .map(|h| h.latest_seq.saturating_add(1))
199            .unwrap_or(1);
200
201        let total = files.len();
202        let mut items = Vec::with_capacity(total);
203        for (idx, file_path) in files.iter().enumerate() {
204            // Stream the file in 256 KiB chunks — never holds the full file
205            // in memory.  Returns the precomputed ChunkedContent descriptor.
206            let (desc, file_size) = describe_content_file(file_path)
207                .await
208                .with_context(|| format!("hash {}", file_path.display()))?;
209
210            let file_name = file_path
211                .file_name()
212                .unwrap_or_default()
213                .to_string_lossy()
214                .to_string();
215            let rel_path = match base_dir {
216                Some(base) => {
217                    let raw = file_path
218                        .strip_prefix(base)
219                        .with_context(|| {
220                            format!(
221                                "file {} is not under base {}",
222                                file_path.display(),
223                                base.display()
224                            )
225                        })?
226                        .to_string_lossy();
227                    Some(normalize_item_path(&raw)?)
228                }
229                None => None,
230            };
231            let mime = mime_from_extension(&file_name);
232            items.push(ItemV1 {
233                content_id: desc.content_id.0,
234                size: file_size,
235                name: file_name,
236                path: rel_path,
237                mime,
238                tags: vec![],
239                chunk_count: desc.chunk_count,
240                chunk_list_hash: desc.chunk_list_hash,
241            });
242            // Register using the already-computed descriptor — no re-read.
243            self.register_content_precomputed(provider.clone(), desc, file_path.to_path_buf())
244                .await?;
245
246            if total > 50 && (idx + 1) % 500 == 0 {
247                info!(
248                    progress = idx + 1,
249                    total,
250                    "publishing: hashed {}/{} files",
251                    idx + 1,
252                    total,
253                );
254            }
255        }
256
257        let manifest = ManifestV1 {
258            version: 1,
259            share_pubkey: publisher.verifying_key().to_bytes(),
260            share_id: publisher.share_id().0,
261            seq: next_seq,
262            created_at: now,
263            expires_at: None,
264            title: Some(title.to_owned()),
265            description: description.map(|d| d.to_owned()),
266            visibility,
267            communities: communities.to_vec(),
268            items,
269            recommended_shares: vec![],
270            signature: None,
271        };
272        self.publish_share(manifest, publisher).await
273    }
274
275    /// Publish an entire folder tree as a new share revision.
276    ///
277    /// Every file under `folder` is recursively collected, hashed, and
278    /// registered as a local provider.  Each item carries a `path`
279    /// relative to `folder`.
280    #[allow(clippy::too_many_arguments)]
281    pub async fn publish_folder(
282        &self,
283        folder: &Path,
284        title: &str,
285        description: Option<&str>,
286        visibility: ShareVisibility,
287        communities: &[[u8; 32]],
288        provider: PeerAddr,
289        publisher: &ShareKeypair,
290    ) -> anyhow::Result<[u8; 32]> {
291        let files = collect_files_recursive(folder).await?;
292        if files.is_empty() {
293            anyhow::bail!("no files found under {}", folder.display());
294        }
295        self.publish_files(
296            &files,
297            Some(folder),
298            title,
299            description,
300            visibility,
301            communities,
302            provider,
303            publisher,
304        )
305        .await
306    }
307
308    /// List all items in a share manifest.
309    pub async fn list_share_items(&self, share_id: [u8; 32]) -> anyhow::Result<Vec<ShareItemInfo>> {
310        let state = self.state.read().await;
311        let head = state.published_share_heads.get(&share_id);
312
313        // Try to find the manifest from published heads first, then from
314        // the manifest cache keyed by any known manifest_id for this share.
315        let manifest = if let Some(head) = head {
316            state.manifest_cache.get(&head.latest_manifest_id)
317        } else {
318            // Walk subscriptions to find the manifest.
319            let sub = state.subscriptions.get(&share_id);
320            sub.and_then(|s| s.latest_manifest_id)
321                .and_then(|mid| state.manifest_cache.get(&mid))
322        };
323
324        let manifest = manifest.ok_or_else(|| {
325            anyhow::anyhow!(
326                "no manifest found for share {} — try syncing first",
327                hex::encode(share_id)
328            )
329        })?;
330
331        Ok(manifest
332            .items
333            .iter()
334            .map(|item| ShareItemInfo {
335                content_id: item.content_id,
336                size: item.size,
337                name: item.name.clone(),
338                path: item.path.clone(),
339                mime: item.mime.clone(),
340            })
341            .collect())
342    }
343
344    /// Re-announce DHT provider entries for all content in `content_paths`.
345    ///
346    /// Called after relay tunnel registration so that provider entries
347    /// contain the relayed address, enabling NAT-traversed downloads.
348    pub async fn reannounce_content_providers(&self, self_addr: PeerAddr) -> anyhow::Result<usize> {
349        let now = now_unix_secs()?;
350        let mut state = self.state.write().await;
351
352        // Collect content IDs from persisted content_paths.
353        let content_ids: Vec<[u8; 32]> = state.content_paths.keys().copied().collect();
354        let mut count = 0usize;
355
356        for content_id in content_ids {
357            if let Err(e) = upsert_provider(&mut state, content_id, self_addr.clone(), now) {
358                debug!(
359                    content_id = %hex::encode(content_id),
360                    error = %e,
361                    "reannounce_content_providers: failed to update provider"
362                );
363            } else {
364                count += 1;
365            }
366        }
367
368        if count > 0 {
369            debug!(count, "reannounce_content_providers: updated");
370        }
371
372        Ok(count)
373    }
374}
375
376/// Insert or update a DHT provider entry for `content_id`.
377fn upsert_provider(
378    state: &mut NodeState,
379    content_id: [u8; 32],
380    peer: PeerAddr,
381    now: u64,
382) -> anyhow::Result<()> {
383    let mut providers: Providers = state
384        .dht
385        .find_value(content_provider_key(&content_id), now)
386        .and_then(|v| crate::cbor::from_slice(&v.value).ok())
387        .unwrap_or(Providers {
388            content_id,
389            providers: vec![],
390            updated_at: now,
391        });
392
393    if !providers.providers.contains(&peer) {
394        providers.providers.push(peer);
395    }
396    providers.updated_at = now;
397
398    state.dht.store(
399        content_provider_key(&content_id),
400        crate::cbor::to_vec(&providers)?,
401        DEFAULT_TTL_SECS,
402        now,
403    )?;
404
405    Ok(())
406}