1use std::{
8 collections::{HashMap, HashSet},
9 path::{Path, PathBuf},
10 sync::Arc,
11};
12
13use async_trait::async_trait;
14use chacha20poly1305::{
15 Key, XChaCha20Poly1305, XNonce,
16 aead::{Aead, KeyInit},
17};
18use pbkdf2::pbkdf2_hmac;
19use rand::RngCore;
20use rusqlite::{Connection, OptionalExtension, params};
21use serde::{Deserialize, Serialize};
22use tokio::sync::RwLock;
23
24use crate::{
25 api::{BlocklistRules, SubscriptionTrustLevel},
26 manifest::{ManifestV1, ShareHead},
27 peer::PeerAddr,
28 peer_db::PeerRecord,
29 search::SearchIndexSnapshot,
30};
31
32const KEY_KDF_ITERATIONS: u32 = 600_000;
33
34const CURRENT_SCHEMA_VERSION: u32 = 2;
36
37#[derive(Debug, Clone, Default, Serialize, Deserialize)]
38pub struct PersistedState {
39 pub peers: Vec<PeerRecord>,
40 pub subscriptions: Vec<PersistedSubscription>,
41 #[serde(default)]
42 pub communities: Vec<PersistedCommunity>,
43 #[serde(default)]
44 pub publisher_identities: Vec<PersistedPublisherIdentity>,
45 pub manifests: HashMap<[u8; 32], ManifestV1>,
46 pub share_heads: HashMap<[u8; 32], ShareHead>,
47 pub share_weights: HashMap<[u8; 32], f32>,
48 pub search_index: Option<SearchIndexSnapshot>,
49 pub partial_downloads: HashMap<[u8; 32], PersistedPartialDownload>,
50 #[serde(default)]
55 pub node_key: Option<[u8; 32]>,
56 pub encrypted_node_key: Option<EncryptedSecret>,
57 #[serde(default)]
58 pub enabled_blocklist_shares: Vec<[u8; 32]>,
59 #[serde(default)]
60 pub blocklist_rules_by_share: HashMap<[u8; 32], BlocklistRules>,
61 #[serde(default)]
65 pub content_paths: HashMap<[u8; 32], PathBuf>,
66 #[serde(default)]
73 pub pinned_bootstrap_keys: HashMap<String, [u8; 32]>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PersistedSubscription {
78 pub share_id: [u8; 32],
79 pub share_pubkey: Option<[u8; 32]>,
80 pub latest_seq: u64,
81 pub latest_manifest_id: Option<[u8; 32]>,
82 #[serde(default)]
83 pub trust_level: SubscriptionTrustLevel,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
87pub struct PersistedCommunity {
88 pub share_id: [u8; 32],
89 pub share_pubkey: [u8; 32],
90 #[serde(default)]
94 pub membership_token: Option<Vec<u8>>,
95 #[serde(default)]
97 pub name: Option<String>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
101pub struct PersistedPublisherIdentity {
102 pub label: String,
103 #[serde(default)]
106 pub share_secret: Option<[u8; 32]>,
107 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub encrypted_share_secret: Option<EncryptedSecret>,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
119 pub node_key_encrypted_share_secret: Option<EncryptedSecret>,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
123pub struct PersistedPartialDownload {
124 pub content_id: [u8; 32],
125 pub target_path: String,
126 pub total_chunks: u32,
127 pub completed_chunks: Vec<u32>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
131pub struct EncryptedSecret {
132 pub salt: [u8; 16],
133 pub nonce: [u8; 24],
134 pub ciphertext: Vec<u8>,
135}
136
137#[derive(Debug, Clone, Copy, Default)]
141pub struct DirtyFlags {
142 pub peers: bool,
143 pub subscriptions: bool,
144 pub communities: bool,
145 pub publisher_identities: bool,
146 pub manifests: bool,
147 pub share_heads: bool,
148 pub share_weights: bool,
149 pub partial_downloads: bool,
150 pub node_key: bool,
151 pub blocklist: bool,
152 pub content_paths: bool,
153}
154
155impl DirtyFlags {
156 pub fn all() -> Self {
158 Self {
159 peers: true,
160 subscriptions: true,
161 communities: true,
162 publisher_identities: true,
163 manifests: true,
164 share_heads: true,
165 share_weights: true,
166 partial_downloads: true,
167 node_key: true,
168 blocklist: true,
169 content_paths: true,
170 }
171 }
172
173 pub fn any(&self) -> bool {
175 self.peers
176 || self.subscriptions
177 || self.communities
178 || self.publisher_identities
179 || self.manifests
180 || self.share_heads
181 || self.share_weights
182 || self.partial_downloads
183 || self.node_key
184 || self.blocklist
185 || self.content_paths
186 }
187}
188
189#[async_trait]
190pub trait Store: Send + Sync {
191 async fn load_state(&self) -> anyhow::Result<PersistedState>;
192 async fn save_state(&self, state: &PersistedState) -> anyhow::Result<()>;
193
194 async fn save_incremental(
199 &self,
200 state: &PersistedState,
201 dirty: &DirtyFlags,
202 ) -> anyhow::Result<()> {
203 if dirty.any() {
204 self.save_state(state).await
205 } else {
206 Ok(())
207 }
208 }
209
210 async fn index_manifest_for_search(&self, _manifest: &ManifestV1) -> anyhow::Result<()> {
219 Ok(())
220 }
221
222 async fn remove_share_from_search(&self, _share_id: [u8; 32]) -> anyhow::Result<()> {
227 Ok(())
228 }
229}
230
231#[derive(Default)]
232pub struct MemoryStore {
233 state: RwLock<PersistedState>,
234}
235
236impl MemoryStore {
237 pub fn new() -> Arc<Self> {
238 Arc::new(Self::default())
239 }
240}
241
242pub struct SqliteStore {
243 path: PathBuf,
244}
245
246impl SqliteStore {
247 pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Arc<Self>> {
248 let store = Arc::new(Self {
249 path: path.as_ref().to_path_buf(),
250 });
251 store.ensure_schema()?;
252 Ok(store)
253 }
254
255 fn open_connection(&self) -> anyhow::Result<Connection> {
256 Ok(Connection::open(&self.path)?)
257 }
258
259 fn ensure_schema(&self) -> anyhow::Result<()> {
260 let conn = self.open_connection()?;
261 conn.execute_batch(
263 "CREATE TABLE IF NOT EXISTS peers (
264 addr_key TEXT PRIMARY KEY,
265 payload BLOB NOT NULL
266 );
267 CREATE TABLE IF NOT EXISTS subscriptions (
268 share_id BLOB PRIMARY KEY,
269 share_pubkey BLOB,
270 latest_seq INTEGER NOT NULL,
271 latest_manifest_id BLOB,
272 trust_level TEXT NOT NULL DEFAULT 'normal'
273 );
274 CREATE TABLE IF NOT EXISTS manifests (
275 manifest_id BLOB PRIMARY KEY,
276 payload BLOB NOT NULL
277 );
278 CREATE TABLE IF NOT EXISTS share_weights (
279 share_id BLOB PRIMARY KEY,
280 weight REAL NOT NULL
281 );
282 CREATE TABLE IF NOT EXISTS partial_downloads (
283 content_id BLOB PRIMARY KEY,
284 payload BLOB NOT NULL
285 );
286 CREATE TABLE IF NOT EXISTS metadata (
287 key TEXT PRIMARY KEY,
288 payload BLOB NOT NULL
289 );",
290 )?;
291
292 let current_version: u32 = conn
294 .query_row(
295 "SELECT payload FROM metadata WHERE key = 'schema_version'",
296 [],
297 |row| {
298 let blob: Vec<u8> = row.get(0)?;
299 Ok(u32::from_le_bytes(blob.try_into().unwrap_or([0, 0, 0, 0])))
300 },
301 )
302 .unwrap_or(0);
303
304 if current_version < 2 {
306 conn.execute_batch(
307 "CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
308 share_id UNINDEXED,
309 content_id UNINDEXED,
310 name,
311 tags,
312 title,
313 description,
314 tokenize = 'unicode61'
315 );",
316 )?;
317 conn.execute("DELETE FROM metadata WHERE key = 'search_index'", [])?;
319 }
320
321 if current_version != CURRENT_SCHEMA_VERSION {
323 conn.execute(
324 "INSERT INTO metadata(key, payload) VALUES('schema_version', ?1)
325 ON CONFLICT(key) DO UPDATE SET payload = excluded.payload",
326 params![CURRENT_SCHEMA_VERSION.to_le_bytes().to_vec()],
327 )?;
328 }
329 Ok(())
330 }
331}
332
333#[async_trait]
334impl Store for MemoryStore {
335 async fn load_state(&self) -> anyhow::Result<PersistedState> {
336 Ok(self.state.read().await.clone())
337 }
338
339 async fn save_state(&self, state: &PersistedState) -> anyhow::Result<()> {
340 *self.state.write().await = state.clone();
341 Ok(())
342 }
343}
344
345#[async_trait]
346impl Store for SqliteStore {
347 async fn load_state(&self) -> anyhow::Result<PersistedState> {
348 let path = self.path.clone();
349 tokio::task::spawn_blocking(move || {
350 let store = SqliteStore { path };
351 store.ensure_schema()?;
352 let conn = store.open_connection()?;
353 load_state_sync(&conn)
354 })
355 .await?
356 }
357
358 async fn save_state(&self, state: &PersistedState) -> anyhow::Result<()> {
359 let path = self.path.clone();
360 let state = state.clone();
361 tokio::task::spawn_blocking(move || {
362 let store = SqliteStore { path };
363 store.ensure_schema()?;
364 let mut conn = store.open_connection()?;
365 save_state_sync(&mut conn, &state, &DirtyFlags::all())
366 })
367 .await?
368 }
369
370 async fn save_incremental(
371 &self,
372 state: &PersistedState,
373 dirty: &DirtyFlags,
374 ) -> anyhow::Result<()> {
375 if !dirty.any() {
376 return Ok(());
377 }
378 let path = self.path.clone();
379 let state = state.clone();
380 let dirty = *dirty;
381 tokio::task::spawn_blocking(move || {
382 let store = SqliteStore { path };
383 store.ensure_schema()?;
384 let mut conn = store.open_connection()?;
385 save_state_sync(&mut conn, &state, &dirty)
386 })
387 .await?
388 }
389
390 async fn index_manifest_for_search(&self, manifest: &ManifestV1) -> anyhow::Result<()> {
391 let path = self.path.clone();
392 let share_id_hex = hex::encode(manifest.share_id);
393 let items: Vec<(String, String, String, String, String)> = manifest
394 .items
395 .iter()
396 .map(|item| {
397 (
398 hex::encode(item.content_id),
399 item.name.clone(),
400 item.tags.join("\t"),
401 manifest.title.as_deref().unwrap_or("").to_owned(),
402 manifest.description.as_deref().unwrap_or("").to_owned(),
403 )
404 })
405 .collect();
406 tokio::task::spawn_blocking(move || {
407 let mut conn = Connection::open(&path)?;
408 let tx = conn.transaction()?;
409 tx.execute(
411 "DELETE FROM search_fts WHERE share_id = ?1",
412 params![share_id_hex],
413 )?;
414 let mut stmt = tx.prepare(
415 "INSERT INTO search_fts(share_id, content_id, name, tags, title, description) \
416 VALUES(?1, ?2, ?3, ?4, ?5, ?6)",
417 )?;
418 for (content_id_hex, name, tags, title, description) in &items {
419 stmt.execute(params![
420 share_id_hex,
421 content_id_hex,
422 name,
423 tags,
424 title,
425 description
426 ])?;
427 }
428 drop(stmt);
429 tx.commit()?;
430 Ok::<_, anyhow::Error>(())
431 })
432 .await?
433 }
434
435 async fn remove_share_from_search(&self, share_id: [u8; 32]) -> anyhow::Result<()> {
436 let path = self.path.clone();
437 let share_id_hex = hex::encode(share_id);
438 tokio::task::spawn_blocking(move || {
439 let conn = Connection::open(&path)?;
440 conn.execute(
441 "DELETE FROM search_fts WHERE share_id = ?1",
442 params![share_id_hex],
443 )?;
444 Ok::<_, anyhow::Error>(())
445 })
446 .await?
447 }
448}
449
450fn load_state_sync(conn: &Connection) -> anyhow::Result<PersistedState> {
452 let mut state = PersistedState::default();
453
454 {
455 let mut stmt = conn.prepare("SELECT payload FROM peers")?;
456 let rows = stmt.query_map([], |row| row.get::<_, Vec<u8>>(0))?;
457 for row in rows {
458 let record: PeerRecord = crate::cbor::from_slice(&row?)?;
459 state.peers.push(record);
460 }
461 }
462
463 {
464 let mut stmt = conn.prepare(
465 "SELECT share_id, share_pubkey, latest_seq, latest_manifest_id, trust_level FROM subscriptions",
466 )?;
467 let rows = stmt.query_map([], |row| {
468 Ok((
469 row.get::<_, Vec<u8>>(0)?,
470 row.get::<_, Option<Vec<u8>>>(1)?,
471 row.get::<_, u64>(2)?,
472 row.get::<_, Option<Vec<u8>>>(3)?,
473 row.get::<_, String>(4)?,
474 ))
475 })?;
476 for row in rows {
477 let (share_id, share_pubkey, latest_seq, latest_manifest_id, trust_level) = row?;
478 state.subscriptions.push(PersistedSubscription {
479 share_id: blob_to_array::<32>(&share_id, "subscriptions.share_id")?,
480 share_pubkey: share_pubkey
481 .map(|v| blob_to_array::<32>(&v, "subscriptions.share_pubkey"))
482 .transpose()?,
483 latest_seq,
484 latest_manifest_id: latest_manifest_id
485 .map(|v| blob_to_array::<32>(&v, "subscriptions.latest_manifest_id"))
486 .transpose()?,
487 trust_level: parse_trust_level(&trust_level),
488 });
489 }
490 }
491
492 state.communities = load_metadata_cbor(conn, "communities")?.unwrap_or_default();
493 state.publisher_identities =
494 load_metadata_cbor(conn, "publisher_identities")?.unwrap_or_default();
495
496 {
497 let mut stmt = conn.prepare("SELECT manifest_id, payload FROM manifests")?;
498 let rows = stmt.query_map([], |row| {
499 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
500 })?;
501 for row in rows {
502 let (manifest_id, payload) = row?;
503 let manifest: ManifestV1 = crate::cbor::from_slice(&payload)?;
504 state.manifests.insert(
505 blob_to_array::<32>(&manifest_id, "manifests.manifest_id")?,
506 manifest,
507 );
508 }
509 }
510
511 {
512 let mut stmt = conn.prepare("SELECT share_id, weight FROM share_weights")?;
513 let rows = stmt.query_map([], |row| {
514 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, f32>(1)?))
515 })?;
516 for row in rows {
517 let (share_id, weight) = row?;
518 state.share_weights.insert(
519 blob_to_array::<32>(&share_id, "share_weights.share_id")?,
520 weight,
521 );
522 }
523 }
524
525 {
526 let mut stmt = conn.prepare("SELECT content_id, payload FROM partial_downloads")?;
527 let rows = stmt.query_map([], |row| {
528 Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
529 })?;
530 for row in rows {
531 let (content_id, payload) = row?;
532 let partial: PersistedPartialDownload = crate::cbor::from_slice(&payload)?;
533 state.partial_downloads.insert(
534 blob_to_array::<32>(&content_id, "partial_downloads.content_id")?,
535 partial,
536 );
537 }
538 }
539
540 {
542 let has_fts = conn
545 .prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'")
546 .and_then(|mut s| s.query_row([], |_| Ok(())))
547 .is_ok();
548
549 if has_fts {
550 let mut stmt = conn.prepare(
551 "SELECT share_id, content_id, name, tags, title, description FROM search_fts",
552 )?;
553 let rows = stmt.query_map([], |row| {
554 Ok((
555 row.get::<_, String>(0)?,
556 row.get::<_, String>(1)?,
557 row.get::<_, String>(2)?,
558 row.get::<_, String>(3)?,
559 row.get::<_, String>(4)?,
560 row.get::<_, String>(5)?,
561 ))
562 })?;
563 let mut items = Vec::new();
564 for row in rows {
565 let (share_id_hex, content_id_hex, name, tags_str, title, description) = row?;
566 let share_id = hex_to_array::<32>(&share_id_hex, "search_fts.share_id")?;
567 let content_id = hex_to_array::<32>(&content_id_hex, "search_fts.content_id")?;
568 let tags: Vec<String> = tags_str
569 .split('\t')
570 .filter(|t| !t.is_empty())
571 .map(ToOwned::to_owned)
572 .collect();
573 let title = if title.is_empty() { None } else { Some(title) };
574 let description = if description.is_empty() {
575 None
576 } else {
577 Some(description)
578 };
579 items.push(crate::search::IndexedItem {
580 share_id,
581 content_id,
582 name,
583 tags,
584 title,
585 description,
586 });
587 }
588 if !items.is_empty() {
589 state.search_index = Some(crate::search::SearchIndex::from_items(items).snapshot());
590 }
591 } else {
592 state.search_index = load_metadata_cbor(conn, "search_index")?;
593 }
594 }
595
596 state.share_heads = load_metadata_cbor(conn, "share_heads")?.unwrap_or_default();
597 state.node_key = load_metadata_cbor(conn, "node_key")?;
598 state.encrypted_node_key = load_metadata_cbor(conn, "encrypted_node_key")?;
599 state.enabled_blocklist_shares =
600 load_metadata_cbor(conn, "enabled_blocklist_shares")?.unwrap_or_default();
601 state.blocklist_rules_by_share =
602 load_metadata_cbor(conn, "blocklist_rules_by_share")?.unwrap_or_default();
603 state.content_paths = load_metadata_cbor(conn, "content_paths")?.unwrap_or_default();
604 state.pinned_bootstrap_keys =
605 load_metadata_cbor(conn, "pinned_bootstrap_keys")?.unwrap_or_default();
606
607 Ok(state)
608}
609
610fn load_metadata_cbor<T: serde::de::DeserializeOwned>(
612 conn: &Connection,
613 key: &str,
614) -> anyhow::Result<Option<T>> {
615 conn.query_row(
616 "SELECT payload FROM metadata WHERE key = ?1",
617 params![key],
618 |row| row.get::<_, Vec<u8>>(0),
619 )
620 .optional()?
621 .map(|payload| crate::cbor::from_slice(&payload).map_err(Into::into))
622 .transpose()
623}
624
625fn save_state_sync(
631 conn: &mut Connection,
632 state: &PersistedState,
633 dirty: &DirtyFlags,
634) -> anyhow::Result<()> {
635 let tx = conn.transaction()?;
636
637 if dirty.peers {
639 let mut live_peer_keys: HashSet<String> = HashSet::new();
640 for peer in &state.peers {
641 let addr_key = format!(
642 "{}:{}:{:?}",
643 peer.addr.ip, peer.addr.port, peer.addr.transport
644 );
645 tx.execute(
646 "INSERT INTO peers(addr_key, payload) VALUES(?1, ?2)
647 ON CONFLICT(addr_key) DO UPDATE SET payload = excluded.payload",
648 params![addr_key, crate::cbor::to_vec(peer)?],
649 )?;
650 live_peer_keys.insert(addr_key);
651 }
652 delete_stale_text_keys(&tx, "peers", "addr_key", &live_peer_keys)?;
653 upsert_metadata_cbor(&tx, "pinned_bootstrap_keys", &state.pinned_bootstrap_keys)?;
654 }
655
656 if dirty.subscriptions {
658 let mut live_sub_keys: HashSet<Vec<u8>> = HashSet::new();
659 for sub in &state.subscriptions {
660 tx.execute(
661 "INSERT INTO subscriptions(share_id, share_pubkey, latest_seq, latest_manifest_id, trust_level)
662 VALUES(?1, ?2, ?3, ?4, ?5)
663 ON CONFLICT(share_id) DO UPDATE SET
664 share_pubkey = excluded.share_pubkey,
665 latest_seq = excluded.latest_seq,
666 latest_manifest_id = excluded.latest_manifest_id,
667 trust_level = excluded.trust_level",
668 params![
669 sub.share_id.to_vec(),
670 sub.share_pubkey.map(|v| v.to_vec()),
671 sub.latest_seq,
672 sub.latest_manifest_id.map(|v| v.to_vec()),
673 trust_level_str(sub.trust_level),
674 ],
675 )?;
676 live_sub_keys.insert(sub.share_id.to_vec());
677 }
678 delete_stale_blob_keys(&tx, "subscriptions", "share_id", &live_sub_keys)?;
679 }
680
681 if dirty.communities {
683 upsert_metadata_cbor(&tx, "communities", &state.communities)?;
684 }
685 if dirty.publisher_identities {
686 upsert_metadata_cbor(&tx, "publisher_identities", &state.publisher_identities)?;
687 }
688
689 if dirty.manifests {
691 let mut live_manifest_keys: HashSet<Vec<u8>> = HashSet::new();
692 for (manifest_id, manifest) in &state.manifests {
693 tx.execute(
694 "INSERT INTO manifests(manifest_id, payload) VALUES(?1, ?2)
695 ON CONFLICT(manifest_id) DO UPDATE SET payload = excluded.payload",
696 params![manifest_id.to_vec(), crate::cbor::to_vec(manifest)?],
697 )?;
698 live_manifest_keys.insert(manifest_id.to_vec());
699 }
700 delete_stale_blob_keys(&tx, "manifests", "manifest_id", &live_manifest_keys)?;
701 }
702
703 if dirty.share_weights {
705 let mut live_weight_keys: HashSet<Vec<u8>> = HashSet::new();
706 for (share_id, weight) in &state.share_weights {
707 tx.execute(
708 "INSERT INTO share_weights(share_id, weight) VALUES(?1, ?2)
709 ON CONFLICT(share_id) DO UPDATE SET weight = excluded.weight",
710 params![share_id.to_vec(), weight],
711 )?;
712 live_weight_keys.insert(share_id.to_vec());
713 }
714 delete_stale_blob_keys(&tx, "share_weights", "share_id", &live_weight_keys)?;
715 }
716
717 if dirty.partial_downloads {
719 let mut live_partial_keys: HashSet<Vec<u8>> = HashSet::new();
720 for (content_id, partial) in &state.partial_downloads {
721 tx.execute(
722 "INSERT INTO partial_downloads(content_id, payload) VALUES(?1, ?2)
723 ON CONFLICT(content_id) DO UPDATE SET payload = excluded.payload",
724 params![content_id.to_vec(), crate::cbor::to_vec(partial)?],
725 )?;
726 live_partial_keys.insert(content_id.to_vec());
727 }
728 delete_stale_blob_keys(&tx, "partial_downloads", "content_id", &live_partial_keys)?;
729 }
730
731 if dirty.share_heads {
735 upsert_metadata_cbor(&tx, "share_heads", &state.share_heads)?;
736 }
737 if dirty.node_key {
738 upsert_metadata_cbor_opt(&tx, "node_key", &state.node_key)?;
739 upsert_metadata_cbor_opt(&tx, "encrypted_node_key", &state.encrypted_node_key)?;
740 }
741 if dirty.blocklist {
742 upsert_metadata_cbor(
743 &tx,
744 "enabled_blocklist_shares",
745 &state.enabled_blocklist_shares,
746 )?;
747 upsert_metadata_cbor(
748 &tx,
749 "blocklist_rules_by_share",
750 &state.blocklist_rules_by_share,
751 )?;
752 }
753 if dirty.content_paths {
754 upsert_metadata_cbor(&tx, "content_paths", &state.content_paths)?;
755 }
756
757 tx.commit()?;
758 Ok(())
759}
760
761fn upsert_metadata_cbor<T: Serialize>(
764 tx: &rusqlite::Transaction<'_>,
765 key: &str,
766 value: &T,
767) -> anyhow::Result<()> {
768 let bytes = crate::cbor::to_vec(value)?;
769 tx.execute(
770 "INSERT INTO metadata(key, payload) VALUES(?1, ?2)
771 ON CONFLICT(key) DO UPDATE SET payload = excluded.payload",
772 params![key, bytes],
773 )?;
774 Ok(())
775}
776
777fn upsert_metadata_cbor_opt<T: Serialize>(
779 tx: &rusqlite::Transaction<'_>,
780 key: &str,
781 value: &Option<T>,
782) -> anyhow::Result<()> {
783 match value {
784 Some(v) => {
785 let bytes = crate::cbor::to_vec(v)?;
786 tx.execute(
787 "INSERT INTO metadata(key, payload) VALUES(?1, ?2)
788 ON CONFLICT(key) DO UPDATE SET payload = excluded.payload",
789 params![key, bytes],
790 )?;
791 }
792 None => {
793 tx.execute("DELETE FROM metadata WHERE key = ?1", params![key])?;
794 }
795 }
796 Ok(())
797}
798
799fn validate_sql_identifier(ident: &str) -> anyhow::Result<()> {
802 if ident.is_empty() || !ident.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
803 anyhow::bail!("invalid SQL identifier: {ident:?}");
804 }
805 Ok(())
806}
807
808fn delete_stale_text_keys(
809 tx: &rusqlite::Transaction<'_>,
810 table: &str,
811 pk_col: &str,
812 live_keys: &HashSet<String>,
813) -> anyhow::Result<()> {
814 validate_sql_identifier(table)?;
815 validate_sql_identifier(pk_col)?;
816 let mut stmt = tx.prepare(&format!("SELECT {pk_col} FROM {table}"))?;
817 let existing: Vec<String> = stmt
818 .query_map([], |row| row.get::<_, String>(0))?
819 .filter_map(|r| r.ok())
820 .collect();
821 for key in existing {
822 if !live_keys.contains(&key) {
823 tx.execute(
824 &format!("DELETE FROM {table} WHERE {pk_col} = ?1"),
825 params![key],
826 )?;
827 }
828 }
829 Ok(())
830}
831
832fn delete_stale_blob_keys(
834 tx: &rusqlite::Transaction<'_>,
835 table: &str,
836 pk_col: &str,
837 live_keys: &HashSet<Vec<u8>>,
838) -> anyhow::Result<()> {
839 validate_sql_identifier(table)?;
840 validate_sql_identifier(pk_col)?;
841 let mut stmt = tx.prepare(&format!("SELECT {pk_col} FROM {table}"))?;
842 let existing: Vec<Vec<u8>> = stmt
843 .query_map([], |row| row.get::<_, Vec<u8>>(0))?
844 .filter_map(|r| r.ok())
845 .collect();
846 for key in existing {
847 if !live_keys.contains(&key) {
848 tx.execute(
849 &format!("DELETE FROM {table} WHERE {pk_col} = ?1"),
850 params![key],
851 )?;
852 }
853 }
854 Ok(())
855}
856
857fn parse_trust_level(value: &str) -> SubscriptionTrustLevel {
858 match value {
859 "trusted" => SubscriptionTrustLevel::Trusted,
860 "untrusted" => SubscriptionTrustLevel::Untrusted,
861 _ => SubscriptionTrustLevel::Normal,
862 }
863}
864
865fn trust_level_str(level: SubscriptionTrustLevel) -> &'static str {
866 match level {
867 SubscriptionTrustLevel::Trusted => "trusted",
868 SubscriptionTrustLevel::Normal => "normal",
869 SubscriptionTrustLevel::Untrusted => "untrusted",
870 }
871}
872
873fn blob_to_array<const N: usize>(blob: &[u8], field: &str) -> anyhow::Result<[u8; N]> {
874 if blob.len() != N {
875 anyhow::bail!(
876 "invalid {} length: expected {}, got {}",
877 field,
878 N,
879 blob.len()
880 );
881 }
882 let mut out = [0u8; N];
883 out.copy_from_slice(blob);
884 Ok(out)
885}
886
887fn hex_to_array<const N: usize>(hex_str: &str, field: &str) -> anyhow::Result<[u8; N]> {
888 let bytes =
889 hex::decode(hex_str).map_err(|e| anyhow::anyhow!("invalid hex in {}: {}", field, e))?;
890 blob_to_array::<N>(&bytes, field)
891}
892
893pub fn peer_record(addr: PeerAddr, last_seen_unix: u64) -> PeerRecord {
894 PeerRecord {
895 addr,
896 last_seen_unix,
897 capabilities: None,
898 capabilities_seen_at: None,
899 reputation_score: 0,
900 }
901}
902
903pub fn encrypt_secret(secret: &[u8], passphrase: &str) -> anyhow::Result<EncryptedSecret> {
904 let mut salt = [0u8; 16];
905 let mut nonce = [0u8; 24];
906 let mut rng = rand::rngs::OsRng;
907 rng.fill_bytes(&mut salt);
908 rng.fill_bytes(&mut nonce);
909
910 let mut key_bytes = [0u8; 32];
911 pbkdf2_hmac::<sha2::Sha256>(
912 passphrase.as_bytes(),
913 &salt,
914 KEY_KDF_ITERATIONS,
915 &mut key_bytes,
916 );
917 let cipher = XChaCha20Poly1305::new(Key::from_slice(&key_bytes));
918 let ciphertext = cipher
919 .encrypt(XNonce::from_slice(&nonce), secret)
920 .map_err(|_| anyhow::anyhow!("failed to encrypt secret"))?;
921
922 Ok(EncryptedSecret {
923 salt,
924 nonce,
925 ciphertext,
926 })
927}
928
929pub fn decrypt_secret(secret: &EncryptedSecret, passphrase: &str) -> anyhow::Result<Vec<u8>> {
930 let mut key_bytes = [0u8; 32];
931 pbkdf2_hmac::<sha2::Sha256>(
932 passphrase.as_bytes(),
933 &secret.salt,
934 KEY_KDF_ITERATIONS,
935 &mut key_bytes,
936 );
937 let cipher = XChaCha20Poly1305::new(Key::from_slice(&key_bytes));
938 let plaintext = cipher
939 .decrypt(
940 XNonce::from_slice(&secret.nonce),
941 secret.ciphertext.as_ref(),
942 )
943 .map_err(|_| anyhow::anyhow!("failed to decrypt secret"))?;
944 Ok(plaintext)
945}
946
947pub fn encrypt_secret_with_key(
954 secret: &[u8],
955 node_key: &[u8; 32],
956) -> anyhow::Result<EncryptedSecret> {
957 let derived = blake3::derive_key("scp2p publisher-identity v1", node_key);
958 let cipher = XChaCha20Poly1305::new(Key::from_slice(&derived));
959 let mut nonce = [0u8; 24];
960 rand::rngs::OsRng.fill_bytes(&mut nonce);
961 let ciphertext = cipher
962 .encrypt(XNonce::from_slice(&nonce), secret)
963 .map_err(|_| anyhow::anyhow!("failed to encrypt secret with key"))?;
964 Ok(EncryptedSecret {
965 salt: [0u8; 16], nonce,
967 ciphertext,
968 })
969}
970
971pub fn decrypt_secret_with_key(
973 secret: &EncryptedSecret,
974 node_key: &[u8; 32],
975) -> anyhow::Result<Vec<u8>> {
976 let derived = blake3::derive_key("scp2p publisher-identity v1", node_key);
977 let cipher = XChaCha20Poly1305::new(Key::from_slice(&derived));
978 cipher
979 .decrypt(
980 XNonce::from_slice(&secret.nonce),
981 secret.ciphertext.as_ref(),
982 )
983 .map_err(|_| anyhow::anyhow!("failed to decrypt secret with key"))
984}
985
986#[cfg(test)]
987mod tests {
988 use super::*;
989 use crate::peer::TransportProtocol;
990
991 #[tokio::test]
992 async fn memory_store_roundtrip() {
993 let store = MemoryStore::new();
994 let mut initial = PersistedState::default();
995 initial.peers.push(peer_record(
996 PeerAddr {
997 ip: "127.0.0.1".parse().expect("valid ip"),
998 port: 7000,
999 transport: TransportProtocol::Tcp,
1000 pubkey_hint: None,
1001 relay_via: None,
1002 },
1003 42,
1004 ));
1005 initial.subscriptions.push(PersistedSubscription {
1006 share_id: [1u8; 32],
1007 share_pubkey: None,
1008 latest_seq: 7,
1009 latest_manifest_id: Some([2u8; 32]),
1010 trust_level: SubscriptionTrustLevel::Normal,
1011 });
1012 initial.share_weights.insert([1u8; 32], 1.5);
1013 initial.enabled_blocklist_shares.push([3u8; 32]);
1014 initial.blocklist_rules_by_share.insert(
1015 [3u8; 32],
1016 BlocklistRules {
1017 blocked_share_ids: vec![[4u8; 32]],
1018 blocked_content_ids: vec![[5u8; 32]],
1019 },
1020 );
1021
1022 store.save_state(&initial).await.expect("save");
1023 let loaded = store.load_state().await.expect("load");
1024
1025 assert_eq!(loaded.peers.len(), 1);
1026 assert_eq!(loaded.subscriptions.len(), 1);
1027 assert_eq!(loaded.share_weights.get(&[1u8; 32]), Some(&1.5));
1028 assert_eq!(loaded.enabled_blocklist_shares, vec![[3u8; 32]]);
1029 assert_eq!(loaded.blocklist_rules_by_share.len(), 1);
1030 }
1031
1032 #[tokio::test]
1033 async fn sqlite_store_roundtrip() {
1034 let mut path = std::env::temp_dir();
1035 path.push(format!(
1036 "scp2p_store_test_{}.db",
1037 std::time::SystemTime::now()
1038 .duration_since(std::time::UNIX_EPOCH)
1039 .expect("now")
1040 .as_nanos()
1041 ));
1042 let store = SqliteStore::open(&path).expect("open sqlite");
1043 let mut initial = PersistedState::default();
1044 initial.subscriptions.push(PersistedSubscription {
1045 share_id: [7u8; 32],
1046 share_pubkey: Some([9u8; 32]),
1047 latest_seq: 3,
1048 latest_manifest_id: Some([5u8; 32]),
1049 trust_level: SubscriptionTrustLevel::Trusted,
1050 });
1051 initial.partial_downloads.insert(
1052 [8u8; 32],
1053 PersistedPartialDownload {
1054 content_id: [8u8; 32],
1055 target_path: "tmp.bin".into(),
1056 total_chunks: 4,
1057 completed_chunks: vec![1, 3],
1058 },
1059 );
1060 initial.encrypted_node_key = Some(encrypt_secret(b"k", "pw").expect("encrypt"));
1061 initial.enabled_blocklist_shares.push([7u8; 32]);
1062 initial.blocklist_rules_by_share.insert(
1063 [7u8; 32],
1064 BlocklistRules {
1065 blocked_share_ids: vec![[1u8; 32]],
1066 blocked_content_ids: vec![[2u8; 32]],
1067 },
1068 );
1069 initial
1073 .content_paths
1074 .insert([0xCCu8; 32], PathBuf::from("/some/file.bin"));
1075
1076 store.save_state(&initial).await.expect("save");
1077 let loaded = store.load_state().await.expect("load");
1078 assert_eq!(loaded.subscriptions.len(), 1);
1079 assert_eq!(loaded.subscriptions[0].latest_seq, 3);
1080 assert_eq!(
1081 loaded.subscriptions[0].trust_level,
1082 SubscriptionTrustLevel::Trusted
1083 );
1084 assert_eq!(loaded.partial_downloads.len(), 1);
1085 assert!(loaded.encrypted_node_key.is_some());
1086 assert_eq!(loaded.enabled_blocklist_shares, vec![[7u8; 32]]);
1087 assert_eq!(loaded.blocklist_rules_by_share.len(), 1);
1088 assert_eq!(
1089 loaded.content_paths.get(&[0xCCu8; 32]),
1090 Some(&PathBuf::from("/some/file.bin")),
1091 "content_paths must be persisted so chunk hashes survive restart"
1092 );
1093
1094 let _ = std::fs::remove_file(path);
1095 }
1096
1097 #[test]
1098 fn encrypted_secret_roundtrip() {
1099 let secret = b"super-secret-material";
1100 let encrypted = encrypt_secret(secret, "passphrase").expect("encrypt");
1101 let decrypted = decrypt_secret(&encrypted, "passphrase").expect("decrypt");
1102 assert_eq!(decrypted, secret);
1103 }
1104
1105 #[tokio::test]
1106 async fn sqlite_fts5_search_index_roundtrip() {
1107 use crate::manifest::{ItemV1, ManifestV1, ShareVisibility};
1108 use crate::search::SearchIndex;
1109
1110 let mut path = std::env::temp_dir();
1111 path.push(format!(
1112 "scp2p_fts5_test_{}.db",
1113 std::time::SystemTime::now()
1114 .duration_since(std::time::UNIX_EPOCH)
1115 .expect("now")
1116 .as_nanos()
1117 ));
1118 let store = SqliteStore::open(&path).expect("open sqlite");
1119
1120 let share_id = [1u8; 32];
1123 let manifest = ManifestV1 {
1124 version: 1,
1125 share_pubkey: [0u8; 32],
1126 share_id,
1127 seq: 1,
1128 created_at: 1,
1129 expires_at: None,
1130 title: Some("Linux Downloads".into()),
1131 description: Some("Open source Linux distributions".into()),
1132 visibility: ShareVisibility::Public,
1133 communities: vec![],
1134 items: vec![
1135 ItemV1 {
1136 content_id: [2u8; 32],
1137 size: 1_000,
1138 name: "Ubuntu ISO".into(),
1139 path: None,
1140 mime: None,
1141 tags: vec!["linux".into(), "ubuntu".into()],
1142 chunk_count: 0,
1143 chunk_list_hash: [0u8; 32],
1144 },
1145 ItemV1 {
1146 content_id: [3u8; 32],
1147 size: 2_000,
1148 name: "Fedora DVD".into(),
1149 path: None,
1150 mime: None,
1151 tags: vec!["linux".into(), "fedora".into()],
1152 chunk_count: 0,
1153 chunk_list_hash: [0u8; 32],
1154 },
1155 ],
1156 recommended_shares: vec![],
1157 signature: None,
1158 };
1159
1160 store
1161 .index_manifest_for_search(&manifest)
1162 .await
1163 .expect("index_manifest_for_search");
1164
1165 let loaded = store.load_state().await.expect("load");
1167 let loaded_snapshot = loaded.search_index.expect("search_index should be Some");
1168 let loaded_items: Vec<_> = loaded_snapshot.items().collect();
1169 assert_eq!(loaded_items.len(), 2, "should load 2 FTS5 items");
1170
1171 let ubuntu = loaded_items
1173 .iter()
1174 .find(|i| i.name == "Ubuntu ISO")
1175 .expect("ubuntu item");
1176 assert_eq!(ubuntu.tags, vec!["linux", "ubuntu"]);
1177 assert_eq!(ubuntu.title.as_deref(), Some("Linux Downloads"));
1178 assert_eq!(
1179 ubuntu.description.as_deref(),
1180 Some("Open source Linux distributions")
1181 );
1182
1183 let fedora = loaded_items
1184 .iter()
1185 .find(|i| i.name == "Fedora DVD")
1186 .expect("fedora item");
1187 assert_eq!(fedora.tags, vec!["linux", "fedora"]);
1188 assert!(fedora.description.is_some());
1190
1191 let reloaded_index = SearchIndex::from_snapshot(loaded_snapshot);
1193 let mut subs = std::collections::HashSet::new();
1194 subs.insert(share_id);
1195 let hits = reloaded_index.search("ubuntu", &subs, &std::collections::HashMap::new());
1196 assert_eq!(hits.len(), 1);
1197 assert_eq!(hits[0].0.name, "Ubuntu ISO");
1198
1199 store
1201 .remove_share_from_search(share_id)
1202 .await
1203 .expect("remove_share_from_search");
1204 let after_remove = store.load_state().await.expect("load after remove");
1205 assert!(
1206 after_remove.search_index.is_none(),
1207 "search_index should be None after removing the only share"
1208 );
1209
1210 let _ = std::fs::remove_file(path);
1211 }
1212}