Skip to main content

scp2p_core/
store.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7use std::{
8    collections::{HashMap, HashSet},
9    path::{Path, PathBuf},
10    sync::Arc,
11};
12
13use async_trait::async_trait;
14use chacha20poly1305::{
15    Key, XChaCha20Poly1305, XNonce,
16    aead::{Aead, KeyInit},
17};
18use pbkdf2::pbkdf2_hmac;
19use rand::RngCore;
20use rusqlite::{Connection, OptionalExtension, params};
21use serde::{Deserialize, Serialize};
22use tokio::sync::RwLock;
23
24use crate::{
25    api::{BlocklistRules, SubscriptionTrustLevel},
26    manifest::{ManifestV1, ShareHead},
27    peer::PeerAddr,
28    peer_db::PeerRecord,
29    search::SearchIndexSnapshot,
30};
31
32const KEY_KDF_ITERATIONS: u32 = 600_000;
33
34/// Bump when making schema changes; migrations are applied in order.
35const CURRENT_SCHEMA_VERSION: u32 = 2;
36
37#[derive(Debug, Clone, Default, Serialize, Deserialize)]
38pub struct PersistedState {
39    pub peers: Vec<PeerRecord>,
40    pub subscriptions: Vec<PersistedSubscription>,
41    #[serde(default)]
42    pub communities: Vec<PersistedCommunity>,
43    #[serde(default)]
44    pub publisher_identities: Vec<PersistedPublisherIdentity>,
45    pub manifests: HashMap<[u8; 32], ManifestV1>,
46    pub share_heads: HashMap<[u8; 32], ShareHead>,
47    pub share_weights: HashMap<[u8; 32], f32>,
48    pub search_index: Option<SearchIndexSnapshot>,
49    pub partial_downloads: HashMap<[u8; 32], PersistedPartialDownload>,
50    /// Plaintext Ed25519 node identity key.  When no passphrase
51    /// encryption is active, this is the primary storage.  When
52    /// encrypted, `encrypted_node_key` holds the ciphertext and this
53    /// field is `None`.
54    #[serde(default)]
55    pub node_key: Option<[u8; 32]>,
56    pub encrypted_node_key: Option<EncryptedSecret>,
57    #[serde(default)]
58    pub enabled_blocklist_shares: Vec<[u8; 32]>,
59    #[serde(default)]
60    pub blocklist_rules_by_share: HashMap<[u8; 32], BlocklistRules>,
61    /// Maps content_id → file path for path-based seeding.
62    /// Instead of keeping blob copies, chunks are served from the original
63    /// file (publisher) or the downloaded file (subscriber).
64    #[serde(default)]
65    pub content_paths: HashMap<[u8; 32], PathBuf>,
66    /// TOFU-pinned public keys for bootstrap peers.
67    ///
68    /// Key is the peer address string (e.g. "tcp://1.2.3.4:7001"),
69    /// value is the first-observed Ed25519 public key.  Once pinned,
70    /// subsequent connections to the same address that present a
71    /// different key are rejected.
72    #[serde(default)]
73    pub pinned_bootstrap_keys: HashMap<String, [u8; 32]>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PersistedSubscription {
78    pub share_id: [u8; 32],
79    pub share_pubkey: Option<[u8; 32]>,
80    pub latest_seq: u64,
81    pub latest_manifest_id: Option<[u8; 32]>,
82    #[serde(default)]
83    pub trust_level: SubscriptionTrustLevel,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
87pub struct PersistedCommunity {
88    pub share_id: [u8; 32],
89    pub share_pubkey: [u8; 32],
90    /// Optional cryptographic membership token signed by the community
91    /// publisher key (§4.2).  When present, community membership is
92    /// verifiable by any peer; when absent, membership is self-asserted.
93    #[serde(default)]
94    pub membership_token: Option<Vec<u8>>,
95    /// Local human-readable label set at creation or join time.
96    #[serde(default)]
97    pub name: Option<String>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
101pub struct PersistedPublisherIdentity {
102    pub label: String,
103    /// Plaintext secret — present when no passphrase-based encryption is active.
104    /// When encryption is enabled this field is zeroed out in persisted form.
105    #[serde(default)]
106    pub share_secret: Option<[u8; 32]>,
107    /// Encrypted secret — present when the publisher identity has been
108    /// locked with a passphrase via [`encrypt_publisher_identities`].
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub encrypted_share_secret: Option<EncryptedSecret>,
111    /// Encrypted secret protected with the node's stable identity key.
112    ///
113    /// Used by the auto-protect-at-rest feature (AV-07): when
114    /// `NodeConfig::auto_protect_publisher_keys` is true and the node key
115    /// is available, publisher secrets are encrypted with a fast blake3-derived
116    /// key instead of storing them in plaintext.  At startup, if the node key
117    /// is present in persisted state, these are automatically decrypted.
118    #[serde(default, skip_serializing_if = "Option::is_none")]
119    pub node_key_encrypted_share_secret: Option<EncryptedSecret>,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
123pub struct PersistedPartialDownload {
124    pub content_id: [u8; 32],
125    pub target_path: String,
126    pub total_chunks: u32,
127    pub completed_chunks: Vec<u32>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
131pub struct EncryptedSecret {
132    pub salt: [u8; 16],
133    pub nonce: [u8; 24],
134    pub ciphertext: Vec<u8>,
135}
136
137/// Tracks which sections of persisted state have been mutated since the
138/// last successful save.  When all flags are `false`, `persist_state`
139/// short-circuits without cloning or writing anything.
140#[derive(Debug, Clone, Copy, Default)]
141pub struct DirtyFlags {
142    pub peers: bool,
143    pub subscriptions: bool,
144    pub communities: bool,
145    pub publisher_identities: bool,
146    pub manifests: bool,
147    pub share_heads: bool,
148    pub share_weights: bool,
149    pub partial_downloads: bool,
150    pub node_key: bool,
151    pub blocklist: bool,
152    pub content_paths: bool,
153}
154
155impl DirtyFlags {
156    /// Return a flags set with every section marked dirty.
157    pub fn all() -> Self {
158        Self {
159            peers: true,
160            subscriptions: true,
161            communities: true,
162            publisher_identities: true,
163            manifests: true,
164            share_heads: true,
165            share_weights: true,
166            partial_downloads: true,
167            node_key: true,
168            blocklist: true,
169            content_paths: true,
170        }
171    }
172
173    /// `true` when at least one section is dirty.
174    pub fn any(&self) -> bool {
175        self.peers
176            || self.subscriptions
177            || self.communities
178            || self.publisher_identities
179            || self.manifests
180            || self.share_heads
181            || self.share_weights
182            || self.partial_downloads
183            || self.node_key
184            || self.blocklist
185            || self.content_paths
186    }
187}
188
189#[async_trait]
190pub trait Store: Send + Sync {
191    async fn load_state(&self) -> anyhow::Result<PersistedState>;
192    async fn save_state(&self, state: &PersistedState) -> anyhow::Result<()>;
193
194    /// Save only the sections indicated by `dirty`.
195    ///
196    /// Default implementation falls back to `save_state`.  `SqliteStore`
197    /// overrides this to skip unchanged tables.
198    async fn save_incremental(
199        &self,
200        state: &PersistedState,
201        dirty: &DirtyFlags,
202    ) -> anyhow::Result<()> {
203        if dirty.any() {
204            self.save_state(state).await
205        } else {
206            Ok(())
207        }
208    }
209
210    /// Write-through: UPSERT all items from `manifest` into the persistent
211    /// full-text search index immediately, outside any state lock.  This
212    /// eliminates the need for the O(N) snapshot+clear+repopulate on every
213    /// `persist_state` call.
214    ///
215    /// The default implementation is a no-op so `MemoryStore` (used in tests)
216    /// does not need an FTS5 implementation; the in-process `SearchIndex`
217    /// handles in-memory queries for tests.
218    async fn index_manifest_for_search(&self, _manifest: &ManifestV1) -> anyhow::Result<()> {
219        Ok(())
220    }
221
222    /// Remove all search-index entries for `share_id` from the persistent
223    /// store.  Called on unsubscribe and share deletion.
224    ///
225    /// Default is a no-op.
226    async fn remove_share_from_search(&self, _share_id: [u8; 32]) -> anyhow::Result<()> {
227        Ok(())
228    }
229}
230
231#[derive(Default)]
232pub struct MemoryStore {
233    state: RwLock<PersistedState>,
234}
235
236impl MemoryStore {
237    pub fn new() -> Arc<Self> {
238        Arc::new(Self::default())
239    }
240}
241
242pub struct SqliteStore {
243    path: PathBuf,
244}
245
246impl SqliteStore {
247    pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Arc<Self>> {
248        let store = Arc::new(Self {
249            path: path.as_ref().to_path_buf(),
250        });
251        store.ensure_schema()?;
252        Ok(store)
253    }
254
255    fn open_connection(&self) -> anyhow::Result<Connection> {
256        Ok(Connection::open(&self.path)?)
257    }
258
259    fn ensure_schema(&self) -> anyhow::Result<()> {
260        let conn = self.open_connection()?;
261        // Always create the baseline tables (idempotent).
262        conn.execute_batch(
263            "CREATE TABLE IF NOT EXISTS peers (
264                addr_key TEXT PRIMARY KEY,
265                payload BLOB NOT NULL
266            );
267            CREATE TABLE IF NOT EXISTS subscriptions (
268                share_id BLOB PRIMARY KEY,
269                share_pubkey BLOB,
270                latest_seq INTEGER NOT NULL,
271                latest_manifest_id BLOB,
272                trust_level TEXT NOT NULL DEFAULT 'normal'
273            );
274            CREATE TABLE IF NOT EXISTS manifests (
275                manifest_id BLOB PRIMARY KEY,
276                payload BLOB NOT NULL
277            );
278            CREATE TABLE IF NOT EXISTS share_weights (
279                share_id BLOB PRIMARY KEY,
280                weight REAL NOT NULL
281            );
282            CREATE TABLE IF NOT EXISTS partial_downloads (
283                content_id BLOB PRIMARY KEY,
284                payload BLOB NOT NULL
285            );
286            CREATE TABLE IF NOT EXISTS metadata (
287                key TEXT PRIMARY KEY,
288                payload BLOB NOT NULL
289            );",
290        )?;
291
292        // Read (or initialise) schema version.
293        let current_version: u32 = conn
294            .query_row(
295                "SELECT payload FROM metadata WHERE key = 'schema_version'",
296                [],
297                |row| {
298                    let blob: Vec<u8> = row.get(0)?;
299                    Ok(u32::from_le_bytes(blob.try_into().unwrap_or([0, 0, 0, 0])))
300                },
301            )
302            .unwrap_or(0);
303
304        // --- Schema v2: FTS5 search index ---
305        if current_version < 2 {
306            conn.execute_batch(
307                "CREATE VIRTUAL TABLE IF NOT EXISTS search_fts USING fts5(
308                    share_id UNINDEXED,
309                    content_id UNINDEXED,
310                    name,
311                    tags,
312                    title,
313                    description,
314                    tokenize = 'unicode61'
315                );",
316            )?;
317            // Drop the legacy CBOR search_index blob if present.
318            conn.execute("DELETE FROM metadata WHERE key = 'search_index'", [])?;
319        }
320
321        // Persist the schema version.
322        if current_version != CURRENT_SCHEMA_VERSION {
323            conn.execute(
324                "INSERT INTO metadata(key, payload) VALUES('schema_version', ?1)
325                 ON CONFLICT(key) DO UPDATE SET payload = excluded.payload",
326                params![CURRENT_SCHEMA_VERSION.to_le_bytes().to_vec()],
327            )?;
328        }
329        Ok(())
330    }
331}
332
333#[async_trait]
334impl Store for MemoryStore {
335    async fn load_state(&self) -> anyhow::Result<PersistedState> {
336        Ok(self.state.read().await.clone())
337    }
338
339    async fn save_state(&self, state: &PersistedState) -> anyhow::Result<()> {
340        *self.state.write().await = state.clone();
341        Ok(())
342    }
343}
344
345#[async_trait]
346impl Store for SqliteStore {
347    async fn load_state(&self) -> anyhow::Result<PersistedState> {
348        let path = self.path.clone();
349        tokio::task::spawn_blocking(move || {
350            let store = SqliteStore { path };
351            store.ensure_schema()?;
352            let conn = store.open_connection()?;
353            load_state_sync(&conn)
354        })
355        .await?
356    }
357
358    async fn save_state(&self, state: &PersistedState) -> anyhow::Result<()> {
359        let path = self.path.clone();
360        let state = state.clone();
361        tokio::task::spawn_blocking(move || {
362            let store = SqliteStore { path };
363            store.ensure_schema()?;
364            let mut conn = store.open_connection()?;
365            save_state_sync(&mut conn, &state, &DirtyFlags::all())
366        })
367        .await?
368    }
369
370    async fn save_incremental(
371        &self,
372        state: &PersistedState,
373        dirty: &DirtyFlags,
374    ) -> anyhow::Result<()> {
375        if !dirty.any() {
376            return Ok(());
377        }
378        let path = self.path.clone();
379        let state = state.clone();
380        let dirty = *dirty;
381        tokio::task::spawn_blocking(move || {
382            let store = SqliteStore { path };
383            store.ensure_schema()?;
384            let mut conn = store.open_connection()?;
385            save_state_sync(&mut conn, &state, &dirty)
386        })
387        .await?
388    }
389
390    async fn index_manifest_for_search(&self, manifest: &ManifestV1) -> anyhow::Result<()> {
391        let path = self.path.clone();
392        let share_id_hex = hex::encode(manifest.share_id);
393        let items: Vec<(String, String, String, String, String)> = manifest
394            .items
395            .iter()
396            .map(|item| {
397                (
398                    hex::encode(item.content_id),
399                    item.name.clone(),
400                    item.tags.join("\t"),
401                    manifest.title.as_deref().unwrap_or("").to_owned(),
402                    manifest.description.as_deref().unwrap_or("").to_owned(),
403                )
404            })
405            .collect();
406        tokio::task::spawn_blocking(move || {
407            let mut conn = Connection::open(&path)?;
408            let tx = conn.transaction()?;
409            // Delete stale entries for this share, then insert current items.
410            tx.execute(
411                "DELETE FROM search_fts WHERE share_id = ?1",
412                params![share_id_hex],
413            )?;
414            let mut stmt = tx.prepare(
415                "INSERT INTO search_fts(share_id, content_id, name, tags, title, description) \
416                 VALUES(?1, ?2, ?3, ?4, ?5, ?6)",
417            )?;
418            for (content_id_hex, name, tags, title, description) in &items {
419                stmt.execute(params![
420                    share_id_hex,
421                    content_id_hex,
422                    name,
423                    tags,
424                    title,
425                    description
426                ])?;
427            }
428            drop(stmt);
429            tx.commit()?;
430            Ok::<_, anyhow::Error>(())
431        })
432        .await?
433    }
434
435    async fn remove_share_from_search(&self, share_id: [u8; 32]) -> anyhow::Result<()> {
436        let path = self.path.clone();
437        let share_id_hex = hex::encode(share_id);
438        tokio::task::spawn_blocking(move || {
439            let conn = Connection::open(&path)?;
440            conn.execute(
441                "DELETE FROM search_fts WHERE share_id = ?1",
442                params![share_id_hex],
443            )?;
444            Ok::<_, anyhow::Error>(())
445        })
446        .await?
447    }
448}
449
450/// All SQLite reads happen here, on a blocking thread.
451fn load_state_sync(conn: &Connection) -> anyhow::Result<PersistedState> {
452    let mut state = PersistedState::default();
453
454    {
455        let mut stmt = conn.prepare("SELECT payload FROM peers")?;
456        let rows = stmt.query_map([], |row| row.get::<_, Vec<u8>>(0))?;
457        for row in rows {
458            let record: PeerRecord = crate::cbor::from_slice(&row?)?;
459            state.peers.push(record);
460        }
461    }
462
463    {
464        let mut stmt = conn.prepare(
465            "SELECT share_id, share_pubkey, latest_seq, latest_manifest_id, trust_level FROM subscriptions",
466        )?;
467        let rows = stmt.query_map([], |row| {
468            Ok((
469                row.get::<_, Vec<u8>>(0)?,
470                row.get::<_, Option<Vec<u8>>>(1)?,
471                row.get::<_, u64>(2)?,
472                row.get::<_, Option<Vec<u8>>>(3)?,
473                row.get::<_, String>(4)?,
474            ))
475        })?;
476        for row in rows {
477            let (share_id, share_pubkey, latest_seq, latest_manifest_id, trust_level) = row?;
478            state.subscriptions.push(PersistedSubscription {
479                share_id: blob_to_array::<32>(&share_id, "subscriptions.share_id")?,
480                share_pubkey: share_pubkey
481                    .map(|v| blob_to_array::<32>(&v, "subscriptions.share_pubkey"))
482                    .transpose()?,
483                latest_seq,
484                latest_manifest_id: latest_manifest_id
485                    .map(|v| blob_to_array::<32>(&v, "subscriptions.latest_manifest_id"))
486                    .transpose()?,
487                trust_level: parse_trust_level(&trust_level),
488            });
489        }
490    }
491
492    state.communities = load_metadata_cbor(conn, "communities")?.unwrap_or_default();
493    state.publisher_identities =
494        load_metadata_cbor(conn, "publisher_identities")?.unwrap_or_default();
495
496    {
497        let mut stmt = conn.prepare("SELECT manifest_id, payload FROM manifests")?;
498        let rows = stmt.query_map([], |row| {
499            Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
500        })?;
501        for row in rows {
502            let (manifest_id, payload) = row?;
503            let manifest: ManifestV1 = crate::cbor::from_slice(&payload)?;
504            state.manifests.insert(
505                blob_to_array::<32>(&manifest_id, "manifests.manifest_id")?,
506                manifest,
507            );
508        }
509    }
510
511    {
512        let mut stmt = conn.prepare("SELECT share_id, weight FROM share_weights")?;
513        let rows = stmt.query_map([], |row| {
514            Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, f32>(1)?))
515        })?;
516        for row in rows {
517            let (share_id, weight) = row?;
518            state.share_weights.insert(
519                blob_to_array::<32>(&share_id, "share_weights.share_id")?,
520                weight,
521            );
522        }
523    }
524
525    {
526        let mut stmt = conn.prepare("SELECT content_id, payload FROM partial_downloads")?;
527        let rows = stmt.query_map([], |row| {
528            Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, Vec<u8>>(1)?))
529        })?;
530        for row in rows {
531            let (content_id, payload) = row?;
532            let partial: PersistedPartialDownload = crate::cbor::from_slice(&payload)?;
533            state.partial_downloads.insert(
534                blob_to_array::<32>(&content_id, "partial_downloads.content_id")?,
535                partial,
536            );
537        }
538    }
539
540    // --- FTS5 search index ---
541    {
542        // Try loading from the search_fts table (schema v2+).
543        // Falls back to legacy CBOR blob for databases not yet migrated.
544        let has_fts = conn
545            .prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='search_fts'")
546            .and_then(|mut s| s.query_row([], |_| Ok(())))
547            .is_ok();
548
549        if has_fts {
550            let mut stmt = conn.prepare(
551                "SELECT share_id, content_id, name, tags, title, description FROM search_fts",
552            )?;
553            let rows = stmt.query_map([], |row| {
554                Ok((
555                    row.get::<_, String>(0)?,
556                    row.get::<_, String>(1)?,
557                    row.get::<_, String>(2)?,
558                    row.get::<_, String>(3)?,
559                    row.get::<_, String>(4)?,
560                    row.get::<_, String>(5)?,
561                ))
562            })?;
563            let mut items = Vec::new();
564            for row in rows {
565                let (share_id_hex, content_id_hex, name, tags_str, title, description) = row?;
566                let share_id = hex_to_array::<32>(&share_id_hex, "search_fts.share_id")?;
567                let content_id = hex_to_array::<32>(&content_id_hex, "search_fts.content_id")?;
568                let tags: Vec<String> = tags_str
569                    .split('\t')
570                    .filter(|t| !t.is_empty())
571                    .map(ToOwned::to_owned)
572                    .collect();
573                let title = if title.is_empty() { None } else { Some(title) };
574                let description = if description.is_empty() {
575                    None
576                } else {
577                    Some(description)
578                };
579                items.push(crate::search::IndexedItem {
580                    share_id,
581                    content_id,
582                    name,
583                    tags,
584                    title,
585                    description,
586                });
587            }
588            if !items.is_empty() {
589                state.search_index = Some(crate::search::SearchIndex::from_items(items).snapshot());
590            }
591        } else {
592            state.search_index = load_metadata_cbor(conn, "search_index")?;
593        }
594    }
595
596    state.share_heads = load_metadata_cbor(conn, "share_heads")?.unwrap_or_default();
597    state.node_key = load_metadata_cbor(conn, "node_key")?;
598    state.encrypted_node_key = load_metadata_cbor(conn, "encrypted_node_key")?;
599    state.enabled_blocklist_shares =
600        load_metadata_cbor(conn, "enabled_blocklist_shares")?.unwrap_or_default();
601    state.blocklist_rules_by_share =
602        load_metadata_cbor(conn, "blocklist_rules_by_share")?.unwrap_or_default();
603    state.content_paths = load_metadata_cbor(conn, "content_paths")?.unwrap_or_default();
604    state.pinned_bootstrap_keys =
605        load_metadata_cbor(conn, "pinned_bootstrap_keys")?.unwrap_or_default();
606
607    Ok(state)
608}
609
610/// Helper to load a single CBOR-encoded metadata value.
611fn load_metadata_cbor<T: serde::de::DeserializeOwned>(
612    conn: &Connection,
613    key: &str,
614) -> anyhow::Result<Option<T>> {
615    conn.query_row(
616        "SELECT payload FROM metadata WHERE key = ?1",
617        params![key],
618        |row| row.get::<_, Vec<u8>>(0),
619    )
620    .optional()?
621    .map(|payload| crate::cbor::from_slice(&payload).map_err(Into::into))
622    .transpose()
623}
624
625/// All SQLite writes happen here, on a blocking thread.
626/// Uses UPSERT (INSERT … ON CONFLICT DO UPDATE) so only changed rows are
627/// written.  Stale rows that no longer exist in `state` are deleted by
628/// comparing keys.  When `dirty` indicates a section is unchanged, the
629/// corresponding table writes are skipped entirely.
630fn save_state_sync(
631    conn: &mut Connection,
632    state: &PersistedState,
633    dirty: &DirtyFlags,
634) -> anyhow::Result<()> {
635    let tx = conn.transaction()?;
636
637    // --- peers: UPSERT + prune stale ---
638    if dirty.peers {
639        let mut live_peer_keys: HashSet<String> = HashSet::new();
640        for peer in &state.peers {
641            let addr_key = format!(
642                "{}:{}:{:?}",
643                peer.addr.ip, peer.addr.port, peer.addr.transport
644            );
645            tx.execute(
646                "INSERT INTO peers(addr_key, payload) VALUES(?1, ?2)
647                 ON CONFLICT(addr_key) DO UPDATE SET payload = excluded.payload",
648                params![addr_key, crate::cbor::to_vec(peer)?],
649            )?;
650            live_peer_keys.insert(addr_key);
651        }
652        delete_stale_text_keys(&tx, "peers", "addr_key", &live_peer_keys)?;
653        upsert_metadata_cbor(&tx, "pinned_bootstrap_keys", &state.pinned_bootstrap_keys)?;
654    }
655
656    // --- subscriptions: UPSERT + prune stale ---
657    if dirty.subscriptions {
658        let mut live_sub_keys: HashSet<Vec<u8>> = HashSet::new();
659        for sub in &state.subscriptions {
660            tx.execute(
661                "INSERT INTO subscriptions(share_id, share_pubkey, latest_seq, latest_manifest_id, trust_level)
662                 VALUES(?1, ?2, ?3, ?4, ?5)
663                 ON CONFLICT(share_id) DO UPDATE SET
664                   share_pubkey = excluded.share_pubkey,
665                   latest_seq = excluded.latest_seq,
666                   latest_manifest_id = excluded.latest_manifest_id,
667                   trust_level = excluded.trust_level",
668                params![
669                    sub.share_id.to_vec(),
670                    sub.share_pubkey.map(|v| v.to_vec()),
671                    sub.latest_seq,
672                    sub.latest_manifest_id.map(|v| v.to_vec()),
673                    trust_level_str(sub.trust_level),
674                ],
675            )?;
676            live_sub_keys.insert(sub.share_id.to_vec());
677        }
678        delete_stale_blob_keys(&tx, "subscriptions", "share_id", &live_sub_keys)?;
679    }
680
681    // --- metadata blobs (communities, publisher_identities, etc.) ---
682    if dirty.communities {
683        upsert_metadata_cbor(&tx, "communities", &state.communities)?;
684    }
685    if dirty.publisher_identities {
686        upsert_metadata_cbor(&tx, "publisher_identities", &state.publisher_identities)?;
687    }
688
689    // --- manifests: UPSERT + prune stale ---
690    if dirty.manifests {
691        let mut live_manifest_keys: HashSet<Vec<u8>> = HashSet::new();
692        for (manifest_id, manifest) in &state.manifests {
693            tx.execute(
694                "INSERT INTO manifests(manifest_id, payload) VALUES(?1, ?2)
695                 ON CONFLICT(manifest_id) DO UPDATE SET payload = excluded.payload",
696                params![manifest_id.to_vec(), crate::cbor::to_vec(manifest)?],
697            )?;
698            live_manifest_keys.insert(manifest_id.to_vec());
699        }
700        delete_stale_blob_keys(&tx, "manifests", "manifest_id", &live_manifest_keys)?;
701    }
702
703    // --- share_weights: UPSERT + prune stale ---
704    if dirty.share_weights {
705        let mut live_weight_keys: HashSet<Vec<u8>> = HashSet::new();
706        for (share_id, weight) in &state.share_weights {
707            tx.execute(
708                "INSERT INTO share_weights(share_id, weight) VALUES(?1, ?2)
709                 ON CONFLICT(share_id) DO UPDATE SET weight = excluded.weight",
710                params![share_id.to_vec(), weight],
711            )?;
712            live_weight_keys.insert(share_id.to_vec());
713        }
714        delete_stale_blob_keys(&tx, "share_weights", "share_id", &live_weight_keys)?;
715    }
716
717    // --- partial_downloads: UPSERT + prune stale ---
718    if dirty.partial_downloads {
719        let mut live_partial_keys: HashSet<Vec<u8>> = HashSet::new();
720        for (content_id, partial) in &state.partial_downloads {
721            tx.execute(
722                "INSERT INTO partial_downloads(content_id, payload) VALUES(?1, ?2)
723                 ON CONFLICT(content_id) DO UPDATE SET payload = excluded.payload",
724                params![content_id.to_vec(), crate::cbor::to_vec(partial)?],
725            )?;
726            live_partial_keys.insert(content_id.to_vec());
727        }
728        delete_stale_blob_keys(&tx, "partial_downloads", "content_id", &live_partial_keys)?;
729    }
730
731    // --- remaining metadata ---
732    // Note: search_index is maintained write-through via Store::index_manifest_for_search /
733    // remove_share_from_search; it no longer needs a bulk clear-and-repopulate here.
734    if dirty.share_heads {
735        upsert_metadata_cbor(&tx, "share_heads", &state.share_heads)?;
736    }
737    if dirty.node_key {
738        upsert_metadata_cbor_opt(&tx, "node_key", &state.node_key)?;
739        upsert_metadata_cbor_opt(&tx, "encrypted_node_key", &state.encrypted_node_key)?;
740    }
741    if dirty.blocklist {
742        upsert_metadata_cbor(
743            &tx,
744            "enabled_blocklist_shares",
745            &state.enabled_blocklist_shares,
746        )?;
747        upsert_metadata_cbor(
748            &tx,
749            "blocklist_rules_by_share",
750            &state.blocklist_rules_by_share,
751        )?;
752    }
753    if dirty.content_paths {
754        upsert_metadata_cbor(&tx, "content_paths", &state.content_paths)?;
755    }
756
757    tx.commit()?;
758    Ok(())
759}
760
761/// UPSERT a CBOR value into the metadata table, or delete the row if the value
762/// is empty/default.
763fn upsert_metadata_cbor<T: Serialize>(
764    tx: &rusqlite::Transaction<'_>,
765    key: &str,
766    value: &T,
767) -> anyhow::Result<()> {
768    let bytes = crate::cbor::to_vec(value)?;
769    tx.execute(
770        "INSERT INTO metadata(key, payload) VALUES(?1, ?2)
771         ON CONFLICT(key) DO UPDATE SET payload = excluded.payload",
772        params![key, bytes],
773    )?;
774    Ok(())
775}
776
777/// Like `upsert_metadata_cbor` but for `Option<T>` — deletes the row when `None`.
778fn upsert_metadata_cbor_opt<T: Serialize>(
779    tx: &rusqlite::Transaction<'_>,
780    key: &str,
781    value: &Option<T>,
782) -> anyhow::Result<()> {
783    match value {
784        Some(v) => {
785            let bytes = crate::cbor::to_vec(v)?;
786            tx.execute(
787                "INSERT INTO metadata(key, payload) VALUES(?1, ?2)
788                 ON CONFLICT(key) DO UPDATE SET payload = excluded.payload",
789                params![key, bytes],
790            )?;
791        }
792        None => {
793            tx.execute("DELETE FROM metadata WHERE key = ?1", params![key])?;
794        }
795    }
796    Ok(())
797}
798
799/// Delete rows whose TEXT primary-key is not in `live_keys`.
800/// Validate that a SQL identifier contains only safe characters (alphanumeric + underscore).
801fn validate_sql_identifier(ident: &str) -> anyhow::Result<()> {
802    if ident.is_empty() || !ident.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
803        anyhow::bail!("invalid SQL identifier: {ident:?}");
804    }
805    Ok(())
806}
807
808fn delete_stale_text_keys(
809    tx: &rusqlite::Transaction<'_>,
810    table: &str,
811    pk_col: &str,
812    live_keys: &HashSet<String>,
813) -> anyhow::Result<()> {
814    validate_sql_identifier(table)?;
815    validate_sql_identifier(pk_col)?;
816    let mut stmt = tx.prepare(&format!("SELECT {pk_col} FROM {table}"))?;
817    let existing: Vec<String> = stmt
818        .query_map([], |row| row.get::<_, String>(0))?
819        .filter_map(|r| r.ok())
820        .collect();
821    for key in existing {
822        if !live_keys.contains(&key) {
823            tx.execute(
824                &format!("DELETE FROM {table} WHERE {pk_col} = ?1"),
825                params![key],
826            )?;
827        }
828    }
829    Ok(())
830}
831
832/// Delete rows whose BLOB primary-key is not in `live_keys`.
833fn delete_stale_blob_keys(
834    tx: &rusqlite::Transaction<'_>,
835    table: &str,
836    pk_col: &str,
837    live_keys: &HashSet<Vec<u8>>,
838) -> anyhow::Result<()> {
839    validate_sql_identifier(table)?;
840    validate_sql_identifier(pk_col)?;
841    let mut stmt = tx.prepare(&format!("SELECT {pk_col} FROM {table}"))?;
842    let existing: Vec<Vec<u8>> = stmt
843        .query_map([], |row| row.get::<_, Vec<u8>>(0))?
844        .filter_map(|r| r.ok())
845        .collect();
846    for key in existing {
847        if !live_keys.contains(&key) {
848            tx.execute(
849                &format!("DELETE FROM {table} WHERE {pk_col} = ?1"),
850                params![key],
851            )?;
852        }
853    }
854    Ok(())
855}
856
857fn parse_trust_level(value: &str) -> SubscriptionTrustLevel {
858    match value {
859        "trusted" => SubscriptionTrustLevel::Trusted,
860        "untrusted" => SubscriptionTrustLevel::Untrusted,
861        _ => SubscriptionTrustLevel::Normal,
862    }
863}
864
865fn trust_level_str(level: SubscriptionTrustLevel) -> &'static str {
866    match level {
867        SubscriptionTrustLevel::Trusted => "trusted",
868        SubscriptionTrustLevel::Normal => "normal",
869        SubscriptionTrustLevel::Untrusted => "untrusted",
870    }
871}
872
873fn blob_to_array<const N: usize>(blob: &[u8], field: &str) -> anyhow::Result<[u8; N]> {
874    if blob.len() != N {
875        anyhow::bail!(
876            "invalid {} length: expected {}, got {}",
877            field,
878            N,
879            blob.len()
880        );
881    }
882    let mut out = [0u8; N];
883    out.copy_from_slice(blob);
884    Ok(out)
885}
886
887fn hex_to_array<const N: usize>(hex_str: &str, field: &str) -> anyhow::Result<[u8; N]> {
888    let bytes =
889        hex::decode(hex_str).map_err(|e| anyhow::anyhow!("invalid hex in {}: {}", field, e))?;
890    blob_to_array::<N>(&bytes, field)
891}
892
893pub fn peer_record(addr: PeerAddr, last_seen_unix: u64) -> PeerRecord {
894    PeerRecord {
895        addr,
896        last_seen_unix,
897        capabilities: None,
898        capabilities_seen_at: None,
899        reputation_score: 0,
900    }
901}
902
903pub fn encrypt_secret(secret: &[u8], passphrase: &str) -> anyhow::Result<EncryptedSecret> {
904    let mut salt = [0u8; 16];
905    let mut nonce = [0u8; 24];
906    let mut rng = rand::rngs::OsRng;
907    rng.fill_bytes(&mut salt);
908    rng.fill_bytes(&mut nonce);
909
910    let mut key_bytes = [0u8; 32];
911    pbkdf2_hmac::<sha2::Sha256>(
912        passphrase.as_bytes(),
913        &salt,
914        KEY_KDF_ITERATIONS,
915        &mut key_bytes,
916    );
917    let cipher = XChaCha20Poly1305::new(Key::from_slice(&key_bytes));
918    let ciphertext = cipher
919        .encrypt(XNonce::from_slice(&nonce), secret)
920        .map_err(|_| anyhow::anyhow!("failed to encrypt secret"))?;
921
922    Ok(EncryptedSecret {
923        salt,
924        nonce,
925        ciphertext,
926    })
927}
928
929pub fn decrypt_secret(secret: &EncryptedSecret, passphrase: &str) -> anyhow::Result<Vec<u8>> {
930    let mut key_bytes = [0u8; 32];
931    pbkdf2_hmac::<sha2::Sha256>(
932        passphrase.as_bytes(),
933        &secret.salt,
934        KEY_KDF_ITERATIONS,
935        &mut key_bytes,
936    );
937    let cipher = XChaCha20Poly1305::new(Key::from_slice(&key_bytes));
938    let plaintext = cipher
939        .decrypt(
940            XNonce::from_slice(&secret.nonce),
941            secret.ciphertext.as_ref(),
942        )
943        .map_err(|_| anyhow::anyhow!("failed to decrypt secret"))?;
944    Ok(plaintext)
945}
946
947/// Encrypt `secret` using a key derived from `node_key` via blake3.
948///
949/// Unlike [`encrypt_secret`] this does **not** run PBKDF2 iterations
950/// because `node_key` is already a 32-byte random value.  Domain
951/// separation is provided by blake3's `derive_key` with a fixed context.
952/// Used by the publisher-key auto-protect-at-rest feature.
953pub fn encrypt_secret_with_key(
954    secret: &[u8],
955    node_key: &[u8; 32],
956) -> anyhow::Result<EncryptedSecret> {
957    let derived = blake3::derive_key("scp2p publisher-identity v1", node_key);
958    let cipher = XChaCha20Poly1305::new(Key::from_slice(&derived));
959    let mut nonce = [0u8; 24];
960    rand::rngs::OsRng.fill_bytes(&mut nonce);
961    let ciphertext = cipher
962        .encrypt(XNonce::from_slice(&nonce), secret)
963        .map_err(|_| anyhow::anyhow!("failed to encrypt secret with key"))?;
964    Ok(EncryptedSecret {
965        salt: [0u8; 16], // unused for blake3 derivation; kept for struct compat
966        nonce,
967        ciphertext,
968    })
969}
970
971/// Decrypt a secret that was encrypted by [`encrypt_secret_with_key`].
972pub fn decrypt_secret_with_key(
973    secret: &EncryptedSecret,
974    node_key: &[u8; 32],
975) -> anyhow::Result<Vec<u8>> {
976    let derived = blake3::derive_key("scp2p publisher-identity v1", node_key);
977    let cipher = XChaCha20Poly1305::new(Key::from_slice(&derived));
978    cipher
979        .decrypt(
980            XNonce::from_slice(&secret.nonce),
981            secret.ciphertext.as_ref(),
982        )
983        .map_err(|_| anyhow::anyhow!("failed to decrypt secret with key"))
984}
985
986#[cfg(test)]
987mod tests {
988    use super::*;
989    use crate::peer::TransportProtocol;
990
991    #[tokio::test]
992    async fn memory_store_roundtrip() {
993        let store = MemoryStore::new();
994        let mut initial = PersistedState::default();
995        initial.peers.push(peer_record(
996            PeerAddr {
997                ip: "127.0.0.1".parse().expect("valid ip"),
998                port: 7000,
999                transport: TransportProtocol::Tcp,
1000                pubkey_hint: None,
1001                relay_via: None,
1002            },
1003            42,
1004        ));
1005        initial.subscriptions.push(PersistedSubscription {
1006            share_id: [1u8; 32],
1007            share_pubkey: None,
1008            latest_seq: 7,
1009            latest_manifest_id: Some([2u8; 32]),
1010            trust_level: SubscriptionTrustLevel::Normal,
1011        });
1012        initial.share_weights.insert([1u8; 32], 1.5);
1013        initial.enabled_blocklist_shares.push([3u8; 32]);
1014        initial.blocklist_rules_by_share.insert(
1015            [3u8; 32],
1016            BlocklistRules {
1017                blocked_share_ids: vec![[4u8; 32]],
1018                blocked_content_ids: vec![[5u8; 32]],
1019            },
1020        );
1021
1022        store.save_state(&initial).await.expect("save");
1023        let loaded = store.load_state().await.expect("load");
1024
1025        assert_eq!(loaded.peers.len(), 1);
1026        assert_eq!(loaded.subscriptions.len(), 1);
1027        assert_eq!(loaded.share_weights.get(&[1u8; 32]), Some(&1.5));
1028        assert_eq!(loaded.enabled_blocklist_shares, vec![[3u8; 32]]);
1029        assert_eq!(loaded.blocklist_rules_by_share.len(), 1);
1030    }
1031
1032    #[tokio::test]
1033    async fn sqlite_store_roundtrip() {
1034        let mut path = std::env::temp_dir();
1035        path.push(format!(
1036            "scp2p_store_test_{}.db",
1037            std::time::SystemTime::now()
1038                .duration_since(std::time::UNIX_EPOCH)
1039                .expect("now")
1040                .as_nanos()
1041        ));
1042        let store = SqliteStore::open(&path).expect("open sqlite");
1043        let mut initial = PersistedState::default();
1044        initial.subscriptions.push(PersistedSubscription {
1045            share_id: [7u8; 32],
1046            share_pubkey: Some([9u8; 32]),
1047            latest_seq: 3,
1048            latest_manifest_id: Some([5u8; 32]),
1049            trust_level: SubscriptionTrustLevel::Trusted,
1050        });
1051        initial.partial_downloads.insert(
1052            [8u8; 32],
1053            PersistedPartialDownload {
1054                content_id: [8u8; 32],
1055                target_path: "tmp.bin".into(),
1056                total_chunks: 4,
1057                completed_chunks: vec![1, 3],
1058            },
1059        );
1060        initial.encrypted_node_key = Some(encrypt_secret(b"k", "pw").expect("encrypt"));
1061        initial.enabled_blocklist_shares.push([7u8; 32]);
1062        initial.blocklist_rules_by_share.insert(
1063            [7u8; 32],
1064            BlocklistRules {
1065                blocked_share_ids: vec![[1u8; 32]],
1066                blocked_content_ids: vec![[2u8; 32]],
1067            },
1068        );
1069        // Regression: content_paths must survive a save/load cycle so that
1070        // chunk hashes can be rehydrated on restart and GetChunkHashes requests
1071        // succeed without re-registering files.
1072        initial
1073            .content_paths
1074            .insert([0xCCu8; 32], PathBuf::from("/some/file.bin"));
1075
1076        store.save_state(&initial).await.expect("save");
1077        let loaded = store.load_state().await.expect("load");
1078        assert_eq!(loaded.subscriptions.len(), 1);
1079        assert_eq!(loaded.subscriptions[0].latest_seq, 3);
1080        assert_eq!(
1081            loaded.subscriptions[0].trust_level,
1082            SubscriptionTrustLevel::Trusted
1083        );
1084        assert_eq!(loaded.partial_downloads.len(), 1);
1085        assert!(loaded.encrypted_node_key.is_some());
1086        assert_eq!(loaded.enabled_blocklist_shares, vec![[7u8; 32]]);
1087        assert_eq!(loaded.blocklist_rules_by_share.len(), 1);
1088        assert_eq!(
1089            loaded.content_paths.get(&[0xCCu8; 32]),
1090            Some(&PathBuf::from("/some/file.bin")),
1091            "content_paths must be persisted so chunk hashes survive restart"
1092        );
1093
1094        let _ = std::fs::remove_file(path);
1095    }
1096
1097    #[test]
1098    fn encrypted_secret_roundtrip() {
1099        let secret = b"super-secret-material";
1100        let encrypted = encrypt_secret(secret, "passphrase").expect("encrypt");
1101        let decrypted = decrypt_secret(&encrypted, "passphrase").expect("decrypt");
1102        assert_eq!(decrypted, secret);
1103    }
1104
1105    #[tokio::test]
1106    async fn sqlite_fts5_search_index_roundtrip() {
1107        use crate::manifest::{ItemV1, ManifestV1, ShareVisibility};
1108        use crate::search::SearchIndex;
1109
1110        let mut path = std::env::temp_dir();
1111        path.push(format!(
1112            "scp2p_fts5_test_{}.db",
1113            std::time::SystemTime::now()
1114                .duration_since(std::time::UNIX_EPOCH)
1115                .expect("now")
1116                .as_nanos()
1117        ));
1118        let store = SqliteStore::open(&path).expect("open sqlite");
1119
1120        // Index two items via the write-through path rather than the legacy
1121        // save-snapshot path (which no longer writes to FTS5).
1122        let share_id = [1u8; 32];
1123        let manifest = ManifestV1 {
1124            version: 1,
1125            share_pubkey: [0u8; 32],
1126            share_id,
1127            seq: 1,
1128            created_at: 1,
1129            expires_at: None,
1130            title: Some("Linux Downloads".into()),
1131            description: Some("Open source Linux distributions".into()),
1132            visibility: ShareVisibility::Public,
1133            communities: vec![],
1134            items: vec![
1135                ItemV1 {
1136                    content_id: [2u8; 32],
1137                    size: 1_000,
1138                    name: "Ubuntu ISO".into(),
1139                    path: None,
1140                    mime: None,
1141                    tags: vec!["linux".into(), "ubuntu".into()],
1142                    chunk_count: 0,
1143                    chunk_list_hash: [0u8; 32],
1144                },
1145                ItemV1 {
1146                    content_id: [3u8; 32],
1147                    size: 2_000,
1148                    name: "Fedora DVD".into(),
1149                    path: None,
1150                    mime: None,
1151                    tags: vec!["linux".into(), "fedora".into()],
1152                    chunk_count: 0,
1153                    chunk_list_hash: [0u8; 32],
1154                },
1155            ],
1156            recommended_shares: vec![],
1157            signature: None,
1158        };
1159
1160        store
1161            .index_manifest_for_search(&manifest)
1162            .await
1163            .expect("index_manifest_for_search");
1164
1165        // Load state and verify FTS5 items were persisted.
1166        let loaded = store.load_state().await.expect("load");
1167        let loaded_snapshot = loaded.search_index.expect("search_index should be Some");
1168        let loaded_items: Vec<_> = loaded_snapshot.items().collect();
1169        assert_eq!(loaded_items.len(), 2, "should load 2 FTS5 items");
1170
1171        // Verify item content.
1172        let ubuntu = loaded_items
1173            .iter()
1174            .find(|i| i.name == "Ubuntu ISO")
1175            .expect("ubuntu item");
1176        assert_eq!(ubuntu.tags, vec!["linux", "ubuntu"]);
1177        assert_eq!(ubuntu.title.as_deref(), Some("Linux Downloads"));
1178        assert_eq!(
1179            ubuntu.description.as_deref(),
1180            Some("Open source Linux distributions")
1181        );
1182
1183        let fedora = loaded_items
1184            .iter()
1185            .find(|i| i.name == "Fedora DVD")
1186            .expect("fedora item");
1187        assert_eq!(fedora.tags, vec!["linux", "fedora"]);
1188        // Manifest-level description is inherited by both items (FTS5 column).
1189        assert!(fedora.description.is_some());
1190
1191        // Rebuild the search index and verify search works.
1192        let reloaded_index = SearchIndex::from_snapshot(loaded_snapshot);
1193        let mut subs = std::collections::HashSet::new();
1194        subs.insert(share_id);
1195        let hits = reloaded_index.search("ubuntu", &subs, &std::collections::HashMap::new());
1196        assert_eq!(hits.len(), 1);
1197        assert_eq!(hits[0].0.name, "Ubuntu ISO");
1198
1199        // Verify remove_share_from_search cleans up FTS5.
1200        store
1201            .remove_share_from_search(share_id)
1202            .await
1203            .expect("remove_share_from_search");
1204        let after_remove = store.load_state().await.expect("load after remove");
1205        assert!(
1206            after_remove.search_index.is_none(),
1207            "search_index should be None after removing the only share"
1208        );
1209
1210        let _ = std::fs::remove_file(path);
1211    }
1212}