1use std::path::{Path, PathBuf};
10
11use anyhow::Context as _;
12use tracing::{debug, info};
13
14use crate::{
15 content::{describe_content, describe_content_file},
16 dht::DEFAULT_TTL_SECS,
17 dht_keys::{content_provider_key, manifest_loc_key, share_head_key},
18 ids::{ManifestId, ShareId},
19 manifest::{ItemV1, ManifestV1, ShareHead, ShareKeypair, ShareVisibility},
20 peer::PeerAddr,
21 wire::Providers,
22};
23
24use super::{
25 NodeHandle, NodeState, ShareItemInfo,
26 helpers::{
27 collect_files_recursive, mime_from_extension, normalize_item_path, now_unix_secs,
28 persist_state,
29 },
30};
31
32impl NodeHandle {
33 pub async fn publish_share(
34 &self,
35 mut manifest: ManifestV1,
36 publisher: &ShareKeypair,
37 ) -> anyhow::Result<[u8; 32]> {
38 manifest.check_limits()?;
39 manifest.sign(publisher)?;
40 manifest.verify()?;
41 let manifest_id = manifest.manifest_id()?.0;
42 let share_id = ShareId(manifest.share_id);
43 info!(
44 share_id = hex::encode(share_id.0),
45 manifest_id = hex::encode(manifest_id),
46 seq = manifest.seq,
47 items = manifest.items.len(),
48 title = manifest.title.as_deref().unwrap_or(""),
49 "publishing share"
50 );
51
52 let head = ShareHead::new_signed(
53 share_id.0,
54 manifest.seq,
55 manifest_id,
56 now_unix_secs()?,
57 publisher,
58 )?;
59
60 let manifest_id = {
61 let mut state = self.state.write().await;
62 let now = now_unix_secs()?;
63 state.dht.store(
65 share_head_key(&share_id),
66 crate::cbor::to_vec(&head)?,
67 DEFAULT_TTL_SECS,
68 now,
69 )?;
70 let manifest_bytes = crate::cbor::to_vec(&manifest)?;
73 state.dht.store(
74 manifest_loc_key(&ManifestId(manifest_id)),
75 manifest_bytes,
76 DEFAULT_TTL_SECS,
77 now,
78 )?;
79 state.manifest_cache.insert(manifest_id, manifest);
80 state.published_share_heads.insert(share_id.0, head.clone());
81 state.dirty.manifests = true;
82 state.dirty.share_heads = true;
83 manifest_id
84 };
85 persist_state(self).await?;
86
87 info!(
88 share_id = hex::encode(share_id.0),
89 manifest_id = hex::encode(manifest_id),
90 "share published and persisted"
91 );
92
93 Ok(manifest_id)
94 }
95
96 pub async fn register_content_by_path(
101 &self,
102 peer: PeerAddr,
103 content_bytes: &[u8],
104 path: PathBuf,
105 ) -> anyhow::Result<[u8; 32]> {
106 crate::blob_store::validate_no_traversal(&path)?;
107 let desc = describe_content(content_bytes);
108 let content_id = desc.content_id.0;
109 debug!(
110 content_id = hex::encode(content_id),
111 size = content_bytes.len(),
112 path = %path.display(),
113 "registering content by path"
114 );
115 let now = now_unix_secs()?;
116 let mut state = self.state.write().await;
117
118 state.content_catalog.insert(content_id, desc);
119 state.content_paths.insert(content_id, path);
120 state.dirty.content_paths = true;
121
122 upsert_provider(&mut state, content_id, peer, now)?;
123
124 Ok(content_id)
125 }
126
127 pub async fn register_content_precomputed(
132 &self,
133 peer: PeerAddr,
134 desc: crate::content::ChunkedContent,
135 path: PathBuf,
136 ) -> anyhow::Result<[u8; 32]> {
137 crate::blob_store::validate_no_traversal(&path)?;
138 let content_id = desc.content_id.0;
139 debug!(
140 content_id = hex::encode(content_id),
141 path = %path.display(),
142 "registering content (precomputed)"
143 );
144 let now = now_unix_secs()?;
145 let mut state = self.state.write().await;
146
147 state.content_catalog.insert(content_id, desc);
148 state.content_paths.insert(content_id, path);
149 state.dirty.content_paths = true;
150
151 upsert_provider(&mut state, content_id, peer, now)?;
152
153 Ok(content_id)
154 }
155
156 pub async fn register_content_from_bytes(
162 &self,
163 peer: PeerAddr,
164 content_bytes: &[u8],
165 data_dir: &Path,
166 ) -> anyhow::Result<[u8; 32]> {
167 let desc = describe_content(content_bytes);
168 let content_id = desc.content_id.0;
169 std::fs::create_dir_all(data_dir)?;
170 let file_path = data_dir.join(format!("{}.dat", hex::encode(content_id)));
171 std::fs::write(&file_path, content_bytes)?;
172 self.register_content_by_path(peer, content_bytes, file_path)
173 .await
174 }
175
176 #[allow(clippy::too_many_arguments)]
183 pub async fn publish_files(
184 &self,
185 files: &[PathBuf],
186 base_dir: Option<&Path>,
187 title: &str,
188 description: Option<&str>,
189 visibility: ShareVisibility,
190 communities: &[[u8; 32]],
191 provider: PeerAddr,
192 publisher: &ShareKeypair,
193 ) -> anyhow::Result<[u8; 32]> {
194 let now = now_unix_secs()?;
195 let next_seq = self
196 .published_share_head(publisher.share_id())
197 .await
198 .map(|h| h.latest_seq.saturating_add(1))
199 .unwrap_or(1);
200
201 let total = files.len();
202 let mut items = Vec::with_capacity(total);
203 for (idx, file_path) in files.iter().enumerate() {
204 let (desc, file_size) = describe_content_file(file_path)
207 .await
208 .with_context(|| format!("hash {}", file_path.display()))?;
209
210 let file_name = file_path
211 .file_name()
212 .unwrap_or_default()
213 .to_string_lossy()
214 .to_string();
215 let rel_path = match base_dir {
216 Some(base) => {
217 let raw = file_path
218 .strip_prefix(base)
219 .with_context(|| {
220 format!(
221 "file {} is not under base {}",
222 file_path.display(),
223 base.display()
224 )
225 })?
226 .to_string_lossy();
227 Some(normalize_item_path(&raw)?)
228 }
229 None => None,
230 };
231 let mime = mime_from_extension(&file_name);
232 items.push(ItemV1 {
233 content_id: desc.content_id.0,
234 size: file_size,
235 name: file_name,
236 path: rel_path,
237 mime,
238 tags: vec![],
239 chunk_count: desc.chunk_count,
240 chunk_list_hash: desc.chunk_list_hash,
241 });
242 self.register_content_precomputed(provider.clone(), desc, file_path.to_path_buf())
244 .await?;
245
246 if total > 50 && (idx + 1) % 500 == 0 {
247 info!(
248 progress = idx + 1,
249 total,
250 "publishing: hashed {}/{} files",
251 idx + 1,
252 total,
253 );
254 }
255 }
256
257 let manifest = ManifestV1 {
258 version: 1,
259 share_pubkey: publisher.verifying_key().to_bytes(),
260 share_id: publisher.share_id().0,
261 seq: next_seq,
262 created_at: now,
263 expires_at: None,
264 title: Some(title.to_owned()),
265 description: description.map(|d| d.to_owned()),
266 visibility,
267 communities: communities.to_vec(),
268 items,
269 recommended_shares: vec![],
270 signature: None,
271 };
272 self.publish_share(manifest, publisher).await
273 }
274
275 #[allow(clippy::too_many_arguments)]
281 pub async fn publish_folder(
282 &self,
283 folder: &Path,
284 title: &str,
285 description: Option<&str>,
286 visibility: ShareVisibility,
287 communities: &[[u8; 32]],
288 provider: PeerAddr,
289 publisher: &ShareKeypair,
290 ) -> anyhow::Result<[u8; 32]> {
291 let files = collect_files_recursive(folder).await?;
292 if files.is_empty() {
293 anyhow::bail!("no files found under {}", folder.display());
294 }
295 self.publish_files(
296 &files,
297 Some(folder),
298 title,
299 description,
300 visibility,
301 communities,
302 provider,
303 publisher,
304 )
305 .await
306 }
307
308 pub async fn list_share_items(&self, share_id: [u8; 32]) -> anyhow::Result<Vec<ShareItemInfo>> {
310 let state = self.state.read().await;
311 let head = state.published_share_heads.get(&share_id);
312
313 let manifest = if let Some(head) = head {
316 state.manifest_cache.get(&head.latest_manifest_id)
317 } else {
318 let sub = state.subscriptions.get(&share_id);
320 sub.and_then(|s| s.latest_manifest_id)
321 .and_then(|mid| state.manifest_cache.get(&mid))
322 };
323
324 let manifest = manifest.ok_or_else(|| {
325 anyhow::anyhow!(
326 "no manifest found for share {} — try syncing first",
327 hex::encode(share_id)
328 )
329 })?;
330
331 Ok(manifest
332 .items
333 .iter()
334 .map(|item| ShareItemInfo {
335 content_id: item.content_id,
336 size: item.size,
337 name: item.name.clone(),
338 path: item.path.clone(),
339 mime: item.mime.clone(),
340 })
341 .collect())
342 }
343
344 pub async fn reannounce_content_providers(&self, self_addr: PeerAddr) -> anyhow::Result<usize> {
349 let now = now_unix_secs()?;
350 let mut state = self.state.write().await;
351
352 let content_ids: Vec<[u8; 32]> = state.content_paths.keys().copied().collect();
354 let mut count = 0usize;
355
356 for content_id in content_ids {
357 if let Err(e) = upsert_provider(&mut state, content_id, self_addr.clone(), now) {
358 debug!(
359 content_id = %hex::encode(content_id),
360 error = %e,
361 "reannounce_content_providers: failed to update provider"
362 );
363 } else {
364 count += 1;
365 }
366 }
367
368 if count > 0 {
369 debug!(count, "reannounce_content_providers: updated");
370 }
371
372 Ok(count)
373 }
374}
375
376fn upsert_provider(
378 state: &mut NodeState,
379 content_id: [u8; 32],
380 peer: PeerAddr,
381 now: u64,
382) -> anyhow::Result<()> {
383 let mut providers: Providers = state
384 .dht
385 .find_value(content_provider_key(&content_id), now)
386 .and_then(|v| crate::cbor::from_slice(&v.value).ok())
387 .unwrap_or(Providers {
388 content_id,
389 providers: vec![],
390 updated_at: now,
391 });
392
393 if !providers.providers.contains(&peer) {
394 providers.providers.push(peer);
395 }
396 providers.updated_at = now;
397
398 state.dht.store(
399 content_provider_key(&content_id),
400 crate::cbor::to_vec(&providers)?,
401 DEFAULT_TTL_SECS,
402 now,
403 )?;
404
405 Ok(())
406}