1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10 LocalListFileState, LocalListSyncOutcome,
11};
12
13mod index_buckets;
14
15use index_buckets::{
16 dedupe_events, latest_metadata_events_by_pubkey, EventIndexBucket, ProfileIndexBucket,
17};
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, Mutex as StdMutex};
22
23use anyhow::{Context, Result};
24use bytes::Bytes;
25use futures::executor::block_on;
26use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
27use hashtree_index::BTree;
28use hashtree_nostr::{
29 is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
30 NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
31};
32#[cfg(test)]
33use hashtree_nostr::{
34 reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
35 take_profile as take_nostr_profile,
36};
37use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
38use nostr_social_graph::{
39 BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
40 SocialGraphBackend as NostrSocialGraphBackend,
41};
42use nostr_social_graph_heed::HeedSocialGraph;
43
44use crate::storage::{LocalStore, StorageRouter};
45
46#[cfg(test)]
47use std::sync::{Mutex, MutexGuard, OnceLock};
48#[cfg(test)]
49use std::time::Instant;
50
51pub type UserSet = BTreeSet<[u8; 32]>;
52
53const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
54const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
55const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
56const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
57const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
58const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
59const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
60const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
61const SOCIALGRAPH_MAX_DBS: u32 = 16;
62const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
63const PROFILE_SEARCH_PREFIX: &str = "p:";
64const PROFILE_NAME_MAX_LENGTH: usize = 100;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum EventStorageClass {
68 Public,
69 Ambient,
70}
71
72#[cfg_attr(not(test), allow(dead_code))]
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub(crate) enum EventQueryScope {
75 PublicOnly,
76 AmbientOnly,
77 All,
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81struct StoredCid {
82 hash: [u8; 32],
83 key: Option<[u8; 32]>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
87pub struct StoredProfileSearchEntry {
88 pub pubkey: String,
89 pub name: String,
90 #[serde(default)]
91 pub aliases: Vec<String>,
92 #[serde(default)]
93 pub nip05: Option<String>,
94 #[serde(default, skip_serializing_if = "Option::is_none")]
95 pub follow_distance: Option<u32>,
96 pub created_at: u64,
97 pub event_nhash: String,
98}
99
100#[derive(Debug, Clone, Default, serde::Serialize)]
101pub struct SocialGraphStats {
102 pub total_users: usize,
103 pub root: Option<String>,
104 pub total_follows: usize,
105 pub max_depth: u32,
106 pub size_by_distance: BTreeMap<u32, usize>,
107 pub enabled: bool,
108}
109
110#[derive(Debug, Clone)]
111struct DistanceCache {
112 stats: SocialGraphStats,
113 users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
114}
115
116#[derive(Debug, thiserror::Error)]
117#[error("{0}")]
118pub struct UpstreamGraphBackendError(String);
119
120pub struct SocialGraphStore {
121 graph: StdMutex<HeedSocialGraph>,
122 distance_cache: StdMutex<Option<DistanceCache>>,
123 public_events: EventIndexBucket,
124 ambient_events: EventIndexBucket,
125 profile_index: ProfileIndexBucket,
126 profile_index_overmute_threshold: StdMutex<f64>,
127}
128
129pub trait SocialGraphBackend: Send + Sync {
130 fn stats(&self) -> Result<SocialGraphStats>;
131 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
132 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
133 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
134 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
135 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
136 fn profile_search_root(&self) -> Result<Option<Cid>> {
137 Ok(None)
138 }
139 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
140 fn ingest_event(&self, event: &Event) -> Result<()>;
141 fn ingest_event_with_storage_class(
142 &self,
143 event: &Event,
144 storage_class: EventStorageClass,
145 ) -> Result<()> {
146 let _ = storage_class;
147 self.ingest_event(event)
148 }
149 fn ingest_events(&self, events: &[Event]) -> Result<()> {
150 for event in events {
151 self.ingest_event(event)?;
152 }
153 Ok(())
154 }
155 fn ingest_events_with_storage_class(
156 &self,
157 events: &[Event],
158 storage_class: EventStorageClass,
159 ) -> Result<()> {
160 for event in events {
161 self.ingest_event_with_storage_class(event, storage_class)?;
162 }
163 Ok(())
164 }
165 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
166 self.ingest_events(events)
167 }
168 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
169}
170
171#[cfg(test)]
172pub type TestLockGuard = MutexGuard<'static, ()>;
173
174#[cfg(test)]
175static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
176
177#[cfg(test)]
178pub fn test_lock() -> TestLockGuard {
179 NDB_TEST_LOCK
180 .get_or_init(|| Mutex::new(()))
181 .lock()
182 .unwrap_or_else(|err| err.into_inner())
183}
184
185pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
186 open_social_graph_store_with_mapsize(data_dir, None)
187}
188
189pub fn open_social_graph_store_with_mapsize(
190 data_dir: &Path,
191 mapsize_bytes: Option<u64>,
192) -> Result<Arc<SocialGraphStore>> {
193 let db_dir = data_dir.join("socialgraph");
194 open_social_graph_store_at_path(&db_dir, mapsize_bytes)
195}
196
197pub fn open_social_graph_store_with_storage(
198 data_dir: &Path,
199 store: Arc<StorageRouter>,
200 mapsize_bytes: Option<u64>,
201) -> Result<Arc<SocialGraphStore>> {
202 let db_dir = data_dir.join("socialgraph");
203 open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
204}
205
206pub fn open_social_graph_store_at_path(
207 db_dir: &Path,
208 mapsize_bytes: Option<u64>,
209) -> Result<Arc<SocialGraphStore>> {
210 let config = hashtree_config::Config::load_or_default();
211 let backend = &config.storage.backend;
212 let local_store = Arc::new(
213 LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
214 .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
215 );
216 let store = Arc::new(StorageRouter::new(local_store));
217 open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
218}
219
220pub fn open_social_graph_store_at_path_with_storage(
221 db_dir: &Path,
222 store: Arc<StorageRouter>,
223 mapsize_bytes: Option<u64>,
224) -> Result<Arc<SocialGraphStore>> {
225 let ambient_backend = store.local_store().backend();
226 let ambient_local = Arc::new(
227 LocalStore::new_with_lmdb_map_size(
228 db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
229 &ambient_backend,
230 mapsize_bytes,
231 )
232 .map_err(|err| {
233 anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
234 })?,
235 );
236 let ambient_store = Arc::new(StorageRouter::new(ambient_local));
237 open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
238}
239
240pub fn open_social_graph_store_at_path_with_storage_split(
241 db_dir: &Path,
242 public_store: Arc<StorageRouter>,
243 ambient_store: Arc<StorageRouter>,
244 mapsize_bytes: Option<u64>,
245) -> Result<Arc<SocialGraphStore>> {
246 std::fs::create_dir_all(db_dir)?;
247 if let Some(size) = mapsize_bytes {
248 ensure_social_graph_mapsize(db_dir, size)?;
249 }
250 let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
251 .context("open nostr-social-graph heed backend")?;
252
253 Ok(Arc::new(SocialGraphStore {
254 graph: StdMutex::new(graph),
255 distance_cache: StdMutex::new(None),
256 public_events: EventIndexBucket {
257 event_store: NostrEventStore::new(Arc::clone(&public_store)),
258 root_path: db_dir.join(EVENTS_ROOT_FILE),
259 },
260 ambient_events: EventIndexBucket {
261 event_store: NostrEventStore::new(ambient_store),
262 root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
263 },
264 profile_index: ProfileIndexBucket {
265 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
266 index: BTree::new(
267 public_store,
268 hashtree_index::BTreeOptions {
269 order: Some(PROFILE_SEARCH_INDEX_ORDER),
270 },
271 ),
272 by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
273 search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
274 },
275 profile_index_overmute_threshold: StdMutex::new(1.0),
276 }))
277}
278
279pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
280 if let Err(err) = store.set_root(pk_bytes) {
281 tracing::warn!("Failed to set social graph root: {err}");
282 }
283}
284
285pub fn get_follow_distance(
286 backend: &(impl SocialGraphBackend + ?Sized),
287 pk_bytes: &[u8; 32],
288) -> Option<u32> {
289 backend.follow_distance(pk_bytes).ok().flatten()
290}
291
292pub fn get_follows(
293 backend: &(impl SocialGraphBackend + ?Sized),
294 pk_bytes: &[u8; 32],
295) -> Vec<[u8; 32]> {
296 match backend.followed_targets(pk_bytes) {
297 Ok(set) => set.into_iter().collect(),
298 Err(_) => Vec::new(),
299 }
300}
301
302pub fn is_overmuted(
303 backend: &(impl SocialGraphBackend + ?Sized),
304 _root_pk: &[u8; 32],
305 user_pk: &[u8; 32],
306 threshold: f64,
307) -> bool {
308 backend
309 .is_overmuted_user(user_pk, threshold)
310 .unwrap_or(false)
311}
312
313pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
314 let event = match Event::from_json(event_json) {
315 Ok(event) => event,
316 Err(_) => return,
317 };
318
319 if let Err(err) = backend.ingest_event(&event) {
320 tracing::warn!("Failed to ingest social graph event: {err}");
321 }
322}
323
324pub fn ingest_parsed_event(
325 backend: &(impl SocialGraphBackend + ?Sized),
326 event: &Event,
327) -> Result<()> {
328 backend.ingest_event(event)
329}
330
331pub fn ingest_parsed_event_with_storage_class(
332 backend: &(impl SocialGraphBackend + ?Sized),
333 event: &Event,
334 storage_class: EventStorageClass,
335) -> Result<()> {
336 backend.ingest_event_with_storage_class(event, storage_class)
337}
338
339pub fn ingest_parsed_events(
340 backend: &(impl SocialGraphBackend + ?Sized),
341 events: &[Event],
342) -> Result<()> {
343 backend.ingest_events(events)
344}
345
346pub fn ingest_parsed_events_with_storage_class(
347 backend: &(impl SocialGraphBackend + ?Sized),
348 events: &[Event],
349 storage_class: EventStorageClass,
350) -> Result<()> {
351 backend.ingest_events_with_storage_class(events, storage_class)
352}
353
354pub fn ingest_graph_parsed_events(
355 backend: &(impl SocialGraphBackend + ?Sized),
356 events: &[Event],
357) -> Result<()> {
358 backend.ingest_graph_events(events)
359}
360
361pub fn query_events(
362 backend: &(impl SocialGraphBackend + ?Sized),
363 filter: &Filter,
364 limit: usize,
365) -> Vec<Event> {
366 backend.query_events(filter, limit).unwrap_or_default()
367}
368
369impl SocialGraphStore {
370 pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
371 *self
372 .profile_index_overmute_threshold
373 .lock()
374 .expect("profile index overmute threshold") = threshold;
375 }
376
377 fn profile_index_overmute_threshold(&self) -> f64 {
378 *self
379 .profile_index_overmute_threshold
380 .lock()
381 .expect("profile index overmute threshold")
382 }
383
384 fn invalidate_distance_cache(&self) {
385 *self.distance_cache.lock().unwrap() = None;
386 }
387
388 fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
389 let unique_ids = state
390 .unique_ids
391 .into_iter()
392 .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
393 .collect::<Result<HashMap<_, _>>>()?;
394
395 let mut users_by_distance = BTreeMap::new();
396 let mut size_by_distance = BTreeMap::new();
397 for (distance, users) in state.users_by_follow_distance {
398 let decoded = users
399 .into_iter()
400 .filter_map(|id| unique_ids.get(&id).copied())
401 .collect::<Vec<_>>();
402 size_by_distance.insert(distance, decoded.len());
403 users_by_distance.insert(distance, decoded);
404 }
405
406 let total_follows = state
407 .followed_by_user
408 .iter()
409 .map(|(_, targets)| targets.len())
410 .sum::<usize>();
411 let total_users = size_by_distance.values().copied().sum();
412 let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
413
414 Ok(DistanceCache {
415 stats: SocialGraphStats {
416 total_users,
417 root: Some(state.root),
418 total_follows,
419 max_depth,
420 size_by_distance,
421 enabled: true,
422 },
423 users_by_distance,
424 })
425 }
426
427 fn load_distance_cache(&self) -> Result<DistanceCache> {
428 if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
429 return Ok(cache);
430 }
431
432 let state = {
433 let graph = self.graph.lock().unwrap();
434 graph.export_state().context("export social graph state")?
435 };
436 let cache = Self::build_distance_cache(state)?;
437 *self.distance_cache.lock().unwrap() = Some(cache.clone());
438 Ok(cache)
439 }
440
441 fn set_root(&self, root: &[u8; 32]) -> Result<()> {
442 let root_hex = hex::encode(root);
443 {
444 let mut graph = self.graph.lock().unwrap();
445 if should_replace_placeholder_root(&graph)? {
446 let fresh = SocialGraph::new(&root_hex);
447 graph
448 .replace_state(&fresh.export_state())
449 .context("replace placeholder social graph root")?;
450 } else {
451 graph
452 .set_root(&root_hex)
453 .context("set nostr-social-graph root")?;
454 }
455 }
456 self.invalidate_distance_cache();
457 Ok(())
458 }
459
460 fn stats(&self) -> Result<SocialGraphStats> {
461 Ok(self.load_distance_cache()?.stats)
462 }
463
464 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
465 let graph = self.graph.lock().unwrap();
466 let distance = graph
467 .get_follow_distance(&hex::encode(pk_bytes))
468 .context("read social graph follow distance")?;
469 Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
470 }
471
472 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
473 Ok(self
474 .load_distance_cache()?
475 .users_by_distance
476 .get(&distance)
477 .cloned()
478 .unwrap_or_default())
479 }
480
481 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
482 let graph = self.graph.lock().unwrap();
483 graph
484 .get_follow_list_created_at(&hex::encode(owner))
485 .context("read social graph follow list timestamp")
486 }
487
488 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
489 let graph = self.graph.lock().unwrap();
490 decode_pubkey_set(
491 graph
492 .get_followed_by_user(&hex::encode(owner))
493 .context("read followed targets")?,
494 )
495 }
496
497 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
498 if threshold <= 0.0 {
499 return Ok(false);
500 }
501 let graph = self.graph.lock().unwrap();
502 graph
503 .is_overmuted(&hex::encode(user_pk), threshold)
504 .context("check social graph overmute")
505 }
506
507 #[cfg_attr(not(test), allow(dead_code))]
508 pub fn profile_search_root(&self) -> Result<Option<Cid>> {
509 self.profile_index.search_root()
510 }
511
512 #[cfg_attr(not(test), allow(dead_code))]
513 pub fn profiles_by_pubkey_root(&self) -> Result<Option<Cid>> {
514 self.profile_index.by_pubkey_root()
515 }
516
517 pub fn public_events_root(&self) -> Result<Option<Cid>> {
518 self.public_events.events_root()
519 }
520
521 #[cfg_attr(test, allow(dead_code))]
522 pub(crate) fn public_events_root_for_write(&self) -> Result<Option<Cid>> {
523 self.public_events.events_root_for_write()
524 }
525
526 pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
527 self.public_events.write_events_root(root)
528 }
529
530 #[cfg_attr(not(test), allow(dead_code))]
531 pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
532 self.profile_index.profile_event_for_pubkey(pubkey_hex)
533 }
534
535 #[cfg_attr(not(test), allow(dead_code))]
536 pub fn profile_search_entries_for_prefix(
537 &self,
538 prefix: &str,
539 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
540 self.profile_index.search_entries_for_prefix(prefix)
541 }
542
543 pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
544 self.update_profile_index_for_events(events)
545 }
546
547 pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
548 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
549 let (by_pubkey_root, search_root) = self
550 .profile_index
551 .rebuild_profile_events_with_distances(latest_by_pubkey.into_values(), |event| {
552 self.follow_distance(&event.pubkey.to_bytes())
553 })?;
554 self.profile_index
555 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
556 self.profile_index.write_search_root(search_root.as_ref())?;
557 Ok(())
558 }
559
560 pub(crate) async fn rebuild_profile_index_for_events_async(
561 &self,
562 events: &[Event],
563 ) -> Result<()> {
564 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
565 let (by_pubkey_root, search_root) = self
566 .profile_index
567 .rebuild_profile_events_async_with_distances(latest_by_pubkey.into_values(), |event| {
568 self.follow_distance(&event.pubkey.to_bytes())
569 })
570 .await?;
571 self.profile_index
572 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
573 self.profile_index.write_search_root(search_root.as_ref())?;
574 Ok(())
575 }
576
577 pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
578 let public_events_root = self.public_events.events_root()?;
579 let ambient_events_root = self.ambient_events.events_root()?;
580 if public_events_root.is_none() && ambient_events_root.is_none() {
581 self.profile_index.write_by_pubkey_root(None)?;
582 self.profile_index.write_search_root(None)?;
583 return Ok(0);
584 }
585
586 let mut events = Vec::new();
587 for (bucket, root) in [
588 (&self.public_events, public_events_root),
589 (&self.ambient_events, ambient_events_root),
590 ] {
591 let Some(root) = root else {
592 continue;
593 };
594 let stored = block_on(bucket.event_store.list_by_kind_lossy(
595 Some(&root),
596 Kind::Metadata.as_u16() as u32,
597 ListEventsOptions::default(),
598 ))
599 .map_err(map_event_store_error)?;
600 events.extend(
601 stored
602 .into_iter()
603 .map(nostr_event_from_stored)
604 .collect::<Result<Vec<_>>>()?,
605 );
606 }
607
608 let latest_count = self
609 .filtered_latest_metadata_events_by_pubkey(&events)?
610 .len();
611 self.rebuild_profile_index_for_events(&events)?;
612 Ok(latest_count)
613 }
614
615 pub async fn rebuild_profile_index_from_stored_events_async(&self) -> Result<usize> {
616 let public_events_root = self.public_events.events_root()?;
617 let ambient_events_root = self.ambient_events.events_root()?;
618 if public_events_root.is_none() && ambient_events_root.is_none() {
619 self.profile_index.write_by_pubkey_root(None)?;
620 self.profile_index.write_search_root(None)?;
621 return Ok(0);
622 }
623
624 let mut events = Vec::new();
625 for (bucket, root) in [
626 (&self.public_events, public_events_root),
627 (&self.ambient_events, ambient_events_root),
628 ] {
629 let Some(root) = root else {
630 continue;
631 };
632 let stored = bucket
633 .event_store
634 .list_by_kind_lossy(
635 Some(&root),
636 Kind::Metadata.as_u16() as u32,
637 ListEventsOptions::default(),
638 )
639 .await
640 .map_err(map_event_store_error)?;
641 events.extend(
642 stored
643 .into_iter()
644 .map(nostr_event_from_stored)
645 .collect::<Result<Vec<_>>>()?,
646 );
647 }
648
649 let latest_count = self
650 .filtered_latest_metadata_events_by_pubkey(&events)?
651 .len();
652 self.rebuild_profile_index_for_events_async(&events).await?;
653 Ok(latest_count)
654 }
655
656 pub fn rebuild_event_indexes_from_stored_events(&self) -> Result<(usize, usize)> {
657 let public_count =
658 self.rebuild_event_index_bucket_from_stored_events(&self.public_events)?;
659 let ambient_count =
660 self.rebuild_event_index_bucket_from_stored_events(&self.ambient_events)?;
661 self.rebuild_profile_index_from_stored_events()?;
662 Ok((public_count, ambient_count))
663 }
664
665 pub async fn rebuild_event_indexes_from_stored_events_async(&self) -> Result<(usize, usize)> {
666 let public_count = self
667 .rebuild_event_index_bucket_from_stored_events_async(&self.public_events)
668 .await?;
669 let ambient_count = self
670 .rebuild_event_index_bucket_from_stored_events_async(&self.ambient_events)
671 .await?;
672 self.rebuild_profile_index_from_stored_events_async()
673 .await?;
674 Ok((public_count, ambient_count))
675 }
676
677 fn rebuild_event_index_bucket_from_stored_events(
678 &self,
679 bucket: &EventIndexBucket,
680 ) -> Result<usize> {
681 let Some(root) = bucket.events_root()? else {
682 bucket.write_events_root(None)?;
683 return Ok(0);
684 };
685
686 let manifest = match block_on(bucket.event_store.get_manifest(Some(&root))) {
687 Ok(manifest) => manifest,
688 Err(err) => {
689 tracing::warn!(
690 "Clearing invalid social graph event index root {} before rebuild: {}",
691 hex::encode(root.hash),
692 err
693 );
694 bucket.write_events_root(None)?;
695 return Ok(0);
696 }
697 };
698 if manifest.by_kind_time_author.is_none() {
699 let next_root = block_on(bucket.event_store.upgrade_manifest_indexes(Some(&root)))
700 .map_err(map_event_store_error)?;
701 if next_root.as_ref() != Some(&root) {
702 bucket.write_events_root(next_root.as_ref())?;
703 return Ok(0);
704 }
705 }
706
707 let stored = block_on(
708 bucket
709 .event_store
710 .list_recent_lossy(Some(&root), ListEventsOptions::default()),
711 )
712 .map_err(map_event_store_error)?;
713 let count = stored.len();
714 let next_root =
715 block_on(bucket.event_store.build(None, stored)).map_err(map_event_store_error)?;
716 bucket.write_events_root(next_root.as_ref())?;
717 Ok(count)
718 }
719
720 async fn rebuild_event_index_bucket_from_stored_events_async(
721 &self,
722 bucket: &EventIndexBucket,
723 ) -> Result<usize> {
724 let Some(root) = bucket.events_root()? else {
725 bucket.write_events_root(None)?;
726 return Ok(0);
727 };
728
729 let manifest = match bucket.event_store.get_manifest(Some(&root)).await {
730 Ok(manifest) => manifest,
731 Err(err) => {
732 tracing::warn!(
733 "Clearing invalid social graph event index root {} before rebuild: {}",
734 hex::encode(root.hash),
735 err
736 );
737 bucket.write_events_root(None)?;
738 return Ok(0);
739 }
740 };
741 if manifest.by_kind_time_author.is_none() {
742 let next_root = bucket
743 .event_store
744 .upgrade_manifest_indexes(Some(&root))
745 .await
746 .map_err(map_event_store_error)?;
747 if next_root.as_ref() != Some(&root) {
748 bucket.write_events_root(next_root.as_ref())?;
749 return Ok(0);
750 }
751 }
752
753 let stored = bucket
754 .event_store
755 .list_recent_lossy(Some(&root), ListEventsOptions::default())
756 .await
757 .map_err(map_event_store_error)?;
758 let count = stored.len();
759 let next_root = bucket
760 .event_store
761 .build(None, stored)
762 .await
763 .map_err(map_event_store_error)?;
764 bucket.write_events_root(next_root.as_ref())?;
765 Ok(count)
766 }
767
768 fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
769 let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
770 let threshold = self.profile_index_overmute_threshold();
771
772 if latest_by_pubkey.is_empty() {
773 return Ok(());
774 }
775
776 let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
777 let mut search_root = self.profile_index.search_root()?;
778 let mut changed = false;
779
780 for event in latest_by_pubkey.into_values() {
781 let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
782 let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
783 self.profile_index.remove_profile_event(
784 by_pubkey_root.as_ref(),
785 search_root.as_ref(),
786 &event.pubkey.to_hex(),
787 )?
788 } else {
789 self.profile_index.update_profile_event(
790 by_pubkey_root.as_ref(),
791 search_root.as_ref(),
792 event,
793 self.follow_distance(&event.pubkey.to_bytes())?,
794 )?
795 };
796 if updated {
797 by_pubkey_root = next_by_pubkey_root;
798 search_root = next_search_root;
799 changed = true;
800 }
801 }
802
803 if changed {
804 self.profile_index
805 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
806 self.profile_index.write_search_root(search_root.as_ref())?;
807 }
808
809 Ok(())
810 }
811
812 fn filtered_latest_metadata_events_by_pubkey<'a>(
813 &self,
814 events: &'a [Event],
815 ) -> Result<BTreeMap<String, &'a Event>> {
816 let threshold = self.profile_index_overmute_threshold();
817 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
818 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
819 if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
820 continue;
821 }
822 let pubkey = event.pubkey.to_hex();
823 match latest_by_pubkey.get(&pubkey) {
824 Some(current) if compare_nostr_events(event, current).is_le() => {}
825 _ => {
826 latest_by_pubkey.insert(pubkey, event);
827 }
828 }
829 }
830 Ok(latest_by_pubkey)
831 }
832
833 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
834 let state = {
835 let graph = self.graph.lock().unwrap();
836 graph.export_state().context("export social graph state")?
837 };
838 let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
839 let root_hex = hex::encode(root);
840 if graph.get_root() != root_hex {
841 graph
842 .set_root(&root_hex)
843 .context("set snapshot social graph root")?;
844 }
845 let chunks = graph
846 .to_binary_chunks_with_budget(*options)
847 .context("encode social graph snapshot")?;
848 Ok(chunks.into_iter().map(Bytes::from).collect())
849 }
850
851 fn ingest_event(&self, event: &Event) -> Result<()> {
852 self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
853 }
854
855 fn ingest_events(&self, events: &[Event]) -> Result<()> {
856 if events.is_empty() {
857 return Ok(());
858 }
859
860 let mut public = Vec::new();
861 let mut ambient = Vec::new();
862 for event in events {
863 match self.default_storage_class_for(event)? {
864 EventStorageClass::Public => public.push(event.clone()),
865 EventStorageClass::Ambient => ambient.push(event.clone()),
866 }
867 }
868
869 if !public.is_empty() {
870 self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
871 }
872 if !ambient.is_empty() {
873 self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
874 }
875
876 Ok(())
877 }
878
879 fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
880 let graph_events = events
881 .iter()
882 .filter(|event| is_social_graph_event(event.kind))
883 .collect::<Vec<_>>();
884 if graph_events.is_empty() {
885 return Ok(());
886 }
887
888 {
889 let mut graph = self.graph.lock().unwrap();
890 let mut snapshot = SocialGraph::from_state(
891 graph
892 .export_state()
893 .context("export social graph state for graph-only ingest")?,
894 )
895 .context("rebuild social graph state for graph-only ingest")?;
896 for event in graph_events {
897 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
898 }
899 graph
900 .replace_state(&snapshot.export_state())
901 .context("replace graph-only social graph state")?;
902 }
903 self.invalidate_distance_cache();
904 Ok(())
905 }
906
907 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
908 self.query_events_in_scope(filter, limit, EventQueryScope::All)
909 }
910
911 fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
912 let graph = self.graph.lock().unwrap();
913 let root_hex = graph.get_root().context("read social graph root")?;
914 if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
915 return Ok(EventStorageClass::Public);
916 }
917 Ok(EventStorageClass::Ambient)
918 }
919
920 fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
921 match storage_class {
922 EventStorageClass::Public => &self.public_events,
923 EventStorageClass::Ambient => &self.ambient_events,
924 }
925 }
926
927 fn ingest_event_with_storage_class(
928 &self,
929 event: &Event,
930 storage_class: EventStorageClass,
931 ) -> Result<()> {
932 let current_root = self.bucket(storage_class).events_root_for_write()?;
933 let next_root = self
934 .bucket(storage_class)
935 .store_event(current_root.as_ref(), event)?;
936 self.bucket(storage_class)
937 .write_events_root(Some(&next_root))?;
938
939 if is_social_graph_event(event.kind) {
940 {
941 let mut graph = self.graph.lock().unwrap();
942 graph
943 .handle_event(&graph_event_from_nostr(event), true, 0.0)
944 .context("ingest social graph event into nostr-social-graph")?;
945 }
946 self.invalidate_distance_cache();
947 }
948
949 self.update_profile_index_for_events(std::slice::from_ref(event))?;
950
951 Ok(())
952 }
953
954 fn ingest_events_with_storage_class(
955 &self,
956 events: &[Event],
957 storage_class: EventStorageClass,
958 ) -> Result<()> {
959 if events.is_empty() {
960 return Ok(());
961 }
962
963 let bucket = self.bucket(storage_class);
964 let current_root = bucket.events_root_for_write()?;
965 let stored_events = events
966 .iter()
967 .map(stored_event_from_nostr)
968 .collect::<Vec<_>>();
969 let next_root = block_on(
970 bucket
971 .event_store
972 .build(current_root.as_ref(), stored_events),
973 )
974 .map_err(map_event_store_error)?;
975 bucket.write_events_root(next_root.as_ref())?;
976
977 let graph_events = events
978 .iter()
979 .filter(|event| is_social_graph_event(event.kind))
980 .collect::<Vec<_>>();
981 if !graph_events.is_empty() {
982 let mut graph = self.graph.lock().unwrap();
983 let mut snapshot = SocialGraph::from_state(
984 graph
985 .export_state()
986 .context("export social graph state for batch ingest")?,
987 )
988 .context("rebuild social graph state for batch ingest")?;
989 for event in graph_events {
990 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
991 }
992 graph
993 .replace_state(&snapshot.export_state())
994 .context("replace batched social graph state")?;
995 self.invalidate_distance_cache();
996 }
997
998 self.update_profile_index_for_events(events)?;
999
1000 Ok(())
1001 }
1002
1003 pub(crate) fn query_events_in_scope(
1004 &self,
1005 filter: &Filter,
1006 limit: usize,
1007 scope: EventQueryScope,
1008 ) -> Result<Vec<Event>> {
1009 if limit == 0 {
1010 return Ok(Vec::new());
1011 }
1012
1013 let buckets: &[&EventIndexBucket] = match scope {
1014 EventQueryScope::PublicOnly => &[&self.public_events],
1015 EventQueryScope::AmbientOnly => &[&self.ambient_events],
1016 EventQueryScope::All => &[&self.public_events, &self.ambient_events],
1017 };
1018
1019 let mut candidates = Vec::new();
1020 for bucket in buckets {
1021 candidates.extend(bucket.query_events(filter, limit)?);
1022 }
1023
1024 let mut deduped = dedupe_events(candidates);
1025 deduped.retain(|event| filter.match_event(event, Default::default()));
1026 deduped.truncate(limit);
1027 Ok(deduped)
1028 }
1029}
1030
1031impl SocialGraphBackend for SocialGraphStore {
1032 fn stats(&self) -> Result<SocialGraphStats> {
1033 SocialGraphStore::stats(self)
1034 }
1035
1036 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1037 SocialGraphStore::users_by_follow_distance(self, distance)
1038 }
1039
1040 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1041 SocialGraphStore::follow_distance(self, pk_bytes)
1042 }
1043
1044 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1045 SocialGraphStore::follow_list_created_at(self, owner)
1046 }
1047
1048 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1049 SocialGraphStore::followed_targets(self, owner)
1050 }
1051
1052 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1053 SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1054 }
1055
1056 fn profile_search_root(&self) -> Result<Option<Cid>> {
1057 SocialGraphStore::profile_search_root(self)
1058 }
1059
1060 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1061 SocialGraphStore::snapshot_chunks(self, root, options)
1062 }
1063
1064 fn ingest_event(&self, event: &Event) -> Result<()> {
1065 SocialGraphStore::ingest_event(self, event)
1066 }
1067
1068 fn ingest_event_with_storage_class(
1069 &self,
1070 event: &Event,
1071 storage_class: EventStorageClass,
1072 ) -> Result<()> {
1073 SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1074 }
1075
1076 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1077 SocialGraphStore::ingest_events(self, events)
1078 }
1079
1080 fn ingest_events_with_storage_class(
1081 &self,
1082 events: &[Event],
1083 storage_class: EventStorageClass,
1084 ) -> Result<()> {
1085 SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1086 }
1087
1088 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1089 SocialGraphStore::apply_graph_events_only(self, events)
1090 }
1091
1092 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1093 SocialGraphStore::query_events(self, filter, limit)
1094 }
1095}
1096
1097impl NostrSocialGraphBackend for SocialGraphStore {
1098 type Error = UpstreamGraphBackendError;
1099
1100 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1101 let graph = self.graph.lock().unwrap();
1102 graph
1103 .get_root()
1104 .context("read social graph root")
1105 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1106 }
1107
1108 fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1109 let root_bytes =
1110 decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1111 SocialGraphStore::set_root(self, &root_bytes)
1112 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1113 }
1114
1115 fn handle_event(
1116 &mut self,
1117 event: &GraphEvent,
1118 allow_unknown_authors: bool,
1119 overmute_threshold: f64,
1120 ) -> std::result::Result<(), Self::Error> {
1121 {
1122 let mut graph = self.graph.lock().unwrap();
1123 graph
1124 .handle_event(event, allow_unknown_authors, overmute_threshold)
1125 .context("ingest social graph event into heed backend")
1126 .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1127 }
1128 self.invalidate_distance_cache();
1129 Ok(())
1130 }
1131
1132 fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1133 let graph = self.graph.lock().unwrap();
1134 graph
1135 .get_follow_distance(user)
1136 .context("read social graph follow distance")
1137 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1138 }
1139
1140 fn is_following(
1141 &self,
1142 follower: &str,
1143 followed_user: &str,
1144 ) -> std::result::Result<bool, Self::Error> {
1145 let graph = self.graph.lock().unwrap();
1146 graph
1147 .is_following(follower, followed_user)
1148 .context("read social graph following edge")
1149 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1150 }
1151
1152 fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1153 let graph = self.graph.lock().unwrap();
1154 graph
1155 .get_followed_by_user(user)
1156 .context("read followed-by-user list")
1157 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1158 }
1159
1160 fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1161 let graph = self.graph.lock().unwrap();
1162 graph
1163 .get_followers_by_user(user)
1164 .context("read followers-by-user list")
1165 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1166 }
1167
1168 fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1169 let graph = self.graph.lock().unwrap();
1170 graph
1171 .get_muted_by_user(user)
1172 .context("read muted-by-user list")
1173 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1174 }
1175
1176 fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1177 let graph = self.graph.lock().unwrap();
1178 graph
1179 .get_user_muted_by(user)
1180 .context("read user-muted-by list")
1181 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1182 }
1183
1184 fn get_follow_list_created_at(
1185 &self,
1186 user: &str,
1187 ) -> std::result::Result<Option<u64>, Self::Error> {
1188 let graph = self.graph.lock().unwrap();
1189 graph
1190 .get_follow_list_created_at(user)
1191 .context("read social graph follow list timestamp")
1192 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1193 }
1194
1195 fn get_mute_list_created_at(
1196 &self,
1197 user: &str,
1198 ) -> std::result::Result<Option<u64>, Self::Error> {
1199 let graph = self.graph.lock().unwrap();
1200 graph
1201 .get_mute_list_created_at(user)
1202 .context("read social graph mute list timestamp")
1203 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1204 }
1205
1206 fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1207 let graph = self.graph.lock().unwrap();
1208 graph
1209 .is_overmuted(user, threshold)
1210 .context("check social graph overmute")
1211 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1212 }
1213}
1214
1215impl<T> SocialGraphBackend for Arc<T>
1216where
1217 T: SocialGraphBackend + ?Sized,
1218{
1219 fn stats(&self) -> Result<SocialGraphStats> {
1220 self.as_ref().stats()
1221 }
1222
1223 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1224 self.as_ref().users_by_follow_distance(distance)
1225 }
1226
1227 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1228 self.as_ref().follow_distance(pk_bytes)
1229 }
1230
1231 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1232 self.as_ref().follow_list_created_at(owner)
1233 }
1234
1235 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1236 self.as_ref().followed_targets(owner)
1237 }
1238
1239 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1240 self.as_ref().is_overmuted_user(user_pk, threshold)
1241 }
1242
1243 fn profile_search_root(&self) -> Result<Option<Cid>> {
1244 self.as_ref().profile_search_root()
1245 }
1246
1247 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1248 self.as_ref().snapshot_chunks(root, options)
1249 }
1250
1251 fn ingest_event(&self, event: &Event) -> Result<()> {
1252 self.as_ref().ingest_event(event)
1253 }
1254
1255 fn ingest_event_with_storage_class(
1256 &self,
1257 event: &Event,
1258 storage_class: EventStorageClass,
1259 ) -> Result<()> {
1260 self.as_ref()
1261 .ingest_event_with_storage_class(event, storage_class)
1262 }
1263
1264 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1265 self.as_ref().ingest_events(events)
1266 }
1267
1268 fn ingest_events_with_storage_class(
1269 &self,
1270 events: &[Event],
1271 storage_class: EventStorageClass,
1272 ) -> Result<()> {
1273 self.as_ref()
1274 .ingest_events_with_storage_class(events, storage_class)
1275 }
1276
1277 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1278 self.as_ref().ingest_graph_events(events)
1279 }
1280
1281 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1282 self.as_ref().query_events(filter, limit)
1283 }
1284}
1285
1286fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1287 if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1288 return Ok(false);
1289 }
1290
1291 let GraphStats {
1292 users,
1293 follows,
1294 mutes,
1295 ..
1296 } = graph.size().context("size social graph")?;
1297 Ok(users <= 1 && follows == 0 && mutes == 0)
1298}
1299
1300fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1301 let mut set = UserSet::new();
1302 for value in values {
1303 set.insert(decode_pubkey(&value)?);
1304 }
1305 Ok(set)
1306}
1307
1308fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1309 let mut bytes = [0u8; 32];
1310 hex::decode_to_slice(value, &mut bytes)
1311 .with_context(|| format!("decode social graph pubkey {value}"))?;
1312 Ok(bytes)
1313}
1314
1315fn is_social_graph_event(kind: Kind) -> bool {
1316 kind == Kind::ContactList || kind == Kind::MuteList
1317}
1318
1319fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1320 GraphEvent {
1321 created_at: event.created_at.as_secs(),
1322 content: event.content.clone(),
1323 tags: event
1324 .tags
1325 .iter()
1326 .map(|tag| tag.as_slice().to_vec())
1327 .collect(),
1328 kind: event.kind.as_u16() as u32,
1329 pubkey: event.pubkey.to_hex(),
1330 id: event.id.to_hex(),
1331 sig: event.sig.to_string(),
1332 }
1333}
1334
1335fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1336 StoredNostrEvent {
1337 id: event.id.to_hex(),
1338 pubkey: event.pubkey.to_hex(),
1339 created_at: event.created_at.as_secs(),
1340 kind: event.kind.as_u16() as u32,
1341 tags: event
1342 .tags
1343 .iter()
1344 .map(|tag| tag.as_slice().to_vec())
1345 .collect(),
1346 content: event.content.clone(),
1347 sig: event.sig.to_string(),
1348 }
1349}
1350
1351fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1352 let value = serde_json::json!({
1353 "id": event.id,
1354 "pubkey": event.pubkey,
1355 "created_at": event.created_at,
1356 "kind": event.kind,
1357 "tags": event.tags,
1358 "content": event.content,
1359 "sig": event.sig,
1360 });
1361 Event::from_json(value.to_string()).context("decode stored nostr event")
1362}
1363
1364pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1365 nostr_event_from_stored(event)
1366}
1367
1368fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1369 rmp_serde::to_vec_named(&StoredCid {
1370 hash: cid.hash,
1371 key: cid.key,
1372 })
1373 .context("encode social graph events root")
1374}
1375
1376fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1377 let stored: StoredCid =
1378 rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1379 Ok(Some(Cid {
1380 hash: stored.hash,
1381 key: stored.key,
1382 }))
1383}
1384
1385fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1386 let Ok(bytes) = std::fs::read(path) else {
1387 return Ok(None);
1388 };
1389 decode_cid(&bytes)
1390}
1391
1392fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1393 let Some(root) = root else {
1394 if path.exists() {
1395 std::fs::remove_file(path)?;
1396 }
1397 return Ok(());
1398 };
1399
1400 let encoded = encode_cid(root)?;
1401 let tmp_path = path.with_extension("tmp");
1402 std::fs::write(&tmp_path, encoded)?;
1403 std::fs::rename(tmp_path, path)?;
1404 Ok(())
1405}
1406
1407fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1408 let raw = value.as_str()?;
1409 let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1410 if trimmed.is_empty() {
1411 return None;
1412 }
1413 Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1414}
1415
1416fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1417 let mut names = Vec::new();
1418 let mut seen = HashSet::new();
1419
1420 for key in ["display_name", "displayName", "name", "username"] {
1421 let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1422 continue;
1423 };
1424 let lowered = value.to_lowercase();
1425 if seen.insert(lowered) {
1426 names.push(value);
1427 }
1428 }
1429
1430 names
1431}
1432
1433fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1434 if local_part.len() == 1 || local_part.starts_with("npub1") {
1435 return true;
1436 }
1437
1438 primary_name
1439 .to_lowercase()
1440 .split_whitespace()
1441 .collect::<String>()
1442 .contains(local_part)
1443}
1444
1445fn normalize_profile_nip05(
1446 profile: &serde_json::Map<String, serde_json::Value>,
1447 primary_name: Option<&str>,
1448) -> Option<String> {
1449 let raw = profile.get("nip05")?.as_str()?;
1450 let local_part = raw.split('@').next()?.trim().to_lowercase();
1451 if local_part.is_empty() {
1452 return None;
1453 }
1454 let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1455 if truncated.is_empty() {
1456 return None;
1457 }
1458 if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1459 return None;
1460 }
1461 Some(truncated)
1462}
1463
1464fn is_search_stop_word(word: &str) -> bool {
1465 matches!(
1466 word,
1467 "a" | "an"
1468 | "the"
1469 | "and"
1470 | "or"
1471 | "but"
1472 | "in"
1473 | "on"
1474 | "at"
1475 | "to"
1476 | "for"
1477 | "of"
1478 | "with"
1479 | "by"
1480 | "from"
1481 | "is"
1482 | "it"
1483 | "as"
1484 | "be"
1485 | "was"
1486 | "are"
1487 | "this"
1488 | "that"
1489 | "these"
1490 | "those"
1491 | "i"
1492 | "you"
1493 | "he"
1494 | "she"
1495 | "we"
1496 | "they"
1497 | "my"
1498 | "your"
1499 | "his"
1500 | "her"
1501 | "its"
1502 | "our"
1503 | "their"
1504 | "what"
1505 | "which"
1506 | "who"
1507 | "whom"
1508 | "how"
1509 | "when"
1510 | "where"
1511 | "why"
1512 | "will"
1513 | "would"
1514 | "could"
1515 | "should"
1516 | "can"
1517 | "may"
1518 | "might"
1519 | "must"
1520 | "have"
1521 | "has"
1522 | "had"
1523 | "do"
1524 | "does"
1525 | "did"
1526 | "been"
1527 | "being"
1528 | "get"
1529 | "got"
1530 | "just"
1531 | "now"
1532 | "then"
1533 | "so"
1534 | "if"
1535 | "not"
1536 | "no"
1537 | "yes"
1538 | "all"
1539 | "any"
1540 | "some"
1541 | "more"
1542 | "most"
1543 | "other"
1544 | "into"
1545 | "over"
1546 | "after"
1547 | "before"
1548 | "about"
1549 | "up"
1550 | "down"
1551 | "out"
1552 | "off"
1553 | "through"
1554 | "during"
1555 | "under"
1556 | "again"
1557 | "further"
1558 | "once"
1559 )
1560}
1561
1562fn is_pure_search_number(word: &str) -> bool {
1563 if !word.chars().all(|ch| ch.is_ascii_digit()) {
1564 return false;
1565 }
1566 !(word.len() == 4
1567 && word
1568 .parse::<u16>()
1569 .is_ok_and(|year| (1900..=2099).contains(&year)))
1570}
1571
1572fn split_compound_search_word(word: &str) -> Vec<String> {
1573 let mut parts = Vec::new();
1574 let mut current = String::new();
1575 let chars: Vec<char> = word.chars().collect();
1576
1577 for (index, ch) in chars.iter().copied().enumerate() {
1578 let split_before = current.chars().last().is_some_and(|prev| {
1579 (prev.is_lowercase() && ch.is_uppercase())
1580 || (prev.is_ascii_digit() && ch.is_alphabetic())
1581 || (prev.is_alphabetic() && ch.is_ascii_digit())
1582 || (prev.is_uppercase()
1583 && ch.is_uppercase()
1584 && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
1585 });
1586
1587 if split_before && !current.is_empty() {
1588 parts.push(std::mem::take(&mut current));
1589 }
1590
1591 current.push(ch);
1592 }
1593
1594 if !current.is_empty() {
1595 parts.push(current);
1596 }
1597
1598 parts
1599}
1600
1601fn parse_search_keywords(text: &str) -> Vec<String> {
1602 let mut keywords = Vec::new();
1603 let mut seen = HashSet::new();
1604
1605 for word in text
1606 .split(|ch: char| !ch.is_alphanumeric())
1607 .filter(|word| !word.is_empty())
1608 {
1609 let mut variants = Vec::with_capacity(1 + word.len() / 4);
1610 variants.push(word.to_lowercase());
1611 variants.extend(
1612 split_compound_search_word(word)
1613 .into_iter()
1614 .map(|part| part.to_lowercase()),
1615 );
1616
1617 for lowered in variants {
1618 if lowered.chars().count() < 2
1619 || is_search_stop_word(&lowered)
1620 || is_pure_search_number(&lowered)
1621 {
1622 continue;
1623 }
1624 if seen.insert(lowered.clone()) {
1625 keywords.push(lowered);
1626 }
1627 }
1628 }
1629
1630 keywords
1631}
1632
1633fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
1634 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1635 Ok(serde_json::Value::Object(profile)) => profile,
1636 _ => serde_json::Map::new(),
1637 };
1638 let names = extract_profile_names(&profile);
1639 let primary_name = names.first().map(String::as_str);
1640 let mut parts = Vec::new();
1641 if let Some(name) = primary_name {
1642 parts.push(name.to_string());
1643 }
1644 if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
1645 parts.push(nip05);
1646 }
1647 parts.push(event.pubkey.to_hex());
1648 if names.len() > 1 {
1649 parts.extend(names.into_iter().skip(1));
1650 }
1651 parse_search_keywords(&parts.join(" "))
1652}
1653
1654fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
1655 left.created_at
1656 .as_secs()
1657 .cmp(&right.created_at.as_secs())
1658 .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
1659}
1660
1661fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
1662 anyhow::anyhow!("nostr event store error: {err}")
1663}
1664
1665fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
1666 let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
1667 let page_size = page_size_bytes() as u64;
1668 let rounded = requested
1669 .checked_add(page_size.saturating_sub(1))
1670 .map(|size| size / page_size * page_size)
1671 .unwrap_or(requested);
1672 let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
1673
1674 let env = unsafe {
1675 heed::EnvOpenOptions::new()
1676 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
1677 .max_dbs(SOCIALGRAPH_MAX_DBS)
1678 .open(db_dir)
1679 }
1680 .context("open social graph LMDB env for resize")?;
1681 if env.info().map_size < map_size {
1682 unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
1683 }
1684
1685 Ok(())
1686}
1687
1688fn page_size_bytes() -> usize {
1689 page_size::get_granularity()
1690}
1691
1692#[cfg(test)]
1693mod tests;