Skip to main content

scp2p_core/api/
mod.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7mod helpers;
8mod node_dht;
9mod node_net;
10mod node_publish;
11mod node_relay;
12#[cfg(test)]
13mod tests;
14
15use helpers::*;
16
17use std::{
18    collections::{HashMap, HashSet},
19    net::SocketAddr,
20    path::PathBuf,
21    sync::Arc,
22    time::SystemTime,
23};
24
25use ed25519_dalek::{Signer, SigningKey, Verifier, VerifyingKey};
26use serde::{Deserialize, Serialize};
27use tokio::sync::RwLock;
28
29use crate::{
30    config::NodeConfig,
31    content::ChunkedContent,
32    dht::{DEFAULT_TTL_SECS, Dht},
33    dht_keys::share_head_key,
34    ids::{ContentId, ShareId},
35    manifest::{ManifestV1, PublicShareSummary, ShareHead, ShareKeypair, ShareVisibility},
36    net_fetch::RequestTransport,
37    peer::{PeerAddr, TransportProtocol},
38    peer_db::PeerDb,
39    relay::{RelayManager, RelayTunnelRegistry},
40    search::SearchIndex,
41    store::{
42        DirtyFlags, EncryptedSecret, MemoryStore, PersistedCommunity, PersistedPartialDownload,
43        PersistedPublisherIdentity, PersistedState, PersistedSubscription, Store,
44        decrypt_secret_with_key, encrypt_secret_with_key,
45    },
46    wire::{PexOffer, PexRequest},
47};
48
49#[derive(Debug, Clone)]
50pub struct SearchQuery {
51    pub text: String,
52}
53
54#[derive(Debug, Clone)]
55pub struct SearchPageQuery {
56    pub text: String,
57    pub offset: usize,
58    pub limit: usize,
59    pub include_snippets: bool,
60}
61
62impl SearchPageQuery {
63    const DEFAULT_LIMIT: usize = 20;
64    const MAX_LIMIT: usize = 200;
65
66    fn normalized(&self) -> Self {
67        Self {
68            text: self.text.clone(),
69            offset: self.offset,
70            limit: self.limit.clamp(1, Self::MAX_LIMIT),
71            include_snippets: self.include_snippets,
72        }
73    }
74}
75
76impl From<SearchQuery> for SearchPageQuery {
77    fn from(value: SearchQuery) -> Self {
78        Self {
79            text: value.text,
80            offset: 0,
81            limit: SearchPageQuery::DEFAULT_LIMIT,
82            include_snippets: false,
83        }
84    }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
88#[serde(rename_all = "snake_case")]
89pub enum SubscriptionTrustLevel {
90    Trusted,
91    #[default]
92    Normal,
93    Untrusted,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct BlocklistRules {
98    pub blocked_share_ids: Vec<[u8; 32]>,
99    pub blocked_content_ids: Vec<[u8; 32]>,
100}
101
102// ── Community membership token (§4.2) ────────────────────────────────
103//
104// A membership token is issued by the **community publisher** (holder of
105// the community share signing key) to authorize a specific node's
106// membership.  The token is cryptographically bound to the community
107// share_id and member node pubkey:
108//
109//   token = { community_share_id, member_node_pubkey, issued_at,
110//             expires_at, signature }
111//
112// The signature covers the CBOR-canonical encoding of all fields except
113// `signature` itself, signed by the community's Ed25519 key.
114//
115// In v0.1 membership tokens are **optional** — nodes may still join
116// communities without a token for convenience.  Future protocol versions
117// will require a valid token for community-scoped operations.
118
119/// A signed token authorizing `member_node_pubkey` as a member of the
120/// community identified by `community_share_id`.
121///
122/// Issued by the community share publisher and verifiable by any
123/// peer that knows the community's public key.
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125pub struct CommunityMembershipToken {
126    pub community_share_id: [u8; 32],
127    pub member_node_pubkey: [u8; 32],
128    pub issued_at: u64,
129    pub expires_at: u64,
130    #[serde(with = "serde_bytes")]
131    pub signature: Vec<u8>,
132}
133
134/// Signable portion of a membership token (all fields except `signature`).
135#[derive(Serialize)]
136struct MembershipTokenSignable([u8; 32], [u8; 32], u64, u64);
137
138impl CommunityMembershipToken {
139    /// Issue a new community membership token.
140    ///
141    /// `community_signing_key` must be the Ed25519 signing key whose
142    /// verifying key derives the community `share_id`.
143    pub fn issue(
144        community_signing_key: &SigningKey,
145        member_node_pubkey: [u8; 32],
146        issued_at: u64,
147        expires_at: u64,
148    ) -> anyhow::Result<Self> {
149        let community_pubkey = community_signing_key.verifying_key().to_bytes();
150        let community_share_id =
151            ShareId::from_pubkey(&VerifyingKey::from_bytes(&community_pubkey)?).0;
152
153        let signable = MembershipTokenSignable(
154            community_share_id,
155            member_node_pubkey,
156            issued_at,
157            expires_at,
158        );
159        let sig = community_signing_key.sign(&crate::cbor::to_vec(&signable)?);
160
161        Ok(Self {
162            community_share_id,
163            member_node_pubkey,
164            issued_at,
165            expires_at,
166            signature: sig.to_bytes().to_vec(),
167        })
168    }
169
170    /// Verify this token against a community's public key and an
171    /// optional `now_unix` timestamp (for expiry checking).
172    pub fn verify(&self, community_pubkey: &[u8; 32], now_unix: Option<u64>) -> anyhow::Result<()> {
173        // Verify the share_id matches the pubkey.
174        let vk = VerifyingKey::from_bytes(community_pubkey)?;
175        let expected_id = ShareId::from_pubkey(&vk).0;
176        if expected_id != self.community_share_id {
177            anyhow::bail!("community_share_id does not match community_pubkey");
178        }
179
180        // Verify expiry.
181        if let Some(now) = now_unix
182            && now > self.expires_at
183        {
184            anyhow::bail!("community membership token expired");
185        }
186
187        // Verify signature.
188        if self.signature.len() != 64 {
189            anyhow::bail!("membership token signature must be 64 bytes");
190        }
191        let signable = MembershipTokenSignable(
192            self.community_share_id,
193            self.member_node_pubkey,
194            self.issued_at,
195            self.expires_at,
196        );
197        let mut sig_arr = [0u8; 64];
198        sig_arr.copy_from_slice(&self.signature);
199        vk.verify(
200            &crate::cbor::to_vec(&signable)?,
201            &ed25519_dalek::Signature::from_bytes(&sig_arr),
202        )?;
203        Ok(())
204    }
205}
206
207/// Internal community membership record storing the pubkey and optional token.
208#[derive(Debug, Clone)]
209struct CommunityMembership {
210    pubkey: [u8; 32],
211    token: Option<CommunityMembershipToken>,
212    /// Local human-readable label.
213    name: Option<String>,
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
217pub enum SearchTrustFilter {
218    #[default]
219    TrustedAndNormal,
220    TrustedOnly,
221    NormalOnly,
222    UntrustedOnly,
223    All,
224}
225
226impl SearchTrustFilter {
227    fn allows(self, trust_level: SubscriptionTrustLevel) -> bool {
228        match self {
229            Self::TrustedAndNormal => {
230                matches!(
231                    trust_level,
232                    SubscriptionTrustLevel::Trusted | SubscriptionTrustLevel::Normal
233                )
234            }
235            Self::TrustedOnly => matches!(trust_level, SubscriptionTrustLevel::Trusted),
236            Self::NormalOnly => matches!(trust_level, SubscriptionTrustLevel::Normal),
237            Self::UntrustedOnly => matches!(trust_level, SubscriptionTrustLevel::Untrusted),
238            Self::All => true,
239        }
240    }
241}
242
243#[derive(Debug, Clone)]
244pub struct SearchResult {
245    pub share_id: ShareId,
246    pub content_id: [u8; 32],
247    pub name: String,
248    pub snippet: Option<String>,
249    pub score: f32,
250}
251
252#[derive(Debug, Clone)]
253pub struct SearchPage {
254    pub total: usize,
255    pub results: Vec<SearchResult>,
256}
257
258/// Item metadata exposed to UIs for share browsing.
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct ShareItemInfo {
261    pub content_id: [u8; 32],
262    pub size: u64,
263    pub name: String,
264    pub path: Option<String>,
265    pub mime: Option<String>,
266}
267
268/// Full record for a locally published share, including the signing secret.
269/// Returned by [`NodeHandle::list_owned_shares`].
270#[derive(Debug, Clone)]
271pub struct OwnedShareRecord {
272    pub share_id: [u8; 32],
273    pub share_pubkey: [u8; 32],
274    /// Raw Ed25519 signing-key bytes (32 bytes).  Keep this confidential.
275    pub share_secret: [u8; 32],
276    pub latest_seq: u64,
277    pub manifest_id: [u8; 32],
278    pub title: Option<String>,
279    pub description: Option<String>,
280    pub visibility: ShareVisibility,
281    pub item_count: usize,
282    pub community_ids: Vec<[u8; 32]>,
283}
284
285#[derive(Clone)]
286pub struct NodeHandle {
287    state: Arc<RwLock<NodeState>>,
288    /// Shared registry of active relay tunnels.
289    ///
290    /// When this node acts as a relay, firewalled peers register slots
291    /// here; incoming requests for those slots are forwarded through
292    /// the tunnel to the firewalled node's persistent connection.
293    pub(crate) relay_tunnels: RelayTunnelRegistry,
294}
295
296struct NodeState {
297    runtime_config: NodeConfig,
298    subscriptions: HashMap<[u8; 32], SubscriptionState>,
299    /// Maps community share_id → (share_pubkey, optional membership token).
300    communities: HashMap<[u8; 32], CommunityMembership>,
301    publisher_identities: HashMap<String, [u8; 32]>,
302    /// Encrypted publisher identity secrets, populated by
303    /// [`encrypt_publisher_identities`].  When present for a label,
304    /// [`to_persisted`] writes the encrypted form and omits plaintext.
305    encrypted_publisher_secrets: HashMap<String, EncryptedSecret>,
306    /// Publisher secrets encrypted with the node identity key (fast blake3 path).
307    /// Populated by `auto_protect_publisher_identities` and `ensure_publisher_identity`
308    /// when `NodeConfig::auto_protect_publisher_keys` is true.
309    node_key_encrypted_publisher_secrets: HashMap<String, EncryptedSecret>,
310    peer_db: PeerDb,
311    dht: Dht,
312    manifest_cache: HashMap<[u8; 32], ManifestV1>,
313    published_share_heads: HashMap<[u8; 32], ShareHead>,
314    search_index: SearchIndex,
315    share_weights: HashMap<[u8; 32], f32>,
316    content_catalog: HashMap<[u8; 32], ChunkedContent>,
317    /// Maps content_id → file path for path-based seeding.
318    /// Chunks are served directly from the file at this path.
319    content_paths: HashMap<[u8; 32], PathBuf>,
320    relay: RelayManager,
321    relay_scores: HashMap<String, i32>,
322    relay_rotation_cursor: usize,
323    abuse_counters: HashMap<String, AbuseCounter>,
324    abuse_limits: AbuseLimits,
325    partial_downloads: HashMap<[u8; 32], PersistedPartialDownload>,
326    encrypted_node_key: Option<EncryptedSecret>,
327    /// Plaintext node identity key (loaded from store).
328    node_key: Option<[u8; 32]>,
329    enabled_blocklist_shares: HashSet<[u8; 32]>,
330    blocklist_rules_by_share: HashMap<[u8; 32], BlocklistRules>,
331    /// TOFU-pinned Ed25519 public keys for bootstrap peers.
332    pinned_bootstrap_keys: HashMap<String, [u8; 32]>,
333    /// Active relay slot when this node is firewalled and using a relay.
334    /// Supports multiple relays for redundancy.
335    active_relay_slots: Vec<ActiveRelaySlot>,
336    store: Arc<dyn Store>,
337    /// Tracks which sections have been mutated since the last persist.
338    dirty: DirtyFlags,
339}
340
341/// Tracks an active relay registration for a firewalled node.
342#[derive(Debug, Clone)]
343pub struct ActiveRelaySlot {
344    /// The relay node we registered on.
345    pub relay_addr: PeerAddr,
346    /// The slot ID assigned by the relay.
347    pub slot_id: u64,
348    /// When the slot expires (unix secs).
349    pub expires_at: u64,
350}
351
352#[derive(Debug, Clone)]
353struct SubscriptionState {
354    share_pubkey: Option<[u8; 32]>,
355    latest_seq: u64,
356    latest_manifest_id: Option<[u8; 32]>,
357    trust_level: SubscriptionTrustLevel,
358}
359
360#[derive(Debug, Clone)]
361pub struct AbuseLimits {
362    pub window_secs: u64,
363    pub max_total_requests_per_window: u32,
364    pub max_dht_requests_per_window: u32,
365    pub max_fetch_requests_per_window: u32,
366    pub max_relay_requests_per_window: u32,
367    /// Maximum chunk data requests per window.  Each chunk is up to
368    /// `CHUNK_SIZE` (256 KiB), so this effectively caps transfer at
369    /// `max_chunk_requests_per_window × 256 KiB` per window period.
370    pub max_chunk_requests_per_window: u32,
371}
372
373impl Default for AbuseLimits {
374    fn default() -> Self {
375        Self {
376            window_secs: 60,
377            max_total_requests_per_window: 1200,
378            max_dht_requests_per_window: 600,
379            max_fetch_requests_per_window: 240,
380            max_relay_requests_per_window: 180,
381            // 600 chunks × 256 KiB ≈ 150 MiB per minute per peer.
382            max_chunk_requests_per_window: 600,
383        }
384    }
385}
386
387#[derive(Debug, Clone, Copy)]
388enum RequestClass {
389    Dht,
390    Fetch,
391    Relay,
392    /// Chunk data transfer requests (`GetChunk`).
393    Chunk,
394    Other,
395}
396
397#[derive(Debug, Clone)]
398struct AbuseCounter {
399    window_start_unix: u64,
400    total: u32,
401    dht: u32,
402    fetch: u32,
403    relay: u32,
404    /// Chunk data requests in this window.
405    chunk: u32,
406}
407
408pub struct Node;
409
410impl Node {
411    pub async fn start(config: NodeConfig) -> anyhow::Result<NodeHandle> {
412        Self::start_with_store(config, MemoryStore::new()).await
413    }
414
415    pub async fn start_with_store(
416        config: NodeConfig,
417        store: Arc<dyn Store>,
418    ) -> anyhow::Result<NodeHandle> {
419        let persisted = store.load_state().await?;
420        let state = NodeState::from_persisted(config, persisted, store)?;
421        Ok(NodeHandle {
422            state: Arc::new(RwLock::new(state)),
423            relay_tunnels: RelayTunnelRegistry::new(),
424        })
425    }
426}
427
428impl NodeState {
429    fn from_persisted(
430        runtime_config: NodeConfig,
431        persisted: PersistedState,
432        store: Arc<dyn Store>,
433    ) -> anyhow::Result<Self> {
434        let PersistedState {
435            peers,
436            subscriptions,
437            communities,
438            publisher_identities: publisher_identities_raw,
439            manifests,
440            share_heads,
441            share_weights,
442            search_index,
443            partial_downloads,
444            node_key,
445            encrypted_node_key,
446            enabled_blocklist_shares,
447            blocklist_rules_by_share,
448            content_paths: persisted_content_paths,
449            pinned_bootstrap_keys,
450        } = persisted;
451        let mut peer_db = PeerDb::default();
452        peer_db.replace_records(peers);
453        let subscriptions = subscriptions
454            .into_iter()
455            .map(|sub| {
456                (
457                    sub.share_id,
458                    SubscriptionState {
459                        share_pubkey: sub.share_pubkey,
460                        latest_seq: sub.latest_seq,
461                        latest_manifest_id: sub.latest_manifest_id,
462                        trust_level: sub.trust_level,
463                    },
464                )
465            })
466            .collect::<HashMap<_, _>>();
467        let communities = communities
468            .into_iter()
469            .map(|community| {
470                let token = community.membership_token.as_ref().and_then(|bytes| {
471                    crate::cbor::from_slice::<CommunityMembershipToken>(bytes).ok()
472                });
473                (
474                    community.share_id,
475                    CommunityMembership {
476                        pubkey: community.share_pubkey,
477                        token,
478                        name: community.name,
479                    },
480                )
481            })
482            .collect::<HashMap<_, _>>();
483        // Build publisher_identities map: prefer plaintext; also auto-unlock those
484        // encrypted with the node key (fast blake3, no PBKDF2) when auto-protect is on.
485        let mut publisher_identities: HashMap<String, [u8; 32]> = publisher_identities_raw
486            .iter()
487            .filter_map(|identity| {
488                identity
489                    .share_secret
490                    .map(|secret| (identity.label.clone(), secret))
491            })
492            .collect();
493        let mut node_key_encrypted_publisher_secrets: HashMap<String, EncryptedSecret> =
494            HashMap::new();
495        if runtime_config.auto_protect_publisher_keys
496            && let Some(ref nk) = node_key
497        {
498            for identity in &publisher_identities_raw {
499                if let Some(enc) = &identity.node_key_encrypted_share_secret {
500                    // Re-populate encrypted map always (even when plaintext is available).
501                    node_key_encrypted_publisher_secrets
502                        .insert(identity.label.clone(), enc.clone());
503                    // Auto-unlock if we don't already have the plaintext.
504                    if !publisher_identities.contains_key(&identity.label)
505                        && let Ok(plain) = decrypt_secret_with_key(enc, nk)
506                        && plain.len() == 32
507                    {
508                        let mut arr = [0u8; 32];
509                        arr.copy_from_slice(&plain);
510                        publisher_identities.insert(identity.label.clone(), arr);
511                    }
512                }
513            }
514        }
515
516        // Prune content_paths for files that no longer exist on disk.
517        let content_paths: HashMap<[u8; 32], PathBuf> = persisted_content_paths
518            .into_iter()
519            .filter(|(_, path)| path.exists())
520            .collect();
521
522        let mut rebuilt_search_index = SearchIndex::default();
523        let mut content_catalog = HashMap::new();
524        for manifest in manifests.values() {
525            rebuilt_search_index.index_manifest(manifest);
526            for item in &manifest.items {
527                // Register content metadata without reading files from disk.
528                // Chunk hashes are computed lazily on first request via
529                // `chunk_hash_list()`, avoiding O(total-content-size) startup
530                // cost.
531                content_catalog.insert(
532                    item.content_id,
533                    ChunkedContent {
534                        content_id: ContentId(item.content_id),
535                        chunks: vec![],
536                        chunk_count: item.chunk_count,
537                        chunk_list_hash: item.chunk_list_hash,
538                    },
539                );
540            }
541        }
542        let search_index = search_index
543            .map(SearchIndex::from_snapshot)
544            .unwrap_or(rebuilt_search_index);
545
546        let now = SystemTime::now()
547            .duration_since(SystemTime::UNIX_EPOCH)
548            .map(|d| d.as_secs())
549            .unwrap_or(0);
550        let mut dht = Dht::default();
551        for (share_id, head) in &share_heads {
552            if let Ok(encoded) = crate::cbor::to_vec(head) {
553                let _ = dht.store(
554                    share_head_key(&ShareId(*share_id)),
555                    encoded,
556                    DEFAULT_TTL_SECS,
557                    now,
558                );
559            }
560        }
561
562        Ok(Self {
563            runtime_config,
564            subscriptions,
565            communities,
566            publisher_identities,
567            encrypted_publisher_secrets: HashMap::new(),
568            node_key_encrypted_publisher_secrets,
569            peer_db,
570            dht,
571            manifest_cache: manifests,
572            published_share_heads: share_heads,
573            search_index,
574            share_weights,
575            content_catalog,
576            content_paths,
577            relay: RelayManager::default(),
578            relay_scores: HashMap::new(),
579            relay_rotation_cursor: 0,
580            abuse_counters: HashMap::new(),
581            abuse_limits: AbuseLimits::default(),
582            partial_downloads,
583            encrypted_node_key,
584            node_key,
585            enabled_blocklist_shares: enabled_blocklist_shares.into_iter().collect(),
586            blocklist_rules_by_share,
587            pinned_bootstrap_keys,
588            active_relay_slots: Vec::new(),
589            store,
590            dirty: DirtyFlags::default(),
591        })
592    }
593
594    fn to_persisted(&self) -> PersistedState {
595        let subscriptions = self
596            .subscriptions
597            .iter()
598            .map(|(share_id, sub)| PersistedSubscription {
599                share_id: *share_id,
600                share_pubkey: sub.share_pubkey,
601                latest_seq: sub.latest_seq,
602                latest_manifest_id: sub.latest_manifest_id,
603                trust_level: sub.trust_level,
604            })
605            .collect();
606        let communities = self
607            .communities
608            .iter()
609            .map(|(share_id, membership)| PersistedCommunity {
610                share_id: *share_id,
611                share_pubkey: membership.pubkey,
612                membership_token: membership
613                    .token
614                    .as_ref()
615                    .and_then(|t| crate::cbor::to_vec(t).ok()),
616                name: membership.name.clone(),
617            })
618            .collect();
619        let publisher_identities = self
620            .publisher_identities
621            .iter()
622            .map(|(label, share_secret)| {
623                if let Some(encrypted) = self.encrypted_publisher_secrets.get(label) {
624                    // Persist only the user-passphrase encrypted form – omit plaintext.
625                    PersistedPublisherIdentity {
626                        label: label.clone(),
627                        share_secret: None,
628                        encrypted_share_secret: Some(encrypted.clone()),
629                        node_key_encrypted_share_secret: self
630                            .node_key_encrypted_publisher_secrets
631                            .get(label)
632                            .cloned(),
633                    }
634                } else {
635                    PersistedPublisherIdentity {
636                        label: label.clone(),
637                        // Omit plaintext when a node-key-encrypted form is available.
638                        share_secret: if self
639                            .node_key_encrypted_publisher_secrets
640                            .contains_key(label)
641                        {
642                            None
643                        } else {
644                            Some(*share_secret)
645                        },
646                        encrypted_share_secret: None,
647                        node_key_encrypted_share_secret: self
648                            .node_key_encrypted_publisher_secrets
649                            .get(label)
650                            .cloned(),
651                    }
652                }
653            })
654            .collect();
655        PersistedState {
656            peers: self.peer_db.all_records(),
657            subscriptions,
658            communities,
659            publisher_identities,
660            manifests: self.manifest_cache.clone(),
661            share_heads: self.published_share_heads.clone(),
662            share_weights: self.share_weights.clone(),
663            search_index: None,
664            partial_downloads: self.partial_downloads.clone(),
665            node_key: self.node_key,
666            encrypted_node_key: self.encrypted_node_key.clone(),
667            enabled_blocklist_shares: self.enabled_blocklist_shares.iter().copied().collect(),
668            blocklist_rules_by_share: self.blocklist_rules_by_share.clone(),
669            content_paths: self.content_paths.clone(),
670            pinned_bootstrap_keys: self.pinned_bootstrap_keys.clone(),
671        }
672    }
673
674    fn enforce_request_limits(
675        &mut self,
676        remote_peer: &PeerAddr,
677        class: RequestClass,
678        now_unix: u64,
679    ) -> anyhow::Result<()> {
680        let key = relay_peer_key(remote_peer);
681        let window = self.abuse_limits.window_secs.max(1);
682        let counter = self
683            .abuse_counters
684            .entry(key)
685            .or_insert_with(|| AbuseCounter {
686                window_start_unix: now_unix,
687                total: 0,
688                dht: 0,
689                fetch: 0,
690                relay: 0,
691                chunk: 0,
692            });
693
694        if now_unix.saturating_sub(counter.window_start_unix) >= window {
695            *counter = AbuseCounter {
696                window_start_unix: now_unix,
697                total: 0,
698                dht: 0,
699                fetch: 0,
700                relay: 0,
701                chunk: 0,
702            };
703        }
704
705        counter.total = counter.total.saturating_add(1);
706        match class {
707            RequestClass::Dht => counter.dht = counter.dht.saturating_add(1),
708            RequestClass::Fetch => counter.fetch = counter.fetch.saturating_add(1),
709            RequestClass::Relay => counter.relay = counter.relay.saturating_add(1),
710            RequestClass::Chunk => counter.chunk = counter.chunk.saturating_add(1),
711            // Truly uncategorized requests are still exempt.
712            RequestClass::Other => return Ok(()),
713        }
714
715        if counter.total > self.abuse_limits.max_total_requests_per_window {
716            anyhow::bail!("request rate limit exceeded");
717        }
718        if counter.dht > self.abuse_limits.max_dht_requests_per_window {
719            anyhow::bail!("dht request rate limit exceeded");
720        }
721        if counter.fetch > self.abuse_limits.max_fetch_requests_per_window {
722            anyhow::bail!("fetch request rate limit exceeded");
723        }
724        if counter.relay > self.abuse_limits.max_relay_requests_per_window {
725            anyhow::bail!("relay request rate limit exceeded");
726        }
727        if counter.chunk > self.abuse_limits.max_chunk_requests_per_window {
728            anyhow::bail!("chunk request rate limit exceeded");
729        }
730        Ok(())
731    }
732}
733
734impl NodeHandle {
735    pub async fn runtime_config(&self) -> NodeConfig {
736        self.state.read().await.runtime_config.clone()
737    }
738
739    pub async fn configured_bootstrap_peers(&self) -> anyhow::Result<Vec<PeerAddr>> {
740        let state = self.state.read().await;
741        let config = state.runtime_config.clone();
742        let pinned = &state.pinned_bootstrap_keys;
743        let mut peers = config
744            .bootstrap_peers
745            .iter()
746            .map(|entry| {
747                let (transport, addr_part) = if let Some(rest) = entry.strip_prefix("quic://") {
748                    (TransportProtocol::Quic, rest)
749                } else if let Some(rest) = entry.strip_prefix("tcp://") {
750                    (TransportProtocol::Tcp, rest)
751                } else {
752                    (TransportProtocol::Tcp, entry.as_str())
753                };
754
755                // Parse optional pubkey suffix: "ip:port@<64-hex-chars>"
756                let (socket_str, explicit_pubkey) = if let Some(at_pos) = addr_part.rfind('@') {
757                    let hex_str = &addr_part[at_pos + 1..];
758                    if hex_str.len() == 64 {
759                        let mut key = [0u8; 32];
760                        let ok = hex_str.as_bytes().chunks(2).enumerate().all(|(i, pair)| {
761                            let hi = hex_nibble(pair[0]);
762                            let lo = hex_nibble(pair[1]);
763                            if let (Some(h), Some(l)) = (hi, lo) {
764                                key[i] = (h << 4) | l;
765                                true
766                            } else {
767                                false
768                            }
769                        });
770                        if ok {
771                            (&addr_part[..at_pos], Some(key))
772                        } else {
773                            (addr_part, None)
774                        }
775                    } else {
776                        (addr_part, None)
777                    }
778                } else {
779                    (addr_part, None)
780                };
781
782                let socket: SocketAddr = socket_str.parse()?;
783
784                // Use explicit key if present, else fall back to TOFU pinned key.
785                let pubkey_hint = explicit_pubkey.or_else(|| pinned.get(entry).copied());
786
787                Ok(PeerAddr {
788                    ip: socket.ip(),
789                    port: socket.port(),
790                    transport,
791                    pubkey_hint,
792                    relay_via: None,
793                })
794            })
795            .collect::<anyhow::Result<Vec<_>>>()?;
796
797        // For each bootstrap peer, also add the alternate transport variant
798        // (TCP ↔ QUIC) so DHT operations can fall back if one protocol is
799        // blocked by the network.  Convention: TCP port = QUIC port + 1.
800        let mut extra = Vec::new();
801        for peer in &peers {
802            let alt = match peer.transport {
803                TransportProtocol::Quic => PeerAddr {
804                    port: peer.port.saturating_add(1),
805                    transport: TransportProtocol::Tcp,
806                    ..peer.clone()
807                },
808                TransportProtocol::Tcp => PeerAddr {
809                    port: peer.port.saturating_sub(1),
810                    transport: TransportProtocol::Quic,
811                    ..peer.clone()
812                },
813            };
814            if !peers
815                .iter()
816                .any(|p| p.ip == alt.ip && p.port == alt.port && p.transport == alt.transport)
817            {
818                extra.push(alt);
819            }
820        }
821        peers.extend(extra);
822
823        // Sort TCP peers before QUIC so that operations succeed quickly
824        // when QUIC is blocked (common behind NAT / firewalls).  The QUIC
825        // variants remain in the list for fallback.
826        peers.sort_by_key(|p| match p.transport {
827            TransportProtocol::Tcp => 0,
828            TransportProtocol::Quic => 1,
829        });
830        Ok(peers)
831    }
832
833    /// Record a TOFU-pinned public key for a bootstrap peer.
834    ///
835    /// If the peer address already has a pinned key, verifies that the
836    /// new key matches; returns an error on mismatch (identity change).
837    /// If no key is pinned yet, stores it (first-seen trust).
838    pub async fn pin_bootstrap_key(
839        &self,
840        peer_addr_str: &str,
841        observed_pubkey: [u8; 32],
842    ) -> anyhow::Result<()> {
843        let mut state = self.state.write().await;
844        if let Some(existing) = state.pinned_bootstrap_keys.get(peer_addr_str) {
845            if *existing != observed_pubkey {
846                anyhow::bail!(
847                    "bootstrap peer identity mismatch for {peer_addr_str}: \
848                     pinned key differs from observed key"
849                );
850            }
851            // Key matches — nothing to do.
852            return Ok(());
853        }
854        state
855            .pinned_bootstrap_keys
856            .insert(peer_addr_str.to_string(), observed_pubkey);
857        state.dirty.peers = true;
858        Ok(())
859    }
860
861    /// Return the TOFU-pinned public key for a bootstrap peer, if any.
862    pub async fn pinned_bootstrap_key(&self, peer_addr_str: &str) -> Option<[u8; 32]> {
863        self.state
864            .read()
865            .await
866            .pinned_bootstrap_keys
867            .get(peer_addr_str)
868            .copied()
869    }
870
871    pub async fn peer_records(&self) -> Vec<crate::peer_db::PeerRecord> {
872        self.state.read().await.peer_db.all_records()
873    }
874
875    /// Record the outcome of a transfer interaction with a peer.
876    ///
877    /// Calls [`PeerDb::note_outcome`] to update the persistent reputation
878    /// score: `+1` for a successful interaction, `-2` for a failure.
879    /// The score is clamped to `[-10, 10]` and survives node restarts.
880    ///
881    /// This method is a no-op if the peer is not yet in the database.
882    pub async fn note_peer_outcome(&self, addr: &PeerAddr, success: bool) -> anyhow::Result<()> {
883        {
884            let mut state = self.state.write().await;
885            state.peer_db.note_outcome(addr, success);
886            state.dirty.peers = true;
887        }
888        helpers::persist_state(self).await
889    }
890
891    pub async fn subscriptions(&self) -> Vec<PersistedSubscription> {
892        let state = self.state.read().await;
893        state
894            .subscriptions
895            .iter()
896            .map(|(share_id, sub)| PersistedSubscription {
897                share_id: *share_id,
898                share_pubkey: sub.share_pubkey,
899                latest_seq: sub.latest_seq,
900                latest_manifest_id: sub.latest_manifest_id,
901                trust_level: sub.trust_level,
902            })
903            .collect()
904    }
905
906    /// Return the cached manifest title and description for a given manifest ID.
907    pub async fn cached_manifest_meta(
908        &self,
909        manifest_id: &[u8; 32],
910    ) -> (Option<String>, Option<String>) {
911        let state = self.state.read().await;
912        match state.manifest_cache.get(manifest_id) {
913            Some(m) => (m.title.clone(), m.description.clone()),
914            None => (None, None),
915        }
916    }
917
918    pub async fn communities(&self) -> Vec<PersistedCommunity> {
919        let state = self.state.read().await;
920        state
921            .communities
922            .iter()
923            .map(|(share_id, membership)| PersistedCommunity {
924                share_id: *share_id,
925                share_pubkey: membership.pubkey,
926                membership_token: membership
927                    .token
928                    .as_ref()
929                    .and_then(|t| crate::cbor::to_vec(t).ok()),
930                name: membership.name.clone(),
931            })
932            .collect()
933    }
934
935    pub async fn ensure_publisher_identity(&self, label: &str) -> anyhow::Result<ShareKeypair> {
936        let label = label.trim();
937        if label.is_empty() {
938            anyhow::bail!("publisher label must not be empty");
939        }
940
941        let mut needs_persist = false;
942        let secret = {
943            let mut state = self.state.write().await;
944            match state.publisher_identities.get(label).copied() {
945                Some(secret) => secret,
946                None => {
947                    let mut rng = rand::rngs::OsRng;
948                    let secret = SigningKey::generate(&mut rng).to_bytes();
949                    state.publisher_identities.insert(label.to_string(), secret);
950                    // Auto-protect with the node key when enabled.
951                    if state.runtime_config.auto_protect_publisher_keys
952                        && let Some(nk) = state.node_key
953                        && let Ok(enc) = encrypt_secret_with_key(secret.as_ref(), &nk)
954                    {
955                        state
956                            .node_key_encrypted_publisher_secrets
957                            .insert(label.to_string(), enc);
958                    }
959                    state.dirty.publisher_identities = true;
960                    needs_persist = true;
961                    secret
962                }
963            }
964        };
965        if needs_persist {
966            persist_state(self).await?;
967        }
968        Ok(ShareKeypair::new(SigningKey::from_bytes(&secret)))
969    }
970
971    /// Encrypt all existing plaintext publisher identities with the node key.
972    ///
973    /// Safe to call repeatedly; already-encrypted identities are skipped.
974    /// No-ops if `auto_protect_publisher_keys` is disabled or no node key is set.
975    pub async fn auto_protect_publisher_identities(&self) -> anyhow::Result<()> {
976        let changed = {
977            let mut state = self.state.write().await;
978            if !state.runtime_config.auto_protect_publisher_keys {
979                return Ok(());
980            }
981            let Some(nk) = state.node_key else {
982                return Ok(());
983            };
984            let mut changed = false;
985            let labels: Vec<_> = state.publisher_identities.keys().cloned().collect();
986            for label in labels {
987                if state
988                    .node_key_encrypted_publisher_secrets
989                    .contains_key(&label)
990                {
991                    continue;
992                }
993                let secret = state.publisher_identities[&label];
994                if let Ok(enc) = encrypt_secret_with_key(secret.as_ref(), &nk) {
995                    state
996                        .node_key_encrypted_publisher_secrets
997                        .insert(label, enc);
998                    changed = true;
999                }
1000            }
1001            if changed {
1002                state.dirty.publisher_identities = true;
1003            }
1004            changed
1005        };
1006        if changed {
1007            persist_state(self).await?;
1008        }
1009        Ok(())
1010    }
1011
1012    /// Return a stable Ed25519 node identity keypair.
1013    ///
1014    /// On first call the key is generated, persisted to the store, and
1015    /// returned.  Subsequent calls return the same key.  This avoids
1016    /// the previous pattern of generating a fresh ephemeral keypair on
1017    /// every node start, which made the node un-addressable by pubkey
1018    /// across restarts.
1019    pub async fn ensure_node_identity(&self) -> anyhow::Result<SigningKey> {
1020        let mut needs_persist = false;
1021        let secret = {
1022            let mut state = self.state.write().await;
1023            match state.node_key {
1024                Some(key) => key,
1025                None => {
1026                    let mut rng = rand::rngs::OsRng;
1027                    let key = SigningKey::generate(&mut rng).to_bytes();
1028                    state.node_key = Some(key);
1029                    state.dirty.node_key = true;
1030                    needs_persist = true;
1031                    key
1032                }
1033            }
1034        };
1035        if needs_persist {
1036            persist_state(self).await?;
1037        }
1038        Ok(SigningKey::from_bytes(&secret))
1039    }
1040
1041    pub async fn list_local_public_shares(
1042        &self,
1043        max_entries: usize,
1044    ) -> anyhow::Result<Vec<PublicShareSummary>> {
1045        let state = self.state.read().await;
1046        let mut heads = state
1047            .published_share_heads
1048            .values()
1049            .cloned()
1050            .collect::<Vec<_>>();
1051        heads.sort_by(|a, b| {
1052            b.updated_at
1053                .cmp(&a.updated_at)
1054                .then(b.latest_seq.cmp(&a.latest_seq))
1055                .then(a.share_id.cmp(&b.share_id))
1056        });
1057
1058        let mut shares = Vec::new();
1059        for head in heads {
1060            let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id) else {
1061                continue;
1062            };
1063            if manifest.visibility != ShareVisibility::Public {
1064                continue;
1065            }
1066            shares.push(PublicShareSummary {
1067                share_id: manifest.share_id,
1068                share_pubkey: manifest.share_pubkey,
1069                latest_seq: head.latest_seq,
1070                latest_manifest_id: head.latest_manifest_id,
1071                title: manifest.title.clone(),
1072                description: manifest.description.clone(),
1073            });
1074            if shares.len() >= max_entries {
1075                break;
1076            }
1077        }
1078        Ok(shares)
1079    }
1080
1081    pub async fn published_share_head(&self, share_id: ShareId) -> Option<ShareHead> {
1082        self.state
1083            .read()
1084            .await
1085            .published_share_heads
1086            .get(&share_id.0)
1087            .cloned()
1088    }
1089
1090    /// Return all publisher identities that have a current published share head,
1091    /// together with the signing secret so the caller can display the share keys.
1092    pub async fn list_owned_shares(&self) -> Vec<OwnedShareRecord> {
1093        let state = self.state.read().await;
1094        let mut records = Vec::new();
1095        for secret in state.publisher_identities.values() {
1096            let signing_key = SigningKey::from_bytes(secret);
1097            let verifying_key = signing_key.verifying_key();
1098            let share_id = ShareId::from_pubkey(&verifying_key);
1099            let Some(head) = state.published_share_heads.get(&share_id.0) else {
1100                continue;
1101            };
1102            let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id) else {
1103                continue;
1104            };
1105            records.push(OwnedShareRecord {
1106                share_id: share_id.0,
1107                share_pubkey: verifying_key.to_bytes(),
1108                share_secret: *secret,
1109                latest_seq: head.latest_seq,
1110                manifest_id: head.latest_manifest_id,
1111                title: manifest.title.clone(),
1112                description: manifest.description.clone(),
1113                visibility: manifest.visibility,
1114                item_count: manifest.items.len(),
1115                community_ids: manifest.communities.clone(),
1116            });
1117        }
1118        records
1119    }
1120
1121    /// Remove the published share head (and its manifest) for the given `share_id`.
1122    /// The publisher identity key is retained so the share can be re-published later.
1123    pub async fn delete_published_share(&self, share_id: ShareId) -> anyhow::Result<()> {
1124        let store = {
1125            let mut state = self.state.write().await;
1126            if let Some(head) = state.published_share_heads.remove(&share_id.0) {
1127                state.manifest_cache.remove(&head.latest_manifest_id);
1128            }
1129            state.dirty.manifests = true;
1130            state.dirty.share_heads = true;
1131            state.store.clone()
1132        };
1133        store.remove_share_from_search(share_id.0).await?;
1134        persist_state(self).await
1135    }
1136
1137    /// Re-publish the share with a new visibility setting, bumping the sequence number.
1138    pub async fn update_share_visibility(
1139        &self,
1140        share_id: ShareId,
1141        new_visibility: ShareVisibility,
1142    ) -> anyhow::Result<()> {
1143        let (manifest, keypair) = {
1144            let state = self.state.read().await;
1145            let head = state
1146                .published_share_heads
1147                .get(&share_id.0)
1148                .ok_or_else(|| anyhow::anyhow!("share not found in published heads"))?
1149                .clone();
1150            let manifest = state
1151                .manifest_cache
1152                .get(&head.latest_manifest_id)
1153                .ok_or_else(|| anyhow::anyhow!("manifest not found in cache"))?
1154                .clone();
1155            // Find the matching signing key among known publisher identities.
1156            let keypair = state
1157                .publisher_identities
1158                .values()
1159                .find_map(|secret| {
1160                    let sk = SigningKey::from_bytes(secret);
1161                    let vk = sk.verifying_key();
1162                    if ShareId::from_pubkey(&vk).0 == share_id.0 {
1163                        Some(ShareKeypair::new(sk))
1164                    } else {
1165                        None
1166                    }
1167                })
1168                .ok_or_else(|| anyhow::anyhow!("no signing key for share"))?;
1169            (manifest, keypair)
1170        };
1171        let mut new_manifest = manifest;
1172        new_manifest.seq = new_manifest.seq.saturating_add(1);
1173        new_manifest.visibility = new_visibility;
1174        new_manifest.signature = None;
1175        self.publish_share(new_manifest, &keypair).await?;
1176        Ok(())
1177    }
1178
1179    pub async fn list_local_community_public_shares(
1180        &self,
1181        community_share_id: ShareId,
1182        community_share_pubkey: [u8; 32],
1183        max_entries: usize,
1184        requester_node_pubkey: Option<[u8; 32]>,
1185        requester_membership_proof: Option<&[u8]>,
1186    ) -> anyhow::Result<Vec<PublicShareSummary>> {
1187        let pubkey = VerifyingKey::from_bytes(&community_share_pubkey)?;
1188        if ShareId::from_pubkey(&pubkey) != community_share_id {
1189            anyhow::bail!("community share_id does not match share_pubkey");
1190        }
1191
1192        let state = self.state.read().await;
1193        if !state.communities.contains_key(&community_share_id.0) {
1194            return Ok(Vec::new());
1195        }
1196
1197        // AV-06: enforce strict membership proof when enabled.
1198        if state.runtime_config.community_strict_mode {
1199            let rpk = requester_node_pubkey.ok_or_else(|| {
1200                anyhow::anyhow!("community strict mode: requester_node_pubkey required")
1201            })?;
1202            let proof_bytes = requester_membership_proof.ok_or_else(|| {
1203                anyhow::anyhow!("community strict mode: membership proof required")
1204            })?;
1205            let token: CommunityMembershipToken =
1206                crate::cbor::from_slice(proof_bytes).map_err(|_| {
1207                    anyhow::anyhow!("community strict mode: invalid membership proof encoding")
1208                })?;
1209            if token.member_node_pubkey != rpk {
1210                anyhow::bail!(
1211                    "community strict mode: proof member_node_pubkey does not match requester"
1212                );
1213            }
1214            let now = now_unix_secs()?;
1215            token.verify(&community_share_pubkey, Some(now))?;
1216        }
1217
1218        let mut heads = state
1219            .published_share_heads
1220            .values()
1221            .cloned()
1222            .collect::<Vec<_>>();
1223        heads.sort_by(|a, b| {
1224            b.updated_at
1225                .cmp(&a.updated_at)
1226                .then(b.latest_seq.cmp(&a.latest_seq))
1227                .then(a.share_id.cmp(&b.share_id))
1228        });
1229        let mut shares = Vec::new();
1230        for head in heads {
1231            let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id) else {
1232                continue;
1233            };
1234            if manifest.visibility != ShareVisibility::Public {
1235                continue;
1236            }
1237            if !manifest.communities.contains(&community_share_id.0) {
1238                continue;
1239            }
1240            shares.push(PublicShareSummary {
1241                share_id: manifest.share_id,
1242                share_pubkey: manifest.share_pubkey,
1243                latest_seq: head.latest_seq,
1244                latest_manifest_id: head.latest_manifest_id,
1245                title: manifest.title.clone(),
1246                description: manifest.description.clone(),
1247            });
1248            if shares.len() >= max_entries {
1249                break;
1250            }
1251        }
1252        Ok(shares)
1253    }
1254
1255    pub async fn fetch_public_shares_from_peer<T: RequestTransport + ?Sized>(
1256        &self,
1257        transport: &T,
1258        peer: &PeerAddr,
1259        max_entries: u16,
1260    ) -> anyhow::Result<Vec<PublicShareSummary>> {
1261        query_public_shares(transport, peer, max_entries).await
1262    }
1263
1264    pub async fn fetch_community_status_from_peer<T: RequestTransport + ?Sized>(
1265        &self,
1266        transport: &T,
1267        peer: &PeerAddr,
1268        share_id: ShareId,
1269        share_pubkey: [u8; 32],
1270    ) -> anyhow::Result<(bool, Option<String>)> {
1271        let result = query_community_status(transport, peer, share_id, share_pubkey).await?;
1272        Ok((result.joined, result.name))
1273    }
1274
1275    pub async fn fetch_community_public_shares_from_peer<T: RequestTransport + ?Sized>(
1276        &self,
1277        transport: &T,
1278        peer: &PeerAddr,
1279        community_share_id: ShareId,
1280        community_share_pubkey: [u8; 32],
1281        max_entries: u16,
1282    ) -> anyhow::Result<Vec<PublicShareSummary>> {
1283        // Build requester identity fields to support strict-mode servers.
1284        let (requester_node_pubkey, requester_membership_proof) = {
1285            let state = self.state.read().await;
1286            let node_pubkey = state
1287                .node_key
1288                .map(|k| SigningKey::from_bytes(&k).verifying_key().to_bytes());
1289            let proof = state
1290                .communities
1291                .get(&community_share_id.0)
1292                .and_then(|m| m.token.as_ref())
1293                .and_then(|t| crate::cbor::to_vec(t).ok());
1294            (node_pubkey, proof)
1295        };
1296        query_community_public_shares(
1297            transport,
1298            peer,
1299            community_share_id,
1300            community_share_pubkey,
1301            max_entries,
1302            requester_node_pubkey,
1303            requester_membership_proof,
1304        )
1305        .await
1306    }
1307
1308    pub async fn connect(&self, peer_addr: PeerAddr) -> anyhow::Result<()> {
1309        self.record_peer_seen(peer_addr).await
1310    }
1311
1312    pub async fn record_peer_seen(&self, peer_addr: PeerAddr) -> anyhow::Result<()> {
1313        {
1314            let mut state = self.state.write().await;
1315            state.peer_db.upsert_seen(peer_addr, now_unix_secs()?);
1316            state.dirty.peers = true;
1317        }
1318        persist_state(self).await
1319    }
1320
1321    /// Record that a peer was seen with specific capabilities.
1322    ///
1323    /// Call after a successful handshake to persist the remote peer's
1324    /// capabilities for future relay selection and capability-aware
1325    /// decisions.
1326    pub async fn record_peer_seen_with_capabilities(
1327        &self,
1328        peer_addr: PeerAddr,
1329        capabilities: crate::Capabilities,
1330    ) -> anyhow::Result<()> {
1331        {
1332            let mut state = self.state.write().await;
1333            state
1334                .peer_db
1335                .upsert_seen_with_capabilities(peer_addr, now_unix_secs()?, capabilities);
1336            state.dirty.peers = true;
1337        }
1338        persist_state(self).await
1339    }
1340
1341    pub async fn apply_pex_offer(&self, offer: PexOffer) -> anyhow::Result<usize> {
1342        let count = {
1343            let mut state = self.state.write().await;
1344            let now = now_unix_secs()?;
1345            for addr in offer.peers {
1346                state.peer_db.upsert_seen(addr, now);
1347            }
1348            state.dirty.peers = true;
1349            state.peer_db.total_known_peers()
1350        };
1351        persist_state(self).await?;
1352        Ok(count)
1353    }
1354
1355    pub async fn build_pex_offer(&self, req: PexRequest) -> anyhow::Result<PexOffer> {
1356        let state = self.state.read().await;
1357        let peers = state
1358            .peer_db
1359            .sample_fresh(now_unix_secs()?, usize::from(req.max_peers));
1360        Ok(PexOffer { peers })
1361    }
1362    pub async fn set_share_weight(&self, share_id: ShareId, weight: f32) -> anyhow::Result<()> {
1363        {
1364            let mut state = self.state.write().await;
1365            state.share_weights.insert(share_id.0, weight.max(0.0));
1366            state.dirty.share_weights = true;
1367        }
1368        persist_state(self).await
1369    }
1370
1371    pub async fn subscribe(&self, share_id: ShareId) -> anyhow::Result<()> {
1372        self.subscribe_with_options(share_id, None, SubscriptionTrustLevel::Normal)
1373            .await
1374    }
1375
1376    pub async fn join_community(
1377        &self,
1378        share_id: ShareId,
1379        share_pubkey: [u8; 32],
1380    ) -> anyhow::Result<()> {
1381        self.join_community_with_options(share_id, share_pubkey, None, None)
1382            .await
1383    }
1384
1385    /// Join a community with a human-readable name.
1386    pub async fn join_community_named(
1387        &self,
1388        share_id: ShareId,
1389        share_pubkey: [u8; 32],
1390        name: &str,
1391    ) -> anyhow::Result<()> {
1392        self.join_community_with_options(share_id, share_pubkey, None, Some(name.to_owned()))
1393            .await
1394    }
1395
1396    /// Join a community with an optional membership token.
1397    ///
1398    /// When a `CommunityMembershipToken` is provided, it is verified
1399    /// against `share_pubkey` before being stored.  In v0.1, tokens
1400    /// are optional; community membership without a token is
1401    /// self-asserted.
1402    pub async fn join_community_with_token(
1403        &self,
1404        share_id: ShareId,
1405        share_pubkey: [u8; 32],
1406        token: Option<CommunityMembershipToken>,
1407    ) -> anyhow::Result<()> {
1408        self.join_community_with_options(share_id, share_pubkey, token, None)
1409            .await
1410    }
1411
1412    /// Join a community with optional token and optional name.
1413    pub async fn join_community_with_options(
1414        &self,
1415        share_id: ShareId,
1416        share_pubkey: [u8; 32],
1417        token: Option<CommunityMembershipToken>,
1418        name: Option<String>,
1419    ) -> anyhow::Result<()> {
1420        let pubkey = VerifyingKey::from_bytes(&share_pubkey)?;
1421        let derived = ShareId::from_pubkey(&pubkey);
1422        if derived != share_id {
1423            anyhow::bail!("community share_id does not match share_pubkey");
1424        }
1425        // If a token is provided, verify it before storing.
1426        if let Some(ref tok) = token {
1427            if tok.community_share_id != share_id.0 {
1428                anyhow::bail!("membership token community_share_id mismatch");
1429            }
1430            tok.verify(&share_pubkey, None)?;
1431        }
1432        let mut state = self.state.write().await;
1433        state.communities.insert(
1434            share_id.0,
1435            CommunityMembership {
1436                pubkey: share_pubkey,
1437                token,
1438                name,
1439            },
1440        );
1441        state.dirty.communities = true;
1442        drop(state);
1443        persist_state(self).await
1444    }
1445
1446    pub async fn leave_community(&self, share_id: ShareId) -> anyhow::Result<()> {
1447        {
1448            let mut state = self.state.write().await;
1449            state.communities.remove(&share_id.0);
1450            state.dirty.communities = true;
1451        }
1452        persist_state(self).await
1453    }
1454
1455    /// Update the locally stored name for a community.
1456    ///
1457    /// Called when a remote peer reports the community name during browse.
1458    pub async fn update_community_name(&self, share_id: ShareId, name: &str) -> anyhow::Result<()> {
1459        let mut state = self.state.write().await;
1460        if let Some(membership) = state.communities.get_mut(&share_id.0)
1461            && membership.name.is_none()
1462        {
1463            membership.name = Some(name.to_owned());
1464            state.dirty.communities = true;
1465        }
1466        drop(state);
1467        persist_state(self).await
1468    }
1469
1470    pub async fn subscribe_with_pubkey(
1471        &self,
1472        share_id: ShareId,
1473        share_pubkey: Option<[u8; 32]>,
1474    ) -> anyhow::Result<()> {
1475        self.subscribe_with_options(share_id, share_pubkey, SubscriptionTrustLevel::Normal)
1476            .await
1477    }
1478
1479    pub async fn subscribe_with_trust(
1480        &self,
1481        share_id: ShareId,
1482        share_pubkey: Option<[u8; 32]>,
1483        trust_level: SubscriptionTrustLevel,
1484    ) -> anyhow::Result<()> {
1485        self.subscribe_with_options(share_id, share_pubkey, trust_level)
1486            .await
1487    }
1488
1489    pub async fn set_subscription_trust_level(
1490        &self,
1491        share_id: ShareId,
1492        trust_level: SubscriptionTrustLevel,
1493    ) -> anyhow::Result<()> {
1494        {
1495            let mut state = self.state.write().await;
1496            let Some(sub) = state.subscriptions.get_mut(&share_id.0) else {
1497                anyhow::bail!("subscription not found");
1498            };
1499            sub.trust_level = trust_level;
1500            state.dirty.subscriptions = true;
1501        }
1502        persist_state(self).await
1503    }
1504
1505    pub async fn set_blocklist_rules(
1506        &self,
1507        blocklist_share_id: ShareId,
1508        rules: BlocklistRules,
1509    ) -> anyhow::Result<()> {
1510        {
1511            let mut state = self.state.write().await;
1512            state
1513                .blocklist_rules_by_share
1514                .insert(blocklist_share_id.0, rules);
1515            state.dirty.blocklist = true;
1516        }
1517        persist_state(self).await
1518    }
1519
1520    pub async fn clear_blocklist_rules(&self, blocklist_share_id: ShareId) -> anyhow::Result<()> {
1521        {
1522            let mut state = self.state.write().await;
1523            state.blocklist_rules_by_share.remove(&blocklist_share_id.0);
1524            state.enabled_blocklist_shares.remove(&blocklist_share_id.0);
1525            state.dirty.blocklist = true;
1526        }
1527        persist_state(self).await
1528    }
1529
1530    pub async fn enable_blocklist_share(&self, blocklist_share_id: ShareId) -> anyhow::Result<()> {
1531        {
1532            let mut state = self.state.write().await;
1533            if !state.subscriptions.contains_key(&blocklist_share_id.0) {
1534                anyhow::bail!("blocklist share must be subscribed before enabling");
1535            }
1536            state.enabled_blocklist_shares.insert(blocklist_share_id.0);
1537            state.dirty.blocklist = true;
1538        }
1539        persist_state(self).await
1540    }
1541
1542    pub async fn disable_blocklist_share(&self, blocklist_share_id: ShareId) -> anyhow::Result<()> {
1543        {
1544            let mut state = self.state.write().await;
1545            state.enabled_blocklist_shares.remove(&blocklist_share_id.0);
1546            state.dirty.blocklist = true;
1547        }
1548        persist_state(self).await
1549    }
1550
1551    async fn subscribe_with_options(
1552        &self,
1553        share_id: ShareId,
1554        share_pubkey: Option<[u8; 32]>,
1555        trust_level: SubscriptionTrustLevel,
1556    ) -> anyhow::Result<()> {
1557        {
1558            let mut state = self.state.write().await;
1559            // Enforce subscription cap to prevent unbounded RAM/sync/persist growth.
1560            // Re-subscribing to an already-subscribed share does not count toward the cap.
1561            if !state.subscriptions.contains_key(&share_id.0)
1562                && state.subscriptions.len() >= state.runtime_config.max_subscriptions
1563            {
1564                anyhow::bail!(
1565                    "subscription limit of {} reached; unsubscribe from another share first",
1566                    state.runtime_config.max_subscriptions
1567                );
1568            }
1569            state
1570                .subscriptions
1571                .entry(share_id.0)
1572                .or_insert(SubscriptionState {
1573                    share_pubkey,
1574                    latest_seq: 0,
1575                    latest_manifest_id: None,
1576                    trust_level,
1577                });
1578            state.dirty.subscriptions = true;
1579        }
1580        persist_state(self).await
1581    }
1582
1583    pub async fn unsubscribe(&self, share_id: ShareId) -> anyhow::Result<()> {
1584        let store = {
1585            let mut state = self.state.write().await;
1586            state.subscriptions.remove(&share_id.0);
1587            state.search_index.remove_share(share_id.0);
1588            state.enabled_blocklist_shares.remove(&share_id.0);
1589            state.blocklist_rules_by_share.remove(&share_id.0);
1590            state.dirty.subscriptions = true;
1591            state.dirty.blocklist = true;
1592            state.store.clone()
1593        };
1594        store.remove_share_from_search(share_id.0).await?;
1595        persist_state(self).await
1596    }
1597}