Skip to main content

hashtree_cli/socialgraph/
mod.rs

1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10    LocalListFileState, LocalListSyncOutcome,
11};
12
13mod index_buckets;
14
15use index_buckets::{
16    dedupe_events, latest_metadata_events_by_pubkey, EventIndexBucket, ProfileIndexBucket,
17};
18
19use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, Mutex as StdMutex};
22
23use anyhow::{Context, Result};
24use bytes::Bytes;
25use futures::executor::block_on;
26use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
27use hashtree_index::BTree;
28use hashtree_nostr::{
29    is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
30    NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
31};
32#[cfg(test)]
33use hashtree_nostr::{
34    reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
35    take_profile as take_nostr_profile,
36};
37use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
38use nostr_social_graph::{
39    BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
40    SocialGraphBackend as NostrSocialGraphBackend,
41};
42use nostr_social_graph_heed::HeedSocialGraph;
43
44use crate::storage::{LocalStore, StorageRouter};
45
46#[cfg(test)]
47use std::sync::{Mutex, MutexGuard, OnceLock};
48#[cfg(test)]
49use std::time::Instant;
50
51pub type UserSet = BTreeSet<[u8; 32]>;
52
53const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
54const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
55const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
56const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
57const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
58const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
59const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
60const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
61const SOCIALGRAPH_MAX_DBS: u32 = 16;
62const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
63const PROFILE_SEARCH_PREFIX: &str = "p:";
64const PROFILE_NAME_MAX_LENGTH: usize = 100;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum EventStorageClass {
68    Public,
69    Ambient,
70}
71
72#[cfg_attr(not(test), allow(dead_code))]
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub(crate) enum EventQueryScope {
75    PublicOnly,
76    AmbientOnly,
77    All,
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81struct StoredCid {
82    hash: [u8; 32],
83    key: Option<[u8; 32]>,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
87pub struct StoredProfileSearchEntry {
88    pub pubkey: String,
89    pub name: String,
90    #[serde(default)]
91    pub aliases: Vec<String>,
92    #[serde(default)]
93    pub nip05: Option<String>,
94    #[serde(default, skip_serializing_if = "Option::is_none")]
95    pub follow_distance: Option<u32>,
96    pub created_at: u64,
97    pub event_nhash: String,
98}
99
100#[derive(Debug, Clone, Default, serde::Serialize)]
101pub struct SocialGraphStats {
102    pub total_users: usize,
103    pub root: Option<String>,
104    pub total_follows: usize,
105    pub max_depth: u32,
106    pub size_by_distance: BTreeMap<u32, usize>,
107    pub enabled: bool,
108}
109
110#[derive(Debug, Clone)]
111struct DistanceCache {
112    stats: SocialGraphStats,
113    users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
114}
115
116#[derive(Debug, thiserror::Error)]
117#[error("{0}")]
118pub struct UpstreamGraphBackendError(String);
119
120pub struct SocialGraphStore {
121    graph: StdMutex<HeedSocialGraph>,
122    distance_cache: StdMutex<Option<DistanceCache>>,
123    public_events: EventIndexBucket,
124    ambient_events: EventIndexBucket,
125    profile_index: ProfileIndexBucket,
126    profile_index_overmute_threshold: StdMutex<f64>,
127}
128
129pub trait SocialGraphBackend: Send + Sync {
130    fn stats(&self) -> Result<SocialGraphStats>;
131    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
132    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
133    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
134    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
135    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
136    fn profile_search_root(&self) -> Result<Option<Cid>> {
137        Ok(None)
138    }
139    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
140    fn ingest_event(&self, event: &Event) -> Result<()>;
141    fn ingest_event_with_storage_class(
142        &self,
143        event: &Event,
144        storage_class: EventStorageClass,
145    ) -> Result<()> {
146        let _ = storage_class;
147        self.ingest_event(event)
148    }
149    fn ingest_events(&self, events: &[Event]) -> Result<()> {
150        for event in events {
151            self.ingest_event(event)?;
152        }
153        Ok(())
154    }
155    fn ingest_events_with_storage_class(
156        &self,
157        events: &[Event],
158        storage_class: EventStorageClass,
159    ) -> Result<()> {
160        for event in events {
161            self.ingest_event_with_storage_class(event, storage_class)?;
162        }
163        Ok(())
164    }
165    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
166        self.ingest_events(events)
167    }
168    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
169}
170
171#[cfg(test)]
172pub type TestLockGuard = MutexGuard<'static, ()>;
173
174#[cfg(test)]
175static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
176
177#[cfg(test)]
178pub fn test_lock() -> TestLockGuard {
179    NDB_TEST_LOCK
180        .get_or_init(|| Mutex::new(()))
181        .lock()
182        .unwrap_or_else(|err| err.into_inner())
183}
184
185pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
186    open_social_graph_store_with_mapsize(data_dir, None)
187}
188
189pub fn open_social_graph_store_with_mapsize(
190    data_dir: &Path,
191    mapsize_bytes: Option<u64>,
192) -> Result<Arc<SocialGraphStore>> {
193    let db_dir = data_dir.join("socialgraph");
194    open_social_graph_store_at_path(&db_dir, mapsize_bytes)
195}
196
197pub fn open_social_graph_store_with_storage(
198    data_dir: &Path,
199    store: Arc<StorageRouter>,
200    mapsize_bytes: Option<u64>,
201) -> Result<Arc<SocialGraphStore>> {
202    let db_dir = data_dir.join("socialgraph");
203    open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
204}
205
206pub fn open_social_graph_store_at_path(
207    db_dir: &Path,
208    mapsize_bytes: Option<u64>,
209) -> Result<Arc<SocialGraphStore>> {
210    let config = hashtree_config::Config::load_or_default();
211    let backend = &config.storage.backend;
212    let local_store = Arc::new(
213        LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
214            .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
215    );
216    let store = Arc::new(StorageRouter::new(local_store));
217    open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
218}
219
220pub fn open_social_graph_store_at_path_with_storage(
221    db_dir: &Path,
222    store: Arc<StorageRouter>,
223    mapsize_bytes: Option<u64>,
224) -> Result<Arc<SocialGraphStore>> {
225    let ambient_backend = store.local_store().backend();
226    let ambient_local = Arc::new(
227        LocalStore::new_with_lmdb_map_size(
228            db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
229            &ambient_backend,
230            mapsize_bytes,
231        )
232        .map_err(|err| {
233            anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
234        })?,
235    );
236    let ambient_store = Arc::new(StorageRouter::new(ambient_local));
237    open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
238}
239
240pub fn open_social_graph_store_at_path_with_storage_split(
241    db_dir: &Path,
242    public_store: Arc<StorageRouter>,
243    ambient_store: Arc<StorageRouter>,
244    mapsize_bytes: Option<u64>,
245) -> Result<Arc<SocialGraphStore>> {
246    std::fs::create_dir_all(db_dir)?;
247    if let Some(size) = mapsize_bytes {
248        ensure_social_graph_mapsize(db_dir, size)?;
249    }
250    let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
251        .context("open nostr-social-graph heed backend")?;
252
253    Ok(Arc::new(SocialGraphStore {
254        graph: StdMutex::new(graph),
255        distance_cache: StdMutex::new(None),
256        public_events: EventIndexBucket {
257            event_store: NostrEventStore::new(Arc::clone(&public_store)),
258            root_path: db_dir.join(EVENTS_ROOT_FILE),
259        },
260        ambient_events: EventIndexBucket {
261            event_store: NostrEventStore::new(ambient_store),
262            root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
263        },
264        profile_index: ProfileIndexBucket {
265            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
266            index: BTree::new(
267                public_store,
268                hashtree_index::BTreeOptions {
269                    order: Some(PROFILE_SEARCH_INDEX_ORDER),
270                },
271            ),
272            by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
273            search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
274        },
275        profile_index_overmute_threshold: StdMutex::new(1.0),
276    }))
277}
278
279pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
280    if let Err(err) = store.set_root(pk_bytes) {
281        tracing::warn!("Failed to set social graph root: {err}");
282    }
283}
284
285pub fn get_follow_distance(
286    backend: &(impl SocialGraphBackend + ?Sized),
287    pk_bytes: &[u8; 32],
288) -> Option<u32> {
289    backend.follow_distance(pk_bytes).ok().flatten()
290}
291
292pub fn get_follows(
293    backend: &(impl SocialGraphBackend + ?Sized),
294    pk_bytes: &[u8; 32],
295) -> Vec<[u8; 32]> {
296    match backend.followed_targets(pk_bytes) {
297        Ok(set) => set.into_iter().collect(),
298        Err(_) => Vec::new(),
299    }
300}
301
302pub fn is_overmuted(
303    backend: &(impl SocialGraphBackend + ?Sized),
304    _root_pk: &[u8; 32],
305    user_pk: &[u8; 32],
306    threshold: f64,
307) -> bool {
308    backend
309        .is_overmuted_user(user_pk, threshold)
310        .unwrap_or(false)
311}
312
313pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
314    let event = match Event::from_json(event_json) {
315        Ok(event) => event,
316        Err(_) => return,
317    };
318
319    if let Err(err) = backend.ingest_event(&event) {
320        tracing::warn!("Failed to ingest social graph event: {err}");
321    }
322}
323
324pub fn ingest_parsed_event(
325    backend: &(impl SocialGraphBackend + ?Sized),
326    event: &Event,
327) -> Result<()> {
328    backend.ingest_event(event)
329}
330
331pub fn ingest_parsed_event_with_storage_class(
332    backend: &(impl SocialGraphBackend + ?Sized),
333    event: &Event,
334    storage_class: EventStorageClass,
335) -> Result<()> {
336    backend.ingest_event_with_storage_class(event, storage_class)
337}
338
339pub fn ingest_parsed_events(
340    backend: &(impl SocialGraphBackend + ?Sized),
341    events: &[Event],
342) -> Result<()> {
343    backend.ingest_events(events)
344}
345
346pub fn ingest_parsed_events_with_storage_class(
347    backend: &(impl SocialGraphBackend + ?Sized),
348    events: &[Event],
349    storage_class: EventStorageClass,
350) -> Result<()> {
351    backend.ingest_events_with_storage_class(events, storage_class)
352}
353
354pub fn ingest_graph_parsed_events(
355    backend: &(impl SocialGraphBackend + ?Sized),
356    events: &[Event],
357) -> Result<()> {
358    backend.ingest_graph_events(events)
359}
360
361pub fn query_events(
362    backend: &(impl SocialGraphBackend + ?Sized),
363    filter: &Filter,
364    limit: usize,
365) -> Vec<Event> {
366    backend.query_events(filter, limit).unwrap_or_default()
367}
368
369impl SocialGraphStore {
370    pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
371        *self
372            .profile_index_overmute_threshold
373            .lock()
374            .expect("profile index overmute threshold") = threshold;
375    }
376
377    fn profile_index_overmute_threshold(&self) -> f64 {
378        *self
379            .profile_index_overmute_threshold
380            .lock()
381            .expect("profile index overmute threshold")
382    }
383
384    fn invalidate_distance_cache(&self) {
385        *self.distance_cache.lock().unwrap() = None;
386    }
387
388    fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
389        let unique_ids = state
390            .unique_ids
391            .into_iter()
392            .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
393            .collect::<Result<HashMap<_, _>>>()?;
394
395        let mut users_by_distance = BTreeMap::new();
396        let mut size_by_distance = BTreeMap::new();
397        for (distance, users) in state.users_by_follow_distance {
398            let decoded = users
399                .into_iter()
400                .filter_map(|id| unique_ids.get(&id).copied())
401                .collect::<Vec<_>>();
402            size_by_distance.insert(distance, decoded.len());
403            users_by_distance.insert(distance, decoded);
404        }
405
406        let total_follows = state
407            .followed_by_user
408            .iter()
409            .map(|(_, targets)| targets.len())
410            .sum::<usize>();
411        let total_users = size_by_distance.values().copied().sum();
412        let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
413
414        Ok(DistanceCache {
415            stats: SocialGraphStats {
416                total_users,
417                root: Some(state.root),
418                total_follows,
419                max_depth,
420                size_by_distance,
421                enabled: true,
422            },
423            users_by_distance,
424        })
425    }
426
427    fn load_distance_cache(&self) -> Result<DistanceCache> {
428        if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
429            return Ok(cache);
430        }
431
432        let state = {
433            let graph = self.graph.lock().unwrap();
434            graph.export_state().context("export social graph state")?
435        };
436        let cache = Self::build_distance_cache(state)?;
437        *self.distance_cache.lock().unwrap() = Some(cache.clone());
438        Ok(cache)
439    }
440
441    fn set_root(&self, root: &[u8; 32]) -> Result<()> {
442        let root_hex = hex::encode(root);
443        {
444            let mut graph = self.graph.lock().unwrap();
445            if should_replace_placeholder_root(&graph)? {
446                let fresh = SocialGraph::new(&root_hex);
447                graph
448                    .replace_state(&fresh.export_state())
449                    .context("replace placeholder social graph root")?;
450            } else {
451                graph
452                    .set_root(&root_hex)
453                    .context("set nostr-social-graph root")?;
454            }
455        }
456        self.invalidate_distance_cache();
457        Ok(())
458    }
459
460    fn stats(&self) -> Result<SocialGraphStats> {
461        Ok(self.load_distance_cache()?.stats)
462    }
463
464    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
465        let graph = self.graph.lock().unwrap();
466        let distance = graph
467            .get_follow_distance(&hex::encode(pk_bytes))
468            .context("read social graph follow distance")?;
469        Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
470    }
471
472    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
473        Ok(self
474            .load_distance_cache()?
475            .users_by_distance
476            .get(&distance)
477            .cloned()
478            .unwrap_or_default())
479    }
480
481    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
482        let graph = self.graph.lock().unwrap();
483        graph
484            .get_follow_list_created_at(&hex::encode(owner))
485            .context("read social graph follow list timestamp")
486    }
487
488    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
489        let graph = self.graph.lock().unwrap();
490        decode_pubkey_set(
491            graph
492                .get_followed_by_user(&hex::encode(owner))
493                .context("read followed targets")?,
494        )
495    }
496
497    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
498        if threshold <= 0.0 {
499            return Ok(false);
500        }
501        let graph = self.graph.lock().unwrap();
502        graph
503            .is_overmuted(&hex::encode(user_pk), threshold)
504            .context("check social graph overmute")
505    }
506
507    #[cfg_attr(not(test), allow(dead_code))]
508    pub fn profile_search_root(&self) -> Result<Option<Cid>> {
509        self.profile_index.search_root()
510    }
511
512    #[cfg_attr(not(test), allow(dead_code))]
513    pub fn profiles_by_pubkey_root(&self) -> Result<Option<Cid>> {
514        self.profile_index.by_pubkey_root()
515    }
516
517    pub fn public_events_root(&self) -> Result<Option<Cid>> {
518        self.public_events.events_root()
519    }
520
521    #[cfg_attr(test, allow(dead_code))]
522    pub(crate) fn public_events_root_for_write(&self) -> Result<Option<Cid>> {
523        self.public_events.events_root_for_write()
524    }
525
526    pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
527        self.public_events.write_events_root(root)
528    }
529
530    #[cfg_attr(not(test), allow(dead_code))]
531    pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
532        self.profile_index.profile_event_for_pubkey(pubkey_hex)
533    }
534
535    #[cfg_attr(not(test), allow(dead_code))]
536    pub fn profile_search_entries_for_prefix(
537        &self,
538        prefix: &str,
539    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
540        self.profile_index.search_entries_for_prefix(prefix)
541    }
542
543    pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
544        self.update_profile_index_for_events(events)
545    }
546
547    pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
548        let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
549        let (by_pubkey_root, search_root) = self
550            .profile_index
551            .rebuild_profile_events_with_distances(latest_by_pubkey.into_values(), |event| {
552                self.follow_distance(&event.pubkey.to_bytes())
553            })?;
554        self.profile_index
555            .write_by_pubkey_root(by_pubkey_root.as_ref())?;
556        self.profile_index.write_search_root(search_root.as_ref())?;
557        Ok(())
558    }
559
560    pub(crate) async fn rebuild_profile_index_for_events_async(
561        &self,
562        events: &[Event],
563    ) -> Result<()> {
564        let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
565        let (by_pubkey_root, search_root) = self
566            .profile_index
567            .rebuild_profile_events_async_with_distances(
568                latest_by_pubkey.into_values(),
569                |event| self.follow_distance(&event.pubkey.to_bytes()),
570            )
571            .await?;
572        self.profile_index
573            .write_by_pubkey_root(by_pubkey_root.as_ref())?;
574        self.profile_index.write_search_root(search_root.as_ref())?;
575        Ok(())
576    }
577
578    pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
579        let public_events_root = self.public_events.events_root()?;
580        let ambient_events_root = self.ambient_events.events_root()?;
581        if public_events_root.is_none() && ambient_events_root.is_none() {
582            self.profile_index.write_by_pubkey_root(None)?;
583            self.profile_index.write_search_root(None)?;
584            return Ok(0);
585        }
586
587        let mut events = Vec::new();
588        for (bucket, root) in [
589            (&self.public_events, public_events_root),
590            (&self.ambient_events, ambient_events_root),
591        ] {
592            let Some(root) = root else {
593                continue;
594            };
595            let stored = block_on(bucket.event_store.list_by_kind_lossy(
596                Some(&root),
597                Kind::Metadata.as_u16() as u32,
598                ListEventsOptions::default(),
599            ))
600            .map_err(map_event_store_error)?;
601            events.extend(
602                stored
603                    .into_iter()
604                    .map(nostr_event_from_stored)
605                    .collect::<Result<Vec<_>>>()?,
606            );
607        }
608
609        let latest_count = self
610            .filtered_latest_metadata_events_by_pubkey(&events)?
611            .len();
612        self.rebuild_profile_index_for_events(&events)?;
613        Ok(latest_count)
614    }
615
616    pub async fn rebuild_profile_index_from_stored_events_async(&self) -> Result<usize> {
617        let public_events_root = self.public_events.events_root()?;
618        let ambient_events_root = self.ambient_events.events_root()?;
619        if public_events_root.is_none() && ambient_events_root.is_none() {
620            self.profile_index.write_by_pubkey_root(None)?;
621            self.profile_index.write_search_root(None)?;
622            return Ok(0);
623        }
624
625        let mut events = Vec::new();
626        for (bucket, root) in [
627            (&self.public_events, public_events_root),
628            (&self.ambient_events, ambient_events_root),
629        ] {
630            let Some(root) = root else {
631                continue;
632            };
633            let stored = bucket
634                .event_store
635                .list_by_kind_lossy(
636                    Some(&root),
637                    Kind::Metadata.as_u16() as u32,
638                    ListEventsOptions::default(),
639                )
640                .await
641                .map_err(map_event_store_error)?;
642            events.extend(
643                stored
644                    .into_iter()
645                    .map(nostr_event_from_stored)
646                    .collect::<Result<Vec<_>>>()?,
647            );
648        }
649
650        let latest_count = self
651            .filtered_latest_metadata_events_by_pubkey(&events)?
652            .len();
653        self.rebuild_profile_index_for_events_async(&events).await?;
654        Ok(latest_count)
655    }
656
657    pub fn rebuild_event_indexes_from_stored_events(&self) -> Result<(usize, usize)> {
658        let public_count =
659            self.rebuild_event_index_bucket_from_stored_events(&self.public_events)?;
660        let ambient_count =
661            self.rebuild_event_index_bucket_from_stored_events(&self.ambient_events)?;
662        self.rebuild_profile_index_from_stored_events()?;
663        Ok((public_count, ambient_count))
664    }
665
666    pub async fn rebuild_event_indexes_from_stored_events_async(&self) -> Result<(usize, usize)> {
667        let public_count = self
668            .rebuild_event_index_bucket_from_stored_events_async(&self.public_events)
669            .await?;
670        let ambient_count = self
671            .rebuild_event_index_bucket_from_stored_events_async(&self.ambient_events)
672            .await?;
673        self.rebuild_profile_index_from_stored_events_async()
674            .await?;
675        Ok((public_count, ambient_count))
676    }
677
678    fn rebuild_event_index_bucket_from_stored_events(
679        &self,
680        bucket: &EventIndexBucket,
681    ) -> Result<usize> {
682        let Some(root) = bucket.events_root()? else {
683            bucket.write_events_root(None)?;
684            return Ok(0);
685        };
686
687        let manifest = match block_on(bucket.event_store.get_manifest(Some(&root))) {
688            Ok(manifest) => manifest,
689            Err(err) => {
690                tracing::warn!(
691                    "Clearing invalid social graph event index root {} before rebuild: {}",
692                    hex::encode(root.hash),
693                    err
694                );
695                bucket.write_events_root(None)?;
696                return Ok(0);
697            }
698        };
699        if manifest.by_kind_time_author.is_none() {
700            let next_root = block_on(bucket.event_store.upgrade_manifest_indexes(Some(&root)))
701                .map_err(map_event_store_error)?;
702            if next_root.as_ref() != Some(&root) {
703                bucket.write_events_root(next_root.as_ref())?;
704                return Ok(0);
705            }
706        }
707
708        let stored = block_on(
709            bucket
710                .event_store
711                .list_recent_lossy(Some(&root), ListEventsOptions::default()),
712        )
713        .map_err(map_event_store_error)?;
714        let count = stored.len();
715        let next_root =
716            block_on(bucket.event_store.build(None, stored)).map_err(map_event_store_error)?;
717        bucket.write_events_root(next_root.as_ref())?;
718        Ok(count)
719    }
720
721    async fn rebuild_event_index_bucket_from_stored_events_async(
722        &self,
723        bucket: &EventIndexBucket,
724    ) -> Result<usize> {
725        let Some(root) = bucket.events_root()? else {
726            bucket.write_events_root(None)?;
727            return Ok(0);
728        };
729
730        let manifest = match bucket.event_store.get_manifest(Some(&root)).await {
731            Ok(manifest) => manifest,
732            Err(err) => {
733                tracing::warn!(
734                    "Clearing invalid social graph event index root {} before rebuild: {}",
735                    hex::encode(root.hash),
736                    err
737                );
738                bucket.write_events_root(None)?;
739                return Ok(0);
740            }
741        };
742        if manifest.by_kind_time_author.is_none() {
743            let next_root = bucket
744                .event_store
745                .upgrade_manifest_indexes(Some(&root))
746                .await
747                .map_err(map_event_store_error)?;
748            if next_root.as_ref() != Some(&root) {
749                bucket.write_events_root(next_root.as_ref())?;
750                return Ok(0);
751            }
752        }
753
754        let stored = bucket
755            .event_store
756            .list_recent_lossy(Some(&root), ListEventsOptions::default())
757            .await
758            .map_err(map_event_store_error)?;
759        let count = stored.len();
760        let next_root = bucket
761            .event_store
762            .build(None, stored)
763            .await
764            .map_err(map_event_store_error)?;
765        bucket.write_events_root(next_root.as_ref())?;
766        Ok(count)
767    }
768
769    fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
770        let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
771        let threshold = self.profile_index_overmute_threshold();
772
773        if latest_by_pubkey.is_empty() {
774            return Ok(());
775        }
776
777        let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
778        let mut search_root = self.profile_index.search_root()?;
779        let mut changed = false;
780
781        for event in latest_by_pubkey.into_values() {
782            let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
783            let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
784                self.profile_index.remove_profile_event(
785                    by_pubkey_root.as_ref(),
786                    search_root.as_ref(),
787                    &event.pubkey.to_hex(),
788                )?
789            } else {
790                self.profile_index.update_profile_event(
791                    by_pubkey_root.as_ref(),
792                    search_root.as_ref(),
793                    event,
794                    self.follow_distance(&event.pubkey.to_bytes())?,
795                )?
796            };
797            if updated {
798                by_pubkey_root = next_by_pubkey_root;
799                search_root = next_search_root;
800                changed = true;
801            }
802        }
803
804        if changed {
805            self.profile_index
806                .write_by_pubkey_root(by_pubkey_root.as_ref())?;
807            self.profile_index.write_search_root(search_root.as_ref())?;
808        }
809
810        Ok(())
811    }
812
813    fn filtered_latest_metadata_events_by_pubkey<'a>(
814        &self,
815        events: &'a [Event],
816    ) -> Result<BTreeMap<String, &'a Event>> {
817        let threshold = self.profile_index_overmute_threshold();
818        let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
819        for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
820            if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
821                continue;
822            }
823            let pubkey = event.pubkey.to_hex();
824            match latest_by_pubkey.get(&pubkey) {
825                Some(current) if compare_nostr_events(event, current).is_le() => {}
826                _ => {
827                    latest_by_pubkey.insert(pubkey, event);
828                }
829            }
830        }
831        Ok(latest_by_pubkey)
832    }
833
834    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
835        let state = {
836            let graph = self.graph.lock().unwrap();
837            graph.export_state().context("export social graph state")?
838        };
839        let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
840        let root_hex = hex::encode(root);
841        if graph.get_root() != root_hex {
842            graph
843                .set_root(&root_hex)
844                .context("set snapshot social graph root")?;
845        }
846        let chunks = graph
847            .to_binary_chunks_with_budget(*options)
848            .context("encode social graph snapshot")?;
849        Ok(chunks.into_iter().map(Bytes::from).collect())
850    }
851
852    fn ingest_event(&self, event: &Event) -> Result<()> {
853        self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
854    }
855
856    fn ingest_events(&self, events: &[Event]) -> Result<()> {
857        if events.is_empty() {
858            return Ok(());
859        }
860
861        let mut public = Vec::new();
862        let mut ambient = Vec::new();
863        for event in events {
864            match self.default_storage_class_for(event)? {
865                EventStorageClass::Public => public.push(event.clone()),
866                EventStorageClass::Ambient => ambient.push(event.clone()),
867            }
868        }
869
870        if !public.is_empty() {
871            self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
872        }
873        if !ambient.is_empty() {
874            self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
875        }
876
877        Ok(())
878    }
879
880    fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
881        let graph_events = events
882            .iter()
883            .filter(|event| is_social_graph_event(event.kind))
884            .collect::<Vec<_>>();
885        if graph_events.is_empty() {
886            return Ok(());
887        }
888
889        {
890            let mut graph = self.graph.lock().unwrap();
891            let mut snapshot = SocialGraph::from_state(
892                graph
893                    .export_state()
894                    .context("export social graph state for graph-only ingest")?,
895            )
896            .context("rebuild social graph state for graph-only ingest")?;
897            for event in graph_events {
898                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
899            }
900            graph
901                .replace_state(&snapshot.export_state())
902                .context("replace graph-only social graph state")?;
903        }
904        self.invalidate_distance_cache();
905        Ok(())
906    }
907
908    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
909        self.query_events_in_scope(filter, limit, EventQueryScope::All)
910    }
911
912    fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
913        let graph = self.graph.lock().unwrap();
914        let root_hex = graph.get_root().context("read social graph root")?;
915        if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
916            return Ok(EventStorageClass::Public);
917        }
918        Ok(EventStorageClass::Ambient)
919    }
920
921    fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
922        match storage_class {
923            EventStorageClass::Public => &self.public_events,
924            EventStorageClass::Ambient => &self.ambient_events,
925        }
926    }
927
928    fn ingest_event_with_storage_class(
929        &self,
930        event: &Event,
931        storage_class: EventStorageClass,
932    ) -> Result<()> {
933        let current_root = self.bucket(storage_class).events_root_for_write()?;
934        let next_root = self
935            .bucket(storage_class)
936            .store_event(current_root.as_ref(), event)?;
937        self.bucket(storage_class)
938            .write_events_root(Some(&next_root))?;
939
940        self.update_profile_index_for_events(std::slice::from_ref(event))?;
941
942        if is_social_graph_event(event.kind) {
943            {
944                let mut graph = self.graph.lock().unwrap();
945                graph
946                    .handle_event(&graph_event_from_nostr(event), true, 0.0)
947                    .context("ingest social graph event into nostr-social-graph")?;
948            }
949            self.invalidate_distance_cache();
950        }
951
952        Ok(())
953    }
954
955    fn ingest_events_with_storage_class(
956        &self,
957        events: &[Event],
958        storage_class: EventStorageClass,
959    ) -> Result<()> {
960        if events.is_empty() {
961            return Ok(());
962        }
963
964        let bucket = self.bucket(storage_class);
965        let current_root = bucket.events_root_for_write()?;
966        let stored_events = events
967            .iter()
968            .map(stored_event_from_nostr)
969            .collect::<Vec<_>>();
970        let next_root = block_on(
971            bucket
972                .event_store
973                .build(current_root.as_ref(), stored_events),
974        )
975        .map_err(map_event_store_error)?;
976        bucket.write_events_root(next_root.as_ref())?;
977
978        self.update_profile_index_for_events(events)?;
979
980        let graph_events = events
981            .iter()
982            .filter(|event| is_social_graph_event(event.kind))
983            .collect::<Vec<_>>();
984        if graph_events.is_empty() {
985            return Ok(());
986        }
987
988        {
989            let mut graph = self.graph.lock().unwrap();
990            let mut snapshot = SocialGraph::from_state(
991                graph
992                    .export_state()
993                    .context("export social graph state for batch ingest")?,
994            )
995            .context("rebuild social graph state for batch ingest")?;
996            for event in graph_events {
997                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
998            }
999            graph
1000                .replace_state(&snapshot.export_state())
1001                .context("replace batched social graph state")?;
1002        }
1003        self.invalidate_distance_cache();
1004
1005        Ok(())
1006    }
1007
1008    pub(crate) fn query_events_in_scope(
1009        &self,
1010        filter: &Filter,
1011        limit: usize,
1012        scope: EventQueryScope,
1013    ) -> Result<Vec<Event>> {
1014        if limit == 0 {
1015            return Ok(Vec::new());
1016        }
1017
1018        let buckets: &[&EventIndexBucket] = match scope {
1019            EventQueryScope::PublicOnly => &[&self.public_events],
1020            EventQueryScope::AmbientOnly => &[&self.ambient_events],
1021            EventQueryScope::All => &[&self.public_events, &self.ambient_events],
1022        };
1023
1024        let mut candidates = Vec::new();
1025        for bucket in buckets {
1026            candidates.extend(bucket.query_events(filter, limit)?);
1027        }
1028
1029        let mut deduped = dedupe_events(candidates);
1030        deduped.retain(|event| filter.match_event(event));
1031        deduped.truncate(limit);
1032        Ok(deduped)
1033    }
1034}
1035
1036impl SocialGraphBackend for SocialGraphStore {
1037    fn stats(&self) -> Result<SocialGraphStats> {
1038        SocialGraphStore::stats(self)
1039    }
1040
1041    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1042        SocialGraphStore::users_by_follow_distance(self, distance)
1043    }
1044
1045    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1046        SocialGraphStore::follow_distance(self, pk_bytes)
1047    }
1048
1049    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1050        SocialGraphStore::follow_list_created_at(self, owner)
1051    }
1052
1053    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1054        SocialGraphStore::followed_targets(self, owner)
1055    }
1056
1057    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1058        SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1059    }
1060
1061    fn profile_search_root(&self) -> Result<Option<Cid>> {
1062        SocialGraphStore::profile_search_root(self)
1063    }
1064
1065    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1066        SocialGraphStore::snapshot_chunks(self, root, options)
1067    }
1068
1069    fn ingest_event(&self, event: &Event) -> Result<()> {
1070        SocialGraphStore::ingest_event(self, event)
1071    }
1072
1073    fn ingest_event_with_storage_class(
1074        &self,
1075        event: &Event,
1076        storage_class: EventStorageClass,
1077    ) -> Result<()> {
1078        SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1079    }
1080
1081    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1082        SocialGraphStore::ingest_events(self, events)
1083    }
1084
1085    fn ingest_events_with_storage_class(
1086        &self,
1087        events: &[Event],
1088        storage_class: EventStorageClass,
1089    ) -> Result<()> {
1090        SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1091    }
1092
1093    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1094        SocialGraphStore::apply_graph_events_only(self, events)
1095    }
1096
1097    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1098        SocialGraphStore::query_events(self, filter, limit)
1099    }
1100}
1101
1102impl NostrSocialGraphBackend for SocialGraphStore {
1103    type Error = UpstreamGraphBackendError;
1104
1105    fn get_root(&self) -> std::result::Result<String, Self::Error> {
1106        let graph = self.graph.lock().unwrap();
1107        graph
1108            .get_root()
1109            .context("read social graph root")
1110            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1111    }
1112
1113    fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1114        let root_bytes =
1115            decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1116        SocialGraphStore::set_root(self, &root_bytes)
1117            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1118    }
1119
1120    fn handle_event(
1121        &mut self,
1122        event: &GraphEvent,
1123        allow_unknown_authors: bool,
1124        overmute_threshold: f64,
1125    ) -> std::result::Result<(), Self::Error> {
1126        {
1127            let mut graph = self.graph.lock().unwrap();
1128            graph
1129                .handle_event(event, allow_unknown_authors, overmute_threshold)
1130                .context("ingest social graph event into heed backend")
1131                .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1132        }
1133        self.invalidate_distance_cache();
1134        Ok(())
1135    }
1136
1137    fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1138        let graph = self.graph.lock().unwrap();
1139        graph
1140            .get_follow_distance(user)
1141            .context("read social graph follow distance")
1142            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1143    }
1144
1145    fn is_following(
1146        &self,
1147        follower: &str,
1148        followed_user: &str,
1149    ) -> std::result::Result<bool, Self::Error> {
1150        let graph = self.graph.lock().unwrap();
1151        graph
1152            .is_following(follower, followed_user)
1153            .context("read social graph following edge")
1154            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1155    }
1156
1157    fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1158        let graph = self.graph.lock().unwrap();
1159        graph
1160            .get_followed_by_user(user)
1161            .context("read followed-by-user list")
1162            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1163    }
1164
1165    fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1166        let graph = self.graph.lock().unwrap();
1167        graph
1168            .get_followers_by_user(user)
1169            .context("read followers-by-user list")
1170            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1171    }
1172
1173    fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1174        let graph = self.graph.lock().unwrap();
1175        graph
1176            .get_muted_by_user(user)
1177            .context("read muted-by-user list")
1178            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1179    }
1180
1181    fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1182        let graph = self.graph.lock().unwrap();
1183        graph
1184            .get_user_muted_by(user)
1185            .context("read user-muted-by list")
1186            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1187    }
1188
1189    fn get_follow_list_created_at(
1190        &self,
1191        user: &str,
1192    ) -> std::result::Result<Option<u64>, Self::Error> {
1193        let graph = self.graph.lock().unwrap();
1194        graph
1195            .get_follow_list_created_at(user)
1196            .context("read social graph follow list timestamp")
1197            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1198    }
1199
1200    fn get_mute_list_created_at(
1201        &self,
1202        user: &str,
1203    ) -> std::result::Result<Option<u64>, Self::Error> {
1204        let graph = self.graph.lock().unwrap();
1205        graph
1206            .get_mute_list_created_at(user)
1207            .context("read social graph mute list timestamp")
1208            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1209    }
1210
1211    fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1212        let graph = self.graph.lock().unwrap();
1213        graph
1214            .is_overmuted(user, threshold)
1215            .context("check social graph overmute")
1216            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1217    }
1218}
1219
1220impl<T> SocialGraphBackend for Arc<T>
1221where
1222    T: SocialGraphBackend + ?Sized,
1223{
1224    fn stats(&self) -> Result<SocialGraphStats> {
1225        self.as_ref().stats()
1226    }
1227
1228    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1229        self.as_ref().users_by_follow_distance(distance)
1230    }
1231
1232    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1233        self.as_ref().follow_distance(pk_bytes)
1234    }
1235
1236    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1237        self.as_ref().follow_list_created_at(owner)
1238    }
1239
1240    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1241        self.as_ref().followed_targets(owner)
1242    }
1243
1244    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1245        self.as_ref().is_overmuted_user(user_pk, threshold)
1246    }
1247
1248    fn profile_search_root(&self) -> Result<Option<Cid>> {
1249        self.as_ref().profile_search_root()
1250    }
1251
1252    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1253        self.as_ref().snapshot_chunks(root, options)
1254    }
1255
1256    fn ingest_event(&self, event: &Event) -> Result<()> {
1257        self.as_ref().ingest_event(event)
1258    }
1259
1260    fn ingest_event_with_storage_class(
1261        &self,
1262        event: &Event,
1263        storage_class: EventStorageClass,
1264    ) -> Result<()> {
1265        self.as_ref()
1266            .ingest_event_with_storage_class(event, storage_class)
1267    }
1268
1269    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1270        self.as_ref().ingest_events(events)
1271    }
1272
1273    fn ingest_events_with_storage_class(
1274        &self,
1275        events: &[Event],
1276        storage_class: EventStorageClass,
1277    ) -> Result<()> {
1278        self.as_ref()
1279            .ingest_events_with_storage_class(events, storage_class)
1280    }
1281
1282    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1283        self.as_ref().ingest_graph_events(events)
1284    }
1285
1286    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1287        self.as_ref().query_events(filter, limit)
1288    }
1289}
1290
1291fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1292    if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1293        return Ok(false);
1294    }
1295
1296    let GraphStats {
1297        users,
1298        follows,
1299        mutes,
1300        ..
1301    } = graph.size().context("size social graph")?;
1302    Ok(users <= 1 && follows == 0 && mutes == 0)
1303}
1304
1305fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1306    let mut set = UserSet::new();
1307    for value in values {
1308        set.insert(decode_pubkey(&value)?);
1309    }
1310    Ok(set)
1311}
1312
1313fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1314    let mut bytes = [0u8; 32];
1315    hex::decode_to_slice(value, &mut bytes)
1316        .with_context(|| format!("decode social graph pubkey {value}"))?;
1317    Ok(bytes)
1318}
1319
1320fn is_social_graph_event(kind: Kind) -> bool {
1321    kind == Kind::ContactList || kind == Kind::MuteList
1322}
1323
1324fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1325    GraphEvent {
1326        created_at: event.created_at.as_u64(),
1327        content: event.content.clone(),
1328        tags: event
1329            .tags
1330            .iter()
1331            .map(|tag| tag.as_slice().to_vec())
1332            .collect(),
1333        kind: event.kind.as_u16() as u32,
1334        pubkey: event.pubkey.to_hex(),
1335        id: event.id.to_hex(),
1336        sig: event.sig.to_string(),
1337    }
1338}
1339
1340fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1341    StoredNostrEvent {
1342        id: event.id.to_hex(),
1343        pubkey: event.pubkey.to_hex(),
1344        created_at: event.created_at.as_u64(),
1345        kind: event.kind.as_u16() as u32,
1346        tags: event
1347            .tags
1348            .iter()
1349            .map(|tag| tag.as_slice().to_vec())
1350            .collect(),
1351        content: event.content.clone(),
1352        sig: event.sig.to_string(),
1353    }
1354}
1355
1356fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1357    let value = serde_json::json!({
1358        "id": event.id,
1359        "pubkey": event.pubkey,
1360        "created_at": event.created_at,
1361        "kind": event.kind,
1362        "tags": event.tags,
1363        "content": event.content,
1364        "sig": event.sig,
1365    });
1366    Event::from_json(value.to_string()).context("decode stored nostr event")
1367}
1368
1369pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1370    nostr_event_from_stored(event)
1371}
1372
1373fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1374    rmp_serde::to_vec_named(&StoredCid {
1375        hash: cid.hash,
1376        key: cid.key,
1377    })
1378    .context("encode social graph events root")
1379}
1380
1381fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1382    let stored: StoredCid =
1383        rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1384    Ok(Some(Cid {
1385        hash: stored.hash,
1386        key: stored.key,
1387    }))
1388}
1389
1390fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1391    let Ok(bytes) = std::fs::read(path) else {
1392        return Ok(None);
1393    };
1394    decode_cid(&bytes)
1395}
1396
1397fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1398    let Some(root) = root else {
1399        if path.exists() {
1400            std::fs::remove_file(path)?;
1401        }
1402        return Ok(());
1403    };
1404
1405    let encoded = encode_cid(root)?;
1406    let tmp_path = path.with_extension("tmp");
1407    std::fs::write(&tmp_path, encoded)?;
1408    std::fs::rename(tmp_path, path)?;
1409    Ok(())
1410}
1411
1412fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1413    let raw = value.as_str()?;
1414    let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1415    if trimmed.is_empty() {
1416        return None;
1417    }
1418    Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1419}
1420
1421fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1422    let mut names = Vec::new();
1423    let mut seen = HashSet::new();
1424
1425    for key in ["display_name", "displayName", "name", "username"] {
1426        let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1427            continue;
1428        };
1429        let lowered = value.to_lowercase();
1430        if seen.insert(lowered) {
1431            names.push(value);
1432        }
1433    }
1434
1435    names
1436}
1437
1438fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1439    if local_part.len() == 1 || local_part.starts_with("npub1") {
1440        return true;
1441    }
1442
1443    primary_name
1444        .to_lowercase()
1445        .split_whitespace()
1446        .collect::<String>()
1447        .contains(local_part)
1448}
1449
1450fn normalize_profile_nip05(
1451    profile: &serde_json::Map<String, serde_json::Value>,
1452    primary_name: Option<&str>,
1453) -> Option<String> {
1454    let raw = profile.get("nip05")?.as_str()?;
1455    let local_part = raw.split('@').next()?.trim().to_lowercase();
1456    if local_part.is_empty() {
1457        return None;
1458    }
1459    let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1460    if truncated.is_empty() {
1461        return None;
1462    }
1463    if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1464        return None;
1465    }
1466    Some(truncated)
1467}
1468
1469fn is_search_stop_word(word: &str) -> bool {
1470    matches!(
1471        word,
1472        "a" | "an"
1473            | "the"
1474            | "and"
1475            | "or"
1476            | "but"
1477            | "in"
1478            | "on"
1479            | "at"
1480            | "to"
1481            | "for"
1482            | "of"
1483            | "with"
1484            | "by"
1485            | "from"
1486            | "is"
1487            | "it"
1488            | "as"
1489            | "be"
1490            | "was"
1491            | "are"
1492            | "this"
1493            | "that"
1494            | "these"
1495            | "those"
1496            | "i"
1497            | "you"
1498            | "he"
1499            | "she"
1500            | "we"
1501            | "they"
1502            | "my"
1503            | "your"
1504            | "his"
1505            | "her"
1506            | "its"
1507            | "our"
1508            | "their"
1509            | "what"
1510            | "which"
1511            | "who"
1512            | "whom"
1513            | "how"
1514            | "when"
1515            | "where"
1516            | "why"
1517            | "will"
1518            | "would"
1519            | "could"
1520            | "should"
1521            | "can"
1522            | "may"
1523            | "might"
1524            | "must"
1525            | "have"
1526            | "has"
1527            | "had"
1528            | "do"
1529            | "does"
1530            | "did"
1531            | "been"
1532            | "being"
1533            | "get"
1534            | "got"
1535            | "just"
1536            | "now"
1537            | "then"
1538            | "so"
1539            | "if"
1540            | "not"
1541            | "no"
1542            | "yes"
1543            | "all"
1544            | "any"
1545            | "some"
1546            | "more"
1547            | "most"
1548            | "other"
1549            | "into"
1550            | "over"
1551            | "after"
1552            | "before"
1553            | "about"
1554            | "up"
1555            | "down"
1556            | "out"
1557            | "off"
1558            | "through"
1559            | "during"
1560            | "under"
1561            | "again"
1562            | "further"
1563            | "once"
1564    )
1565}
1566
1567fn is_pure_search_number(word: &str) -> bool {
1568    if !word.chars().all(|ch| ch.is_ascii_digit()) {
1569        return false;
1570    }
1571    !(word.len() == 4
1572        && word
1573            .parse::<u16>()
1574            .is_ok_and(|year| (1900..=2099).contains(&year)))
1575}
1576
1577fn split_compound_search_word(word: &str) -> Vec<String> {
1578    let mut parts = Vec::new();
1579    let mut current = String::new();
1580    let chars: Vec<char> = word.chars().collect();
1581
1582    for (index, ch) in chars.iter().copied().enumerate() {
1583        let split_before = current.chars().last().is_some_and(|prev| {
1584            (prev.is_lowercase() && ch.is_uppercase())
1585                || (prev.is_ascii_digit() && ch.is_alphabetic())
1586                || (prev.is_alphabetic() && ch.is_ascii_digit())
1587                || (prev.is_uppercase()
1588                    && ch.is_uppercase()
1589                    && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
1590        });
1591
1592        if split_before && !current.is_empty() {
1593            parts.push(std::mem::take(&mut current));
1594        }
1595
1596        current.push(ch);
1597    }
1598
1599    if !current.is_empty() {
1600        parts.push(current);
1601    }
1602
1603    parts
1604}
1605
1606fn parse_search_keywords(text: &str) -> Vec<String> {
1607    let mut keywords = Vec::new();
1608    let mut seen = HashSet::new();
1609
1610    for word in text
1611        .split(|ch: char| !ch.is_alphanumeric())
1612        .filter(|word| !word.is_empty())
1613    {
1614        let mut variants = Vec::with_capacity(1 + word.len() / 4);
1615        variants.push(word.to_lowercase());
1616        variants.extend(
1617            split_compound_search_word(word)
1618                .into_iter()
1619                .map(|part| part.to_lowercase()),
1620        );
1621
1622        for lowered in variants {
1623            if lowered.chars().count() < 2
1624                || is_search_stop_word(&lowered)
1625                || is_pure_search_number(&lowered)
1626            {
1627                continue;
1628            }
1629            if seen.insert(lowered.clone()) {
1630                keywords.push(lowered);
1631            }
1632        }
1633    }
1634
1635    keywords
1636}
1637
1638fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
1639    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1640        Ok(serde_json::Value::Object(profile)) => profile,
1641        _ => serde_json::Map::new(),
1642    };
1643    let names = extract_profile_names(&profile);
1644    let primary_name = names.first().map(String::as_str);
1645    let mut parts = Vec::new();
1646    if let Some(name) = primary_name {
1647        parts.push(name.to_string());
1648    }
1649    if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
1650        parts.push(nip05);
1651    }
1652    parts.push(event.pubkey.to_hex());
1653    if names.len() > 1 {
1654        parts.extend(names.into_iter().skip(1));
1655    }
1656    parse_search_keywords(&parts.join(" "))
1657}
1658
1659fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
1660    left.created_at
1661        .as_u64()
1662        .cmp(&right.created_at.as_u64())
1663        .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
1664}
1665
1666fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
1667    anyhow::anyhow!("nostr event store error: {err}")
1668}
1669
1670fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
1671    let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
1672    let page_size = page_size_bytes() as u64;
1673    let rounded = requested
1674        .checked_add(page_size.saturating_sub(1))
1675        .map(|size| size / page_size * page_size)
1676        .unwrap_or(requested);
1677    let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
1678
1679    let env = unsafe {
1680        heed::EnvOpenOptions::new()
1681            .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
1682            .max_dbs(SOCIALGRAPH_MAX_DBS)
1683            .open(db_dir)
1684    }
1685    .context("open social graph LMDB env for resize")?;
1686    if env.info().map_size < map_size {
1687        unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
1688    }
1689
1690    Ok(())
1691}
1692
1693fn page_size_bytes() -> usize {
1694    page_size::get_granularity()
1695}
1696
1697#[cfg(test)]
1698mod tests;