1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10 LocalListFileState, LocalListSyncOutcome,
11};
12
13mod index_buckets;
14
15use index_buckets::{
16 dedupe_events, latest_metadata_events_by_pubkey, EventIndexBucket, ProfileIndexBucket,
17};
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, Mutex as StdMutex};
22
23use anyhow::{Context, Result};
24use bytes::Bytes;
25use futures::executor::block_on;
26use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
27use hashtree_index::BTree;
28use hashtree_nostr::{
29 is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
30 NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
31};
32#[cfg(test)]
33use hashtree_nostr::{
34 reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
35 take_profile as take_nostr_profile,
36};
37use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
38use nostr_social_graph::{
39 BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
40 SocialGraphBackend as NostrSocialGraphBackend,
41};
42use nostr_social_graph_heed::HeedSocialGraph;
43
44use crate::storage::{LocalStore, StorageRouter};
45
46#[cfg(test)]
47use std::sync::{Mutex, MutexGuard, OnceLock};
48#[cfg(test)]
49use std::time::Instant;
50
51pub type UserSet = BTreeSet<[u8; 32]>;
52
53const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
54const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
55const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
56const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
57const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
58const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
59const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
60const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
61const SOCIALGRAPH_MAX_DBS: u32 = 16;
62const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
63const PROFILE_SEARCH_PREFIX: &str = "p:";
64const PROFILE_NAME_MAX_LENGTH: usize = 100;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum EventStorageClass {
68 Public,
69 Ambient,
70}
71
72#[cfg_attr(not(test), allow(dead_code))]
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub(crate) enum EventQueryScope {
75 PublicOnly,
76 AmbientOnly,
77 All,
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81struct StoredCid {
82 hash: [u8; 32],
83 key: Option<[u8; 32]>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
87pub struct StoredProfileSearchEntry {
88 pub pubkey: String,
89 pub name: String,
90 #[serde(default)]
91 pub aliases: Vec<String>,
92 #[serde(default)]
93 pub nip05: Option<String>,
94 pub created_at: u64,
95 pub event_nhash: String,
96}
97
98#[derive(Debug, Clone, Default, serde::Serialize)]
99pub struct SocialGraphStats {
100 pub total_users: usize,
101 pub root: Option<String>,
102 pub total_follows: usize,
103 pub max_depth: u32,
104 pub size_by_distance: BTreeMap<u32, usize>,
105 pub enabled: bool,
106}
107
108#[derive(Debug, Clone)]
109struct DistanceCache {
110 stats: SocialGraphStats,
111 users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
112}
113
114#[derive(Debug, thiserror::Error)]
115#[error("{0}")]
116pub struct UpstreamGraphBackendError(String);
117
118pub struct SocialGraphStore {
119 graph: StdMutex<HeedSocialGraph>,
120 distance_cache: StdMutex<Option<DistanceCache>>,
121 public_events: EventIndexBucket,
122 ambient_events: EventIndexBucket,
123 profile_index: ProfileIndexBucket,
124 profile_index_overmute_threshold: StdMutex<f64>,
125}
126
127pub trait SocialGraphBackend: Send + Sync {
128 fn stats(&self) -> Result<SocialGraphStats>;
129 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
130 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
131 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
132 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
133 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
134 fn profile_search_root(&self) -> Result<Option<Cid>> {
135 Ok(None)
136 }
137 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
138 fn ingest_event(&self, event: &Event) -> Result<()>;
139 fn ingest_event_with_storage_class(
140 &self,
141 event: &Event,
142 storage_class: EventStorageClass,
143 ) -> Result<()> {
144 let _ = storage_class;
145 self.ingest_event(event)
146 }
147 fn ingest_events(&self, events: &[Event]) -> Result<()> {
148 for event in events {
149 self.ingest_event(event)?;
150 }
151 Ok(())
152 }
153 fn ingest_events_with_storage_class(
154 &self,
155 events: &[Event],
156 storage_class: EventStorageClass,
157 ) -> Result<()> {
158 for event in events {
159 self.ingest_event_with_storage_class(event, storage_class)?;
160 }
161 Ok(())
162 }
163 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
164 self.ingest_events(events)
165 }
166 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
167}
168
169#[cfg(test)]
170pub type TestLockGuard = MutexGuard<'static, ()>;
171
172#[cfg(test)]
173static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
174
175#[cfg(test)]
176pub fn test_lock() -> TestLockGuard {
177 NDB_TEST_LOCK
178 .get_or_init(|| Mutex::new(()))
179 .lock()
180 .unwrap_or_else(|err| err.into_inner())
181}
182
183pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
184 open_social_graph_store_with_mapsize(data_dir, None)
185}
186
187pub fn open_social_graph_store_with_mapsize(
188 data_dir: &Path,
189 mapsize_bytes: Option<u64>,
190) -> Result<Arc<SocialGraphStore>> {
191 let db_dir = data_dir.join("socialgraph");
192 open_social_graph_store_at_path(&db_dir, mapsize_bytes)
193}
194
195pub fn open_social_graph_store_with_storage(
196 data_dir: &Path,
197 store: Arc<StorageRouter>,
198 mapsize_bytes: Option<u64>,
199) -> Result<Arc<SocialGraphStore>> {
200 let db_dir = data_dir.join("socialgraph");
201 open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
202}
203
204pub fn open_social_graph_store_at_path(
205 db_dir: &Path,
206 mapsize_bytes: Option<u64>,
207) -> Result<Arc<SocialGraphStore>> {
208 let config = hashtree_config::Config::load_or_default();
209 let backend = &config.storage.backend;
210 let local_store = Arc::new(
211 LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
212 .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
213 );
214 let store = Arc::new(StorageRouter::new(local_store));
215 open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
216}
217
218pub fn open_social_graph_store_at_path_with_storage(
219 db_dir: &Path,
220 store: Arc<StorageRouter>,
221 mapsize_bytes: Option<u64>,
222) -> Result<Arc<SocialGraphStore>> {
223 let ambient_backend = store.local_store().backend();
224 let ambient_local = Arc::new(
225 LocalStore::new_with_lmdb_map_size(
226 db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
227 &ambient_backend,
228 mapsize_bytes,
229 )
230 .map_err(|err| {
231 anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
232 })?,
233 );
234 let ambient_store = Arc::new(StorageRouter::new(ambient_local));
235 open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
236}
237
238pub fn open_social_graph_store_at_path_with_storage_split(
239 db_dir: &Path,
240 public_store: Arc<StorageRouter>,
241 ambient_store: Arc<StorageRouter>,
242 mapsize_bytes: Option<u64>,
243) -> Result<Arc<SocialGraphStore>> {
244 std::fs::create_dir_all(db_dir)?;
245 if let Some(size) = mapsize_bytes {
246 ensure_social_graph_mapsize(db_dir, size)?;
247 }
248 let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
249 .context("open nostr-social-graph heed backend")?;
250
251 Ok(Arc::new(SocialGraphStore {
252 graph: StdMutex::new(graph),
253 distance_cache: StdMutex::new(None),
254 public_events: EventIndexBucket {
255 event_store: NostrEventStore::new(Arc::clone(&public_store)),
256 root_path: db_dir.join(EVENTS_ROOT_FILE),
257 },
258 ambient_events: EventIndexBucket {
259 event_store: NostrEventStore::new(ambient_store),
260 root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
261 },
262 profile_index: ProfileIndexBucket {
263 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
264 index: BTree::new(
265 public_store,
266 hashtree_index::BTreeOptions {
267 order: Some(PROFILE_SEARCH_INDEX_ORDER),
268 },
269 ),
270 by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
271 search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
272 },
273 profile_index_overmute_threshold: StdMutex::new(1.0),
274 }))
275}
276
277pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
278 if let Err(err) = store.set_root(pk_bytes) {
279 tracing::warn!("Failed to set social graph root: {err}");
280 }
281}
282
283pub fn get_follow_distance(
284 backend: &(impl SocialGraphBackend + ?Sized),
285 pk_bytes: &[u8; 32],
286) -> Option<u32> {
287 backend.follow_distance(pk_bytes).ok().flatten()
288}
289
290pub fn get_follows(
291 backend: &(impl SocialGraphBackend + ?Sized),
292 pk_bytes: &[u8; 32],
293) -> Vec<[u8; 32]> {
294 match backend.followed_targets(pk_bytes) {
295 Ok(set) => set.into_iter().collect(),
296 Err(_) => Vec::new(),
297 }
298}
299
300pub fn is_overmuted(
301 backend: &(impl SocialGraphBackend + ?Sized),
302 _root_pk: &[u8; 32],
303 user_pk: &[u8; 32],
304 threshold: f64,
305) -> bool {
306 backend
307 .is_overmuted_user(user_pk, threshold)
308 .unwrap_or(false)
309}
310
311pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
312 let event = match Event::from_json(event_json) {
313 Ok(event) => event,
314 Err(_) => return,
315 };
316
317 if let Err(err) = backend.ingest_event(&event) {
318 tracing::warn!("Failed to ingest social graph event: {err}");
319 }
320}
321
322pub fn ingest_parsed_event(
323 backend: &(impl SocialGraphBackend + ?Sized),
324 event: &Event,
325) -> Result<()> {
326 backend.ingest_event(event)
327}
328
329pub fn ingest_parsed_event_with_storage_class(
330 backend: &(impl SocialGraphBackend + ?Sized),
331 event: &Event,
332 storage_class: EventStorageClass,
333) -> Result<()> {
334 backend.ingest_event_with_storage_class(event, storage_class)
335}
336
337pub fn ingest_parsed_events(
338 backend: &(impl SocialGraphBackend + ?Sized),
339 events: &[Event],
340) -> Result<()> {
341 backend.ingest_events(events)
342}
343
344pub fn ingest_parsed_events_with_storage_class(
345 backend: &(impl SocialGraphBackend + ?Sized),
346 events: &[Event],
347 storage_class: EventStorageClass,
348) -> Result<()> {
349 backend.ingest_events_with_storage_class(events, storage_class)
350}
351
352pub fn ingest_graph_parsed_events(
353 backend: &(impl SocialGraphBackend + ?Sized),
354 events: &[Event],
355) -> Result<()> {
356 backend.ingest_graph_events(events)
357}
358
359pub fn query_events(
360 backend: &(impl SocialGraphBackend + ?Sized),
361 filter: &Filter,
362 limit: usize,
363) -> Vec<Event> {
364 backend.query_events(filter, limit).unwrap_or_default()
365}
366
367impl SocialGraphStore {
368 pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
369 *self
370 .profile_index_overmute_threshold
371 .lock()
372 .expect("profile index overmute threshold") = threshold;
373 }
374
375 fn profile_index_overmute_threshold(&self) -> f64 {
376 *self
377 .profile_index_overmute_threshold
378 .lock()
379 .expect("profile index overmute threshold")
380 }
381
382 fn invalidate_distance_cache(&self) {
383 *self.distance_cache.lock().unwrap() = None;
384 }
385
386 fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
387 let unique_ids = state
388 .unique_ids
389 .into_iter()
390 .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
391 .collect::<Result<HashMap<_, _>>>()?;
392
393 let mut users_by_distance = BTreeMap::new();
394 let mut size_by_distance = BTreeMap::new();
395 for (distance, users) in state.users_by_follow_distance {
396 let decoded = users
397 .into_iter()
398 .filter_map(|id| unique_ids.get(&id).copied())
399 .collect::<Vec<_>>();
400 size_by_distance.insert(distance, decoded.len());
401 users_by_distance.insert(distance, decoded);
402 }
403
404 let total_follows = state
405 .followed_by_user
406 .iter()
407 .map(|(_, targets)| targets.len())
408 .sum::<usize>();
409 let total_users = size_by_distance.values().copied().sum();
410 let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
411
412 Ok(DistanceCache {
413 stats: SocialGraphStats {
414 total_users,
415 root: Some(state.root),
416 total_follows,
417 max_depth,
418 size_by_distance,
419 enabled: true,
420 },
421 users_by_distance,
422 })
423 }
424
425 fn load_distance_cache(&self) -> Result<DistanceCache> {
426 if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
427 return Ok(cache);
428 }
429
430 let state = {
431 let graph = self.graph.lock().unwrap();
432 graph.export_state().context("export social graph state")?
433 };
434 let cache = Self::build_distance_cache(state)?;
435 *self.distance_cache.lock().unwrap() = Some(cache.clone());
436 Ok(cache)
437 }
438
439 fn set_root(&self, root: &[u8; 32]) -> Result<()> {
440 let root_hex = hex::encode(root);
441 {
442 let mut graph = self.graph.lock().unwrap();
443 if should_replace_placeholder_root(&graph)? {
444 let fresh = SocialGraph::new(&root_hex);
445 graph
446 .replace_state(&fresh.export_state())
447 .context("replace placeholder social graph root")?;
448 } else {
449 graph
450 .set_root(&root_hex)
451 .context("set nostr-social-graph root")?;
452 }
453 }
454 self.invalidate_distance_cache();
455 Ok(())
456 }
457
458 fn stats(&self) -> Result<SocialGraphStats> {
459 Ok(self.load_distance_cache()?.stats)
460 }
461
462 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
463 let graph = self.graph.lock().unwrap();
464 let distance = graph
465 .get_follow_distance(&hex::encode(pk_bytes))
466 .context("read social graph follow distance")?;
467 Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
468 }
469
470 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
471 Ok(self
472 .load_distance_cache()?
473 .users_by_distance
474 .get(&distance)
475 .cloned()
476 .unwrap_or_default())
477 }
478
479 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
480 let graph = self.graph.lock().unwrap();
481 graph
482 .get_follow_list_created_at(&hex::encode(owner))
483 .context("read social graph follow list timestamp")
484 }
485
486 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
487 let graph = self.graph.lock().unwrap();
488 decode_pubkey_set(
489 graph
490 .get_followed_by_user(&hex::encode(owner))
491 .context("read followed targets")?,
492 )
493 }
494
495 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
496 if threshold <= 0.0 {
497 return Ok(false);
498 }
499 let graph = self.graph.lock().unwrap();
500 graph
501 .is_overmuted(&hex::encode(user_pk), threshold)
502 .context("check social graph overmute")
503 }
504
505 #[cfg_attr(not(test), allow(dead_code))]
506 pub fn profile_search_root(&self) -> Result<Option<Cid>> {
507 self.profile_index.search_root()
508 }
509
510 #[cfg_attr(not(test), allow(dead_code))]
511 pub fn profiles_by_pubkey_root(&self) -> Result<Option<Cid>> {
512 self.profile_index.by_pubkey_root()
513 }
514
515 pub fn public_events_root(&self) -> Result<Option<Cid>> {
516 self.public_events.events_root()
517 }
518
519 #[cfg_attr(test, allow(dead_code))]
520 pub(crate) fn public_events_root_for_write(&self) -> Result<Option<Cid>> {
521 self.public_events.events_root_for_write()
522 }
523
524 pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
525 self.public_events.write_events_root(root)
526 }
527
528 #[cfg_attr(not(test), allow(dead_code))]
529 pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
530 self.profile_index.profile_event_for_pubkey(pubkey_hex)
531 }
532
533 #[cfg_attr(not(test), allow(dead_code))]
534 pub fn profile_search_entries_for_prefix(
535 &self,
536 prefix: &str,
537 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
538 self.profile_index.search_entries_for_prefix(prefix)
539 }
540
541 pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
542 self.update_profile_index_for_events(events)
543 }
544
545 pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
546 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
547 let (by_pubkey_root, search_root) = self
548 .profile_index
549 .rebuild_profile_events(latest_by_pubkey.into_values())?;
550 self.profile_index
551 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
552 self.profile_index.write_search_root(search_root.as_ref())?;
553 Ok(())
554 }
555
556 pub(crate) async fn rebuild_profile_index_for_events_async(
557 &self,
558 events: &[Event],
559 ) -> Result<()> {
560 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
561 let (by_pubkey_root, search_root) = self
562 .profile_index
563 .rebuild_profile_events_async(latest_by_pubkey.into_values())
564 .await?;
565 self.profile_index
566 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
567 self.profile_index.write_search_root(search_root.as_ref())?;
568 Ok(())
569 }
570
571 pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
572 let public_events_root = self.public_events.events_root()?;
573 let ambient_events_root = self.ambient_events.events_root()?;
574 if public_events_root.is_none() && ambient_events_root.is_none() {
575 self.profile_index.write_by_pubkey_root(None)?;
576 self.profile_index.write_search_root(None)?;
577 return Ok(0);
578 }
579
580 let mut events = Vec::new();
581 for (bucket, root) in [
582 (&self.public_events, public_events_root),
583 (&self.ambient_events, ambient_events_root),
584 ] {
585 let Some(root) = root else {
586 continue;
587 };
588 let stored = block_on(bucket.event_store.list_by_kind_lossy(
589 Some(&root),
590 Kind::Metadata.as_u16() as u32,
591 ListEventsOptions::default(),
592 ))
593 .map_err(map_event_store_error)?;
594 events.extend(
595 stored
596 .into_iter()
597 .map(nostr_event_from_stored)
598 .collect::<Result<Vec<_>>>()?,
599 );
600 }
601
602 let latest_count = self
603 .filtered_latest_metadata_events_by_pubkey(&events)?
604 .len();
605 self.rebuild_profile_index_for_events(&events)?;
606 Ok(latest_count)
607 }
608
609 pub async fn rebuild_profile_index_from_stored_events_async(&self) -> Result<usize> {
610 let public_events_root = self.public_events.events_root()?;
611 let ambient_events_root = self.ambient_events.events_root()?;
612 if public_events_root.is_none() && ambient_events_root.is_none() {
613 self.profile_index.write_by_pubkey_root(None)?;
614 self.profile_index.write_search_root(None)?;
615 return Ok(0);
616 }
617
618 let mut events = Vec::new();
619 for (bucket, root) in [
620 (&self.public_events, public_events_root),
621 (&self.ambient_events, ambient_events_root),
622 ] {
623 let Some(root) = root else {
624 continue;
625 };
626 let stored = bucket
627 .event_store
628 .list_by_kind_lossy(
629 Some(&root),
630 Kind::Metadata.as_u16() as u32,
631 ListEventsOptions::default(),
632 )
633 .await
634 .map_err(map_event_store_error)?;
635 events.extend(
636 stored
637 .into_iter()
638 .map(nostr_event_from_stored)
639 .collect::<Result<Vec<_>>>()?,
640 );
641 }
642
643 let latest_count = self
644 .filtered_latest_metadata_events_by_pubkey(&events)?
645 .len();
646 self.rebuild_profile_index_for_events_async(&events).await?;
647 Ok(latest_count)
648 }
649
650 pub fn rebuild_event_indexes_from_stored_events(&self) -> Result<(usize, usize)> {
651 let public_count =
652 self.rebuild_event_index_bucket_from_stored_events(&self.public_events)?;
653 let ambient_count =
654 self.rebuild_event_index_bucket_from_stored_events(&self.ambient_events)?;
655 self.rebuild_profile_index_from_stored_events()?;
656 Ok((public_count, ambient_count))
657 }
658
659 pub async fn rebuild_event_indexes_from_stored_events_async(&self) -> Result<(usize, usize)> {
660 let public_count = self
661 .rebuild_event_index_bucket_from_stored_events_async(&self.public_events)
662 .await?;
663 let ambient_count = self
664 .rebuild_event_index_bucket_from_stored_events_async(&self.ambient_events)
665 .await?;
666 self.rebuild_profile_index_from_stored_events_async()
667 .await?;
668 Ok((public_count, ambient_count))
669 }
670
671 fn rebuild_event_index_bucket_from_stored_events(
672 &self,
673 bucket: &EventIndexBucket,
674 ) -> Result<usize> {
675 let Some(root) = bucket.events_root()? else {
676 bucket.write_events_root(None)?;
677 return Ok(0);
678 };
679
680 let manifest = match block_on(bucket.event_store.get_manifest(Some(&root))) {
681 Ok(manifest) => manifest,
682 Err(err) => {
683 tracing::warn!(
684 "Clearing invalid social graph event index root {} before rebuild: {}",
685 hex::encode(root.hash),
686 err
687 );
688 bucket.write_events_root(None)?;
689 return Ok(0);
690 }
691 };
692 if manifest.by_kind_time_author.is_none() {
693 let next_root = block_on(bucket.event_store.upgrade_manifest_indexes(Some(&root)))
694 .map_err(map_event_store_error)?;
695 if next_root.as_ref() != Some(&root) {
696 bucket.write_events_root(next_root.as_ref())?;
697 return Ok(0);
698 }
699 }
700
701 let stored = block_on(
702 bucket
703 .event_store
704 .list_recent_lossy(Some(&root), ListEventsOptions::default()),
705 )
706 .map_err(map_event_store_error)?;
707 let count = stored.len();
708 let next_root =
709 block_on(bucket.event_store.build(None, stored)).map_err(map_event_store_error)?;
710 bucket.write_events_root(next_root.as_ref())?;
711 Ok(count)
712 }
713
714 async fn rebuild_event_index_bucket_from_stored_events_async(
715 &self,
716 bucket: &EventIndexBucket,
717 ) -> Result<usize> {
718 let Some(root) = bucket.events_root()? else {
719 bucket.write_events_root(None)?;
720 return Ok(0);
721 };
722
723 let manifest = match bucket.event_store.get_manifest(Some(&root)).await {
724 Ok(manifest) => manifest,
725 Err(err) => {
726 tracing::warn!(
727 "Clearing invalid social graph event index root {} before rebuild: {}",
728 hex::encode(root.hash),
729 err
730 );
731 bucket.write_events_root(None)?;
732 return Ok(0);
733 }
734 };
735 if manifest.by_kind_time_author.is_none() {
736 let next_root = bucket
737 .event_store
738 .upgrade_manifest_indexes(Some(&root))
739 .await
740 .map_err(map_event_store_error)?;
741 if next_root.as_ref() != Some(&root) {
742 bucket.write_events_root(next_root.as_ref())?;
743 return Ok(0);
744 }
745 }
746
747 let stored = bucket
748 .event_store
749 .list_recent_lossy(Some(&root), ListEventsOptions::default())
750 .await
751 .map_err(map_event_store_error)?;
752 let count = stored.len();
753 let next_root = bucket
754 .event_store
755 .build(None, stored)
756 .await
757 .map_err(map_event_store_error)?;
758 bucket.write_events_root(next_root.as_ref())?;
759 Ok(count)
760 }
761
762 fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
763 let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
764 let threshold = self.profile_index_overmute_threshold();
765
766 if latest_by_pubkey.is_empty() {
767 return Ok(());
768 }
769
770 let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
771 let mut search_root = self.profile_index.search_root()?;
772 let mut changed = false;
773
774 for event in latest_by_pubkey.into_values() {
775 let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
776 let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
777 self.profile_index.remove_profile_event(
778 by_pubkey_root.as_ref(),
779 search_root.as_ref(),
780 &event.pubkey.to_hex(),
781 )?
782 } else {
783 self.profile_index.update_profile_event(
784 by_pubkey_root.as_ref(),
785 search_root.as_ref(),
786 event,
787 )?
788 };
789 if updated {
790 by_pubkey_root = next_by_pubkey_root;
791 search_root = next_search_root;
792 changed = true;
793 }
794 }
795
796 if changed {
797 self.profile_index
798 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
799 self.profile_index.write_search_root(search_root.as_ref())?;
800 }
801
802 Ok(())
803 }
804
805 fn filtered_latest_metadata_events_by_pubkey<'a>(
806 &self,
807 events: &'a [Event],
808 ) -> Result<BTreeMap<String, &'a Event>> {
809 let threshold = self.profile_index_overmute_threshold();
810 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
811 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
812 if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
813 continue;
814 }
815 let pubkey = event.pubkey.to_hex();
816 match latest_by_pubkey.get(&pubkey) {
817 Some(current) if compare_nostr_events(event, current).is_le() => {}
818 _ => {
819 latest_by_pubkey.insert(pubkey, event);
820 }
821 }
822 }
823 Ok(latest_by_pubkey)
824 }
825
826 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
827 let state = {
828 let graph = self.graph.lock().unwrap();
829 graph.export_state().context("export social graph state")?
830 };
831 let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
832 let root_hex = hex::encode(root);
833 if graph.get_root() != root_hex {
834 graph
835 .set_root(&root_hex)
836 .context("set snapshot social graph root")?;
837 }
838 let chunks = graph
839 .to_binary_chunks_with_budget(*options)
840 .context("encode social graph snapshot")?;
841 Ok(chunks.into_iter().map(Bytes::from).collect())
842 }
843
844 fn ingest_event(&self, event: &Event) -> Result<()> {
845 self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
846 }
847
848 fn ingest_events(&self, events: &[Event]) -> Result<()> {
849 if events.is_empty() {
850 return Ok(());
851 }
852
853 let mut public = Vec::new();
854 let mut ambient = Vec::new();
855 for event in events {
856 match self.default_storage_class_for(event)? {
857 EventStorageClass::Public => public.push(event.clone()),
858 EventStorageClass::Ambient => ambient.push(event.clone()),
859 }
860 }
861
862 if !public.is_empty() {
863 self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
864 }
865 if !ambient.is_empty() {
866 self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
867 }
868
869 Ok(())
870 }
871
872 fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
873 let graph_events = events
874 .iter()
875 .filter(|event| is_social_graph_event(event.kind))
876 .collect::<Vec<_>>();
877 if graph_events.is_empty() {
878 return Ok(());
879 }
880
881 {
882 let mut graph = self.graph.lock().unwrap();
883 let mut snapshot = SocialGraph::from_state(
884 graph
885 .export_state()
886 .context("export social graph state for graph-only ingest")?,
887 )
888 .context("rebuild social graph state for graph-only ingest")?;
889 for event in graph_events {
890 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
891 }
892 graph
893 .replace_state(&snapshot.export_state())
894 .context("replace graph-only social graph state")?;
895 }
896 self.invalidate_distance_cache();
897 Ok(())
898 }
899
900 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
901 self.query_events_in_scope(filter, limit, EventQueryScope::All)
902 }
903
904 fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
905 let graph = self.graph.lock().unwrap();
906 let root_hex = graph.get_root().context("read social graph root")?;
907 if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
908 return Ok(EventStorageClass::Public);
909 }
910 Ok(EventStorageClass::Ambient)
911 }
912
913 fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
914 match storage_class {
915 EventStorageClass::Public => &self.public_events,
916 EventStorageClass::Ambient => &self.ambient_events,
917 }
918 }
919
920 fn ingest_event_with_storage_class(
921 &self,
922 event: &Event,
923 storage_class: EventStorageClass,
924 ) -> Result<()> {
925 let current_root = self.bucket(storage_class).events_root_for_write()?;
926 let next_root = self
927 .bucket(storage_class)
928 .store_event(current_root.as_ref(), event)?;
929 self.bucket(storage_class)
930 .write_events_root(Some(&next_root))?;
931
932 self.update_profile_index_for_events(std::slice::from_ref(event))?;
933
934 if is_social_graph_event(event.kind) {
935 {
936 let mut graph = self.graph.lock().unwrap();
937 graph
938 .handle_event(&graph_event_from_nostr(event), true, 0.0)
939 .context("ingest social graph event into nostr-social-graph")?;
940 }
941 self.invalidate_distance_cache();
942 }
943
944 Ok(())
945 }
946
947 fn ingest_events_with_storage_class(
948 &self,
949 events: &[Event],
950 storage_class: EventStorageClass,
951 ) -> Result<()> {
952 if events.is_empty() {
953 return Ok(());
954 }
955
956 let bucket = self.bucket(storage_class);
957 let current_root = bucket.events_root_for_write()?;
958 let stored_events = events
959 .iter()
960 .map(stored_event_from_nostr)
961 .collect::<Vec<_>>();
962 let next_root = block_on(
963 bucket
964 .event_store
965 .build(current_root.as_ref(), stored_events),
966 )
967 .map_err(map_event_store_error)?;
968 bucket.write_events_root(next_root.as_ref())?;
969
970 self.update_profile_index_for_events(events)?;
971
972 let graph_events = events
973 .iter()
974 .filter(|event| is_social_graph_event(event.kind))
975 .collect::<Vec<_>>();
976 if graph_events.is_empty() {
977 return Ok(());
978 }
979
980 {
981 let mut graph = self.graph.lock().unwrap();
982 let mut snapshot = SocialGraph::from_state(
983 graph
984 .export_state()
985 .context("export social graph state for batch ingest")?,
986 )
987 .context("rebuild social graph state for batch ingest")?;
988 for event in graph_events {
989 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
990 }
991 graph
992 .replace_state(&snapshot.export_state())
993 .context("replace batched social graph state")?;
994 }
995 self.invalidate_distance_cache();
996
997 Ok(())
998 }
999
1000 pub(crate) fn query_events_in_scope(
1001 &self,
1002 filter: &Filter,
1003 limit: usize,
1004 scope: EventQueryScope,
1005 ) -> Result<Vec<Event>> {
1006 if limit == 0 {
1007 return Ok(Vec::new());
1008 }
1009
1010 let buckets: &[&EventIndexBucket] = match scope {
1011 EventQueryScope::PublicOnly => &[&self.public_events],
1012 EventQueryScope::AmbientOnly => &[&self.ambient_events],
1013 EventQueryScope::All => &[&self.public_events, &self.ambient_events],
1014 };
1015
1016 let mut candidates = Vec::new();
1017 for bucket in buckets {
1018 candidates.extend(bucket.query_events(filter, limit)?);
1019 }
1020
1021 let mut deduped = dedupe_events(candidates);
1022 deduped.retain(|event| filter.match_event(event));
1023 deduped.truncate(limit);
1024 Ok(deduped)
1025 }
1026}
1027
1028impl SocialGraphBackend for SocialGraphStore {
1029 fn stats(&self) -> Result<SocialGraphStats> {
1030 SocialGraphStore::stats(self)
1031 }
1032
1033 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1034 SocialGraphStore::users_by_follow_distance(self, distance)
1035 }
1036
1037 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1038 SocialGraphStore::follow_distance(self, pk_bytes)
1039 }
1040
1041 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1042 SocialGraphStore::follow_list_created_at(self, owner)
1043 }
1044
1045 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1046 SocialGraphStore::followed_targets(self, owner)
1047 }
1048
1049 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1050 SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1051 }
1052
1053 fn profile_search_root(&self) -> Result<Option<Cid>> {
1054 SocialGraphStore::profile_search_root(self)
1055 }
1056
1057 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1058 SocialGraphStore::snapshot_chunks(self, root, options)
1059 }
1060
1061 fn ingest_event(&self, event: &Event) -> Result<()> {
1062 SocialGraphStore::ingest_event(self, event)
1063 }
1064
1065 fn ingest_event_with_storage_class(
1066 &self,
1067 event: &Event,
1068 storage_class: EventStorageClass,
1069 ) -> Result<()> {
1070 SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1071 }
1072
1073 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1074 SocialGraphStore::ingest_events(self, events)
1075 }
1076
1077 fn ingest_events_with_storage_class(
1078 &self,
1079 events: &[Event],
1080 storage_class: EventStorageClass,
1081 ) -> Result<()> {
1082 SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1083 }
1084
1085 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1086 SocialGraphStore::apply_graph_events_only(self, events)
1087 }
1088
1089 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1090 SocialGraphStore::query_events(self, filter, limit)
1091 }
1092}
1093
1094impl NostrSocialGraphBackend for SocialGraphStore {
1095 type Error = UpstreamGraphBackendError;
1096
1097 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1098 let graph = self.graph.lock().unwrap();
1099 graph
1100 .get_root()
1101 .context("read social graph root")
1102 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1103 }
1104
1105 fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1106 let root_bytes =
1107 decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1108 SocialGraphStore::set_root(self, &root_bytes)
1109 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1110 }
1111
1112 fn handle_event(
1113 &mut self,
1114 event: &GraphEvent,
1115 allow_unknown_authors: bool,
1116 overmute_threshold: f64,
1117 ) -> std::result::Result<(), Self::Error> {
1118 {
1119 let mut graph = self.graph.lock().unwrap();
1120 graph
1121 .handle_event(event, allow_unknown_authors, overmute_threshold)
1122 .context("ingest social graph event into heed backend")
1123 .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1124 }
1125 self.invalidate_distance_cache();
1126 Ok(())
1127 }
1128
1129 fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1130 let graph = self.graph.lock().unwrap();
1131 graph
1132 .get_follow_distance(user)
1133 .context("read social graph follow distance")
1134 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1135 }
1136
1137 fn is_following(
1138 &self,
1139 follower: &str,
1140 followed_user: &str,
1141 ) -> std::result::Result<bool, Self::Error> {
1142 let graph = self.graph.lock().unwrap();
1143 graph
1144 .is_following(follower, followed_user)
1145 .context("read social graph following edge")
1146 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1147 }
1148
1149 fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1150 let graph = self.graph.lock().unwrap();
1151 graph
1152 .get_followed_by_user(user)
1153 .context("read followed-by-user list")
1154 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1155 }
1156
1157 fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1158 let graph = self.graph.lock().unwrap();
1159 graph
1160 .get_followers_by_user(user)
1161 .context("read followers-by-user list")
1162 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1163 }
1164
1165 fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1166 let graph = self.graph.lock().unwrap();
1167 graph
1168 .get_muted_by_user(user)
1169 .context("read muted-by-user list")
1170 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1171 }
1172
1173 fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1174 let graph = self.graph.lock().unwrap();
1175 graph
1176 .get_user_muted_by(user)
1177 .context("read user-muted-by list")
1178 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1179 }
1180
1181 fn get_follow_list_created_at(
1182 &self,
1183 user: &str,
1184 ) -> std::result::Result<Option<u64>, Self::Error> {
1185 let graph = self.graph.lock().unwrap();
1186 graph
1187 .get_follow_list_created_at(user)
1188 .context("read social graph follow list timestamp")
1189 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1190 }
1191
1192 fn get_mute_list_created_at(
1193 &self,
1194 user: &str,
1195 ) -> std::result::Result<Option<u64>, Self::Error> {
1196 let graph = self.graph.lock().unwrap();
1197 graph
1198 .get_mute_list_created_at(user)
1199 .context("read social graph mute list timestamp")
1200 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1201 }
1202
1203 fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1204 let graph = self.graph.lock().unwrap();
1205 graph
1206 .is_overmuted(user, threshold)
1207 .context("check social graph overmute")
1208 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1209 }
1210}
1211
1212impl<T> SocialGraphBackend for Arc<T>
1213where
1214 T: SocialGraphBackend + ?Sized,
1215{
1216 fn stats(&self) -> Result<SocialGraphStats> {
1217 self.as_ref().stats()
1218 }
1219
1220 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1221 self.as_ref().users_by_follow_distance(distance)
1222 }
1223
1224 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1225 self.as_ref().follow_distance(pk_bytes)
1226 }
1227
1228 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1229 self.as_ref().follow_list_created_at(owner)
1230 }
1231
1232 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1233 self.as_ref().followed_targets(owner)
1234 }
1235
1236 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1237 self.as_ref().is_overmuted_user(user_pk, threshold)
1238 }
1239
1240 fn profile_search_root(&self) -> Result<Option<Cid>> {
1241 self.as_ref().profile_search_root()
1242 }
1243
1244 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1245 self.as_ref().snapshot_chunks(root, options)
1246 }
1247
1248 fn ingest_event(&self, event: &Event) -> Result<()> {
1249 self.as_ref().ingest_event(event)
1250 }
1251
1252 fn ingest_event_with_storage_class(
1253 &self,
1254 event: &Event,
1255 storage_class: EventStorageClass,
1256 ) -> Result<()> {
1257 self.as_ref()
1258 .ingest_event_with_storage_class(event, storage_class)
1259 }
1260
1261 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1262 self.as_ref().ingest_events(events)
1263 }
1264
1265 fn ingest_events_with_storage_class(
1266 &self,
1267 events: &[Event],
1268 storage_class: EventStorageClass,
1269 ) -> Result<()> {
1270 self.as_ref()
1271 .ingest_events_with_storage_class(events, storage_class)
1272 }
1273
1274 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1275 self.as_ref().ingest_graph_events(events)
1276 }
1277
1278 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1279 self.as_ref().query_events(filter, limit)
1280 }
1281}
1282
1283fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1284 if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1285 return Ok(false);
1286 }
1287
1288 let GraphStats {
1289 users,
1290 follows,
1291 mutes,
1292 ..
1293 } = graph.size().context("size social graph")?;
1294 Ok(users <= 1 && follows == 0 && mutes == 0)
1295}
1296
1297fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1298 let mut set = UserSet::new();
1299 for value in values {
1300 set.insert(decode_pubkey(&value)?);
1301 }
1302 Ok(set)
1303}
1304
1305fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1306 let mut bytes = [0u8; 32];
1307 hex::decode_to_slice(value, &mut bytes)
1308 .with_context(|| format!("decode social graph pubkey {value}"))?;
1309 Ok(bytes)
1310}
1311
1312fn is_social_graph_event(kind: Kind) -> bool {
1313 kind == Kind::ContactList || kind == Kind::MuteList
1314}
1315
1316fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1317 GraphEvent {
1318 created_at: event.created_at.as_u64(),
1319 content: event.content.clone(),
1320 tags: event
1321 .tags
1322 .iter()
1323 .map(|tag| tag.as_slice().to_vec())
1324 .collect(),
1325 kind: event.kind.as_u16() as u32,
1326 pubkey: event.pubkey.to_hex(),
1327 id: event.id.to_hex(),
1328 sig: event.sig.to_string(),
1329 }
1330}
1331
1332fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1333 StoredNostrEvent {
1334 id: event.id.to_hex(),
1335 pubkey: event.pubkey.to_hex(),
1336 created_at: event.created_at.as_u64(),
1337 kind: event.kind.as_u16() as u32,
1338 tags: event
1339 .tags
1340 .iter()
1341 .map(|tag| tag.as_slice().to_vec())
1342 .collect(),
1343 content: event.content.clone(),
1344 sig: event.sig.to_string(),
1345 }
1346}
1347
1348fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1349 let value = serde_json::json!({
1350 "id": event.id,
1351 "pubkey": event.pubkey,
1352 "created_at": event.created_at,
1353 "kind": event.kind,
1354 "tags": event.tags,
1355 "content": event.content,
1356 "sig": event.sig,
1357 });
1358 Event::from_json(value.to_string()).context("decode stored nostr event")
1359}
1360
1361pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1362 nostr_event_from_stored(event)
1363}
1364
1365fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1366 rmp_serde::to_vec_named(&StoredCid {
1367 hash: cid.hash,
1368 key: cid.key,
1369 })
1370 .context("encode social graph events root")
1371}
1372
1373fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1374 let stored: StoredCid =
1375 rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1376 Ok(Some(Cid {
1377 hash: stored.hash,
1378 key: stored.key,
1379 }))
1380}
1381
1382fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1383 let Ok(bytes) = std::fs::read(path) else {
1384 return Ok(None);
1385 };
1386 decode_cid(&bytes)
1387}
1388
1389fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1390 let Some(root) = root else {
1391 if path.exists() {
1392 std::fs::remove_file(path)?;
1393 }
1394 return Ok(());
1395 };
1396
1397 let encoded = encode_cid(root)?;
1398 let tmp_path = path.with_extension("tmp");
1399 std::fs::write(&tmp_path, encoded)?;
1400 std::fs::rename(tmp_path, path)?;
1401 Ok(())
1402}
1403
1404fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1405 let raw = value.as_str()?;
1406 let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1407 if trimmed.is_empty() {
1408 return None;
1409 }
1410 Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1411}
1412
1413fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1414 let mut names = Vec::new();
1415 let mut seen = HashSet::new();
1416
1417 for key in ["display_name", "displayName", "name", "username"] {
1418 let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1419 continue;
1420 };
1421 let lowered = value.to_lowercase();
1422 if seen.insert(lowered) {
1423 names.push(value);
1424 }
1425 }
1426
1427 names
1428}
1429
1430fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1431 if local_part.len() == 1 || local_part.starts_with("npub1") {
1432 return true;
1433 }
1434
1435 primary_name
1436 .to_lowercase()
1437 .split_whitespace()
1438 .collect::<String>()
1439 .contains(local_part)
1440}
1441
1442fn normalize_profile_nip05(
1443 profile: &serde_json::Map<String, serde_json::Value>,
1444 primary_name: Option<&str>,
1445) -> Option<String> {
1446 let raw = profile.get("nip05")?.as_str()?;
1447 let local_part = raw.split('@').next()?.trim().to_lowercase();
1448 if local_part.is_empty() {
1449 return None;
1450 }
1451 let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1452 if truncated.is_empty() {
1453 return None;
1454 }
1455 if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1456 return None;
1457 }
1458 Some(truncated)
1459}
1460
1461fn is_search_stop_word(word: &str) -> bool {
1462 matches!(
1463 word,
1464 "a" | "an"
1465 | "the"
1466 | "and"
1467 | "or"
1468 | "but"
1469 | "in"
1470 | "on"
1471 | "at"
1472 | "to"
1473 | "for"
1474 | "of"
1475 | "with"
1476 | "by"
1477 | "from"
1478 | "is"
1479 | "it"
1480 | "as"
1481 | "be"
1482 | "was"
1483 | "are"
1484 | "this"
1485 | "that"
1486 | "these"
1487 | "those"
1488 | "i"
1489 | "you"
1490 | "he"
1491 | "she"
1492 | "we"
1493 | "they"
1494 | "my"
1495 | "your"
1496 | "his"
1497 | "her"
1498 | "its"
1499 | "our"
1500 | "their"
1501 | "what"
1502 | "which"
1503 | "who"
1504 | "whom"
1505 | "how"
1506 | "when"
1507 | "where"
1508 | "why"
1509 | "will"
1510 | "would"
1511 | "could"
1512 | "should"
1513 | "can"
1514 | "may"
1515 | "might"
1516 | "must"
1517 | "have"
1518 | "has"
1519 | "had"
1520 | "do"
1521 | "does"
1522 | "did"
1523 | "been"
1524 | "being"
1525 | "get"
1526 | "got"
1527 | "just"
1528 | "now"
1529 | "then"
1530 | "so"
1531 | "if"
1532 | "not"
1533 | "no"
1534 | "yes"
1535 | "all"
1536 | "any"
1537 | "some"
1538 | "more"
1539 | "most"
1540 | "other"
1541 | "into"
1542 | "over"
1543 | "after"
1544 | "before"
1545 | "about"
1546 | "up"
1547 | "down"
1548 | "out"
1549 | "off"
1550 | "through"
1551 | "during"
1552 | "under"
1553 | "again"
1554 | "further"
1555 | "once"
1556 )
1557}
1558
1559fn is_pure_search_number(word: &str) -> bool {
1560 if !word.chars().all(|ch| ch.is_ascii_digit()) {
1561 return false;
1562 }
1563 !(word.len() == 4
1564 && word
1565 .parse::<u16>()
1566 .is_ok_and(|year| (1900..=2099).contains(&year)))
1567}
1568
1569fn split_compound_search_word(word: &str) -> Vec<String> {
1570 let mut parts = Vec::new();
1571 let mut current = String::new();
1572 let chars: Vec<char> = word.chars().collect();
1573
1574 for (index, ch) in chars.iter().copied().enumerate() {
1575 let split_before = current.chars().last().is_some_and(|prev| {
1576 (prev.is_lowercase() && ch.is_uppercase())
1577 || (prev.is_ascii_digit() && ch.is_alphabetic())
1578 || (prev.is_alphabetic() && ch.is_ascii_digit())
1579 || (prev.is_uppercase()
1580 && ch.is_uppercase()
1581 && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
1582 });
1583
1584 if split_before && !current.is_empty() {
1585 parts.push(std::mem::take(&mut current));
1586 }
1587
1588 current.push(ch);
1589 }
1590
1591 if !current.is_empty() {
1592 parts.push(current);
1593 }
1594
1595 parts
1596}
1597
1598fn parse_search_keywords(text: &str) -> Vec<String> {
1599 let mut keywords = Vec::new();
1600 let mut seen = HashSet::new();
1601
1602 for word in text
1603 .split(|ch: char| !ch.is_alphanumeric())
1604 .filter(|word| !word.is_empty())
1605 {
1606 let mut variants = Vec::with_capacity(1 + word.len() / 4);
1607 variants.push(word.to_lowercase());
1608 variants.extend(
1609 split_compound_search_word(word)
1610 .into_iter()
1611 .map(|part| part.to_lowercase()),
1612 );
1613
1614 for lowered in variants {
1615 if lowered.chars().count() < 2
1616 || is_search_stop_word(&lowered)
1617 || is_pure_search_number(&lowered)
1618 {
1619 continue;
1620 }
1621 if seen.insert(lowered.clone()) {
1622 keywords.push(lowered);
1623 }
1624 }
1625 }
1626
1627 keywords
1628}
1629
1630fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
1631 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1632 Ok(serde_json::Value::Object(profile)) => profile,
1633 _ => serde_json::Map::new(),
1634 };
1635 let names = extract_profile_names(&profile);
1636 let primary_name = names.first().map(String::as_str);
1637 let mut parts = Vec::new();
1638 if let Some(name) = primary_name {
1639 parts.push(name.to_string());
1640 }
1641 if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
1642 parts.push(nip05);
1643 }
1644 parts.push(event.pubkey.to_hex());
1645 if names.len() > 1 {
1646 parts.extend(names.into_iter().skip(1));
1647 }
1648 parse_search_keywords(&parts.join(" "))
1649}
1650
1651fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
1652 left.created_at
1653 .as_u64()
1654 .cmp(&right.created_at.as_u64())
1655 .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
1656}
1657
1658fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
1659 anyhow::anyhow!("nostr event store error: {err}")
1660}
1661
1662fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
1663 let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
1664 let page_size = page_size_bytes() as u64;
1665 let rounded = requested
1666 .checked_add(page_size.saturating_sub(1))
1667 .map(|size| size / page_size * page_size)
1668 .unwrap_or(requested);
1669 let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
1670
1671 let env = unsafe {
1672 heed::EnvOpenOptions::new()
1673 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
1674 .max_dbs(SOCIALGRAPH_MAX_DBS)
1675 .open(db_dir)
1676 }
1677 .context("open social graph LMDB env for resize")?;
1678 if env.info().map_size < map_size {
1679 unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
1680 }
1681
1682 Ok(())
1683}
1684
1685fn page_size_bytes() -> usize {
1686 page_size::get_granularity()
1687}
1688
1689#[cfg(test)]
1690mod tests;