Skip to main content

hashtree_cli/socialgraph/
mod.rs

1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10    LocalListFileState, LocalListSyncOutcome,
11};
12
13mod index_buckets;
14
15use index_buckets::{
16    dedupe_events, latest_metadata_events_by_pubkey, EventIndexBucket, ProfileIndexBucket,
17};
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, Mutex as StdMutex};
22
23use anyhow::{Context, Result};
24use bytes::Bytes;
25use futures::executor::block_on;
26use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
27use hashtree_index::BTree;
28use hashtree_nostr::{
29    is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
30    NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
31};
32#[cfg(test)]
33use hashtree_nostr::{
34    reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
35    take_profile as take_nostr_profile,
36};
37use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
38use nostr_social_graph::{
39    BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
40    SocialGraphBackend as NostrSocialGraphBackend,
41};
42use nostr_social_graph_heed::HeedSocialGraph;
43
44use crate::storage::{LocalStore, StorageRouter};
45
46#[cfg(test)]
47use std::sync::{Mutex, MutexGuard, OnceLock};
48#[cfg(test)]
49use std::time::Instant;
50
51pub type UserSet = BTreeSet<[u8; 32]>;
52
53const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
54const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
55const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
56const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
57const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
58const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
59const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
60const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
61const SOCIALGRAPH_MAX_DBS: u32 = 16;
62const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
63const PROFILE_SEARCH_PREFIX: &str = "p:";
64const PROFILE_NAME_MAX_LENGTH: usize = 100;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum EventStorageClass {
68    Public,
69    Ambient,
70}
71
72#[cfg_attr(not(test), allow(dead_code))]
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub(crate) enum EventQueryScope {
75    PublicOnly,
76    AmbientOnly,
77    All,
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81struct StoredCid {
82    hash: [u8; 32],
83    key: Option<[u8; 32]>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
87pub struct StoredProfileSearchEntry {
88    pub pubkey: String,
89    pub name: String,
90    #[serde(default)]
91    pub aliases: Vec<String>,
92    #[serde(default)]
93    pub nip05: Option<String>,
94    pub created_at: u64,
95    pub event_nhash: String,
96}
97
98#[derive(Debug, Clone, Default, serde::Serialize)]
99pub struct SocialGraphStats {
100    pub total_users: usize,
101    pub root: Option<String>,
102    pub total_follows: usize,
103    pub max_depth: u32,
104    pub size_by_distance: BTreeMap<u32, usize>,
105    pub enabled: bool,
106}
107
108#[derive(Debug, Clone)]
109struct DistanceCache {
110    stats: SocialGraphStats,
111    users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
112}
113
114#[derive(Debug, thiserror::Error)]
115#[error("{0}")]
116pub struct UpstreamGraphBackendError(String);
117
118pub struct SocialGraphStore {
119    graph: StdMutex<HeedSocialGraph>,
120    distance_cache: StdMutex<Option<DistanceCache>>,
121    public_events: EventIndexBucket,
122    ambient_events: EventIndexBucket,
123    profile_index: ProfileIndexBucket,
124    profile_index_overmute_threshold: StdMutex<f64>,
125}
126
127pub trait SocialGraphBackend: Send + Sync {
128    fn stats(&self) -> Result<SocialGraphStats>;
129    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
130    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
131    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
132    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
133    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
134    fn profile_search_root(&self) -> Result<Option<Cid>> {
135        Ok(None)
136    }
137    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
138    fn ingest_event(&self, event: &Event) -> Result<()>;
139    fn ingest_event_with_storage_class(
140        &self,
141        event: &Event,
142        storage_class: EventStorageClass,
143    ) -> Result<()> {
144        let _ = storage_class;
145        self.ingest_event(event)
146    }
147    fn ingest_events(&self, events: &[Event]) -> Result<()> {
148        for event in events {
149            self.ingest_event(event)?;
150        }
151        Ok(())
152    }
153    fn ingest_events_with_storage_class(
154        &self,
155        events: &[Event],
156        storage_class: EventStorageClass,
157    ) -> Result<()> {
158        for event in events {
159            self.ingest_event_with_storage_class(event, storage_class)?;
160        }
161        Ok(())
162    }
163    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
164        self.ingest_events(events)
165    }
166    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
167}
168
169#[cfg(test)]
170pub type TestLockGuard = MutexGuard<'static, ()>;
171
172#[cfg(test)]
173static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
174
175#[cfg(test)]
176pub fn test_lock() -> TestLockGuard {
177    NDB_TEST_LOCK
178        .get_or_init(|| Mutex::new(()))
179        .lock()
180        .unwrap_or_else(|err| err.into_inner())
181}
182
183pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
184    open_social_graph_store_with_mapsize(data_dir, None)
185}
186
187pub fn open_social_graph_store_with_mapsize(
188    data_dir: &Path,
189    mapsize_bytes: Option<u64>,
190) -> Result<Arc<SocialGraphStore>> {
191    let db_dir = data_dir.join("socialgraph");
192    open_social_graph_store_at_path(&db_dir, mapsize_bytes)
193}
194
195pub fn open_social_graph_store_with_storage(
196    data_dir: &Path,
197    store: Arc<StorageRouter>,
198    mapsize_bytes: Option<u64>,
199) -> Result<Arc<SocialGraphStore>> {
200    let db_dir = data_dir.join("socialgraph");
201    open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
202}
203
204pub fn open_social_graph_store_at_path(
205    db_dir: &Path,
206    mapsize_bytes: Option<u64>,
207) -> Result<Arc<SocialGraphStore>> {
208    let config = hashtree_config::Config::load_or_default();
209    let backend = &config.storage.backend;
210    let local_store = Arc::new(
211        LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
212            .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
213    );
214    let store = Arc::new(StorageRouter::new(local_store));
215    open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
216}
217
218pub fn open_social_graph_store_at_path_with_storage(
219    db_dir: &Path,
220    store: Arc<StorageRouter>,
221    mapsize_bytes: Option<u64>,
222) -> Result<Arc<SocialGraphStore>> {
223    let ambient_backend = store.local_store().backend();
224    let ambient_local = Arc::new(
225        LocalStore::new_with_lmdb_map_size(
226            db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
227            &ambient_backend,
228            mapsize_bytes,
229        )
230        .map_err(|err| {
231            anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
232        })?,
233    );
234    let ambient_store = Arc::new(StorageRouter::new(ambient_local));
235    open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
236}
237
238pub fn open_social_graph_store_at_path_with_storage_split(
239    db_dir: &Path,
240    public_store: Arc<StorageRouter>,
241    ambient_store: Arc<StorageRouter>,
242    mapsize_bytes: Option<u64>,
243) -> Result<Arc<SocialGraphStore>> {
244    std::fs::create_dir_all(db_dir)?;
245    if let Some(size) = mapsize_bytes {
246        ensure_social_graph_mapsize(db_dir, size)?;
247    }
248    let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
249        .context("open nostr-social-graph heed backend")?;
250
251    Ok(Arc::new(SocialGraphStore {
252        graph: StdMutex::new(graph),
253        distance_cache: StdMutex::new(None),
254        public_events: EventIndexBucket {
255            event_store: NostrEventStore::new(Arc::clone(&public_store)),
256            root_path: db_dir.join(EVENTS_ROOT_FILE),
257        },
258        ambient_events: EventIndexBucket {
259            event_store: NostrEventStore::new(ambient_store),
260            root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
261        },
262        profile_index: ProfileIndexBucket {
263            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
264            index: BTree::new(
265                public_store,
266                hashtree_index::BTreeOptions {
267                    order: Some(PROFILE_SEARCH_INDEX_ORDER),
268                },
269            ),
270            by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
271            search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
272        },
273        profile_index_overmute_threshold: StdMutex::new(1.0),
274    }))
275}
276
277pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
278    if let Err(err) = store.set_root(pk_bytes) {
279        tracing::warn!("Failed to set social graph root: {err}");
280    }
281}
282
283pub fn get_follow_distance(
284    backend: &(impl SocialGraphBackend + ?Sized),
285    pk_bytes: &[u8; 32],
286) -> Option<u32> {
287    backend.follow_distance(pk_bytes).ok().flatten()
288}
289
290pub fn get_follows(
291    backend: &(impl SocialGraphBackend + ?Sized),
292    pk_bytes: &[u8; 32],
293) -> Vec<[u8; 32]> {
294    match backend.followed_targets(pk_bytes) {
295        Ok(set) => set.into_iter().collect(),
296        Err(_) => Vec::new(),
297    }
298}
299
300pub fn is_overmuted(
301    backend: &(impl SocialGraphBackend + ?Sized),
302    _root_pk: &[u8; 32],
303    user_pk: &[u8; 32],
304    threshold: f64,
305) -> bool {
306    backend
307        .is_overmuted_user(user_pk, threshold)
308        .unwrap_or(false)
309}
310
311pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
312    let event = match Event::from_json(event_json) {
313        Ok(event) => event,
314        Err(_) => return,
315    };
316
317    if let Err(err) = backend.ingest_event(&event) {
318        tracing::warn!("Failed to ingest social graph event: {err}");
319    }
320}
321
322pub fn ingest_parsed_event(
323    backend: &(impl SocialGraphBackend + ?Sized),
324    event: &Event,
325) -> Result<()> {
326    backend.ingest_event(event)
327}
328
329pub fn ingest_parsed_event_with_storage_class(
330    backend: &(impl SocialGraphBackend + ?Sized),
331    event: &Event,
332    storage_class: EventStorageClass,
333) -> Result<()> {
334    backend.ingest_event_with_storage_class(event, storage_class)
335}
336
337pub fn ingest_parsed_events(
338    backend: &(impl SocialGraphBackend + ?Sized),
339    events: &[Event],
340) -> Result<()> {
341    backend.ingest_events(events)
342}
343
344pub fn ingest_parsed_events_with_storage_class(
345    backend: &(impl SocialGraphBackend + ?Sized),
346    events: &[Event],
347    storage_class: EventStorageClass,
348) -> Result<()> {
349    backend.ingest_events_with_storage_class(events, storage_class)
350}
351
352pub fn ingest_graph_parsed_events(
353    backend: &(impl SocialGraphBackend + ?Sized),
354    events: &[Event],
355) -> Result<()> {
356    backend.ingest_graph_events(events)
357}
358
359pub fn query_events(
360    backend: &(impl SocialGraphBackend + ?Sized),
361    filter: &Filter,
362    limit: usize,
363) -> Vec<Event> {
364    backend.query_events(filter, limit).unwrap_or_default()
365}
366
367impl SocialGraphStore {
368    pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
369        *self
370            .profile_index_overmute_threshold
371            .lock()
372            .expect("profile index overmute threshold") = threshold;
373    }
374
375    fn profile_index_overmute_threshold(&self) -> f64 {
376        *self
377            .profile_index_overmute_threshold
378            .lock()
379            .expect("profile index overmute threshold")
380    }
381
382    fn invalidate_distance_cache(&self) {
383        *self.distance_cache.lock().unwrap() = None;
384    }
385
386    fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
387        let unique_ids = state
388            .unique_ids
389            .into_iter()
390            .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
391            .collect::<Result<HashMap<_, _>>>()?;
392
393        let mut users_by_distance = BTreeMap::new();
394        let mut size_by_distance = BTreeMap::new();
395        for (distance, users) in state.users_by_follow_distance {
396            let decoded = users
397                .into_iter()
398                .filter_map(|id| unique_ids.get(&id).copied())
399                .collect::<Vec<_>>();
400            size_by_distance.insert(distance, decoded.len());
401            users_by_distance.insert(distance, decoded);
402        }
403
404        let total_follows = state
405            .followed_by_user
406            .iter()
407            .map(|(_, targets)| targets.len())
408            .sum::<usize>();
409        let total_users = size_by_distance.values().copied().sum();
410        let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
411
412        Ok(DistanceCache {
413            stats: SocialGraphStats {
414                total_users,
415                root: Some(state.root),
416                total_follows,
417                max_depth,
418                size_by_distance,
419                enabled: true,
420            },
421            users_by_distance,
422        })
423    }
424
425    fn load_distance_cache(&self) -> Result<DistanceCache> {
426        if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
427            return Ok(cache);
428        }
429
430        let state = {
431            let graph = self.graph.lock().unwrap();
432            graph.export_state().context("export social graph state")?
433        };
434        let cache = Self::build_distance_cache(state)?;
435        *self.distance_cache.lock().unwrap() = Some(cache.clone());
436        Ok(cache)
437    }
438
439    fn set_root(&self, root: &[u8; 32]) -> Result<()> {
440        let root_hex = hex::encode(root);
441        {
442            let mut graph = self.graph.lock().unwrap();
443            if should_replace_placeholder_root(&graph)? {
444                let fresh = SocialGraph::new(&root_hex);
445                graph
446                    .replace_state(&fresh.export_state())
447                    .context("replace placeholder social graph root")?;
448            } else {
449                graph
450                    .set_root(&root_hex)
451                    .context("set nostr-social-graph root")?;
452            }
453        }
454        self.invalidate_distance_cache();
455        Ok(())
456    }
457
458    fn stats(&self) -> Result<SocialGraphStats> {
459        Ok(self.load_distance_cache()?.stats)
460    }
461
462    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
463        let graph = self.graph.lock().unwrap();
464        let distance = graph
465            .get_follow_distance(&hex::encode(pk_bytes))
466            .context("read social graph follow distance")?;
467        Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
468    }
469
470    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
471        Ok(self
472            .load_distance_cache()?
473            .users_by_distance
474            .get(&distance)
475            .cloned()
476            .unwrap_or_default())
477    }
478
479    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
480        let graph = self.graph.lock().unwrap();
481        graph
482            .get_follow_list_created_at(&hex::encode(owner))
483            .context("read social graph follow list timestamp")
484    }
485
486    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
487        let graph = self.graph.lock().unwrap();
488        decode_pubkey_set(
489            graph
490                .get_followed_by_user(&hex::encode(owner))
491                .context("read followed targets")?,
492        )
493    }
494
495    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
496        if threshold <= 0.0 {
497            return Ok(false);
498        }
499        let graph = self.graph.lock().unwrap();
500        graph
501            .is_overmuted(&hex::encode(user_pk), threshold)
502            .context("check social graph overmute")
503    }
504
505    #[cfg_attr(not(test), allow(dead_code))]
506    pub fn profile_search_root(&self) -> Result<Option<Cid>> {
507        self.profile_index.search_root()
508    }
509
510    #[cfg_attr(not(test), allow(dead_code))]
511    pub fn profiles_by_pubkey_root(&self) -> Result<Option<Cid>> {
512        self.profile_index.by_pubkey_root()
513    }
514
515    pub fn public_events_root(&self) -> Result<Option<Cid>> {
516        self.public_events.events_root()
517    }
518
519    #[cfg_attr(test, allow(dead_code))]
520    pub(crate) fn public_events_root_for_write(&self) -> Result<Option<Cid>> {
521        self.public_events.events_root_for_write()
522    }
523
524    pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
525        self.public_events.write_events_root(root)
526    }
527
528    #[cfg_attr(not(test), allow(dead_code))]
529    pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
530        self.profile_index.profile_event_for_pubkey(pubkey_hex)
531    }
532
533    #[cfg_attr(not(test), allow(dead_code))]
534    pub fn profile_search_entries_for_prefix(
535        &self,
536        prefix: &str,
537    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
538        self.profile_index.search_entries_for_prefix(prefix)
539    }
540
541    pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
542        self.update_profile_index_for_events(events)
543    }
544
545    pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
546        let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
547        let (by_pubkey_root, search_root) = self
548            .profile_index
549            .rebuild_profile_events(latest_by_pubkey.into_values())?;
550        self.profile_index
551            .write_by_pubkey_root(by_pubkey_root.as_ref())?;
552        self.profile_index.write_search_root(search_root.as_ref())?;
553        Ok(())
554    }
555
556    pub(crate) async fn rebuild_profile_index_for_events_async(
557        &self,
558        events: &[Event],
559    ) -> Result<()> {
560        let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
561        let (by_pubkey_root, search_root) = self
562            .profile_index
563            .rebuild_profile_events_async(latest_by_pubkey.into_values())
564            .await?;
565        self.profile_index
566            .write_by_pubkey_root(by_pubkey_root.as_ref())?;
567        self.profile_index.write_search_root(search_root.as_ref())?;
568        Ok(())
569    }
570
571    pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
572        let public_events_root = self.public_events.events_root()?;
573        let ambient_events_root = self.ambient_events.events_root()?;
574        if public_events_root.is_none() && ambient_events_root.is_none() {
575            self.profile_index.write_by_pubkey_root(None)?;
576            self.profile_index.write_search_root(None)?;
577            return Ok(0);
578        }
579
580        let mut events = Vec::new();
581        for (bucket, root) in [
582            (&self.public_events, public_events_root),
583            (&self.ambient_events, ambient_events_root),
584        ] {
585            let Some(root) = root else {
586                continue;
587            };
588            let stored = block_on(bucket.event_store.list_by_kind_lossy(
589                Some(&root),
590                Kind::Metadata.as_u16() as u32,
591                ListEventsOptions::default(),
592            ))
593            .map_err(map_event_store_error)?;
594            events.extend(
595                stored
596                    .into_iter()
597                    .map(nostr_event_from_stored)
598                    .collect::<Result<Vec<_>>>()?,
599            );
600        }
601
602        let latest_count = self
603            .filtered_latest_metadata_events_by_pubkey(&events)?
604            .len();
605        self.rebuild_profile_index_for_events(&events)?;
606        Ok(latest_count)
607    }
608
609    pub async fn rebuild_profile_index_from_stored_events_async(&self) -> Result<usize> {
610        let public_events_root = self.public_events.events_root()?;
611        let ambient_events_root = self.ambient_events.events_root()?;
612        if public_events_root.is_none() && ambient_events_root.is_none() {
613            self.profile_index.write_by_pubkey_root(None)?;
614            self.profile_index.write_search_root(None)?;
615            return Ok(0);
616        }
617
618        let mut events = Vec::new();
619        for (bucket, root) in [
620            (&self.public_events, public_events_root),
621            (&self.ambient_events, ambient_events_root),
622        ] {
623            let Some(root) = root else {
624                continue;
625            };
626            let stored = bucket
627                .event_store
628                .list_by_kind_lossy(
629                    Some(&root),
630                    Kind::Metadata.as_u16() as u32,
631                    ListEventsOptions::default(),
632                )
633                .await
634                .map_err(map_event_store_error)?;
635            events.extend(
636                stored
637                    .into_iter()
638                    .map(nostr_event_from_stored)
639                    .collect::<Result<Vec<_>>>()?,
640            );
641        }
642
643        let latest_count = self
644            .filtered_latest_metadata_events_by_pubkey(&events)?
645            .len();
646        self.rebuild_profile_index_for_events_async(&events).await?;
647        Ok(latest_count)
648    }
649
650    pub fn rebuild_event_indexes_from_stored_events(&self) -> Result<(usize, usize)> {
651        let public_count =
652            self.rebuild_event_index_bucket_from_stored_events(&self.public_events)?;
653        let ambient_count =
654            self.rebuild_event_index_bucket_from_stored_events(&self.ambient_events)?;
655        self.rebuild_profile_index_from_stored_events()?;
656        Ok((public_count, ambient_count))
657    }
658
659    pub async fn rebuild_event_indexes_from_stored_events_async(&self) -> Result<(usize, usize)> {
660        let public_count = self
661            .rebuild_event_index_bucket_from_stored_events_async(&self.public_events)
662            .await?;
663        let ambient_count = self
664            .rebuild_event_index_bucket_from_stored_events_async(&self.ambient_events)
665            .await?;
666        self.rebuild_profile_index_from_stored_events_async()
667            .await?;
668        Ok((public_count, ambient_count))
669    }
670
671    fn rebuild_event_index_bucket_from_stored_events(
672        &self,
673        bucket: &EventIndexBucket,
674    ) -> Result<usize> {
675        let Some(root) = bucket.events_root()? else {
676            bucket.write_events_root(None)?;
677            return Ok(0);
678        };
679
680        let manifest = match block_on(bucket.event_store.get_manifest(Some(&root))) {
681            Ok(manifest) => manifest,
682            Err(err) => {
683                tracing::warn!(
684                    "Clearing invalid social graph event index root {} before rebuild: {}",
685                    hex::encode(root.hash),
686                    err
687                );
688                bucket.write_events_root(None)?;
689                return Ok(0);
690            }
691        };
692        if manifest.by_kind_time_author.is_none() {
693            let next_root = block_on(bucket.event_store.upgrade_manifest_indexes(Some(&root)))
694                .map_err(map_event_store_error)?;
695            if next_root.as_ref() != Some(&root) {
696                bucket.write_events_root(next_root.as_ref())?;
697                return Ok(0);
698            }
699        }
700
701        let stored = block_on(
702            bucket
703                .event_store
704                .list_recent_lossy(Some(&root), ListEventsOptions::default()),
705        )
706        .map_err(map_event_store_error)?;
707        let count = stored.len();
708        let next_root =
709            block_on(bucket.event_store.build(None, stored)).map_err(map_event_store_error)?;
710        bucket.write_events_root(next_root.as_ref())?;
711        Ok(count)
712    }
713
714    async fn rebuild_event_index_bucket_from_stored_events_async(
715        &self,
716        bucket: &EventIndexBucket,
717    ) -> Result<usize> {
718        let Some(root) = bucket.events_root()? else {
719            bucket.write_events_root(None)?;
720            return Ok(0);
721        };
722
723        let manifest = match bucket.event_store.get_manifest(Some(&root)).await {
724            Ok(manifest) => manifest,
725            Err(err) => {
726                tracing::warn!(
727                    "Clearing invalid social graph event index root {} before rebuild: {}",
728                    hex::encode(root.hash),
729                    err
730                );
731                bucket.write_events_root(None)?;
732                return Ok(0);
733            }
734        };
735        if manifest.by_kind_time_author.is_none() {
736            let next_root = bucket
737                .event_store
738                .upgrade_manifest_indexes(Some(&root))
739                .await
740                .map_err(map_event_store_error)?;
741            if next_root.as_ref() != Some(&root) {
742                bucket.write_events_root(next_root.as_ref())?;
743                return Ok(0);
744            }
745        }
746
747        let stored = bucket
748            .event_store
749            .list_recent_lossy(Some(&root), ListEventsOptions::default())
750            .await
751            .map_err(map_event_store_error)?;
752        let count = stored.len();
753        let next_root = bucket
754            .event_store
755            .build(None, stored)
756            .await
757            .map_err(map_event_store_error)?;
758        bucket.write_events_root(next_root.as_ref())?;
759        Ok(count)
760    }
761
762    fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
763        let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
764        let threshold = self.profile_index_overmute_threshold();
765
766        if latest_by_pubkey.is_empty() {
767            return Ok(());
768        }
769
770        let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
771        let mut search_root = self.profile_index.search_root()?;
772        let mut changed = false;
773
774        for event in latest_by_pubkey.into_values() {
775            let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
776            let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
777                self.profile_index.remove_profile_event(
778                    by_pubkey_root.as_ref(),
779                    search_root.as_ref(),
780                    &event.pubkey.to_hex(),
781                )?
782            } else {
783                self.profile_index.update_profile_event(
784                    by_pubkey_root.as_ref(),
785                    search_root.as_ref(),
786                    event,
787                )?
788            };
789            if updated {
790                by_pubkey_root = next_by_pubkey_root;
791                search_root = next_search_root;
792                changed = true;
793            }
794        }
795
796        if changed {
797            self.profile_index
798                .write_by_pubkey_root(by_pubkey_root.as_ref())?;
799            self.profile_index.write_search_root(search_root.as_ref())?;
800        }
801
802        Ok(())
803    }
804
805    fn filtered_latest_metadata_events_by_pubkey<'a>(
806        &self,
807        events: &'a [Event],
808    ) -> Result<BTreeMap<String, &'a Event>> {
809        let threshold = self.profile_index_overmute_threshold();
810        let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
811        for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
812            if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
813                continue;
814            }
815            let pubkey = event.pubkey.to_hex();
816            match latest_by_pubkey.get(&pubkey) {
817                Some(current) if compare_nostr_events(event, current).is_le() => {}
818                _ => {
819                    latest_by_pubkey.insert(pubkey, event);
820                }
821            }
822        }
823        Ok(latest_by_pubkey)
824    }
825
826    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
827        let state = {
828            let graph = self.graph.lock().unwrap();
829            graph.export_state().context("export social graph state")?
830        };
831        let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
832        let root_hex = hex::encode(root);
833        if graph.get_root() != root_hex {
834            graph
835                .set_root(&root_hex)
836                .context("set snapshot social graph root")?;
837        }
838        let chunks = graph
839            .to_binary_chunks_with_budget(*options)
840            .context("encode social graph snapshot")?;
841        Ok(chunks.into_iter().map(Bytes::from).collect())
842    }
843
844    fn ingest_event(&self, event: &Event) -> Result<()> {
845        self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
846    }
847
848    fn ingest_events(&self, events: &[Event]) -> Result<()> {
849        if events.is_empty() {
850            return Ok(());
851        }
852
853        let mut public = Vec::new();
854        let mut ambient = Vec::new();
855        for event in events {
856            match self.default_storage_class_for(event)? {
857                EventStorageClass::Public => public.push(event.clone()),
858                EventStorageClass::Ambient => ambient.push(event.clone()),
859            }
860        }
861
862        if !public.is_empty() {
863            self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
864        }
865        if !ambient.is_empty() {
866            self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
867        }
868
869        Ok(())
870    }
871
872    fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
873        let graph_events = events
874            .iter()
875            .filter(|event| is_social_graph_event(event.kind))
876            .collect::<Vec<_>>();
877        if graph_events.is_empty() {
878            return Ok(());
879        }
880
881        {
882            let mut graph = self.graph.lock().unwrap();
883            let mut snapshot = SocialGraph::from_state(
884                graph
885                    .export_state()
886                    .context("export social graph state for graph-only ingest")?,
887            )
888            .context("rebuild social graph state for graph-only ingest")?;
889            for event in graph_events {
890                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
891            }
892            graph
893                .replace_state(&snapshot.export_state())
894                .context("replace graph-only social graph state")?;
895        }
896        self.invalidate_distance_cache();
897        Ok(())
898    }
899
900    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
901        self.query_events_in_scope(filter, limit, EventQueryScope::All)
902    }
903
904    fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
905        let graph = self.graph.lock().unwrap();
906        let root_hex = graph.get_root().context("read social graph root")?;
907        if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
908            return Ok(EventStorageClass::Public);
909        }
910        Ok(EventStorageClass::Ambient)
911    }
912
913    fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
914        match storage_class {
915            EventStorageClass::Public => &self.public_events,
916            EventStorageClass::Ambient => &self.ambient_events,
917        }
918    }
919
920    fn ingest_event_with_storage_class(
921        &self,
922        event: &Event,
923        storage_class: EventStorageClass,
924    ) -> Result<()> {
925        let current_root = self.bucket(storage_class).events_root_for_write()?;
926        let next_root = self
927            .bucket(storage_class)
928            .store_event(current_root.as_ref(), event)?;
929        self.bucket(storage_class)
930            .write_events_root(Some(&next_root))?;
931
932        self.update_profile_index_for_events(std::slice::from_ref(event))?;
933
934        if is_social_graph_event(event.kind) {
935            {
936                let mut graph = self.graph.lock().unwrap();
937                graph
938                    .handle_event(&graph_event_from_nostr(event), true, 0.0)
939                    .context("ingest social graph event into nostr-social-graph")?;
940            }
941            self.invalidate_distance_cache();
942        }
943
944        Ok(())
945    }
946
947    fn ingest_events_with_storage_class(
948        &self,
949        events: &[Event],
950        storage_class: EventStorageClass,
951    ) -> Result<()> {
952        if events.is_empty() {
953            return Ok(());
954        }
955
956        let bucket = self.bucket(storage_class);
957        let current_root = bucket.events_root_for_write()?;
958        let stored_events = events
959            .iter()
960            .map(stored_event_from_nostr)
961            .collect::<Vec<_>>();
962        let next_root = block_on(
963            bucket
964                .event_store
965                .build(current_root.as_ref(), stored_events),
966        )
967        .map_err(map_event_store_error)?;
968        bucket.write_events_root(next_root.as_ref())?;
969
970        self.update_profile_index_for_events(events)?;
971
972        let graph_events = events
973            .iter()
974            .filter(|event| is_social_graph_event(event.kind))
975            .collect::<Vec<_>>();
976        if graph_events.is_empty() {
977            return Ok(());
978        }
979
980        {
981            let mut graph = self.graph.lock().unwrap();
982            let mut snapshot = SocialGraph::from_state(
983                graph
984                    .export_state()
985                    .context("export social graph state for batch ingest")?,
986            )
987            .context("rebuild social graph state for batch ingest")?;
988            for event in graph_events {
989                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
990            }
991            graph
992                .replace_state(&snapshot.export_state())
993                .context("replace batched social graph state")?;
994        }
995        self.invalidate_distance_cache();
996
997        Ok(())
998    }
999
1000    pub(crate) fn query_events_in_scope(
1001        &self,
1002        filter: &Filter,
1003        limit: usize,
1004        scope: EventQueryScope,
1005    ) -> Result<Vec<Event>> {
1006        if limit == 0 {
1007            return Ok(Vec::new());
1008        }
1009
1010        let buckets: &[&EventIndexBucket] = match scope {
1011            EventQueryScope::PublicOnly => &[&self.public_events],
1012            EventQueryScope::AmbientOnly => &[&self.ambient_events],
1013            EventQueryScope::All => &[&self.public_events, &self.ambient_events],
1014        };
1015
1016        let mut candidates = Vec::new();
1017        for bucket in buckets {
1018            candidates.extend(bucket.query_events(filter, limit)?);
1019        }
1020
1021        let mut deduped = dedupe_events(candidates);
1022        deduped.retain(|event| filter.match_event(event));
1023        deduped.truncate(limit);
1024        Ok(deduped)
1025    }
1026}
1027
1028impl SocialGraphBackend for SocialGraphStore {
1029    fn stats(&self) -> Result<SocialGraphStats> {
1030        SocialGraphStore::stats(self)
1031    }
1032
1033    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1034        SocialGraphStore::users_by_follow_distance(self, distance)
1035    }
1036
1037    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1038        SocialGraphStore::follow_distance(self, pk_bytes)
1039    }
1040
1041    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1042        SocialGraphStore::follow_list_created_at(self, owner)
1043    }
1044
1045    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1046        SocialGraphStore::followed_targets(self, owner)
1047    }
1048
1049    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1050        SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1051    }
1052
1053    fn profile_search_root(&self) -> Result<Option<Cid>> {
1054        SocialGraphStore::profile_search_root(self)
1055    }
1056
1057    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1058        SocialGraphStore::snapshot_chunks(self, root, options)
1059    }
1060
1061    fn ingest_event(&self, event: &Event) -> Result<()> {
1062        SocialGraphStore::ingest_event(self, event)
1063    }
1064
1065    fn ingest_event_with_storage_class(
1066        &self,
1067        event: &Event,
1068        storage_class: EventStorageClass,
1069    ) -> Result<()> {
1070        SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1071    }
1072
1073    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1074        SocialGraphStore::ingest_events(self, events)
1075    }
1076
1077    fn ingest_events_with_storage_class(
1078        &self,
1079        events: &[Event],
1080        storage_class: EventStorageClass,
1081    ) -> Result<()> {
1082        SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1083    }
1084
1085    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1086        SocialGraphStore::apply_graph_events_only(self, events)
1087    }
1088
1089    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1090        SocialGraphStore::query_events(self, filter, limit)
1091    }
1092}
1093
1094impl NostrSocialGraphBackend for SocialGraphStore {
1095    type Error = UpstreamGraphBackendError;
1096
1097    fn get_root(&self) -> std::result::Result<String, Self::Error> {
1098        let graph = self.graph.lock().unwrap();
1099        graph
1100            .get_root()
1101            .context("read social graph root")
1102            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1103    }
1104
1105    fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1106        let root_bytes =
1107            decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1108        SocialGraphStore::set_root(self, &root_bytes)
1109            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1110    }
1111
1112    fn handle_event(
1113        &mut self,
1114        event: &GraphEvent,
1115        allow_unknown_authors: bool,
1116        overmute_threshold: f64,
1117    ) -> std::result::Result<(), Self::Error> {
1118        {
1119            let mut graph = self.graph.lock().unwrap();
1120            graph
1121                .handle_event(event, allow_unknown_authors, overmute_threshold)
1122                .context("ingest social graph event into heed backend")
1123                .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1124        }
1125        self.invalidate_distance_cache();
1126        Ok(())
1127    }
1128
1129    fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1130        let graph = self.graph.lock().unwrap();
1131        graph
1132            .get_follow_distance(user)
1133            .context("read social graph follow distance")
1134            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1135    }
1136
1137    fn is_following(
1138        &self,
1139        follower: &str,
1140        followed_user: &str,
1141    ) -> std::result::Result<bool, Self::Error> {
1142        let graph = self.graph.lock().unwrap();
1143        graph
1144            .is_following(follower, followed_user)
1145            .context("read social graph following edge")
1146            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1147    }
1148
1149    fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1150        let graph = self.graph.lock().unwrap();
1151        graph
1152            .get_followed_by_user(user)
1153            .context("read followed-by-user list")
1154            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1155    }
1156
1157    fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1158        let graph = self.graph.lock().unwrap();
1159        graph
1160            .get_followers_by_user(user)
1161            .context("read followers-by-user list")
1162            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1163    }
1164
1165    fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1166        let graph = self.graph.lock().unwrap();
1167        graph
1168            .get_muted_by_user(user)
1169            .context("read muted-by-user list")
1170            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1171    }
1172
1173    fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1174        let graph = self.graph.lock().unwrap();
1175        graph
1176            .get_user_muted_by(user)
1177            .context("read user-muted-by list")
1178            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1179    }
1180
1181    fn get_follow_list_created_at(
1182        &self,
1183        user: &str,
1184    ) -> std::result::Result<Option<u64>, Self::Error> {
1185        let graph = self.graph.lock().unwrap();
1186        graph
1187            .get_follow_list_created_at(user)
1188            .context("read social graph follow list timestamp")
1189            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1190    }
1191
1192    fn get_mute_list_created_at(
1193        &self,
1194        user: &str,
1195    ) -> std::result::Result<Option<u64>, Self::Error> {
1196        let graph = self.graph.lock().unwrap();
1197        graph
1198            .get_mute_list_created_at(user)
1199            .context("read social graph mute list timestamp")
1200            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1201    }
1202
1203    fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1204        let graph = self.graph.lock().unwrap();
1205        graph
1206            .is_overmuted(user, threshold)
1207            .context("check social graph overmute")
1208            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1209    }
1210}
1211
1212impl<T> SocialGraphBackend for Arc<T>
1213where
1214    T: SocialGraphBackend + ?Sized,
1215{
1216    fn stats(&self) -> Result<SocialGraphStats> {
1217        self.as_ref().stats()
1218    }
1219
1220    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1221        self.as_ref().users_by_follow_distance(distance)
1222    }
1223
1224    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1225        self.as_ref().follow_distance(pk_bytes)
1226    }
1227
1228    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1229        self.as_ref().follow_list_created_at(owner)
1230    }
1231
1232    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1233        self.as_ref().followed_targets(owner)
1234    }
1235
1236    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1237        self.as_ref().is_overmuted_user(user_pk, threshold)
1238    }
1239
1240    fn profile_search_root(&self) -> Result<Option<Cid>> {
1241        self.as_ref().profile_search_root()
1242    }
1243
1244    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1245        self.as_ref().snapshot_chunks(root, options)
1246    }
1247
1248    fn ingest_event(&self, event: &Event) -> Result<()> {
1249        self.as_ref().ingest_event(event)
1250    }
1251
1252    fn ingest_event_with_storage_class(
1253        &self,
1254        event: &Event,
1255        storage_class: EventStorageClass,
1256    ) -> Result<()> {
1257        self.as_ref()
1258            .ingest_event_with_storage_class(event, storage_class)
1259    }
1260
1261    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1262        self.as_ref().ingest_events(events)
1263    }
1264
1265    fn ingest_events_with_storage_class(
1266        &self,
1267        events: &[Event],
1268        storage_class: EventStorageClass,
1269    ) -> Result<()> {
1270        self.as_ref()
1271            .ingest_events_with_storage_class(events, storage_class)
1272    }
1273
1274    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1275        self.as_ref().ingest_graph_events(events)
1276    }
1277
1278    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1279        self.as_ref().query_events(filter, limit)
1280    }
1281}
1282
1283fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1284    if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1285        return Ok(false);
1286    }
1287
1288    let GraphStats {
1289        users,
1290        follows,
1291        mutes,
1292        ..
1293    } = graph.size().context("size social graph")?;
1294    Ok(users <= 1 && follows == 0 && mutes == 0)
1295}
1296
1297fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1298    let mut set = UserSet::new();
1299    for value in values {
1300        set.insert(decode_pubkey(&value)?);
1301    }
1302    Ok(set)
1303}
1304
1305fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1306    let mut bytes = [0u8; 32];
1307    hex::decode_to_slice(value, &mut bytes)
1308        .with_context(|| format!("decode social graph pubkey {value}"))?;
1309    Ok(bytes)
1310}
1311
1312fn is_social_graph_event(kind: Kind) -> bool {
1313    kind == Kind::ContactList || kind == Kind::MuteList
1314}
1315
1316fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1317    GraphEvent {
1318        created_at: event.created_at.as_u64(),
1319        content: event.content.clone(),
1320        tags: event
1321            .tags
1322            .iter()
1323            .map(|tag| tag.as_slice().to_vec())
1324            .collect(),
1325        kind: event.kind.as_u16() as u32,
1326        pubkey: event.pubkey.to_hex(),
1327        id: event.id.to_hex(),
1328        sig: event.sig.to_string(),
1329    }
1330}
1331
1332fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1333    StoredNostrEvent {
1334        id: event.id.to_hex(),
1335        pubkey: event.pubkey.to_hex(),
1336        created_at: event.created_at.as_u64(),
1337        kind: event.kind.as_u16() as u32,
1338        tags: event
1339            .tags
1340            .iter()
1341            .map(|tag| tag.as_slice().to_vec())
1342            .collect(),
1343        content: event.content.clone(),
1344        sig: event.sig.to_string(),
1345    }
1346}
1347
1348fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1349    let value = serde_json::json!({
1350        "id": event.id,
1351        "pubkey": event.pubkey,
1352        "created_at": event.created_at,
1353        "kind": event.kind,
1354        "tags": event.tags,
1355        "content": event.content,
1356        "sig": event.sig,
1357    });
1358    Event::from_json(value.to_string()).context("decode stored nostr event")
1359}
1360
1361pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1362    nostr_event_from_stored(event)
1363}
1364
1365fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1366    rmp_serde::to_vec_named(&StoredCid {
1367        hash: cid.hash,
1368        key: cid.key,
1369    })
1370    .context("encode social graph events root")
1371}
1372
1373fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1374    let stored: StoredCid =
1375        rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1376    Ok(Some(Cid {
1377        hash: stored.hash,
1378        key: stored.key,
1379    }))
1380}
1381
1382fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1383    let Ok(bytes) = std::fs::read(path) else {
1384        return Ok(None);
1385    };
1386    decode_cid(&bytes)
1387}
1388
1389fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1390    let Some(root) = root else {
1391        if path.exists() {
1392            std::fs::remove_file(path)?;
1393        }
1394        return Ok(());
1395    };
1396
1397    let encoded = encode_cid(root)?;
1398    let tmp_path = path.with_extension("tmp");
1399    std::fs::write(&tmp_path, encoded)?;
1400    std::fs::rename(tmp_path, path)?;
1401    Ok(())
1402}
1403
1404fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1405    let raw = value.as_str()?;
1406    let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1407    if trimmed.is_empty() {
1408        return None;
1409    }
1410    Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1411}
1412
1413fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1414    let mut names = Vec::new();
1415    let mut seen = HashSet::new();
1416
1417    for key in ["display_name", "displayName", "name", "username"] {
1418        let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1419            continue;
1420        };
1421        let lowered = value.to_lowercase();
1422        if seen.insert(lowered) {
1423            names.push(value);
1424        }
1425    }
1426
1427    names
1428}
1429
1430fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1431    if local_part.len() == 1 || local_part.starts_with("npub1") {
1432        return true;
1433    }
1434
1435    primary_name
1436        .to_lowercase()
1437        .split_whitespace()
1438        .collect::<String>()
1439        .contains(local_part)
1440}
1441
1442fn normalize_profile_nip05(
1443    profile: &serde_json::Map<String, serde_json::Value>,
1444    primary_name: Option<&str>,
1445) -> Option<String> {
1446    let raw = profile.get("nip05")?.as_str()?;
1447    let local_part = raw.split('@').next()?.trim().to_lowercase();
1448    if local_part.is_empty() {
1449        return None;
1450    }
1451    let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1452    if truncated.is_empty() {
1453        return None;
1454    }
1455    if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1456        return None;
1457    }
1458    Some(truncated)
1459}
1460
1461fn is_search_stop_word(word: &str) -> bool {
1462    matches!(
1463        word,
1464        "a" | "an"
1465            | "the"
1466            | "and"
1467            | "or"
1468            | "but"
1469            | "in"
1470            | "on"
1471            | "at"
1472            | "to"
1473            | "for"
1474            | "of"
1475            | "with"
1476            | "by"
1477            | "from"
1478            | "is"
1479            | "it"
1480            | "as"
1481            | "be"
1482            | "was"
1483            | "are"
1484            | "this"
1485            | "that"
1486            | "these"
1487            | "those"
1488            | "i"
1489            | "you"
1490            | "he"
1491            | "she"
1492            | "we"
1493            | "they"
1494            | "my"
1495            | "your"
1496            | "his"
1497            | "her"
1498            | "its"
1499            | "our"
1500            | "their"
1501            | "what"
1502            | "which"
1503            | "who"
1504            | "whom"
1505            | "how"
1506            | "when"
1507            | "where"
1508            | "why"
1509            | "will"
1510            | "would"
1511            | "could"
1512            | "should"
1513            | "can"
1514            | "may"
1515            | "might"
1516            | "must"
1517            | "have"
1518            | "has"
1519            | "had"
1520            | "do"
1521            | "does"
1522            | "did"
1523            | "been"
1524            | "being"
1525            | "get"
1526            | "got"
1527            | "just"
1528            | "now"
1529            | "then"
1530            | "so"
1531            | "if"
1532            | "not"
1533            | "no"
1534            | "yes"
1535            | "all"
1536            | "any"
1537            | "some"
1538            | "more"
1539            | "most"
1540            | "other"
1541            | "into"
1542            | "over"
1543            | "after"
1544            | "before"
1545            | "about"
1546            | "up"
1547            | "down"
1548            | "out"
1549            | "off"
1550            | "through"
1551            | "during"
1552            | "under"
1553            | "again"
1554            | "further"
1555            | "once"
1556    )
1557}
1558
1559fn is_pure_search_number(word: &str) -> bool {
1560    if !word.chars().all(|ch| ch.is_ascii_digit()) {
1561        return false;
1562    }
1563    !(word.len() == 4
1564        && word
1565            .parse::<u16>()
1566            .is_ok_and(|year| (1900..=2099).contains(&year)))
1567}
1568
1569fn split_compound_search_word(word: &str) -> Vec<String> {
1570    let mut parts = Vec::new();
1571    let mut current = String::new();
1572    let chars: Vec<char> = word.chars().collect();
1573
1574    for (index, ch) in chars.iter().copied().enumerate() {
1575        let split_before = current.chars().last().is_some_and(|prev| {
1576            (prev.is_lowercase() && ch.is_uppercase())
1577                || (prev.is_ascii_digit() && ch.is_alphabetic())
1578                || (prev.is_alphabetic() && ch.is_ascii_digit())
1579                || (prev.is_uppercase()
1580                    && ch.is_uppercase()
1581                    && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
1582        });
1583
1584        if split_before && !current.is_empty() {
1585            parts.push(std::mem::take(&mut current));
1586        }
1587
1588        current.push(ch);
1589    }
1590
1591    if !current.is_empty() {
1592        parts.push(current);
1593    }
1594
1595    parts
1596}
1597
1598fn parse_search_keywords(text: &str) -> Vec<String> {
1599    let mut keywords = Vec::new();
1600    let mut seen = HashSet::new();
1601
1602    for word in text
1603        .split(|ch: char| !ch.is_alphanumeric())
1604        .filter(|word| !word.is_empty())
1605    {
1606        let mut variants = Vec::with_capacity(1 + word.len() / 4);
1607        variants.push(word.to_lowercase());
1608        variants.extend(
1609            split_compound_search_word(word)
1610                .into_iter()
1611                .map(|part| part.to_lowercase()),
1612        );
1613
1614        for lowered in variants {
1615            if lowered.chars().count() < 2
1616                || is_search_stop_word(&lowered)
1617                || is_pure_search_number(&lowered)
1618            {
1619                continue;
1620            }
1621            if seen.insert(lowered.clone()) {
1622                keywords.push(lowered);
1623            }
1624        }
1625    }
1626
1627    keywords
1628}
1629
1630fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
1631    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1632        Ok(serde_json::Value::Object(profile)) => profile,
1633        _ => serde_json::Map::new(),
1634    };
1635    let names = extract_profile_names(&profile);
1636    let primary_name = names.first().map(String::as_str);
1637    let mut parts = Vec::new();
1638    if let Some(name) = primary_name {
1639        parts.push(name.to_string());
1640    }
1641    if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
1642        parts.push(nip05);
1643    }
1644    parts.push(event.pubkey.to_hex());
1645    if names.len() > 1 {
1646        parts.extend(names.into_iter().skip(1));
1647    }
1648    parse_search_keywords(&parts.join(" "))
1649}
1650
1651fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
1652    left.created_at
1653        .as_u64()
1654        .cmp(&right.created_at.as_u64())
1655        .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
1656}
1657
1658fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
1659    anyhow::anyhow!("nostr event store error: {err}")
1660}
1661
1662fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
1663    let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
1664    let page_size = page_size_bytes() as u64;
1665    let rounded = requested
1666        .checked_add(page_size.saturating_sub(1))
1667        .map(|size| size / page_size * page_size)
1668        .unwrap_or(requested);
1669    let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
1670
1671    let env = unsafe {
1672        heed::EnvOpenOptions::new()
1673            .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
1674            .max_dbs(SOCIALGRAPH_MAX_DBS)
1675            .open(db_dir)
1676    }
1677    .context("open social graph LMDB env for resize")?;
1678    if env.info().map_size < map_size {
1679        unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
1680    }
1681
1682    Ok(())
1683}
1684
1685fn page_size_bytes() -> usize {
1686    page_size::get_granularity()
1687}
1688
1689#[cfg(test)]
1690mod tests;