1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10 LocalListFileState, LocalListSyncOutcome,
11};
12
13mod index_buckets;
14
15use index_buckets::{
16 dedupe_events, latest_metadata_events_by_pubkey, EventIndexBucket, ProfileIndexBucket,
17};
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, Mutex as StdMutex};
22
23use anyhow::{Context, Result};
24use bytes::Bytes;
25use futures::executor::block_on;
26use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
27use hashtree_index::BTree;
28use hashtree_nostr::{
29 is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
30 NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
31};
32#[cfg(test)]
33use hashtree_nostr::{
34 reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
35 take_profile as take_nostr_profile,
36};
37use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
38use nostr_social_graph::{
39 BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
40 SocialGraphBackend as NostrSocialGraphBackend,
41};
42use nostr_social_graph_heed::HeedSocialGraph;
43
44use crate::storage::{LocalStore, StorageRouter};
45
46#[cfg(test)]
47use std::sync::{Mutex, MutexGuard, OnceLock};
48#[cfg(test)]
49use std::time::Instant;
50
51pub type UserSet = BTreeSet<[u8; 32]>;
52
53const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
54const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
55const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
56const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
57const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
58const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
59const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
60const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
61const SOCIALGRAPH_MAX_DBS: u32 = 16;
62const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
63const PROFILE_SEARCH_PREFIX: &str = "p:";
64const PROFILE_NAME_MAX_LENGTH: usize = 100;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum EventStorageClass {
68 Public,
69 Ambient,
70}
71
72#[cfg_attr(not(test), allow(dead_code))]
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub(crate) enum EventQueryScope {
75 PublicOnly,
76 AmbientOnly,
77 All,
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81struct StoredCid {
82 hash: [u8; 32],
83 key: Option<[u8; 32]>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
87pub struct StoredProfileSearchEntry {
88 pub pubkey: String,
89 pub name: String,
90 #[serde(default)]
91 pub aliases: Vec<String>,
92 #[serde(default)]
93 pub nip05: Option<String>,
94 #[serde(default, skip_serializing_if = "Option::is_none")]
95 pub follow_distance: Option<u32>,
96 pub created_at: u64,
97 pub event_nhash: String,
98}
99
100#[derive(Debug, Clone, Default, serde::Serialize)]
101pub struct SocialGraphStats {
102 pub total_users: usize,
103 pub root: Option<String>,
104 pub total_follows: usize,
105 pub max_depth: u32,
106 pub size_by_distance: BTreeMap<u32, usize>,
107 pub enabled: bool,
108}
109
110#[derive(Debug, Clone)]
111struct DistanceCache {
112 stats: SocialGraphStats,
113 users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
114}
115
116#[derive(Debug, thiserror::Error)]
117#[error("{0}")]
118pub struct UpstreamGraphBackendError(String);
119
120pub struct SocialGraphStore {
121 graph: StdMutex<HeedSocialGraph>,
122 distance_cache: StdMutex<Option<DistanceCache>>,
123 public_events: EventIndexBucket,
124 ambient_events: EventIndexBucket,
125 profile_index: ProfileIndexBucket,
126 profile_index_overmute_threshold: StdMutex<f64>,
127}
128
129pub trait SocialGraphBackend: Send + Sync {
130 fn stats(&self) -> Result<SocialGraphStats>;
131 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
132 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
133 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
134 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
135 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
136 fn profile_search_root(&self) -> Result<Option<Cid>> {
137 Ok(None)
138 }
139 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
140 fn ingest_event(&self, event: &Event) -> Result<()>;
141 fn ingest_event_with_storage_class(
142 &self,
143 event: &Event,
144 storage_class: EventStorageClass,
145 ) -> Result<()> {
146 let _ = storage_class;
147 self.ingest_event(event)
148 }
149 fn ingest_events(&self, events: &[Event]) -> Result<()> {
150 for event in events {
151 self.ingest_event(event)?;
152 }
153 Ok(())
154 }
155 fn ingest_events_with_storage_class(
156 &self,
157 events: &[Event],
158 storage_class: EventStorageClass,
159 ) -> Result<()> {
160 for event in events {
161 self.ingest_event_with_storage_class(event, storage_class)?;
162 }
163 Ok(())
164 }
165 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
166 self.ingest_events(events)
167 }
168 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
169}
170
171#[cfg(test)]
172pub type TestLockGuard = MutexGuard<'static, ()>;
173
174#[cfg(test)]
175static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
176
177#[cfg(test)]
178pub fn test_lock() -> TestLockGuard {
179 NDB_TEST_LOCK
180 .get_or_init(|| Mutex::new(()))
181 .lock()
182 .unwrap_or_else(|err| err.into_inner())
183}
184
185pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
186 open_social_graph_store_with_mapsize(data_dir, None)
187}
188
189pub fn open_social_graph_store_with_mapsize(
190 data_dir: &Path,
191 mapsize_bytes: Option<u64>,
192) -> Result<Arc<SocialGraphStore>> {
193 let db_dir = data_dir.join("socialgraph");
194 open_social_graph_store_at_path(&db_dir, mapsize_bytes)
195}
196
197pub fn open_social_graph_store_with_storage(
198 data_dir: &Path,
199 store: Arc<StorageRouter>,
200 mapsize_bytes: Option<u64>,
201) -> Result<Arc<SocialGraphStore>> {
202 let db_dir = data_dir.join("socialgraph");
203 open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
204}
205
206pub fn open_social_graph_store_at_path(
207 db_dir: &Path,
208 mapsize_bytes: Option<u64>,
209) -> Result<Arc<SocialGraphStore>> {
210 let config = hashtree_config::Config::load_or_default();
211 let backend = &config.storage.backend;
212 let local_store = Arc::new(
213 LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
214 .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
215 );
216 let store = Arc::new(StorageRouter::new(local_store));
217 open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
218}
219
220pub fn open_social_graph_store_at_path_with_storage(
221 db_dir: &Path,
222 store: Arc<StorageRouter>,
223 mapsize_bytes: Option<u64>,
224) -> Result<Arc<SocialGraphStore>> {
225 let ambient_backend = store.local_store().backend();
226 let ambient_local = Arc::new(
227 LocalStore::new_with_lmdb_map_size(
228 db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
229 &ambient_backend,
230 mapsize_bytes,
231 )
232 .map_err(|err| {
233 anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
234 })?,
235 );
236 let ambient_store = Arc::new(StorageRouter::new(ambient_local));
237 open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
238}
239
240pub fn open_social_graph_store_at_path_with_storage_split(
241 db_dir: &Path,
242 public_store: Arc<StorageRouter>,
243 ambient_store: Arc<StorageRouter>,
244 mapsize_bytes: Option<u64>,
245) -> Result<Arc<SocialGraphStore>> {
246 std::fs::create_dir_all(db_dir)?;
247 if let Some(size) = mapsize_bytes {
248 ensure_social_graph_mapsize(db_dir, size)?;
249 }
250 let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
251 .context("open nostr-social-graph heed backend")?;
252
253 Ok(Arc::new(SocialGraphStore {
254 graph: StdMutex::new(graph),
255 distance_cache: StdMutex::new(None),
256 public_events: EventIndexBucket {
257 event_store: NostrEventStore::new(Arc::clone(&public_store)),
258 root_path: db_dir.join(EVENTS_ROOT_FILE),
259 },
260 ambient_events: EventIndexBucket {
261 event_store: NostrEventStore::new(ambient_store),
262 root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
263 },
264 profile_index: ProfileIndexBucket {
265 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
266 index: BTree::new(
267 public_store,
268 hashtree_index::BTreeOptions {
269 order: Some(PROFILE_SEARCH_INDEX_ORDER),
270 },
271 ),
272 by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
273 search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
274 },
275 profile_index_overmute_threshold: StdMutex::new(1.0),
276 }))
277}
278
279pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
280 if let Err(err) = store.set_root(pk_bytes) {
281 tracing::warn!("Failed to set social graph root: {err}");
282 }
283}
284
285pub fn get_follow_distance(
286 backend: &(impl SocialGraphBackend + ?Sized),
287 pk_bytes: &[u8; 32],
288) -> Option<u32> {
289 backend.follow_distance(pk_bytes).ok().flatten()
290}
291
292pub fn get_follows(
293 backend: &(impl SocialGraphBackend + ?Sized),
294 pk_bytes: &[u8; 32],
295) -> Vec<[u8; 32]> {
296 match backend.followed_targets(pk_bytes) {
297 Ok(set) => set.into_iter().collect(),
298 Err(_) => Vec::new(),
299 }
300}
301
302pub fn is_overmuted(
303 backend: &(impl SocialGraphBackend + ?Sized),
304 _root_pk: &[u8; 32],
305 user_pk: &[u8; 32],
306 threshold: f64,
307) -> bool {
308 backend
309 .is_overmuted_user(user_pk, threshold)
310 .unwrap_or(false)
311}
312
313pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
314 let event = match Event::from_json(event_json) {
315 Ok(event) => event,
316 Err(_) => return,
317 };
318
319 if let Err(err) = backend.ingest_event(&event) {
320 tracing::warn!("Failed to ingest social graph event: {err}");
321 }
322}
323
324pub fn ingest_parsed_event(
325 backend: &(impl SocialGraphBackend + ?Sized),
326 event: &Event,
327) -> Result<()> {
328 backend.ingest_event(event)
329}
330
331pub fn ingest_parsed_event_with_storage_class(
332 backend: &(impl SocialGraphBackend + ?Sized),
333 event: &Event,
334 storage_class: EventStorageClass,
335) -> Result<()> {
336 backend.ingest_event_with_storage_class(event, storage_class)
337}
338
339pub fn ingest_parsed_events(
340 backend: &(impl SocialGraphBackend + ?Sized),
341 events: &[Event],
342) -> Result<()> {
343 backend.ingest_events(events)
344}
345
346pub fn ingest_parsed_events_with_storage_class(
347 backend: &(impl SocialGraphBackend + ?Sized),
348 events: &[Event],
349 storage_class: EventStorageClass,
350) -> Result<()> {
351 backend.ingest_events_with_storage_class(events, storage_class)
352}
353
354pub fn ingest_graph_parsed_events(
355 backend: &(impl SocialGraphBackend + ?Sized),
356 events: &[Event],
357) -> Result<()> {
358 backend.ingest_graph_events(events)
359}
360
361pub fn query_events(
362 backend: &(impl SocialGraphBackend + ?Sized),
363 filter: &Filter,
364 limit: usize,
365) -> Vec<Event> {
366 backend.query_events(filter, limit).unwrap_or_default()
367}
368
369impl SocialGraphStore {
370 pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
371 *self
372 .profile_index_overmute_threshold
373 .lock()
374 .expect("profile index overmute threshold") = threshold;
375 }
376
377 fn profile_index_overmute_threshold(&self) -> f64 {
378 *self
379 .profile_index_overmute_threshold
380 .lock()
381 .expect("profile index overmute threshold")
382 }
383
384 fn invalidate_distance_cache(&self) {
385 *self.distance_cache.lock().unwrap() = None;
386 }
387
388 fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
389 let unique_ids = state
390 .unique_ids
391 .into_iter()
392 .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
393 .collect::<Result<HashMap<_, _>>>()?;
394
395 let mut users_by_distance = BTreeMap::new();
396 let mut size_by_distance = BTreeMap::new();
397 for (distance, users) in state.users_by_follow_distance {
398 let decoded = users
399 .into_iter()
400 .filter_map(|id| unique_ids.get(&id).copied())
401 .collect::<Vec<_>>();
402 size_by_distance.insert(distance, decoded.len());
403 users_by_distance.insert(distance, decoded);
404 }
405
406 let total_follows = state
407 .followed_by_user
408 .iter()
409 .map(|(_, targets)| targets.len())
410 .sum::<usize>();
411 let total_users = size_by_distance.values().copied().sum();
412 let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
413
414 Ok(DistanceCache {
415 stats: SocialGraphStats {
416 total_users,
417 root: Some(state.root),
418 total_follows,
419 max_depth,
420 size_by_distance,
421 enabled: true,
422 },
423 users_by_distance,
424 })
425 }
426
427 fn load_distance_cache(&self) -> Result<DistanceCache> {
428 if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
429 return Ok(cache);
430 }
431
432 let state = {
433 let graph = self.graph.lock().unwrap();
434 graph.export_state().context("export social graph state")?
435 };
436 let cache = Self::build_distance_cache(state)?;
437 *self.distance_cache.lock().unwrap() = Some(cache.clone());
438 Ok(cache)
439 }
440
441 fn set_root(&self, root: &[u8; 32]) -> Result<()> {
442 let root_hex = hex::encode(root);
443 {
444 let mut graph = self.graph.lock().unwrap();
445 if should_replace_placeholder_root(&graph)? {
446 let fresh = SocialGraph::new(&root_hex);
447 graph
448 .replace_state(&fresh.export_state())
449 .context("replace placeholder social graph root")?;
450 } else {
451 graph
452 .set_root(&root_hex)
453 .context("set nostr-social-graph root")?;
454 }
455 }
456 self.invalidate_distance_cache();
457 Ok(())
458 }
459
460 fn stats(&self) -> Result<SocialGraphStats> {
461 Ok(self.load_distance_cache()?.stats)
462 }
463
464 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
465 let graph = self.graph.lock().unwrap();
466 let distance = graph
467 .get_follow_distance(&hex::encode(pk_bytes))
468 .context("read social graph follow distance")?;
469 Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
470 }
471
472 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
473 Ok(self
474 .load_distance_cache()?
475 .users_by_distance
476 .get(&distance)
477 .cloned()
478 .unwrap_or_default())
479 }
480
481 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
482 let graph = self.graph.lock().unwrap();
483 graph
484 .get_follow_list_created_at(&hex::encode(owner))
485 .context("read social graph follow list timestamp")
486 }
487
488 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
489 let graph = self.graph.lock().unwrap();
490 decode_pubkey_set(
491 graph
492 .get_followed_by_user(&hex::encode(owner))
493 .context("read followed targets")?,
494 )
495 }
496
497 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
498 if threshold <= 0.0 {
499 return Ok(false);
500 }
501 let graph = self.graph.lock().unwrap();
502 graph
503 .is_overmuted(&hex::encode(user_pk), threshold)
504 .context("check social graph overmute")
505 }
506
507 #[cfg_attr(not(test), allow(dead_code))]
508 pub fn profile_search_root(&self) -> Result<Option<Cid>> {
509 self.profile_index.search_root()
510 }
511
512 #[cfg_attr(not(test), allow(dead_code))]
513 pub fn profiles_by_pubkey_root(&self) -> Result<Option<Cid>> {
514 self.profile_index.by_pubkey_root()
515 }
516
517 pub fn public_events_root(&self) -> Result<Option<Cid>> {
518 self.public_events.events_root()
519 }
520
521 #[cfg_attr(test, allow(dead_code))]
522 pub(crate) fn public_events_root_for_write(&self) -> Result<Option<Cid>> {
523 self.public_events.events_root_for_write()
524 }
525
526 pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
527 self.public_events.write_events_root(root)
528 }
529
530 #[cfg_attr(not(test), allow(dead_code))]
531 pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
532 self.profile_index.profile_event_for_pubkey(pubkey_hex)
533 }
534
535 #[cfg_attr(not(test), allow(dead_code))]
536 pub fn profile_search_entries_for_prefix(
537 &self,
538 prefix: &str,
539 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
540 self.profile_index.search_entries_for_prefix(prefix)
541 }
542
543 pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
544 self.update_profile_index_for_events(events)
545 }
546
547 pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
548 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
549 let (by_pubkey_root, search_root) = self
550 .profile_index
551 .rebuild_profile_events_with_distances(latest_by_pubkey.into_values(), |event| {
552 self.follow_distance(&event.pubkey.to_bytes())
553 })?;
554 self.profile_index
555 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
556 self.profile_index.write_search_root(search_root.as_ref())?;
557 Ok(())
558 }
559
560 pub(crate) async fn rebuild_profile_index_for_events_async(
561 &self,
562 events: &[Event],
563 ) -> Result<()> {
564 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
565 let (by_pubkey_root, search_root) = self
566 .profile_index
567 .rebuild_profile_events_async_with_distances(
568 latest_by_pubkey.into_values(),
569 |event| self.follow_distance(&event.pubkey.to_bytes()),
570 )
571 .await?;
572 self.profile_index
573 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
574 self.profile_index.write_search_root(search_root.as_ref())?;
575 Ok(())
576 }
577
578 pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
579 let public_events_root = self.public_events.events_root()?;
580 let ambient_events_root = self.ambient_events.events_root()?;
581 if public_events_root.is_none() && ambient_events_root.is_none() {
582 self.profile_index.write_by_pubkey_root(None)?;
583 self.profile_index.write_search_root(None)?;
584 return Ok(0);
585 }
586
587 let mut events = Vec::new();
588 for (bucket, root) in [
589 (&self.public_events, public_events_root),
590 (&self.ambient_events, ambient_events_root),
591 ] {
592 let Some(root) = root else {
593 continue;
594 };
595 let stored = block_on(bucket.event_store.list_by_kind_lossy(
596 Some(&root),
597 Kind::Metadata.as_u16() as u32,
598 ListEventsOptions::default(),
599 ))
600 .map_err(map_event_store_error)?;
601 events.extend(
602 stored
603 .into_iter()
604 .map(nostr_event_from_stored)
605 .collect::<Result<Vec<_>>>()?,
606 );
607 }
608
609 let latest_count = self
610 .filtered_latest_metadata_events_by_pubkey(&events)?
611 .len();
612 self.rebuild_profile_index_for_events(&events)?;
613 Ok(latest_count)
614 }
615
616 pub async fn rebuild_profile_index_from_stored_events_async(&self) -> Result<usize> {
617 let public_events_root = self.public_events.events_root()?;
618 let ambient_events_root = self.ambient_events.events_root()?;
619 if public_events_root.is_none() && ambient_events_root.is_none() {
620 self.profile_index.write_by_pubkey_root(None)?;
621 self.profile_index.write_search_root(None)?;
622 return Ok(0);
623 }
624
625 let mut events = Vec::new();
626 for (bucket, root) in [
627 (&self.public_events, public_events_root),
628 (&self.ambient_events, ambient_events_root),
629 ] {
630 let Some(root) = root else {
631 continue;
632 };
633 let stored = bucket
634 .event_store
635 .list_by_kind_lossy(
636 Some(&root),
637 Kind::Metadata.as_u16() as u32,
638 ListEventsOptions::default(),
639 )
640 .await
641 .map_err(map_event_store_error)?;
642 events.extend(
643 stored
644 .into_iter()
645 .map(nostr_event_from_stored)
646 .collect::<Result<Vec<_>>>()?,
647 );
648 }
649
650 let latest_count = self
651 .filtered_latest_metadata_events_by_pubkey(&events)?
652 .len();
653 self.rebuild_profile_index_for_events_async(&events).await?;
654 Ok(latest_count)
655 }
656
657 pub fn rebuild_event_indexes_from_stored_events(&self) -> Result<(usize, usize)> {
658 let public_count =
659 self.rebuild_event_index_bucket_from_stored_events(&self.public_events)?;
660 let ambient_count =
661 self.rebuild_event_index_bucket_from_stored_events(&self.ambient_events)?;
662 self.rebuild_profile_index_from_stored_events()?;
663 Ok((public_count, ambient_count))
664 }
665
666 pub async fn rebuild_event_indexes_from_stored_events_async(&self) -> Result<(usize, usize)> {
667 let public_count = self
668 .rebuild_event_index_bucket_from_stored_events_async(&self.public_events)
669 .await?;
670 let ambient_count = self
671 .rebuild_event_index_bucket_from_stored_events_async(&self.ambient_events)
672 .await?;
673 self.rebuild_profile_index_from_stored_events_async()
674 .await?;
675 Ok((public_count, ambient_count))
676 }
677
678 fn rebuild_event_index_bucket_from_stored_events(
679 &self,
680 bucket: &EventIndexBucket,
681 ) -> Result<usize> {
682 let Some(root) = bucket.events_root()? else {
683 bucket.write_events_root(None)?;
684 return Ok(0);
685 };
686
687 let manifest = match block_on(bucket.event_store.get_manifest(Some(&root))) {
688 Ok(manifest) => manifest,
689 Err(err) => {
690 tracing::warn!(
691 "Clearing invalid social graph event index root {} before rebuild: {}",
692 hex::encode(root.hash),
693 err
694 );
695 bucket.write_events_root(None)?;
696 return Ok(0);
697 }
698 };
699 if manifest.by_kind_time_author.is_none() {
700 let next_root = block_on(bucket.event_store.upgrade_manifest_indexes(Some(&root)))
701 .map_err(map_event_store_error)?;
702 if next_root.as_ref() != Some(&root) {
703 bucket.write_events_root(next_root.as_ref())?;
704 return Ok(0);
705 }
706 }
707
708 let stored = block_on(
709 bucket
710 .event_store
711 .list_recent_lossy(Some(&root), ListEventsOptions::default()),
712 )
713 .map_err(map_event_store_error)?;
714 let count = stored.len();
715 let next_root =
716 block_on(bucket.event_store.build(None, stored)).map_err(map_event_store_error)?;
717 bucket.write_events_root(next_root.as_ref())?;
718 Ok(count)
719 }
720
721 async fn rebuild_event_index_bucket_from_stored_events_async(
722 &self,
723 bucket: &EventIndexBucket,
724 ) -> Result<usize> {
725 let Some(root) = bucket.events_root()? else {
726 bucket.write_events_root(None)?;
727 return Ok(0);
728 };
729
730 let manifest = match bucket.event_store.get_manifest(Some(&root)).await {
731 Ok(manifest) => manifest,
732 Err(err) => {
733 tracing::warn!(
734 "Clearing invalid social graph event index root {} before rebuild: {}",
735 hex::encode(root.hash),
736 err
737 );
738 bucket.write_events_root(None)?;
739 return Ok(0);
740 }
741 };
742 if manifest.by_kind_time_author.is_none() {
743 let next_root = bucket
744 .event_store
745 .upgrade_manifest_indexes(Some(&root))
746 .await
747 .map_err(map_event_store_error)?;
748 if next_root.as_ref() != Some(&root) {
749 bucket.write_events_root(next_root.as_ref())?;
750 return Ok(0);
751 }
752 }
753
754 let stored = bucket
755 .event_store
756 .list_recent_lossy(Some(&root), ListEventsOptions::default())
757 .await
758 .map_err(map_event_store_error)?;
759 let count = stored.len();
760 let next_root = bucket
761 .event_store
762 .build(None, stored)
763 .await
764 .map_err(map_event_store_error)?;
765 bucket.write_events_root(next_root.as_ref())?;
766 Ok(count)
767 }
768
769 fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
770 let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
771 let threshold = self.profile_index_overmute_threshold();
772
773 if latest_by_pubkey.is_empty() {
774 return Ok(());
775 }
776
777 let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
778 let mut search_root = self.profile_index.search_root()?;
779 let mut changed = false;
780
781 for event in latest_by_pubkey.into_values() {
782 let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
783 let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
784 self.profile_index.remove_profile_event(
785 by_pubkey_root.as_ref(),
786 search_root.as_ref(),
787 &event.pubkey.to_hex(),
788 )?
789 } else {
790 self.profile_index.update_profile_event(
791 by_pubkey_root.as_ref(),
792 search_root.as_ref(),
793 event,
794 self.follow_distance(&event.pubkey.to_bytes())?,
795 )?
796 };
797 if updated {
798 by_pubkey_root = next_by_pubkey_root;
799 search_root = next_search_root;
800 changed = true;
801 }
802 }
803
804 if changed {
805 self.profile_index
806 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
807 self.profile_index.write_search_root(search_root.as_ref())?;
808 }
809
810 Ok(())
811 }
812
813 fn filtered_latest_metadata_events_by_pubkey<'a>(
814 &self,
815 events: &'a [Event],
816 ) -> Result<BTreeMap<String, &'a Event>> {
817 let threshold = self.profile_index_overmute_threshold();
818 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
819 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
820 if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
821 continue;
822 }
823 let pubkey = event.pubkey.to_hex();
824 match latest_by_pubkey.get(&pubkey) {
825 Some(current) if compare_nostr_events(event, current).is_le() => {}
826 _ => {
827 latest_by_pubkey.insert(pubkey, event);
828 }
829 }
830 }
831 Ok(latest_by_pubkey)
832 }
833
834 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
835 let state = {
836 let graph = self.graph.lock().unwrap();
837 graph.export_state().context("export social graph state")?
838 };
839 let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
840 let root_hex = hex::encode(root);
841 if graph.get_root() != root_hex {
842 graph
843 .set_root(&root_hex)
844 .context("set snapshot social graph root")?;
845 }
846 let chunks = graph
847 .to_binary_chunks_with_budget(*options)
848 .context("encode social graph snapshot")?;
849 Ok(chunks.into_iter().map(Bytes::from).collect())
850 }
851
852 fn ingest_event(&self, event: &Event) -> Result<()> {
853 self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
854 }
855
856 fn ingest_events(&self, events: &[Event]) -> Result<()> {
857 if events.is_empty() {
858 return Ok(());
859 }
860
861 let mut public = Vec::new();
862 let mut ambient = Vec::new();
863 for event in events {
864 match self.default_storage_class_for(event)? {
865 EventStorageClass::Public => public.push(event.clone()),
866 EventStorageClass::Ambient => ambient.push(event.clone()),
867 }
868 }
869
870 if !public.is_empty() {
871 self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
872 }
873 if !ambient.is_empty() {
874 self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
875 }
876
877 Ok(())
878 }
879
880 fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
881 let graph_events = events
882 .iter()
883 .filter(|event| is_social_graph_event(event.kind))
884 .collect::<Vec<_>>();
885 if graph_events.is_empty() {
886 return Ok(());
887 }
888
889 {
890 let mut graph = self.graph.lock().unwrap();
891 let mut snapshot = SocialGraph::from_state(
892 graph
893 .export_state()
894 .context("export social graph state for graph-only ingest")?,
895 )
896 .context("rebuild social graph state for graph-only ingest")?;
897 for event in graph_events {
898 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
899 }
900 graph
901 .replace_state(&snapshot.export_state())
902 .context("replace graph-only social graph state")?;
903 }
904 self.invalidate_distance_cache();
905 Ok(())
906 }
907
908 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
909 self.query_events_in_scope(filter, limit, EventQueryScope::All)
910 }
911
912 fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
913 let graph = self.graph.lock().unwrap();
914 let root_hex = graph.get_root().context("read social graph root")?;
915 if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
916 return Ok(EventStorageClass::Public);
917 }
918 Ok(EventStorageClass::Ambient)
919 }
920
921 fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
922 match storage_class {
923 EventStorageClass::Public => &self.public_events,
924 EventStorageClass::Ambient => &self.ambient_events,
925 }
926 }
927
928 fn ingest_event_with_storage_class(
929 &self,
930 event: &Event,
931 storage_class: EventStorageClass,
932 ) -> Result<()> {
933 let current_root = self.bucket(storage_class).events_root_for_write()?;
934 let next_root = self
935 .bucket(storage_class)
936 .store_event(current_root.as_ref(), event)?;
937 self.bucket(storage_class)
938 .write_events_root(Some(&next_root))?;
939
940 self.update_profile_index_for_events(std::slice::from_ref(event))?;
941
942 if is_social_graph_event(event.kind) {
943 {
944 let mut graph = self.graph.lock().unwrap();
945 graph
946 .handle_event(&graph_event_from_nostr(event), true, 0.0)
947 .context("ingest social graph event into nostr-social-graph")?;
948 }
949 self.invalidate_distance_cache();
950 }
951
952 Ok(())
953 }
954
955 fn ingest_events_with_storage_class(
956 &self,
957 events: &[Event],
958 storage_class: EventStorageClass,
959 ) -> Result<()> {
960 if events.is_empty() {
961 return Ok(());
962 }
963
964 let bucket = self.bucket(storage_class);
965 let current_root = bucket.events_root_for_write()?;
966 let stored_events = events
967 .iter()
968 .map(stored_event_from_nostr)
969 .collect::<Vec<_>>();
970 let next_root = block_on(
971 bucket
972 .event_store
973 .build(current_root.as_ref(), stored_events),
974 )
975 .map_err(map_event_store_error)?;
976 bucket.write_events_root(next_root.as_ref())?;
977
978 self.update_profile_index_for_events(events)?;
979
980 let graph_events = events
981 .iter()
982 .filter(|event| is_social_graph_event(event.kind))
983 .collect::<Vec<_>>();
984 if graph_events.is_empty() {
985 return Ok(());
986 }
987
988 {
989 let mut graph = self.graph.lock().unwrap();
990 let mut snapshot = SocialGraph::from_state(
991 graph
992 .export_state()
993 .context("export social graph state for batch ingest")?,
994 )
995 .context("rebuild social graph state for batch ingest")?;
996 for event in graph_events {
997 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
998 }
999 graph
1000 .replace_state(&snapshot.export_state())
1001 .context("replace batched social graph state")?;
1002 }
1003 self.invalidate_distance_cache();
1004
1005 Ok(())
1006 }
1007
1008 pub(crate) fn query_events_in_scope(
1009 &self,
1010 filter: &Filter,
1011 limit: usize,
1012 scope: EventQueryScope,
1013 ) -> Result<Vec<Event>> {
1014 if limit == 0 {
1015 return Ok(Vec::new());
1016 }
1017
1018 let buckets: &[&EventIndexBucket] = match scope {
1019 EventQueryScope::PublicOnly => &[&self.public_events],
1020 EventQueryScope::AmbientOnly => &[&self.ambient_events],
1021 EventQueryScope::All => &[&self.public_events, &self.ambient_events],
1022 };
1023
1024 let mut candidates = Vec::new();
1025 for bucket in buckets {
1026 candidates.extend(bucket.query_events(filter, limit)?);
1027 }
1028
1029 let mut deduped = dedupe_events(candidates);
1030 deduped.retain(|event| filter.match_event(event));
1031 deduped.truncate(limit);
1032 Ok(deduped)
1033 }
1034}
1035
1036impl SocialGraphBackend for SocialGraphStore {
1037 fn stats(&self) -> Result<SocialGraphStats> {
1038 SocialGraphStore::stats(self)
1039 }
1040
1041 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1042 SocialGraphStore::users_by_follow_distance(self, distance)
1043 }
1044
1045 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1046 SocialGraphStore::follow_distance(self, pk_bytes)
1047 }
1048
1049 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1050 SocialGraphStore::follow_list_created_at(self, owner)
1051 }
1052
1053 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1054 SocialGraphStore::followed_targets(self, owner)
1055 }
1056
1057 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1058 SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1059 }
1060
1061 fn profile_search_root(&self) -> Result<Option<Cid>> {
1062 SocialGraphStore::profile_search_root(self)
1063 }
1064
1065 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1066 SocialGraphStore::snapshot_chunks(self, root, options)
1067 }
1068
1069 fn ingest_event(&self, event: &Event) -> Result<()> {
1070 SocialGraphStore::ingest_event(self, event)
1071 }
1072
1073 fn ingest_event_with_storage_class(
1074 &self,
1075 event: &Event,
1076 storage_class: EventStorageClass,
1077 ) -> Result<()> {
1078 SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1079 }
1080
1081 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1082 SocialGraphStore::ingest_events(self, events)
1083 }
1084
1085 fn ingest_events_with_storage_class(
1086 &self,
1087 events: &[Event],
1088 storage_class: EventStorageClass,
1089 ) -> Result<()> {
1090 SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1091 }
1092
1093 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1094 SocialGraphStore::apply_graph_events_only(self, events)
1095 }
1096
1097 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1098 SocialGraphStore::query_events(self, filter, limit)
1099 }
1100}
1101
1102impl NostrSocialGraphBackend for SocialGraphStore {
1103 type Error = UpstreamGraphBackendError;
1104
1105 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1106 let graph = self.graph.lock().unwrap();
1107 graph
1108 .get_root()
1109 .context("read social graph root")
1110 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1111 }
1112
1113 fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1114 let root_bytes =
1115 decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1116 SocialGraphStore::set_root(self, &root_bytes)
1117 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1118 }
1119
1120 fn handle_event(
1121 &mut self,
1122 event: &GraphEvent,
1123 allow_unknown_authors: bool,
1124 overmute_threshold: f64,
1125 ) -> std::result::Result<(), Self::Error> {
1126 {
1127 let mut graph = self.graph.lock().unwrap();
1128 graph
1129 .handle_event(event, allow_unknown_authors, overmute_threshold)
1130 .context("ingest social graph event into heed backend")
1131 .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1132 }
1133 self.invalidate_distance_cache();
1134 Ok(())
1135 }
1136
1137 fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1138 let graph = self.graph.lock().unwrap();
1139 graph
1140 .get_follow_distance(user)
1141 .context("read social graph follow distance")
1142 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1143 }
1144
1145 fn is_following(
1146 &self,
1147 follower: &str,
1148 followed_user: &str,
1149 ) -> std::result::Result<bool, Self::Error> {
1150 let graph = self.graph.lock().unwrap();
1151 graph
1152 .is_following(follower, followed_user)
1153 .context("read social graph following edge")
1154 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1155 }
1156
1157 fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1158 let graph = self.graph.lock().unwrap();
1159 graph
1160 .get_followed_by_user(user)
1161 .context("read followed-by-user list")
1162 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1163 }
1164
1165 fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1166 let graph = self.graph.lock().unwrap();
1167 graph
1168 .get_followers_by_user(user)
1169 .context("read followers-by-user list")
1170 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1171 }
1172
1173 fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1174 let graph = self.graph.lock().unwrap();
1175 graph
1176 .get_muted_by_user(user)
1177 .context("read muted-by-user list")
1178 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1179 }
1180
1181 fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1182 let graph = self.graph.lock().unwrap();
1183 graph
1184 .get_user_muted_by(user)
1185 .context("read user-muted-by list")
1186 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1187 }
1188
1189 fn get_follow_list_created_at(
1190 &self,
1191 user: &str,
1192 ) -> std::result::Result<Option<u64>, Self::Error> {
1193 let graph = self.graph.lock().unwrap();
1194 graph
1195 .get_follow_list_created_at(user)
1196 .context("read social graph follow list timestamp")
1197 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1198 }
1199
1200 fn get_mute_list_created_at(
1201 &self,
1202 user: &str,
1203 ) -> std::result::Result<Option<u64>, Self::Error> {
1204 let graph = self.graph.lock().unwrap();
1205 graph
1206 .get_mute_list_created_at(user)
1207 .context("read social graph mute list timestamp")
1208 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1209 }
1210
1211 fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1212 let graph = self.graph.lock().unwrap();
1213 graph
1214 .is_overmuted(user, threshold)
1215 .context("check social graph overmute")
1216 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1217 }
1218}
1219
1220impl<T> SocialGraphBackend for Arc<T>
1221where
1222 T: SocialGraphBackend + ?Sized,
1223{
1224 fn stats(&self) -> Result<SocialGraphStats> {
1225 self.as_ref().stats()
1226 }
1227
1228 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1229 self.as_ref().users_by_follow_distance(distance)
1230 }
1231
1232 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1233 self.as_ref().follow_distance(pk_bytes)
1234 }
1235
1236 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1237 self.as_ref().follow_list_created_at(owner)
1238 }
1239
1240 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1241 self.as_ref().followed_targets(owner)
1242 }
1243
1244 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1245 self.as_ref().is_overmuted_user(user_pk, threshold)
1246 }
1247
1248 fn profile_search_root(&self) -> Result<Option<Cid>> {
1249 self.as_ref().profile_search_root()
1250 }
1251
1252 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1253 self.as_ref().snapshot_chunks(root, options)
1254 }
1255
1256 fn ingest_event(&self, event: &Event) -> Result<()> {
1257 self.as_ref().ingest_event(event)
1258 }
1259
1260 fn ingest_event_with_storage_class(
1261 &self,
1262 event: &Event,
1263 storage_class: EventStorageClass,
1264 ) -> Result<()> {
1265 self.as_ref()
1266 .ingest_event_with_storage_class(event, storage_class)
1267 }
1268
1269 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1270 self.as_ref().ingest_events(events)
1271 }
1272
1273 fn ingest_events_with_storage_class(
1274 &self,
1275 events: &[Event],
1276 storage_class: EventStorageClass,
1277 ) -> Result<()> {
1278 self.as_ref()
1279 .ingest_events_with_storage_class(events, storage_class)
1280 }
1281
1282 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1283 self.as_ref().ingest_graph_events(events)
1284 }
1285
1286 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1287 self.as_ref().query_events(filter, limit)
1288 }
1289}
1290
1291fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1292 if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1293 return Ok(false);
1294 }
1295
1296 let GraphStats {
1297 users,
1298 follows,
1299 mutes,
1300 ..
1301 } = graph.size().context("size social graph")?;
1302 Ok(users <= 1 && follows == 0 && mutes == 0)
1303}
1304
1305fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1306 let mut set = UserSet::new();
1307 for value in values {
1308 set.insert(decode_pubkey(&value)?);
1309 }
1310 Ok(set)
1311}
1312
1313fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1314 let mut bytes = [0u8; 32];
1315 hex::decode_to_slice(value, &mut bytes)
1316 .with_context(|| format!("decode social graph pubkey {value}"))?;
1317 Ok(bytes)
1318}
1319
1320fn is_social_graph_event(kind: Kind) -> bool {
1321 kind == Kind::ContactList || kind == Kind::MuteList
1322}
1323
1324fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1325 GraphEvent {
1326 created_at: event.created_at.as_u64(),
1327 content: event.content.clone(),
1328 tags: event
1329 .tags
1330 .iter()
1331 .map(|tag| tag.as_slice().to_vec())
1332 .collect(),
1333 kind: event.kind.as_u16() as u32,
1334 pubkey: event.pubkey.to_hex(),
1335 id: event.id.to_hex(),
1336 sig: event.sig.to_string(),
1337 }
1338}
1339
1340fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1341 StoredNostrEvent {
1342 id: event.id.to_hex(),
1343 pubkey: event.pubkey.to_hex(),
1344 created_at: event.created_at.as_u64(),
1345 kind: event.kind.as_u16() as u32,
1346 tags: event
1347 .tags
1348 .iter()
1349 .map(|tag| tag.as_slice().to_vec())
1350 .collect(),
1351 content: event.content.clone(),
1352 sig: event.sig.to_string(),
1353 }
1354}
1355
1356fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1357 let value = serde_json::json!({
1358 "id": event.id,
1359 "pubkey": event.pubkey,
1360 "created_at": event.created_at,
1361 "kind": event.kind,
1362 "tags": event.tags,
1363 "content": event.content,
1364 "sig": event.sig,
1365 });
1366 Event::from_json(value.to_string()).context("decode stored nostr event")
1367}
1368
1369pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1370 nostr_event_from_stored(event)
1371}
1372
1373fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1374 rmp_serde::to_vec_named(&StoredCid {
1375 hash: cid.hash,
1376 key: cid.key,
1377 })
1378 .context("encode social graph events root")
1379}
1380
1381fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1382 let stored: StoredCid =
1383 rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1384 Ok(Some(Cid {
1385 hash: stored.hash,
1386 key: stored.key,
1387 }))
1388}
1389
1390fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1391 let Ok(bytes) = std::fs::read(path) else {
1392 return Ok(None);
1393 };
1394 decode_cid(&bytes)
1395}
1396
1397fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1398 let Some(root) = root else {
1399 if path.exists() {
1400 std::fs::remove_file(path)?;
1401 }
1402 return Ok(());
1403 };
1404
1405 let encoded = encode_cid(root)?;
1406 let tmp_path = path.with_extension("tmp");
1407 std::fs::write(&tmp_path, encoded)?;
1408 std::fs::rename(tmp_path, path)?;
1409 Ok(())
1410}
1411
1412fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1413 let raw = value.as_str()?;
1414 let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1415 if trimmed.is_empty() {
1416 return None;
1417 }
1418 Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1419}
1420
1421fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1422 let mut names = Vec::new();
1423 let mut seen = HashSet::new();
1424
1425 for key in ["display_name", "displayName", "name", "username"] {
1426 let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1427 continue;
1428 };
1429 let lowered = value.to_lowercase();
1430 if seen.insert(lowered) {
1431 names.push(value);
1432 }
1433 }
1434
1435 names
1436}
1437
1438fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1439 if local_part.len() == 1 || local_part.starts_with("npub1") {
1440 return true;
1441 }
1442
1443 primary_name
1444 .to_lowercase()
1445 .split_whitespace()
1446 .collect::<String>()
1447 .contains(local_part)
1448}
1449
1450fn normalize_profile_nip05(
1451 profile: &serde_json::Map<String, serde_json::Value>,
1452 primary_name: Option<&str>,
1453) -> Option<String> {
1454 let raw = profile.get("nip05")?.as_str()?;
1455 let local_part = raw.split('@').next()?.trim().to_lowercase();
1456 if local_part.is_empty() {
1457 return None;
1458 }
1459 let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1460 if truncated.is_empty() {
1461 return None;
1462 }
1463 if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1464 return None;
1465 }
1466 Some(truncated)
1467}
1468
1469fn is_search_stop_word(word: &str) -> bool {
1470 matches!(
1471 word,
1472 "a" | "an"
1473 | "the"
1474 | "and"
1475 | "or"
1476 | "but"
1477 | "in"
1478 | "on"
1479 | "at"
1480 | "to"
1481 | "for"
1482 | "of"
1483 | "with"
1484 | "by"
1485 | "from"
1486 | "is"
1487 | "it"
1488 | "as"
1489 | "be"
1490 | "was"
1491 | "are"
1492 | "this"
1493 | "that"
1494 | "these"
1495 | "those"
1496 | "i"
1497 | "you"
1498 | "he"
1499 | "she"
1500 | "we"
1501 | "they"
1502 | "my"
1503 | "your"
1504 | "his"
1505 | "her"
1506 | "its"
1507 | "our"
1508 | "their"
1509 | "what"
1510 | "which"
1511 | "who"
1512 | "whom"
1513 | "how"
1514 | "when"
1515 | "where"
1516 | "why"
1517 | "will"
1518 | "would"
1519 | "could"
1520 | "should"
1521 | "can"
1522 | "may"
1523 | "might"
1524 | "must"
1525 | "have"
1526 | "has"
1527 | "had"
1528 | "do"
1529 | "does"
1530 | "did"
1531 | "been"
1532 | "being"
1533 | "get"
1534 | "got"
1535 | "just"
1536 | "now"
1537 | "then"
1538 | "so"
1539 | "if"
1540 | "not"
1541 | "no"
1542 | "yes"
1543 | "all"
1544 | "any"
1545 | "some"
1546 | "more"
1547 | "most"
1548 | "other"
1549 | "into"
1550 | "over"
1551 | "after"
1552 | "before"
1553 | "about"
1554 | "up"
1555 | "down"
1556 | "out"
1557 | "off"
1558 | "through"
1559 | "during"
1560 | "under"
1561 | "again"
1562 | "further"
1563 | "once"
1564 )
1565}
1566
1567fn is_pure_search_number(word: &str) -> bool {
1568 if !word.chars().all(|ch| ch.is_ascii_digit()) {
1569 return false;
1570 }
1571 !(word.len() == 4
1572 && word
1573 .parse::<u16>()
1574 .is_ok_and(|year| (1900..=2099).contains(&year)))
1575}
1576
1577fn split_compound_search_word(word: &str) -> Vec<String> {
1578 let mut parts = Vec::new();
1579 let mut current = String::new();
1580 let chars: Vec<char> = word.chars().collect();
1581
1582 for (index, ch) in chars.iter().copied().enumerate() {
1583 let split_before = current.chars().last().is_some_and(|prev| {
1584 (prev.is_lowercase() && ch.is_uppercase())
1585 || (prev.is_ascii_digit() && ch.is_alphabetic())
1586 || (prev.is_alphabetic() && ch.is_ascii_digit())
1587 || (prev.is_uppercase()
1588 && ch.is_uppercase()
1589 && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
1590 });
1591
1592 if split_before && !current.is_empty() {
1593 parts.push(std::mem::take(&mut current));
1594 }
1595
1596 current.push(ch);
1597 }
1598
1599 if !current.is_empty() {
1600 parts.push(current);
1601 }
1602
1603 parts
1604}
1605
1606fn parse_search_keywords(text: &str) -> Vec<String> {
1607 let mut keywords = Vec::new();
1608 let mut seen = HashSet::new();
1609
1610 for word in text
1611 .split(|ch: char| !ch.is_alphanumeric())
1612 .filter(|word| !word.is_empty())
1613 {
1614 let mut variants = Vec::with_capacity(1 + word.len() / 4);
1615 variants.push(word.to_lowercase());
1616 variants.extend(
1617 split_compound_search_word(word)
1618 .into_iter()
1619 .map(|part| part.to_lowercase()),
1620 );
1621
1622 for lowered in variants {
1623 if lowered.chars().count() < 2
1624 || is_search_stop_word(&lowered)
1625 || is_pure_search_number(&lowered)
1626 {
1627 continue;
1628 }
1629 if seen.insert(lowered.clone()) {
1630 keywords.push(lowered);
1631 }
1632 }
1633 }
1634
1635 keywords
1636}
1637
1638fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
1639 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1640 Ok(serde_json::Value::Object(profile)) => profile,
1641 _ => serde_json::Map::new(),
1642 };
1643 let names = extract_profile_names(&profile);
1644 let primary_name = names.first().map(String::as_str);
1645 let mut parts = Vec::new();
1646 if let Some(name) = primary_name {
1647 parts.push(name.to_string());
1648 }
1649 if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
1650 parts.push(nip05);
1651 }
1652 parts.push(event.pubkey.to_hex());
1653 if names.len() > 1 {
1654 parts.extend(names.into_iter().skip(1));
1655 }
1656 parse_search_keywords(&parts.join(" "))
1657}
1658
1659fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
1660 left.created_at
1661 .as_u64()
1662 .cmp(&right.created_at.as_u64())
1663 .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
1664}
1665
1666fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
1667 anyhow::anyhow!("nostr event store error: {err}")
1668}
1669
1670fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
1671 let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
1672 let page_size = page_size_bytes() as u64;
1673 let rounded = requested
1674 .checked_add(page_size.saturating_sub(1))
1675 .map(|size| size / page_size * page_size)
1676 .unwrap_or(requested);
1677 let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
1678
1679 let env = unsafe {
1680 heed::EnvOpenOptions::new()
1681 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
1682 .max_dbs(SOCIALGRAPH_MAX_DBS)
1683 .open(db_dir)
1684 }
1685 .context("open social graph LMDB env for resize")?;
1686 if env.info().map_size < map_size {
1687 unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
1688 }
1689
1690 Ok(())
1691}
1692
1693fn page_size_bytes() -> usize {
1694 page_size::get_granularity()
1695}
1696
1697#[cfg(test)]
1698mod tests;