1mod helpers;
8mod node_dht;
9mod node_net;
10mod node_publish;
11mod node_relay;
12#[cfg(test)]
13mod tests;
14
15use helpers::*;
16
17use std::{
18 collections::{HashMap, HashSet},
19 net::SocketAddr,
20 path::PathBuf,
21 sync::Arc,
22 time::SystemTime,
23};
24
25use ed25519_dalek::{Signer, SigningKey, Verifier, VerifyingKey};
26use serde::{Deserialize, Serialize};
27use tokio::sync::RwLock;
28
29use crate::{
30 config::NodeConfig,
31 content::ChunkedContent,
32 dht::{DEFAULT_TTL_SECS, Dht},
33 dht_keys::share_head_key,
34 ids::{ContentId, ShareId},
35 manifest::{ManifestV1, PublicShareSummary, ShareHead, ShareKeypair, ShareVisibility},
36 net_fetch::RequestTransport,
37 peer::{PeerAddr, TransportProtocol},
38 peer_db::PeerDb,
39 relay::{RelayManager, RelayTunnelRegistry},
40 search::SearchIndex,
41 store::{
42 DirtyFlags, EncryptedSecret, MemoryStore, PersistedCommunity, PersistedPartialDownload,
43 PersistedPublisherIdentity, PersistedState, PersistedSubscription, Store,
44 decrypt_secret_with_key, encrypt_secret_with_key,
45 },
46 wire::{PexOffer, PexRequest},
47};
48
49#[derive(Debug, Clone)]
50pub struct SearchQuery {
51 pub text: String,
52}
53
54#[derive(Debug, Clone)]
55pub struct SearchPageQuery {
56 pub text: String,
57 pub offset: usize,
58 pub limit: usize,
59 pub include_snippets: bool,
60}
61
62impl SearchPageQuery {
63 const DEFAULT_LIMIT: usize = 20;
64 const MAX_LIMIT: usize = 200;
65
66 fn normalized(&self) -> Self {
67 Self {
68 text: self.text.clone(),
69 offset: self.offset,
70 limit: self.limit.clamp(1, Self::MAX_LIMIT),
71 include_snippets: self.include_snippets,
72 }
73 }
74}
75
76impl From<SearchQuery> for SearchPageQuery {
77 fn from(value: SearchQuery) -> Self {
78 Self {
79 text: value.text,
80 offset: 0,
81 limit: SearchPageQuery::DEFAULT_LIMIT,
82 include_snippets: false,
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
88#[serde(rename_all = "snake_case")]
89pub enum SubscriptionTrustLevel {
90 Trusted,
91 #[default]
92 Normal,
93 Untrusted,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
97pub struct BlocklistRules {
98 pub blocked_share_ids: Vec<[u8; 32]>,
99 pub blocked_content_ids: Vec<[u8; 32]>,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125pub struct CommunityMembershipToken {
126 pub community_share_id: [u8; 32],
127 pub member_node_pubkey: [u8; 32],
128 pub issued_at: u64,
129 pub expires_at: u64,
130 #[serde(with = "serde_bytes")]
131 pub signature: Vec<u8>,
132}
133
134#[derive(Serialize)]
136struct MembershipTokenSignable([u8; 32], [u8; 32], u64, u64);
137
138impl CommunityMembershipToken {
139 pub fn issue(
144 community_signing_key: &SigningKey,
145 member_node_pubkey: [u8; 32],
146 issued_at: u64,
147 expires_at: u64,
148 ) -> anyhow::Result<Self> {
149 let community_pubkey = community_signing_key.verifying_key().to_bytes();
150 let community_share_id =
151 ShareId::from_pubkey(&VerifyingKey::from_bytes(&community_pubkey)?).0;
152
153 let signable = MembershipTokenSignable(
154 community_share_id,
155 member_node_pubkey,
156 issued_at,
157 expires_at,
158 );
159 let sig = community_signing_key.sign(&crate::cbor::to_vec(&signable)?);
160
161 Ok(Self {
162 community_share_id,
163 member_node_pubkey,
164 issued_at,
165 expires_at,
166 signature: sig.to_bytes().to_vec(),
167 })
168 }
169
170 pub fn verify(&self, community_pubkey: &[u8; 32], now_unix: Option<u64>) -> anyhow::Result<()> {
173 let vk = VerifyingKey::from_bytes(community_pubkey)?;
175 let expected_id = ShareId::from_pubkey(&vk).0;
176 if expected_id != self.community_share_id {
177 anyhow::bail!("community_share_id does not match community_pubkey");
178 }
179
180 if let Some(now) = now_unix
182 && now > self.expires_at
183 {
184 anyhow::bail!("community membership token expired");
185 }
186
187 if self.signature.len() != 64 {
189 anyhow::bail!("membership token signature must be 64 bytes");
190 }
191 let signable = MembershipTokenSignable(
192 self.community_share_id,
193 self.member_node_pubkey,
194 self.issued_at,
195 self.expires_at,
196 );
197 let mut sig_arr = [0u8; 64];
198 sig_arr.copy_from_slice(&self.signature);
199 vk.verify(
200 &crate::cbor::to_vec(&signable)?,
201 &ed25519_dalek::Signature::from_bytes(&sig_arr),
202 )?;
203 Ok(())
204 }
205}
206
207#[derive(Debug, Clone)]
209struct CommunityMembership {
210 pubkey: [u8; 32],
211 token: Option<CommunityMembershipToken>,
212 name: Option<String>,
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
217pub enum SearchTrustFilter {
218 #[default]
219 TrustedAndNormal,
220 TrustedOnly,
221 NormalOnly,
222 UntrustedOnly,
223 All,
224}
225
226impl SearchTrustFilter {
227 fn allows(self, trust_level: SubscriptionTrustLevel) -> bool {
228 match self {
229 Self::TrustedAndNormal => {
230 matches!(
231 trust_level,
232 SubscriptionTrustLevel::Trusted | SubscriptionTrustLevel::Normal
233 )
234 }
235 Self::TrustedOnly => matches!(trust_level, SubscriptionTrustLevel::Trusted),
236 Self::NormalOnly => matches!(trust_level, SubscriptionTrustLevel::Normal),
237 Self::UntrustedOnly => matches!(trust_level, SubscriptionTrustLevel::Untrusted),
238 Self::All => true,
239 }
240 }
241}
242
243#[derive(Debug, Clone)]
244pub struct SearchResult {
245 pub share_id: ShareId,
246 pub content_id: [u8; 32],
247 pub name: String,
248 pub snippet: Option<String>,
249 pub score: f32,
250}
251
252#[derive(Debug, Clone)]
253pub struct SearchPage {
254 pub total: usize,
255 pub results: Vec<SearchResult>,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct ShareItemInfo {
261 pub content_id: [u8; 32],
262 pub size: u64,
263 pub name: String,
264 pub path: Option<String>,
265 pub mime: Option<String>,
266}
267
268#[derive(Debug, Clone)]
271pub struct OwnedShareRecord {
272 pub share_id: [u8; 32],
273 pub share_pubkey: [u8; 32],
274 pub share_secret: [u8; 32],
276 pub latest_seq: u64,
277 pub manifest_id: [u8; 32],
278 pub title: Option<String>,
279 pub description: Option<String>,
280 pub visibility: ShareVisibility,
281 pub item_count: usize,
282 pub community_ids: Vec<[u8; 32]>,
283}
284
285#[derive(Clone)]
286pub struct NodeHandle {
287 state: Arc<RwLock<NodeState>>,
288 pub(crate) relay_tunnels: RelayTunnelRegistry,
294}
295
296struct NodeState {
297 runtime_config: NodeConfig,
298 subscriptions: HashMap<[u8; 32], SubscriptionState>,
299 communities: HashMap<[u8; 32], CommunityMembership>,
301 publisher_identities: HashMap<String, [u8; 32]>,
302 encrypted_publisher_secrets: HashMap<String, EncryptedSecret>,
306 node_key_encrypted_publisher_secrets: HashMap<String, EncryptedSecret>,
310 peer_db: PeerDb,
311 dht: Dht,
312 manifest_cache: HashMap<[u8; 32], ManifestV1>,
313 published_share_heads: HashMap<[u8; 32], ShareHead>,
314 search_index: SearchIndex,
315 share_weights: HashMap<[u8; 32], f32>,
316 content_catalog: HashMap<[u8; 32], ChunkedContent>,
317 content_paths: HashMap<[u8; 32], PathBuf>,
320 relay: RelayManager,
321 relay_scores: HashMap<String, i32>,
322 relay_rotation_cursor: usize,
323 abuse_counters: HashMap<String, AbuseCounter>,
324 abuse_limits: AbuseLimits,
325 partial_downloads: HashMap<[u8; 32], PersistedPartialDownload>,
326 encrypted_node_key: Option<EncryptedSecret>,
327 node_key: Option<[u8; 32]>,
329 enabled_blocklist_shares: HashSet<[u8; 32]>,
330 blocklist_rules_by_share: HashMap<[u8; 32], BlocklistRules>,
331 pinned_bootstrap_keys: HashMap<String, [u8; 32]>,
333 active_relay_slots: Vec<ActiveRelaySlot>,
336 store: Arc<dyn Store>,
337 dirty: DirtyFlags,
339}
340
341#[derive(Debug, Clone)]
343pub struct ActiveRelaySlot {
344 pub relay_addr: PeerAddr,
346 pub slot_id: u64,
348 pub expires_at: u64,
350}
351
352#[derive(Debug, Clone)]
353struct SubscriptionState {
354 share_pubkey: Option<[u8; 32]>,
355 latest_seq: u64,
356 latest_manifest_id: Option<[u8; 32]>,
357 trust_level: SubscriptionTrustLevel,
358}
359
360#[derive(Debug, Clone)]
361pub struct AbuseLimits {
362 pub window_secs: u64,
363 pub max_total_requests_per_window: u32,
364 pub max_dht_requests_per_window: u32,
365 pub max_fetch_requests_per_window: u32,
366 pub max_relay_requests_per_window: u32,
367 pub max_chunk_requests_per_window: u32,
371}
372
373impl Default for AbuseLimits {
374 fn default() -> Self {
375 Self {
376 window_secs: 60,
377 max_total_requests_per_window: 1200,
378 max_dht_requests_per_window: 600,
379 max_fetch_requests_per_window: 240,
380 max_relay_requests_per_window: 180,
381 max_chunk_requests_per_window: 600,
383 }
384 }
385}
386
387#[derive(Debug, Clone, Copy)]
388enum RequestClass {
389 Dht,
390 Fetch,
391 Relay,
392 Chunk,
394 Other,
395}
396
397#[derive(Debug, Clone)]
398struct AbuseCounter {
399 window_start_unix: u64,
400 total: u32,
401 dht: u32,
402 fetch: u32,
403 relay: u32,
404 chunk: u32,
406}
407
408pub struct Node;
409
410impl Node {
411 pub async fn start(config: NodeConfig) -> anyhow::Result<NodeHandle> {
412 Self::start_with_store(config, MemoryStore::new()).await
413 }
414
415 pub async fn start_with_store(
416 config: NodeConfig,
417 store: Arc<dyn Store>,
418 ) -> anyhow::Result<NodeHandle> {
419 let persisted = store.load_state().await?;
420 let state = NodeState::from_persisted(config, persisted, store)?;
421 Ok(NodeHandle {
422 state: Arc::new(RwLock::new(state)),
423 relay_tunnels: RelayTunnelRegistry::new(),
424 })
425 }
426}
427
428impl NodeState {
429 fn from_persisted(
430 runtime_config: NodeConfig,
431 persisted: PersistedState,
432 store: Arc<dyn Store>,
433 ) -> anyhow::Result<Self> {
434 let PersistedState {
435 peers,
436 subscriptions,
437 communities,
438 publisher_identities: publisher_identities_raw,
439 manifests,
440 share_heads,
441 share_weights,
442 search_index,
443 partial_downloads,
444 node_key,
445 encrypted_node_key,
446 enabled_blocklist_shares,
447 blocklist_rules_by_share,
448 content_paths: persisted_content_paths,
449 pinned_bootstrap_keys,
450 } = persisted;
451 let mut peer_db = PeerDb::default();
452 peer_db.replace_records(peers);
453 let subscriptions = subscriptions
454 .into_iter()
455 .map(|sub| {
456 (
457 sub.share_id,
458 SubscriptionState {
459 share_pubkey: sub.share_pubkey,
460 latest_seq: sub.latest_seq,
461 latest_manifest_id: sub.latest_manifest_id,
462 trust_level: sub.trust_level,
463 },
464 )
465 })
466 .collect::<HashMap<_, _>>();
467 let communities = communities
468 .into_iter()
469 .map(|community| {
470 let token = community.membership_token.as_ref().and_then(|bytes| {
471 crate::cbor::from_slice::<CommunityMembershipToken>(bytes).ok()
472 });
473 (
474 community.share_id,
475 CommunityMembership {
476 pubkey: community.share_pubkey,
477 token,
478 name: community.name,
479 },
480 )
481 })
482 .collect::<HashMap<_, _>>();
483 let mut publisher_identities: HashMap<String, [u8; 32]> = publisher_identities_raw
486 .iter()
487 .filter_map(|identity| {
488 identity
489 .share_secret
490 .map(|secret| (identity.label.clone(), secret))
491 })
492 .collect();
493 let mut node_key_encrypted_publisher_secrets: HashMap<String, EncryptedSecret> =
494 HashMap::new();
495 if runtime_config.auto_protect_publisher_keys
496 && let Some(ref nk) = node_key
497 {
498 for identity in &publisher_identities_raw {
499 if let Some(enc) = &identity.node_key_encrypted_share_secret {
500 node_key_encrypted_publisher_secrets
502 .insert(identity.label.clone(), enc.clone());
503 if !publisher_identities.contains_key(&identity.label)
505 && let Ok(plain) = decrypt_secret_with_key(enc, nk)
506 && plain.len() == 32
507 {
508 let mut arr = [0u8; 32];
509 arr.copy_from_slice(&plain);
510 publisher_identities.insert(identity.label.clone(), arr);
511 }
512 }
513 }
514 }
515
516 let content_paths: HashMap<[u8; 32], PathBuf> = persisted_content_paths
518 .into_iter()
519 .filter(|(_, path)| path.exists())
520 .collect();
521
522 let mut rebuilt_search_index = SearchIndex::default();
523 let mut content_catalog = HashMap::new();
524 for manifest in manifests.values() {
525 rebuilt_search_index.index_manifest(manifest);
526 for item in &manifest.items {
527 content_catalog.insert(
532 item.content_id,
533 ChunkedContent {
534 content_id: ContentId(item.content_id),
535 chunks: vec![],
536 chunk_count: item.chunk_count,
537 chunk_list_hash: item.chunk_list_hash,
538 },
539 );
540 }
541 }
542 let search_index = search_index
543 .map(SearchIndex::from_snapshot)
544 .unwrap_or(rebuilt_search_index);
545
546 let now = SystemTime::now()
547 .duration_since(SystemTime::UNIX_EPOCH)
548 .map(|d| d.as_secs())
549 .unwrap_or(0);
550 let mut dht = Dht::default();
551 for (share_id, head) in &share_heads {
552 if let Ok(encoded) = crate::cbor::to_vec(head) {
553 let _ = dht.store(
554 share_head_key(&ShareId(*share_id)),
555 encoded,
556 DEFAULT_TTL_SECS,
557 now,
558 );
559 }
560 }
561
562 Ok(Self {
563 runtime_config,
564 subscriptions,
565 communities,
566 publisher_identities,
567 encrypted_publisher_secrets: HashMap::new(),
568 node_key_encrypted_publisher_secrets,
569 peer_db,
570 dht,
571 manifest_cache: manifests,
572 published_share_heads: share_heads,
573 search_index,
574 share_weights,
575 content_catalog,
576 content_paths,
577 relay: RelayManager::default(),
578 relay_scores: HashMap::new(),
579 relay_rotation_cursor: 0,
580 abuse_counters: HashMap::new(),
581 abuse_limits: AbuseLimits::default(),
582 partial_downloads,
583 encrypted_node_key,
584 node_key,
585 enabled_blocklist_shares: enabled_blocklist_shares.into_iter().collect(),
586 blocklist_rules_by_share,
587 pinned_bootstrap_keys,
588 active_relay_slots: Vec::new(),
589 store,
590 dirty: DirtyFlags::default(),
591 })
592 }
593
594 fn to_persisted(&self) -> PersistedState {
595 let subscriptions = self
596 .subscriptions
597 .iter()
598 .map(|(share_id, sub)| PersistedSubscription {
599 share_id: *share_id,
600 share_pubkey: sub.share_pubkey,
601 latest_seq: sub.latest_seq,
602 latest_manifest_id: sub.latest_manifest_id,
603 trust_level: sub.trust_level,
604 })
605 .collect();
606 let communities = self
607 .communities
608 .iter()
609 .map(|(share_id, membership)| PersistedCommunity {
610 share_id: *share_id,
611 share_pubkey: membership.pubkey,
612 membership_token: membership
613 .token
614 .as_ref()
615 .and_then(|t| crate::cbor::to_vec(t).ok()),
616 name: membership.name.clone(),
617 })
618 .collect();
619 let publisher_identities = self
620 .publisher_identities
621 .iter()
622 .map(|(label, share_secret)| {
623 if let Some(encrypted) = self.encrypted_publisher_secrets.get(label) {
624 PersistedPublisherIdentity {
626 label: label.clone(),
627 share_secret: None,
628 encrypted_share_secret: Some(encrypted.clone()),
629 node_key_encrypted_share_secret: self
630 .node_key_encrypted_publisher_secrets
631 .get(label)
632 .cloned(),
633 }
634 } else {
635 PersistedPublisherIdentity {
636 label: label.clone(),
637 share_secret: if self
639 .node_key_encrypted_publisher_secrets
640 .contains_key(label)
641 {
642 None
643 } else {
644 Some(*share_secret)
645 },
646 encrypted_share_secret: None,
647 node_key_encrypted_share_secret: self
648 .node_key_encrypted_publisher_secrets
649 .get(label)
650 .cloned(),
651 }
652 }
653 })
654 .collect();
655 PersistedState {
656 peers: self.peer_db.all_records(),
657 subscriptions,
658 communities,
659 publisher_identities,
660 manifests: self.manifest_cache.clone(),
661 share_heads: self.published_share_heads.clone(),
662 share_weights: self.share_weights.clone(),
663 search_index: None,
664 partial_downloads: self.partial_downloads.clone(),
665 node_key: self.node_key,
666 encrypted_node_key: self.encrypted_node_key.clone(),
667 enabled_blocklist_shares: self.enabled_blocklist_shares.iter().copied().collect(),
668 blocklist_rules_by_share: self.blocklist_rules_by_share.clone(),
669 content_paths: self.content_paths.clone(),
670 pinned_bootstrap_keys: self.pinned_bootstrap_keys.clone(),
671 }
672 }
673
674 fn enforce_request_limits(
675 &mut self,
676 remote_peer: &PeerAddr,
677 class: RequestClass,
678 now_unix: u64,
679 ) -> anyhow::Result<()> {
680 let key = relay_peer_key(remote_peer);
681 let window = self.abuse_limits.window_secs.max(1);
682 let counter = self
683 .abuse_counters
684 .entry(key)
685 .or_insert_with(|| AbuseCounter {
686 window_start_unix: now_unix,
687 total: 0,
688 dht: 0,
689 fetch: 0,
690 relay: 0,
691 chunk: 0,
692 });
693
694 if now_unix.saturating_sub(counter.window_start_unix) >= window {
695 *counter = AbuseCounter {
696 window_start_unix: now_unix,
697 total: 0,
698 dht: 0,
699 fetch: 0,
700 relay: 0,
701 chunk: 0,
702 };
703 }
704
705 counter.total = counter.total.saturating_add(1);
706 match class {
707 RequestClass::Dht => counter.dht = counter.dht.saturating_add(1),
708 RequestClass::Fetch => counter.fetch = counter.fetch.saturating_add(1),
709 RequestClass::Relay => counter.relay = counter.relay.saturating_add(1),
710 RequestClass::Chunk => counter.chunk = counter.chunk.saturating_add(1),
711 RequestClass::Other => return Ok(()),
713 }
714
715 if counter.total > self.abuse_limits.max_total_requests_per_window {
716 anyhow::bail!("request rate limit exceeded");
717 }
718 if counter.dht > self.abuse_limits.max_dht_requests_per_window {
719 anyhow::bail!("dht request rate limit exceeded");
720 }
721 if counter.fetch > self.abuse_limits.max_fetch_requests_per_window {
722 anyhow::bail!("fetch request rate limit exceeded");
723 }
724 if counter.relay > self.abuse_limits.max_relay_requests_per_window {
725 anyhow::bail!("relay request rate limit exceeded");
726 }
727 if counter.chunk > self.abuse_limits.max_chunk_requests_per_window {
728 anyhow::bail!("chunk request rate limit exceeded");
729 }
730 Ok(())
731 }
732}
733
734impl NodeHandle {
735 pub async fn runtime_config(&self) -> NodeConfig {
736 self.state.read().await.runtime_config.clone()
737 }
738
739 pub async fn configured_bootstrap_peers(&self) -> anyhow::Result<Vec<PeerAddr>> {
740 let state = self.state.read().await;
741 let config = state.runtime_config.clone();
742 let pinned = &state.pinned_bootstrap_keys;
743 let mut peers = config
744 .bootstrap_peers
745 .iter()
746 .map(|entry| {
747 let (transport, addr_part) = if let Some(rest) = entry.strip_prefix("quic://") {
748 (TransportProtocol::Quic, rest)
749 } else if let Some(rest) = entry.strip_prefix("tcp://") {
750 (TransportProtocol::Tcp, rest)
751 } else {
752 (TransportProtocol::Tcp, entry.as_str())
753 };
754
755 let (socket_str, explicit_pubkey) = if let Some(at_pos) = addr_part.rfind('@') {
757 let hex_str = &addr_part[at_pos + 1..];
758 if hex_str.len() == 64 {
759 let mut key = [0u8; 32];
760 let ok = hex_str.as_bytes().chunks(2).enumerate().all(|(i, pair)| {
761 let hi = hex_nibble(pair[0]);
762 let lo = hex_nibble(pair[1]);
763 if let (Some(h), Some(l)) = (hi, lo) {
764 key[i] = (h << 4) | l;
765 true
766 } else {
767 false
768 }
769 });
770 if ok {
771 (&addr_part[..at_pos], Some(key))
772 } else {
773 (addr_part, None)
774 }
775 } else {
776 (addr_part, None)
777 }
778 } else {
779 (addr_part, None)
780 };
781
782 let socket: SocketAddr = socket_str.parse()?;
783
784 let pubkey_hint = explicit_pubkey.or_else(|| pinned.get(entry).copied());
786
787 Ok(PeerAddr {
788 ip: socket.ip(),
789 port: socket.port(),
790 transport,
791 pubkey_hint,
792 relay_via: None,
793 })
794 })
795 .collect::<anyhow::Result<Vec<_>>>()?;
796
797 let mut extra = Vec::new();
801 for peer in &peers {
802 let alt = match peer.transport {
803 TransportProtocol::Quic => PeerAddr {
804 port: peer.port.saturating_add(1),
805 transport: TransportProtocol::Tcp,
806 ..peer.clone()
807 },
808 TransportProtocol::Tcp => PeerAddr {
809 port: peer.port.saturating_sub(1),
810 transport: TransportProtocol::Quic,
811 ..peer.clone()
812 },
813 };
814 if !peers
815 .iter()
816 .any(|p| p.ip == alt.ip && p.port == alt.port && p.transport == alt.transport)
817 {
818 extra.push(alt);
819 }
820 }
821 peers.extend(extra);
822
823 peers.sort_by_key(|p| match p.transport {
827 TransportProtocol::Tcp => 0,
828 TransportProtocol::Quic => 1,
829 });
830 Ok(peers)
831 }
832
833 pub async fn pin_bootstrap_key(
839 &self,
840 peer_addr_str: &str,
841 observed_pubkey: [u8; 32],
842 ) -> anyhow::Result<()> {
843 let mut state = self.state.write().await;
844 if let Some(existing) = state.pinned_bootstrap_keys.get(peer_addr_str) {
845 if *existing != observed_pubkey {
846 anyhow::bail!(
847 "bootstrap peer identity mismatch for {peer_addr_str}: \
848 pinned key differs from observed key"
849 );
850 }
851 return Ok(());
853 }
854 state
855 .pinned_bootstrap_keys
856 .insert(peer_addr_str.to_string(), observed_pubkey);
857 state.dirty.peers = true;
858 Ok(())
859 }
860
861 pub async fn pinned_bootstrap_key(&self, peer_addr_str: &str) -> Option<[u8; 32]> {
863 self.state
864 .read()
865 .await
866 .pinned_bootstrap_keys
867 .get(peer_addr_str)
868 .copied()
869 }
870
871 pub async fn peer_records(&self) -> Vec<crate::peer_db::PeerRecord> {
872 self.state.read().await.peer_db.all_records()
873 }
874
875 pub async fn note_peer_outcome(&self, addr: &PeerAddr, success: bool) -> anyhow::Result<()> {
883 {
884 let mut state = self.state.write().await;
885 state.peer_db.note_outcome(addr, success);
886 state.dirty.peers = true;
887 }
888 helpers::persist_state(self).await
889 }
890
891 pub async fn subscriptions(&self) -> Vec<PersistedSubscription> {
892 let state = self.state.read().await;
893 state
894 .subscriptions
895 .iter()
896 .map(|(share_id, sub)| PersistedSubscription {
897 share_id: *share_id,
898 share_pubkey: sub.share_pubkey,
899 latest_seq: sub.latest_seq,
900 latest_manifest_id: sub.latest_manifest_id,
901 trust_level: sub.trust_level,
902 })
903 .collect()
904 }
905
906 pub async fn cached_manifest_meta(
908 &self,
909 manifest_id: &[u8; 32],
910 ) -> (Option<String>, Option<String>) {
911 let state = self.state.read().await;
912 match state.manifest_cache.get(manifest_id) {
913 Some(m) => (m.title.clone(), m.description.clone()),
914 None => (None, None),
915 }
916 }
917
918 pub async fn communities(&self) -> Vec<PersistedCommunity> {
919 let state = self.state.read().await;
920 state
921 .communities
922 .iter()
923 .map(|(share_id, membership)| PersistedCommunity {
924 share_id: *share_id,
925 share_pubkey: membership.pubkey,
926 membership_token: membership
927 .token
928 .as_ref()
929 .and_then(|t| crate::cbor::to_vec(t).ok()),
930 name: membership.name.clone(),
931 })
932 .collect()
933 }
934
935 pub async fn ensure_publisher_identity(&self, label: &str) -> anyhow::Result<ShareKeypair> {
936 let label = label.trim();
937 if label.is_empty() {
938 anyhow::bail!("publisher label must not be empty");
939 }
940
941 let mut needs_persist = false;
942 let secret = {
943 let mut state = self.state.write().await;
944 match state.publisher_identities.get(label).copied() {
945 Some(secret) => secret,
946 None => {
947 let mut rng = rand::rngs::OsRng;
948 let secret = SigningKey::generate(&mut rng).to_bytes();
949 state.publisher_identities.insert(label.to_string(), secret);
950 if state.runtime_config.auto_protect_publisher_keys
952 && let Some(nk) = state.node_key
953 && let Ok(enc) = encrypt_secret_with_key(secret.as_ref(), &nk)
954 {
955 state
956 .node_key_encrypted_publisher_secrets
957 .insert(label.to_string(), enc);
958 }
959 state.dirty.publisher_identities = true;
960 needs_persist = true;
961 secret
962 }
963 }
964 };
965 if needs_persist {
966 persist_state(self).await?;
967 }
968 Ok(ShareKeypair::new(SigningKey::from_bytes(&secret)))
969 }
970
971 pub async fn auto_protect_publisher_identities(&self) -> anyhow::Result<()> {
976 let changed = {
977 let mut state = self.state.write().await;
978 if !state.runtime_config.auto_protect_publisher_keys {
979 return Ok(());
980 }
981 let Some(nk) = state.node_key else {
982 return Ok(());
983 };
984 let mut changed = false;
985 let labels: Vec<_> = state.publisher_identities.keys().cloned().collect();
986 for label in labels {
987 if state
988 .node_key_encrypted_publisher_secrets
989 .contains_key(&label)
990 {
991 continue;
992 }
993 let secret = state.publisher_identities[&label];
994 if let Ok(enc) = encrypt_secret_with_key(secret.as_ref(), &nk) {
995 state
996 .node_key_encrypted_publisher_secrets
997 .insert(label, enc);
998 changed = true;
999 }
1000 }
1001 if changed {
1002 state.dirty.publisher_identities = true;
1003 }
1004 changed
1005 };
1006 if changed {
1007 persist_state(self).await?;
1008 }
1009 Ok(())
1010 }
1011
1012 pub async fn ensure_node_identity(&self) -> anyhow::Result<SigningKey> {
1020 let mut needs_persist = false;
1021 let secret = {
1022 let mut state = self.state.write().await;
1023 match state.node_key {
1024 Some(key) => key,
1025 None => {
1026 let mut rng = rand::rngs::OsRng;
1027 let key = SigningKey::generate(&mut rng).to_bytes();
1028 state.node_key = Some(key);
1029 state.dirty.node_key = true;
1030 needs_persist = true;
1031 key
1032 }
1033 }
1034 };
1035 if needs_persist {
1036 persist_state(self).await?;
1037 }
1038 Ok(SigningKey::from_bytes(&secret))
1039 }
1040
1041 pub async fn list_local_public_shares(
1042 &self,
1043 max_entries: usize,
1044 ) -> anyhow::Result<Vec<PublicShareSummary>> {
1045 let state = self.state.read().await;
1046 let mut heads = state
1047 .published_share_heads
1048 .values()
1049 .cloned()
1050 .collect::<Vec<_>>();
1051 heads.sort_by(|a, b| {
1052 b.updated_at
1053 .cmp(&a.updated_at)
1054 .then(b.latest_seq.cmp(&a.latest_seq))
1055 .then(a.share_id.cmp(&b.share_id))
1056 });
1057
1058 let mut shares = Vec::new();
1059 for head in heads {
1060 let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id) else {
1061 continue;
1062 };
1063 if manifest.visibility != ShareVisibility::Public {
1064 continue;
1065 }
1066 shares.push(PublicShareSummary {
1067 share_id: manifest.share_id,
1068 share_pubkey: manifest.share_pubkey,
1069 latest_seq: head.latest_seq,
1070 latest_manifest_id: head.latest_manifest_id,
1071 title: manifest.title.clone(),
1072 description: manifest.description.clone(),
1073 });
1074 if shares.len() >= max_entries {
1075 break;
1076 }
1077 }
1078 Ok(shares)
1079 }
1080
1081 pub async fn published_share_head(&self, share_id: ShareId) -> Option<ShareHead> {
1082 self.state
1083 .read()
1084 .await
1085 .published_share_heads
1086 .get(&share_id.0)
1087 .cloned()
1088 }
1089
1090 pub async fn list_owned_shares(&self) -> Vec<OwnedShareRecord> {
1093 let state = self.state.read().await;
1094 let mut records = Vec::new();
1095 for secret in state.publisher_identities.values() {
1096 let signing_key = SigningKey::from_bytes(secret);
1097 let verifying_key = signing_key.verifying_key();
1098 let share_id = ShareId::from_pubkey(&verifying_key);
1099 let Some(head) = state.published_share_heads.get(&share_id.0) else {
1100 continue;
1101 };
1102 let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id) else {
1103 continue;
1104 };
1105 records.push(OwnedShareRecord {
1106 share_id: share_id.0,
1107 share_pubkey: verifying_key.to_bytes(),
1108 share_secret: *secret,
1109 latest_seq: head.latest_seq,
1110 manifest_id: head.latest_manifest_id,
1111 title: manifest.title.clone(),
1112 description: manifest.description.clone(),
1113 visibility: manifest.visibility,
1114 item_count: manifest.items.len(),
1115 community_ids: manifest.communities.clone(),
1116 });
1117 }
1118 records
1119 }
1120
1121 pub async fn delete_published_share(&self, share_id: ShareId) -> anyhow::Result<()> {
1124 let store = {
1125 let mut state = self.state.write().await;
1126 if let Some(head) = state.published_share_heads.remove(&share_id.0) {
1127 state.manifest_cache.remove(&head.latest_manifest_id);
1128 }
1129 state.dirty.manifests = true;
1130 state.dirty.share_heads = true;
1131 state.store.clone()
1132 };
1133 store.remove_share_from_search(share_id.0).await?;
1134 persist_state(self).await
1135 }
1136
1137 pub async fn update_share_visibility(
1139 &self,
1140 share_id: ShareId,
1141 new_visibility: ShareVisibility,
1142 ) -> anyhow::Result<()> {
1143 let (manifest, keypair) = {
1144 let state = self.state.read().await;
1145 let head = state
1146 .published_share_heads
1147 .get(&share_id.0)
1148 .ok_or_else(|| anyhow::anyhow!("share not found in published heads"))?
1149 .clone();
1150 let manifest = state
1151 .manifest_cache
1152 .get(&head.latest_manifest_id)
1153 .ok_or_else(|| anyhow::anyhow!("manifest not found in cache"))?
1154 .clone();
1155 let keypair = state
1157 .publisher_identities
1158 .values()
1159 .find_map(|secret| {
1160 let sk = SigningKey::from_bytes(secret);
1161 let vk = sk.verifying_key();
1162 if ShareId::from_pubkey(&vk).0 == share_id.0 {
1163 Some(ShareKeypair::new(sk))
1164 } else {
1165 None
1166 }
1167 })
1168 .ok_or_else(|| anyhow::anyhow!("no signing key for share"))?;
1169 (manifest, keypair)
1170 };
1171 let mut new_manifest = manifest;
1172 new_manifest.seq = new_manifest.seq.saturating_add(1);
1173 new_manifest.visibility = new_visibility;
1174 new_manifest.signature = None;
1175 self.publish_share(new_manifest, &keypair).await?;
1176 Ok(())
1177 }
1178
1179 pub async fn list_local_community_public_shares(
1180 &self,
1181 community_share_id: ShareId,
1182 community_share_pubkey: [u8; 32],
1183 max_entries: usize,
1184 requester_node_pubkey: Option<[u8; 32]>,
1185 requester_membership_proof: Option<&[u8]>,
1186 ) -> anyhow::Result<Vec<PublicShareSummary>> {
1187 let pubkey = VerifyingKey::from_bytes(&community_share_pubkey)?;
1188 if ShareId::from_pubkey(&pubkey) != community_share_id {
1189 anyhow::bail!("community share_id does not match share_pubkey");
1190 }
1191
1192 let state = self.state.read().await;
1193 if !state.communities.contains_key(&community_share_id.0) {
1194 return Ok(Vec::new());
1195 }
1196
1197 if state.runtime_config.community_strict_mode {
1199 let rpk = requester_node_pubkey.ok_or_else(|| {
1200 anyhow::anyhow!("community strict mode: requester_node_pubkey required")
1201 })?;
1202 let proof_bytes = requester_membership_proof.ok_or_else(|| {
1203 anyhow::anyhow!("community strict mode: membership proof required")
1204 })?;
1205 let token: CommunityMembershipToken =
1206 crate::cbor::from_slice(proof_bytes).map_err(|_| {
1207 anyhow::anyhow!("community strict mode: invalid membership proof encoding")
1208 })?;
1209 if token.member_node_pubkey != rpk {
1210 anyhow::bail!(
1211 "community strict mode: proof member_node_pubkey does not match requester"
1212 );
1213 }
1214 let now = now_unix_secs()?;
1215 token.verify(&community_share_pubkey, Some(now))?;
1216 }
1217
1218 let mut heads = state
1219 .published_share_heads
1220 .values()
1221 .cloned()
1222 .collect::<Vec<_>>();
1223 heads.sort_by(|a, b| {
1224 b.updated_at
1225 .cmp(&a.updated_at)
1226 .then(b.latest_seq.cmp(&a.latest_seq))
1227 .then(a.share_id.cmp(&b.share_id))
1228 });
1229 let mut shares = Vec::new();
1230 for head in heads {
1231 let Some(manifest) = state.manifest_cache.get(&head.latest_manifest_id) else {
1232 continue;
1233 };
1234 if manifest.visibility != ShareVisibility::Public {
1235 continue;
1236 }
1237 if !manifest.communities.contains(&community_share_id.0) {
1238 continue;
1239 }
1240 shares.push(PublicShareSummary {
1241 share_id: manifest.share_id,
1242 share_pubkey: manifest.share_pubkey,
1243 latest_seq: head.latest_seq,
1244 latest_manifest_id: head.latest_manifest_id,
1245 title: manifest.title.clone(),
1246 description: manifest.description.clone(),
1247 });
1248 if shares.len() >= max_entries {
1249 break;
1250 }
1251 }
1252 Ok(shares)
1253 }
1254
1255 pub async fn fetch_public_shares_from_peer<T: RequestTransport + ?Sized>(
1256 &self,
1257 transport: &T,
1258 peer: &PeerAddr,
1259 max_entries: u16,
1260 ) -> anyhow::Result<Vec<PublicShareSummary>> {
1261 query_public_shares(transport, peer, max_entries).await
1262 }
1263
1264 pub async fn fetch_community_status_from_peer<T: RequestTransport + ?Sized>(
1265 &self,
1266 transport: &T,
1267 peer: &PeerAddr,
1268 share_id: ShareId,
1269 share_pubkey: [u8; 32],
1270 ) -> anyhow::Result<(bool, Option<String>)> {
1271 let result = query_community_status(transport, peer, share_id, share_pubkey).await?;
1272 Ok((result.joined, result.name))
1273 }
1274
1275 pub async fn fetch_community_public_shares_from_peer<T: RequestTransport + ?Sized>(
1276 &self,
1277 transport: &T,
1278 peer: &PeerAddr,
1279 community_share_id: ShareId,
1280 community_share_pubkey: [u8; 32],
1281 max_entries: u16,
1282 ) -> anyhow::Result<Vec<PublicShareSummary>> {
1283 let (requester_node_pubkey, requester_membership_proof) = {
1285 let state = self.state.read().await;
1286 let node_pubkey = state
1287 .node_key
1288 .map(|k| SigningKey::from_bytes(&k).verifying_key().to_bytes());
1289 let proof = state
1290 .communities
1291 .get(&community_share_id.0)
1292 .and_then(|m| m.token.as_ref())
1293 .and_then(|t| crate::cbor::to_vec(t).ok());
1294 (node_pubkey, proof)
1295 };
1296 query_community_public_shares(
1297 transport,
1298 peer,
1299 community_share_id,
1300 community_share_pubkey,
1301 max_entries,
1302 requester_node_pubkey,
1303 requester_membership_proof,
1304 )
1305 .await
1306 }
1307
1308 pub async fn connect(&self, peer_addr: PeerAddr) -> anyhow::Result<()> {
1309 self.record_peer_seen(peer_addr).await
1310 }
1311
1312 pub async fn record_peer_seen(&self, peer_addr: PeerAddr) -> anyhow::Result<()> {
1313 {
1314 let mut state = self.state.write().await;
1315 state.peer_db.upsert_seen(peer_addr, now_unix_secs()?);
1316 state.dirty.peers = true;
1317 }
1318 persist_state(self).await
1319 }
1320
1321 pub async fn record_peer_seen_with_capabilities(
1327 &self,
1328 peer_addr: PeerAddr,
1329 capabilities: crate::Capabilities,
1330 ) -> anyhow::Result<()> {
1331 {
1332 let mut state = self.state.write().await;
1333 state
1334 .peer_db
1335 .upsert_seen_with_capabilities(peer_addr, now_unix_secs()?, capabilities);
1336 state.dirty.peers = true;
1337 }
1338 persist_state(self).await
1339 }
1340
1341 pub async fn apply_pex_offer(&self, offer: PexOffer) -> anyhow::Result<usize> {
1342 let count = {
1343 let mut state = self.state.write().await;
1344 let now = now_unix_secs()?;
1345 for addr in offer.peers {
1346 state.peer_db.upsert_seen(addr, now);
1347 }
1348 state.dirty.peers = true;
1349 state.peer_db.total_known_peers()
1350 };
1351 persist_state(self).await?;
1352 Ok(count)
1353 }
1354
1355 pub async fn build_pex_offer(&self, req: PexRequest) -> anyhow::Result<PexOffer> {
1356 let state = self.state.read().await;
1357 let peers = state
1358 .peer_db
1359 .sample_fresh(now_unix_secs()?, usize::from(req.max_peers));
1360 Ok(PexOffer { peers })
1361 }
1362 pub async fn set_share_weight(&self, share_id: ShareId, weight: f32) -> anyhow::Result<()> {
1363 {
1364 let mut state = self.state.write().await;
1365 state.share_weights.insert(share_id.0, weight.max(0.0));
1366 state.dirty.share_weights = true;
1367 }
1368 persist_state(self).await
1369 }
1370
1371 pub async fn subscribe(&self, share_id: ShareId) -> anyhow::Result<()> {
1372 self.subscribe_with_options(share_id, None, SubscriptionTrustLevel::Normal)
1373 .await
1374 }
1375
1376 pub async fn join_community(
1377 &self,
1378 share_id: ShareId,
1379 share_pubkey: [u8; 32],
1380 ) -> anyhow::Result<()> {
1381 self.join_community_with_options(share_id, share_pubkey, None, None)
1382 .await
1383 }
1384
1385 pub async fn join_community_named(
1387 &self,
1388 share_id: ShareId,
1389 share_pubkey: [u8; 32],
1390 name: &str,
1391 ) -> anyhow::Result<()> {
1392 self.join_community_with_options(share_id, share_pubkey, None, Some(name.to_owned()))
1393 .await
1394 }
1395
1396 pub async fn join_community_with_token(
1403 &self,
1404 share_id: ShareId,
1405 share_pubkey: [u8; 32],
1406 token: Option<CommunityMembershipToken>,
1407 ) -> anyhow::Result<()> {
1408 self.join_community_with_options(share_id, share_pubkey, token, None)
1409 .await
1410 }
1411
1412 pub async fn join_community_with_options(
1414 &self,
1415 share_id: ShareId,
1416 share_pubkey: [u8; 32],
1417 token: Option<CommunityMembershipToken>,
1418 name: Option<String>,
1419 ) -> anyhow::Result<()> {
1420 let pubkey = VerifyingKey::from_bytes(&share_pubkey)?;
1421 let derived = ShareId::from_pubkey(&pubkey);
1422 if derived != share_id {
1423 anyhow::bail!("community share_id does not match share_pubkey");
1424 }
1425 if let Some(ref tok) = token {
1427 if tok.community_share_id != share_id.0 {
1428 anyhow::bail!("membership token community_share_id mismatch");
1429 }
1430 tok.verify(&share_pubkey, None)?;
1431 }
1432 let mut state = self.state.write().await;
1433 state.communities.insert(
1434 share_id.0,
1435 CommunityMembership {
1436 pubkey: share_pubkey,
1437 token,
1438 name,
1439 },
1440 );
1441 state.dirty.communities = true;
1442 drop(state);
1443 persist_state(self).await
1444 }
1445
1446 pub async fn leave_community(&self, share_id: ShareId) -> anyhow::Result<()> {
1447 {
1448 let mut state = self.state.write().await;
1449 state.communities.remove(&share_id.0);
1450 state.dirty.communities = true;
1451 }
1452 persist_state(self).await
1453 }
1454
1455 pub async fn update_community_name(&self, share_id: ShareId, name: &str) -> anyhow::Result<()> {
1459 let mut state = self.state.write().await;
1460 if let Some(membership) = state.communities.get_mut(&share_id.0)
1461 && membership.name.is_none()
1462 {
1463 membership.name = Some(name.to_owned());
1464 state.dirty.communities = true;
1465 }
1466 drop(state);
1467 persist_state(self).await
1468 }
1469
1470 pub async fn subscribe_with_pubkey(
1471 &self,
1472 share_id: ShareId,
1473 share_pubkey: Option<[u8; 32]>,
1474 ) -> anyhow::Result<()> {
1475 self.subscribe_with_options(share_id, share_pubkey, SubscriptionTrustLevel::Normal)
1476 .await
1477 }
1478
1479 pub async fn subscribe_with_trust(
1480 &self,
1481 share_id: ShareId,
1482 share_pubkey: Option<[u8; 32]>,
1483 trust_level: SubscriptionTrustLevel,
1484 ) -> anyhow::Result<()> {
1485 self.subscribe_with_options(share_id, share_pubkey, trust_level)
1486 .await
1487 }
1488
1489 pub async fn set_subscription_trust_level(
1490 &self,
1491 share_id: ShareId,
1492 trust_level: SubscriptionTrustLevel,
1493 ) -> anyhow::Result<()> {
1494 {
1495 let mut state = self.state.write().await;
1496 let Some(sub) = state.subscriptions.get_mut(&share_id.0) else {
1497 anyhow::bail!("subscription not found");
1498 };
1499 sub.trust_level = trust_level;
1500 state.dirty.subscriptions = true;
1501 }
1502 persist_state(self).await
1503 }
1504
1505 pub async fn set_blocklist_rules(
1506 &self,
1507 blocklist_share_id: ShareId,
1508 rules: BlocklistRules,
1509 ) -> anyhow::Result<()> {
1510 {
1511 let mut state = self.state.write().await;
1512 state
1513 .blocklist_rules_by_share
1514 .insert(blocklist_share_id.0, rules);
1515 state.dirty.blocklist = true;
1516 }
1517 persist_state(self).await
1518 }
1519
1520 pub async fn clear_blocklist_rules(&self, blocklist_share_id: ShareId) -> anyhow::Result<()> {
1521 {
1522 let mut state = self.state.write().await;
1523 state.blocklist_rules_by_share.remove(&blocklist_share_id.0);
1524 state.enabled_blocklist_shares.remove(&blocklist_share_id.0);
1525 state.dirty.blocklist = true;
1526 }
1527 persist_state(self).await
1528 }
1529
1530 pub async fn enable_blocklist_share(&self, blocklist_share_id: ShareId) -> anyhow::Result<()> {
1531 {
1532 let mut state = self.state.write().await;
1533 if !state.subscriptions.contains_key(&blocklist_share_id.0) {
1534 anyhow::bail!("blocklist share must be subscribed before enabling");
1535 }
1536 state.enabled_blocklist_shares.insert(blocklist_share_id.0);
1537 state.dirty.blocklist = true;
1538 }
1539 persist_state(self).await
1540 }
1541
1542 pub async fn disable_blocklist_share(&self, blocklist_share_id: ShareId) -> anyhow::Result<()> {
1543 {
1544 let mut state = self.state.write().await;
1545 state.enabled_blocklist_shares.remove(&blocklist_share_id.0);
1546 state.dirty.blocklist = true;
1547 }
1548 persist_state(self).await
1549 }
1550
1551 async fn subscribe_with_options(
1552 &self,
1553 share_id: ShareId,
1554 share_pubkey: Option<[u8; 32]>,
1555 trust_level: SubscriptionTrustLevel,
1556 ) -> anyhow::Result<()> {
1557 {
1558 let mut state = self.state.write().await;
1559 if !state.subscriptions.contains_key(&share_id.0)
1562 && state.subscriptions.len() >= state.runtime_config.max_subscriptions
1563 {
1564 anyhow::bail!(
1565 "subscription limit of {} reached; unsubscribe from another share first",
1566 state.runtime_config.max_subscriptions
1567 );
1568 }
1569 state
1570 .subscriptions
1571 .entry(share_id.0)
1572 .or_insert(SubscriptionState {
1573 share_pubkey,
1574 latest_seq: 0,
1575 latest_manifest_id: None,
1576 trust_level,
1577 });
1578 state.dirty.subscriptions = true;
1579 }
1580 persist_state(self).await
1581 }
1582
1583 pub async fn unsubscribe(&self, share_id: ShareId) -> anyhow::Result<()> {
1584 let store = {
1585 let mut state = self.state.write().await;
1586 state.subscriptions.remove(&share_id.0);
1587 state.search_index.remove_share(share_id.0);
1588 state.enabled_blocklist_shares.remove(&share_id.0);
1589 state.blocklist_rules_by_share.remove(&share_id.0);
1590 state.dirty.subscriptions = true;
1591 state.dirty.blocklist = true;
1592 state.store.clone()
1593 };
1594 store.remove_share_from_search(share_id.0).await?;
1595 persist_state(self).await
1596 }
1597}