Skip to main content

hashtree_cli/socialgraph/
mod.rs

1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10    LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
21use hashtree_index::BTree;
22use hashtree_nostr::{
23    is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
24    NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
25};
26#[cfg(test)]
27use hashtree_nostr::{
28    reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
29    take_profile as take_nostr_profile,
30};
31use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
32use nostr_social_graph::{
33    BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
34    SocialGraphBackend as NostrSocialGraphBackend,
35};
36use nostr_social_graph_heed::HeedSocialGraph;
37
38use crate::storage::{LocalStore, StorageRouter};
39
40#[cfg(test)]
41use std::sync::{Mutex, MutexGuard, OnceLock};
42#[cfg(test)]
43use std::time::Instant;
44
45pub type UserSet = BTreeSet<[u8; 32]>;
46
47const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
48const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
49const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
50const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
51const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
52const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
53const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
54const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
55const SOCIALGRAPH_MAX_DBS: u32 = 16;
56const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
57const PROFILE_SEARCH_PREFIX: &str = "p:";
58const PROFILE_NAME_MAX_LENGTH: usize = 100;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum EventStorageClass {
62    Public,
63    Ambient,
64}
65
66#[cfg_attr(not(test), allow(dead_code))]
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub(crate) enum EventQueryScope {
69    PublicOnly,
70    AmbientOnly,
71    All,
72}
73
74struct EventIndexBucket {
75    event_store: NostrEventStore<StorageRouter>,
76    root_path: PathBuf,
77}
78
79struct ProfileIndexBucket {
80    tree: HashTree<StorageRouter>,
81    index: BTree<StorageRouter>,
82    by_pubkey_root_path: PathBuf,
83    search_root_path: PathBuf,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct StoredCid {
88    hash: [u8; 32],
89    key: Option<[u8; 32]>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
93pub struct StoredProfileSearchEntry {
94    pub pubkey: String,
95    pub name: String,
96    #[serde(default)]
97    pub aliases: Vec<String>,
98    #[serde(default)]
99    pub nip05: Option<String>,
100    pub created_at: u64,
101    pub event_nhash: String,
102}
103
104#[derive(Debug, Clone, Default, serde::Serialize)]
105pub struct SocialGraphStats {
106    pub total_users: usize,
107    pub root: Option<String>,
108    pub total_follows: usize,
109    pub max_depth: u32,
110    pub size_by_distance: BTreeMap<u32, usize>,
111    pub enabled: bool,
112}
113
114#[derive(Debug, Clone)]
115struct DistanceCache {
116    stats: SocialGraphStats,
117    users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
118}
119
120#[derive(Debug, thiserror::Error)]
121#[error("{0}")]
122pub struct UpstreamGraphBackendError(String);
123
124pub struct SocialGraphStore {
125    graph: StdMutex<HeedSocialGraph>,
126    distance_cache: StdMutex<Option<DistanceCache>>,
127    public_events: EventIndexBucket,
128    ambient_events: EventIndexBucket,
129    profile_index: ProfileIndexBucket,
130    profile_index_overmute_threshold: StdMutex<f64>,
131}
132
133pub trait SocialGraphBackend: Send + Sync {
134    fn stats(&self) -> Result<SocialGraphStats>;
135    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
136    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
137    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
138    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
139    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
140    fn profile_search_root(&self) -> Result<Option<Cid>> {
141        Ok(None)
142    }
143    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
144    fn ingest_event(&self, event: &Event) -> Result<()>;
145    fn ingest_event_with_storage_class(
146        &self,
147        event: &Event,
148        storage_class: EventStorageClass,
149    ) -> Result<()> {
150        let _ = storage_class;
151        self.ingest_event(event)
152    }
153    fn ingest_events(&self, events: &[Event]) -> Result<()> {
154        for event in events {
155            self.ingest_event(event)?;
156        }
157        Ok(())
158    }
159    fn ingest_events_with_storage_class(
160        &self,
161        events: &[Event],
162        storage_class: EventStorageClass,
163    ) -> Result<()> {
164        for event in events {
165            self.ingest_event_with_storage_class(event, storage_class)?;
166        }
167        Ok(())
168    }
169    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
170        self.ingest_events(events)
171    }
172    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
173}
174
175#[cfg(test)]
176pub type TestLockGuard = MutexGuard<'static, ()>;
177
178#[cfg(test)]
179static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
180
181#[cfg(test)]
182pub fn test_lock() -> TestLockGuard {
183    NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
184}
185
186pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
187    open_social_graph_store_with_mapsize(data_dir, None)
188}
189
190pub fn open_social_graph_store_with_mapsize(
191    data_dir: &Path,
192    mapsize_bytes: Option<u64>,
193) -> Result<Arc<SocialGraphStore>> {
194    let db_dir = data_dir.join("socialgraph");
195    open_social_graph_store_at_path(&db_dir, mapsize_bytes)
196}
197
198pub fn open_social_graph_store_with_storage(
199    data_dir: &Path,
200    store: Arc<StorageRouter>,
201    mapsize_bytes: Option<u64>,
202) -> Result<Arc<SocialGraphStore>> {
203    let db_dir = data_dir.join("socialgraph");
204    open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
205}
206
207pub fn open_social_graph_store_at_path(
208    db_dir: &Path,
209    mapsize_bytes: Option<u64>,
210) -> Result<Arc<SocialGraphStore>> {
211    let config = hashtree_config::Config::load_or_default();
212    let backend = &config.storage.backend;
213    let local_store = Arc::new(
214        LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
215            .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
216    );
217    let store = Arc::new(StorageRouter::new(local_store));
218    open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
219}
220
221pub fn open_social_graph_store_at_path_with_storage(
222    db_dir: &Path,
223    store: Arc<StorageRouter>,
224    mapsize_bytes: Option<u64>,
225) -> Result<Arc<SocialGraphStore>> {
226    let ambient_backend = store.local_store().backend();
227    let ambient_local = Arc::new(
228        LocalStore::new_with_lmdb_map_size(
229            db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
230            &ambient_backend,
231            mapsize_bytes,
232        )
233        .map_err(|err| {
234            anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
235        })?,
236    );
237    let ambient_store = Arc::new(StorageRouter::new(ambient_local));
238    open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
239}
240
241pub fn open_social_graph_store_at_path_with_storage_split(
242    db_dir: &Path,
243    public_store: Arc<StorageRouter>,
244    ambient_store: Arc<StorageRouter>,
245    mapsize_bytes: Option<u64>,
246) -> Result<Arc<SocialGraphStore>> {
247    std::fs::create_dir_all(db_dir)?;
248    if let Some(size) = mapsize_bytes {
249        ensure_social_graph_mapsize(db_dir, size)?;
250    }
251    let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
252        .context("open nostr-social-graph heed backend")?;
253
254    Ok(Arc::new(SocialGraphStore {
255        graph: StdMutex::new(graph),
256        distance_cache: StdMutex::new(None),
257        public_events: EventIndexBucket {
258            event_store: NostrEventStore::new(Arc::clone(&public_store)),
259            root_path: db_dir.join(EVENTS_ROOT_FILE),
260        },
261        ambient_events: EventIndexBucket {
262            event_store: NostrEventStore::new(ambient_store),
263            root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
264        },
265        profile_index: ProfileIndexBucket {
266            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
267            index: BTree::new(
268                public_store,
269                hashtree_index::BTreeOptions {
270                    order: Some(PROFILE_SEARCH_INDEX_ORDER),
271                },
272            ),
273            by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
274            search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
275        },
276        profile_index_overmute_threshold: StdMutex::new(1.0),
277    }))
278}
279
280pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
281    if let Err(err) = store.set_root(pk_bytes) {
282        tracing::warn!("Failed to set social graph root: {err}");
283    }
284}
285
286pub fn get_follow_distance(
287    backend: &(impl SocialGraphBackend + ?Sized),
288    pk_bytes: &[u8; 32],
289) -> Option<u32> {
290    backend.follow_distance(pk_bytes).ok().flatten()
291}
292
293pub fn get_follows(
294    backend: &(impl SocialGraphBackend + ?Sized),
295    pk_bytes: &[u8; 32],
296) -> Vec<[u8; 32]> {
297    match backend.followed_targets(pk_bytes) {
298        Ok(set) => set.into_iter().collect(),
299        Err(_) => Vec::new(),
300    }
301}
302
303pub fn is_overmuted(
304    backend: &(impl SocialGraphBackend + ?Sized),
305    _root_pk: &[u8; 32],
306    user_pk: &[u8; 32],
307    threshold: f64,
308) -> bool {
309    backend
310        .is_overmuted_user(user_pk, threshold)
311        .unwrap_or(false)
312}
313
314pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
315    let event = match Event::from_json(event_json) {
316        Ok(event) => event,
317        Err(_) => return,
318    };
319
320    if let Err(err) = backend.ingest_event(&event) {
321        tracing::warn!("Failed to ingest social graph event: {err}");
322    }
323}
324
325pub fn ingest_parsed_event(
326    backend: &(impl SocialGraphBackend + ?Sized),
327    event: &Event,
328) -> Result<()> {
329    backend.ingest_event(event)
330}
331
332pub fn ingest_parsed_event_with_storage_class(
333    backend: &(impl SocialGraphBackend + ?Sized),
334    event: &Event,
335    storage_class: EventStorageClass,
336) -> Result<()> {
337    backend.ingest_event_with_storage_class(event, storage_class)
338}
339
340pub fn ingest_parsed_events(
341    backend: &(impl SocialGraphBackend + ?Sized),
342    events: &[Event],
343) -> Result<()> {
344    backend.ingest_events(events)
345}
346
347pub fn ingest_parsed_events_with_storage_class(
348    backend: &(impl SocialGraphBackend + ?Sized),
349    events: &[Event],
350    storage_class: EventStorageClass,
351) -> Result<()> {
352    backend.ingest_events_with_storage_class(events, storage_class)
353}
354
355pub fn ingest_graph_parsed_events(
356    backend: &(impl SocialGraphBackend + ?Sized),
357    events: &[Event],
358) -> Result<()> {
359    backend.ingest_graph_events(events)
360}
361
362pub fn query_events(
363    backend: &(impl SocialGraphBackend + ?Sized),
364    filter: &Filter,
365    limit: usize,
366) -> Vec<Event> {
367    backend.query_events(filter, limit).unwrap_or_default()
368}
369
370impl SocialGraphStore {
371    pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
372        *self
373            .profile_index_overmute_threshold
374            .lock()
375            .expect("profile index overmute threshold") = threshold;
376    }
377
378    fn profile_index_overmute_threshold(&self) -> f64 {
379        *self
380            .profile_index_overmute_threshold
381            .lock()
382            .expect("profile index overmute threshold")
383    }
384
385    fn invalidate_distance_cache(&self) {
386        *self.distance_cache.lock().unwrap() = None;
387    }
388
389    fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
390        let unique_ids = state
391            .unique_ids
392            .into_iter()
393            .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
394            .collect::<Result<HashMap<_, _>>>()?;
395
396        let mut users_by_distance = BTreeMap::new();
397        let mut size_by_distance = BTreeMap::new();
398        for (distance, users) in state.users_by_follow_distance {
399            let decoded = users
400                .into_iter()
401                .filter_map(|id| unique_ids.get(&id).copied())
402                .collect::<Vec<_>>();
403            size_by_distance.insert(distance, decoded.len());
404            users_by_distance.insert(distance, decoded);
405        }
406
407        let total_follows = state
408            .followed_by_user
409            .iter()
410            .map(|(_, targets)| targets.len())
411            .sum::<usize>();
412        let total_users = size_by_distance.values().copied().sum();
413        let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
414
415        Ok(DistanceCache {
416            stats: SocialGraphStats {
417                total_users,
418                root: Some(state.root),
419                total_follows,
420                max_depth,
421                size_by_distance,
422                enabled: true,
423            },
424            users_by_distance,
425        })
426    }
427
428    fn load_distance_cache(&self) -> Result<DistanceCache> {
429        if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
430            return Ok(cache);
431        }
432
433        let state = {
434            let graph = self.graph.lock().unwrap();
435            graph.export_state().context("export social graph state")?
436        };
437        let cache = Self::build_distance_cache(state)?;
438        *self.distance_cache.lock().unwrap() = Some(cache.clone());
439        Ok(cache)
440    }
441
442    fn set_root(&self, root: &[u8; 32]) -> Result<()> {
443        let root_hex = hex::encode(root);
444        {
445            let mut graph = self.graph.lock().unwrap();
446            if should_replace_placeholder_root(&graph)? {
447                let fresh = SocialGraph::new(&root_hex);
448                graph
449                    .replace_state(&fresh.export_state())
450                    .context("replace placeholder social graph root")?;
451            } else {
452                graph
453                    .set_root(&root_hex)
454                    .context("set nostr-social-graph root")?;
455            }
456        }
457        self.invalidate_distance_cache();
458        Ok(())
459    }
460
461    fn stats(&self) -> Result<SocialGraphStats> {
462        Ok(self.load_distance_cache()?.stats)
463    }
464
465    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
466        let graph = self.graph.lock().unwrap();
467        let distance = graph
468            .get_follow_distance(&hex::encode(pk_bytes))
469            .context("read social graph follow distance")?;
470        Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
471    }
472
473    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
474        Ok(self
475            .load_distance_cache()?
476            .users_by_distance
477            .get(&distance)
478            .cloned()
479            .unwrap_or_default())
480    }
481
482    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
483        let graph = self.graph.lock().unwrap();
484        graph
485            .get_follow_list_created_at(&hex::encode(owner))
486            .context("read social graph follow list timestamp")
487    }
488
489    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
490        let graph = self.graph.lock().unwrap();
491        decode_pubkey_set(
492            graph
493                .get_followed_by_user(&hex::encode(owner))
494                .context("read followed targets")?,
495        )
496    }
497
498    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
499        if threshold <= 0.0 {
500            return Ok(false);
501        }
502        let graph = self.graph.lock().unwrap();
503        graph
504            .is_overmuted(&hex::encode(user_pk), threshold)
505            .context("check social graph overmute")
506    }
507
508    #[cfg_attr(not(test), allow(dead_code))]
509    pub fn profile_search_root(&self) -> Result<Option<Cid>> {
510        self.profile_index.search_root()
511    }
512
513    pub(crate) fn public_events_root(&self) -> Result<Option<Cid>> {
514        self.public_events.events_root()
515    }
516
517    pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
518        self.public_events.write_events_root(root)
519    }
520
521    #[cfg_attr(not(test), allow(dead_code))]
522    pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
523        self.profile_index.profile_event_for_pubkey(pubkey_hex)
524    }
525
526    #[cfg_attr(not(test), allow(dead_code))]
527    pub fn profile_search_entries_for_prefix(
528        &self,
529        prefix: &str,
530    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
531        self.profile_index.search_entries_for_prefix(prefix)
532    }
533
534    pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
535        self.update_profile_index_for_events(events)
536    }
537
538    pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
539        let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
540        let (by_pubkey_root, search_root) = self
541            .profile_index
542            .rebuild_profile_events(latest_by_pubkey.into_values())?;
543        self.profile_index
544            .write_by_pubkey_root(by_pubkey_root.as_ref())?;
545        self.profile_index.write_search_root(search_root.as_ref())?;
546        Ok(())
547    }
548
549    pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
550        let public_events_root = self.public_events.events_root()?;
551        let ambient_events_root = self.ambient_events.events_root()?;
552        if public_events_root.is_none() && ambient_events_root.is_none() {
553            self.profile_index.write_by_pubkey_root(None)?;
554            self.profile_index.write_search_root(None)?;
555            return Ok(0);
556        }
557
558        let mut events = Vec::new();
559        for (bucket, root) in [
560            (&self.public_events, public_events_root),
561            (&self.ambient_events, ambient_events_root),
562        ] {
563            let Some(root) = root else {
564                continue;
565            };
566            let stored = block_on(bucket.event_store.list_by_kind_lossy(
567                Some(&root),
568                Kind::Metadata.as_u16() as u32,
569                ListEventsOptions::default(),
570            ))
571            .map_err(map_event_store_error)?;
572            events.extend(
573                stored
574                    .into_iter()
575                    .map(nostr_event_from_stored)
576                    .collect::<Result<Vec<_>>>()?,
577            );
578        }
579
580        let latest_count = self.filtered_latest_metadata_events_by_pubkey(&events)?.len();
581        self.rebuild_profile_index_for_events(&events)?;
582        Ok(latest_count)
583    }
584
585    fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
586        let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
587        let threshold = self.profile_index_overmute_threshold();
588
589        if latest_by_pubkey.is_empty() {
590            return Ok(());
591        }
592
593        let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
594        let mut search_root = self.profile_index.search_root()?;
595        let mut changed = false;
596
597        for event in latest_by_pubkey.into_values() {
598            let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
599            let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
600                self.profile_index.remove_profile_event(
601                    by_pubkey_root.as_ref(),
602                    search_root.as_ref(),
603                    &event.pubkey.to_hex(),
604                )?
605            } else {
606                self.profile_index
607                    .update_profile_event(by_pubkey_root.as_ref(), search_root.as_ref(), event)?
608            };
609            if updated {
610                by_pubkey_root = next_by_pubkey_root;
611                search_root = next_search_root;
612                changed = true;
613            }
614        }
615
616        if changed {
617            self.profile_index
618                .write_by_pubkey_root(by_pubkey_root.as_ref())?;
619            self.profile_index.write_search_root(search_root.as_ref())?;
620        }
621
622        Ok(())
623    }
624
625    fn filtered_latest_metadata_events_by_pubkey<'a>(
626        &self,
627        events: &'a [Event],
628    ) -> Result<BTreeMap<String, &'a Event>> {
629        let threshold = self.profile_index_overmute_threshold();
630        let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
631        for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
632            if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
633                continue;
634            }
635            let pubkey = event.pubkey.to_hex();
636            match latest_by_pubkey.get(&pubkey) {
637                Some(current) if compare_nostr_events(event, current).is_le() => {}
638                _ => {
639                    latest_by_pubkey.insert(pubkey, event);
640                }
641            }
642        }
643        Ok(latest_by_pubkey)
644    }
645
646    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
647        let state = {
648            let graph = self.graph.lock().unwrap();
649            graph.export_state().context("export social graph state")?
650        };
651        let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
652        let root_hex = hex::encode(root);
653        if graph.get_root() != root_hex {
654            graph
655                .set_root(&root_hex)
656                .context("set snapshot social graph root")?;
657        }
658        let chunks = graph
659            .to_binary_chunks_with_budget(*options)
660            .context("encode social graph snapshot")?;
661        Ok(chunks.into_iter().map(Bytes::from).collect())
662    }
663
664    fn ingest_event(&self, event: &Event) -> Result<()> {
665        self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
666    }
667
668    fn ingest_events(&self, events: &[Event]) -> Result<()> {
669        if events.is_empty() {
670            return Ok(());
671        }
672
673        let mut public = Vec::new();
674        let mut ambient = Vec::new();
675        for event in events {
676            match self.default_storage_class_for(event)? {
677                EventStorageClass::Public => public.push(event.clone()),
678                EventStorageClass::Ambient => ambient.push(event.clone()),
679            }
680        }
681
682        if !public.is_empty() {
683            self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
684        }
685        if !ambient.is_empty() {
686            self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
687        }
688
689        Ok(())
690    }
691
692    fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
693        let graph_events = events
694            .iter()
695            .filter(|event| is_social_graph_event(event.kind))
696            .collect::<Vec<_>>();
697        if graph_events.is_empty() {
698            return Ok(());
699        }
700
701        {
702            let mut graph = self.graph.lock().unwrap();
703            let mut snapshot = SocialGraph::from_state(
704                graph
705                    .export_state()
706                    .context("export social graph state for graph-only ingest")?,
707            )
708            .context("rebuild social graph state for graph-only ingest")?;
709            for event in graph_events {
710                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
711            }
712            graph
713                .replace_state(&snapshot.export_state())
714                .context("replace graph-only social graph state")?;
715        }
716        self.invalidate_distance_cache();
717        Ok(())
718    }
719
720    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
721        self.query_events_in_scope(filter, limit, EventQueryScope::All)
722    }
723
724    fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
725        let graph = self.graph.lock().unwrap();
726        let root_hex = graph.get_root().context("read social graph root")?;
727        if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
728            return Ok(EventStorageClass::Public);
729        }
730        Ok(EventStorageClass::Ambient)
731    }
732
733    fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
734        match storage_class {
735            EventStorageClass::Public => &self.public_events,
736            EventStorageClass::Ambient => &self.ambient_events,
737        }
738    }
739
740    fn ingest_event_with_storage_class(
741        &self,
742        event: &Event,
743        storage_class: EventStorageClass,
744    ) -> Result<()> {
745        let current_root = self.bucket(storage_class).events_root()?;
746        let next_root = self
747            .bucket(storage_class)
748            .store_event(current_root.as_ref(), event)?;
749        self.bucket(storage_class)
750            .write_events_root(Some(&next_root))?;
751
752        self.update_profile_index_for_events(std::slice::from_ref(event))?;
753
754        if is_social_graph_event(event.kind) {
755            {
756                let mut graph = self.graph.lock().unwrap();
757                graph
758                    .handle_event(&graph_event_from_nostr(event), true, 0.0)
759                    .context("ingest social graph event into nostr-social-graph")?;
760            }
761            self.invalidate_distance_cache();
762        }
763
764        Ok(())
765    }
766
767    fn ingest_events_with_storage_class(
768        &self,
769        events: &[Event],
770        storage_class: EventStorageClass,
771    ) -> Result<()> {
772        if events.is_empty() {
773            return Ok(());
774        }
775
776        let bucket = self.bucket(storage_class);
777        let current_root = bucket.events_root()?;
778        let stored_events = events
779            .iter()
780            .map(stored_event_from_nostr)
781            .collect::<Vec<_>>();
782        let next_root = block_on(
783            bucket
784                .event_store
785                .build(current_root.as_ref(), stored_events),
786        )
787        .map_err(map_event_store_error)?;
788        bucket.write_events_root(next_root.as_ref())?;
789
790        self.update_profile_index_for_events(events)?;
791
792        let graph_events = events
793            .iter()
794            .filter(|event| is_social_graph_event(event.kind))
795            .collect::<Vec<_>>();
796        if graph_events.is_empty() {
797            return Ok(());
798        }
799
800        {
801            let mut graph = self.graph.lock().unwrap();
802            let mut snapshot = SocialGraph::from_state(
803                graph
804                    .export_state()
805                    .context("export social graph state for batch ingest")?,
806            )
807            .context("rebuild social graph state for batch ingest")?;
808            for event in graph_events {
809                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
810            }
811            graph
812                .replace_state(&snapshot.export_state())
813                .context("replace batched social graph state")?;
814        }
815        self.invalidate_distance_cache();
816
817        Ok(())
818    }
819
820    pub(crate) fn query_events_in_scope(
821        &self,
822        filter: &Filter,
823        limit: usize,
824        scope: EventQueryScope,
825    ) -> Result<Vec<Event>> {
826        if limit == 0 {
827            return Ok(Vec::new());
828        }
829
830        let buckets: &[&EventIndexBucket] = match scope {
831            EventQueryScope::PublicOnly => &[&self.public_events],
832            EventQueryScope::AmbientOnly => &[&self.ambient_events],
833            EventQueryScope::All => &[&self.public_events, &self.ambient_events],
834        };
835
836        let mut candidates = Vec::new();
837        for bucket in buckets {
838            candidates.extend(bucket.query_events(filter, limit)?);
839        }
840
841        let mut deduped = dedupe_events(candidates);
842        deduped.retain(|event| filter.match_event(event));
843        deduped.truncate(limit);
844        Ok(deduped)
845    }
846}
847
848impl EventIndexBucket {
849    fn events_root(&self) -> Result<Option<Cid>> {
850        let _profile = NostrProfileGuard::new("socialgraph.events_root.read");
851        read_root_file(&self.root_path)
852    }
853
854    fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
855        let _profile = NostrProfileGuard::new("socialgraph.events_root.write");
856        write_root_file(&self.root_path, root)
857    }
858
859    fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
860        let stored = stored_event_from_nostr(event);
861        let _profile = NostrProfileGuard::new("socialgraph.event_store.add");
862        block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
863    }
864
865    fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
866        let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
867            .map_err(map_event_store_error)?;
868        stored.map(nostr_event_from_stored).transpose()
869    }
870
871    fn load_events_for_author(
872        &self,
873        root: &Cid,
874        author: &nostr::PublicKey,
875        filter: &Filter,
876        limit: usize,
877        exact: bool,
878    ) -> Result<Vec<Event>> {
879        let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
880            if kinds.len() == 1 {
881                kinds.iter().next().map(|kind| kind.as_u16() as u32)
882            } else {
883                None
884            }
885        });
886        let author_hex = author.to_hex();
887        let options = filter_list_options(filter, limit, exact);
888        let stored = match kind_filter {
889            Some(kind) => block_on(self.event_store.list_by_author_and_kind(
890                Some(root),
891                &author_hex,
892                kind,
893                options.clone(),
894            ))
895            .map_err(map_event_store_error)?,
896            None => block_on(
897                self.event_store
898                    .list_by_author(Some(root), &author_hex, options),
899            )
900            .map_err(map_event_store_error)?,
901        };
902        stored
903            .into_iter()
904            .map(nostr_event_from_stored)
905            .collect::<Result<Vec<_>>>()
906    }
907
908    fn load_events_for_kind(
909        &self,
910        root: &Cid,
911        kind: Kind,
912        filter: &Filter,
913        limit: usize,
914        exact: bool,
915    ) -> Result<Vec<Event>> {
916        let stored = block_on(self.event_store.list_by_kind(
917            Some(root),
918            kind.as_u16() as u32,
919            filter_list_options(filter, limit, exact),
920        ))
921        .map_err(map_event_store_error)?;
922        stored
923            .into_iter()
924            .map(nostr_event_from_stored)
925            .collect::<Result<Vec<_>>>()
926    }
927
928    fn load_recent_events(
929        &self,
930        root: &Cid,
931        filter: &Filter,
932        limit: usize,
933        exact: bool,
934    ) -> Result<Vec<Event>> {
935        let stored = block_on(
936            self.event_store
937                .list_recent(Some(root), filter_list_options(filter, limit, exact)),
938        )
939        .map_err(map_event_store_error)?;
940        stored
941            .into_iter()
942            .map(nostr_event_from_stored)
943            .collect::<Result<Vec<_>>>()
944    }
945
946    fn load_events_for_tag(
947        &self,
948        root: &Cid,
949        tag_name: &str,
950        values: &[String],
951        filter: &Filter,
952        limit: usize,
953        exact: bool,
954    ) -> Result<Vec<Event>> {
955        let mut events = Vec::new();
956        let options = filter_list_options(filter, limit, exact);
957        for value in values {
958            let stored = block_on(self.event_store.list_by_tag(
959                Some(root),
960                tag_name,
961                value,
962                options.clone(),
963            ))
964            .map_err(map_event_store_error)?;
965            events.extend(
966                stored
967                    .into_iter()
968                    .map(nostr_event_from_stored)
969                    .collect::<Result<Vec<_>>>()?,
970            );
971        }
972        Ok(dedupe_events(events))
973    }
974
975    fn choose_tag_source(&self, filter: &Filter) -> Option<(String, Vec<String>)> {
976        filter
977            .generic_tags
978            .iter()
979            .min_by_key(|(_, values)| values.len())
980            .map(|(tag, values)| {
981                (
982                    tag.as_char().to_ascii_lowercase().to_string(),
983                    values.iter().cloned().collect(),
984                )
985            })
986    }
987
988    fn load_major_index_candidates(
989        &self,
990        root: &Cid,
991        filter: &Filter,
992        limit: usize,
993    ) -> Result<Option<Vec<Event>>> {
994        if let Some(events) = self.load_direct_replaceable_candidates(root, filter)? {
995            return Ok(Some(events));
996        }
997
998        if let Some((tag_name, values)) = self.choose_tag_source(filter) {
999            let exact = filter.authors.is_none()
1000                && filter.kinds.is_none()
1001                && filter.search.is_none()
1002                && filter.generic_tags.len() == 1;
1003            return Ok(Some(self.load_events_for_tag(
1004                root, &tag_name, &values, filter, limit, exact,
1005            )?));
1006        }
1007
1008        if let (Some(authors), Some(kinds)) = (filter.authors.as_ref(), filter.kinds.as_ref()) {
1009            if authors.len() == 1 && kinds.len() == 1 {
1010                let author = authors.iter().next().expect("checked single author");
1011                let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1012                return Ok(Some(
1013                    self.load_events_for_author(root, author, filter, limit, exact)?,
1014                ));
1015            }
1016
1017            if kinds.len() < authors.len() {
1018                let mut events = Vec::new();
1019                for kind in kinds {
1020                    events.extend(self.load_events_for_kind(root, *kind, filter, limit, false)?);
1021                }
1022                return Ok(Some(dedupe_events(events)));
1023            }
1024
1025            let mut events = Vec::new();
1026            for author in authors {
1027                events.extend(self.load_events_for_author(root, author, filter, limit, false)?);
1028            }
1029            return Ok(Some(dedupe_events(events)));
1030        }
1031
1032        if let Some(authors) = filter.authors.as_ref() {
1033            let mut events = Vec::new();
1034            let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1035            for author in authors {
1036                events.extend(self.load_events_for_author(root, author, filter, limit, exact)?);
1037            }
1038            return Ok(Some(dedupe_events(events)));
1039        }
1040
1041        if let Some(kinds) = filter.kinds.as_ref() {
1042            let mut events = Vec::new();
1043            let exact = filter.authors.is_none()
1044                && filter.generic_tags.is_empty()
1045                && filter.search.is_none();
1046            for kind in kinds {
1047                events.extend(self.load_events_for_kind(root, *kind, filter, limit, exact)?);
1048            }
1049            return Ok(Some(dedupe_events(events)));
1050        }
1051
1052        Ok(None)
1053    }
1054
1055    fn load_direct_replaceable_candidates(
1056        &self,
1057        root: &Cid,
1058        filter: &Filter,
1059    ) -> Result<Option<Vec<Event>>> {
1060        let Some(authors) = filter.authors.as_ref() else {
1061            return Ok(None);
1062        };
1063        let Some(kinds) = filter.kinds.as_ref() else {
1064            return Ok(None);
1065        };
1066        if kinds.len() != 1 {
1067            return Ok(None);
1068        }
1069
1070        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
1071
1072        if is_parameterized_replaceable_kind(kind) {
1073            let d_tag = SingleLetterTag::lowercase(nostr::Alphabet::D);
1074            let Some(d_values) = filter.generic_tags.get(&d_tag) else {
1075                return Ok(None);
1076            };
1077            let mut events = Vec::new();
1078            for author in authors {
1079                let author_hex = author.to_hex();
1080                for d_value in d_values {
1081                    if let Some(stored) = block_on(self.event_store.get_parameterized_replaceable(
1082                        Some(root),
1083                        &author_hex,
1084                        kind,
1085                        d_value,
1086                    ))
1087                    .map_err(map_event_store_error)?
1088                    {
1089                        events.push(nostr_event_from_stored(stored)?);
1090                    }
1091                }
1092            }
1093            return Ok(Some(dedupe_events(events)));
1094        }
1095
1096        if is_replaceable_kind(kind) {
1097            let mut events = Vec::new();
1098            for author in authors {
1099                if let Some(stored) = block_on(self.event_store.get_replaceable(
1100                    Some(root),
1101                    &author.to_hex(),
1102                    kind,
1103                ))
1104                .map_err(map_event_store_error)?
1105                {
1106                    events.push(nostr_event_from_stored(stored)?);
1107                }
1108            }
1109            return Ok(Some(dedupe_events(events)));
1110        }
1111
1112        Ok(None)
1113    }
1114
1115    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1116        if limit == 0 {
1117            return Ok(Vec::new());
1118        }
1119
1120        let events_root = self.events_root()?;
1121        let Some(root) = events_root.as_ref() else {
1122            return Ok(Vec::new());
1123        };
1124        let mut candidates = Vec::new();
1125        let mut seen: HashSet<[u8; 32]> = HashSet::new();
1126
1127        if let Some(ids) = filter.ids.as_ref() {
1128            for id in ids {
1129                let id_bytes = id.to_bytes();
1130                if !seen.insert(id_bytes) {
1131                    continue;
1132                }
1133                if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
1134                    if filter.match_event(&event) {
1135                        candidates.push(event);
1136                    }
1137                }
1138                if candidates.len() >= limit {
1139                    break;
1140                }
1141            }
1142        } else {
1143            let base_events = match self.load_major_index_candidates(root, filter, limit)? {
1144                Some(events) => events,
1145                None => self.load_recent_events(
1146                    root,
1147                    filter,
1148                    limit,
1149                    filter.authors.is_none()
1150                        && filter.kinds.is_none()
1151                        && filter.generic_tags.is_empty()
1152                        && filter.search.is_none(),
1153                )?,
1154            };
1155
1156            for event in base_events {
1157                let id_bytes = event.id.to_bytes();
1158                if !seen.insert(id_bytes) {
1159                    continue;
1160                }
1161                if filter.match_event(&event) {
1162                    candidates.push(event);
1163                }
1164                if candidates.len() >= limit {
1165                    break;
1166                }
1167            }
1168        }
1169
1170        candidates.sort_by(|a, b| {
1171            b.created_at
1172                .as_u64()
1173                .cmp(&a.created_at.as_u64())
1174                .then_with(|| a.id.cmp(&b.id))
1175        });
1176        candidates.truncate(limit);
1177        Ok(candidates)
1178    }
1179}
1180
1181impl ProfileIndexBucket {
1182    fn by_pubkey_root(&self) -> Result<Option<Cid>> {
1183        let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.read");
1184        read_root_file(&self.by_pubkey_root_path)
1185    }
1186
1187    fn search_root(&self) -> Result<Option<Cid>> {
1188        let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.read");
1189        read_root_file(&self.search_root_path)
1190    }
1191
1192    fn write_by_pubkey_root(&self, root: Option<&Cid>) -> Result<()> {
1193        let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.write");
1194        write_root_file(&self.by_pubkey_root_path, root)
1195    }
1196
1197    fn write_search_root(&self, root: Option<&Cid>) -> Result<()> {
1198        let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.write");
1199        write_root_file(&self.search_root_path, root)
1200    }
1201
1202    fn mirror_profile_event(&self, event: &Event) -> Result<Cid> {
1203        let bytes = event.as_json().into_bytes();
1204        block_on(self.tree.put_file(&bytes))
1205            .map(|(cid, _size)| cid)
1206            .context("store mirrored profile event")
1207    }
1208
1209    fn load_profile_event(&self, cid: &Cid) -> Result<Option<Event>> {
1210        let bytes = block_on(self.tree.get(cid, None)).context("read mirrored profile event")?;
1211        let Some(bytes) = bytes else {
1212            return Ok(None);
1213        };
1214        let json = String::from_utf8(bytes).context("decode mirrored profile event as utf-8")?;
1215        Ok(Some(
1216            Event::from_json(json).context("decode mirrored profile event json")?,
1217        ))
1218    }
1219
1220    #[cfg_attr(not(test), allow(dead_code))]
1221    fn profile_event_for_pubkey(&self, pubkey_hex: &str) -> Result<Option<Event>> {
1222        let root = self.by_pubkey_root()?;
1223        let Some(cid) = block_on(self.index.get_link(root.as_ref(), pubkey_hex))
1224            .context("read mirrored profile event cid by pubkey")?
1225        else {
1226            return Ok(None);
1227        };
1228        self.load_profile_event(&cid)
1229    }
1230
1231    #[cfg_attr(not(test), allow(dead_code))]
1232    fn search_entries_for_prefix(
1233        &self,
1234        prefix: &str,
1235    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
1236        let Some(root) = self.search_root()? else {
1237            return Ok(Vec::new());
1238        };
1239        let entries =
1240            block_on(self.index.prefix(&root, prefix)).context("query profile search prefix")?;
1241        entries
1242            .into_iter()
1243            .map(|(key, value)| {
1244                let entry = serde_json::from_str(&value)
1245                    .context("decode stored profile search entry json")?;
1246                Ok((key, entry))
1247            })
1248            .collect()
1249    }
1250
1251    fn rebuild_profile_events<'a, I>(&self, events: I) -> Result<(Option<Cid>, Option<Cid>)>
1252    where
1253        I: IntoIterator<Item = &'a Event>,
1254    {
1255        let mut by_pubkey_entries = Vec::<(String, Cid)>::new();
1256        let mut search_entries = Vec::<(String, String)>::new();
1257
1258        for event in events {
1259            let pubkey = event.pubkey.to_hex();
1260            let mirrored_cid = self.mirror_profile_event(event)?;
1261            let search_value =
1262                serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1263            by_pubkey_entries.push((pubkey.clone(), mirrored_cid.clone()));
1264            for term in profile_search_terms_for_event(event) {
1265                search_entries.push((
1266                    format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1267                    search_value.clone(),
1268                ));
1269            }
1270        }
1271
1272        let by_pubkey_root = block_on(self.index.build_links(by_pubkey_entries))
1273            .context("bulk build mirrored profile-by-pubkey index")?;
1274        let search_root = block_on(self.index.build(search_entries))
1275            .context("bulk build mirrored profile search index")?;
1276        Ok((by_pubkey_root, search_root))
1277    }
1278
1279    fn update_profile_event(
1280        &self,
1281        by_pubkey_root: Option<&Cid>,
1282        search_root: Option<&Cid>,
1283        event: &Event,
1284    ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1285        let pubkey = event.pubkey.to_hex();
1286        let existing_cid = block_on(self.index.get_link(by_pubkey_root, &pubkey))
1287            .context("lookup existing mirrored profile event")?;
1288
1289        let existing_event = match existing_cid.as_ref() {
1290            Some(cid) => self.load_profile_event(cid)?,
1291            None => None,
1292        };
1293
1294        if existing_event
1295            .as_ref()
1296            .is_some_and(|current| compare_nostr_events(event, current).is_le())
1297        {
1298            return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1299        }
1300
1301        let mirrored_cid = self.mirror_profile_event(event)?;
1302        let next_by_pubkey_root = Some(
1303            block_on(
1304                self.index
1305                    .insert_link(by_pubkey_root, &pubkey, &mirrored_cid),
1306            )
1307            .context("write mirrored profile event index")?,
1308        );
1309
1310        let mut next_search_root = search_root.cloned();
1311        if let Some(current) = existing_event.as_ref() {
1312            for term in profile_search_terms_for_event(current) {
1313                let Some(root) = next_search_root.as_ref() else {
1314                    break;
1315                };
1316                next_search_root = block_on(
1317                    self.index
1318                        .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1319                )
1320                .context("remove stale profile search term")?;
1321            }
1322        }
1323
1324        let search_value =
1325            serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1326        for term in profile_search_terms_for_event(event) {
1327            next_search_root = Some(
1328                block_on(self.index.insert(
1329                    next_search_root.as_ref(),
1330                    &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1331                    &search_value,
1332                ))
1333                .context("write profile search term")?,
1334            );
1335        }
1336
1337        Ok((next_by_pubkey_root, next_search_root, true))
1338    }
1339
1340    fn remove_profile_event(
1341        &self,
1342        by_pubkey_root: Option<&Cid>,
1343        search_root: Option<&Cid>,
1344        pubkey: &str,
1345    ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1346        let existing_cid = block_on(self.index.get_link(by_pubkey_root, pubkey))
1347            .context("lookup mirrored profile event for removal")?;
1348        let Some(existing_cid) = existing_cid else {
1349            return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1350        };
1351
1352        let existing_event = self.load_profile_event(&existing_cid)?;
1353        let next_by_pubkey_root = match by_pubkey_root {
1354            Some(root) => block_on(self.index.delete(root, pubkey))
1355                .context("remove mirrored profile-by-pubkey entry")?,
1356            None => None,
1357        };
1358
1359        let mut next_search_root = search_root.cloned();
1360        if let Some(current) = existing_event.as_ref() {
1361            for term in profile_search_terms_for_event(current) {
1362                let Some(root) = next_search_root.as_ref() else {
1363                    break;
1364                };
1365                next_search_root = block_on(
1366                    self.index
1367                        .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1368                )
1369                .context("remove overmuted profile search term")?;
1370            }
1371        }
1372
1373        Ok((next_by_pubkey_root, next_search_root, true))
1374    }
1375}
1376
1377fn latest_metadata_events_by_pubkey<'a>(events: &'a [Event]) -> BTreeMap<String, &'a Event> {
1378    let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
1379    for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
1380        let pubkey = event.pubkey.to_hex();
1381        match latest_by_pubkey.get(&pubkey) {
1382            Some(current) if compare_nostr_events(event, current).is_le() => {}
1383            _ => {
1384                latest_by_pubkey.insert(pubkey, event);
1385            }
1386        }
1387    }
1388    latest_by_pubkey
1389}
1390
1391fn serialize_profile_search_entry(entry: &StoredProfileSearchEntry) -> Result<String> {
1392    serde_json::to_string(entry).context("encode stored profile search entry json")
1393}
1394
1395fn cid_to_nhash(cid: &Cid) -> Result<String> {
1396    nhash_encode_full(&NHashData {
1397        hash: cid.hash,
1398        decrypt_key: cid.key,
1399    })
1400    .context("encode mirrored profile event nhash")
1401}
1402
1403fn build_profile_search_entry(
1404    event: &Event,
1405    mirrored_cid: &Cid,
1406) -> Result<StoredProfileSearchEntry> {
1407    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1408        Ok(serde_json::Value::Object(profile)) => profile,
1409        _ => serde_json::Map::new(),
1410    };
1411    let names = extract_profile_names(&profile);
1412    let primary_name = names.first().cloned();
1413    let nip05 = normalize_profile_nip05(&profile, primary_name.as_deref());
1414    let name = primary_name
1415        .clone()
1416        .or_else(|| nip05.clone())
1417        .unwrap_or_else(|| event.pubkey.to_hex());
1418
1419    Ok(StoredProfileSearchEntry {
1420        pubkey: event.pubkey.to_hex(),
1421        name,
1422        aliases: names.into_iter().skip(1).collect(),
1423        nip05,
1424        created_at: event.created_at.as_u64(),
1425        event_nhash: cid_to_nhash(mirrored_cid)?,
1426    })
1427}
1428
1429fn filter_list_options(filter: &Filter, limit: usize, exact: bool) -> ListEventsOptions {
1430    ListEventsOptions {
1431        limit: exact.then_some(limit.max(1)),
1432        since: filter.since.map(|timestamp| timestamp.as_u64()),
1433        until: filter.until.map(|timestamp| timestamp.as_u64()),
1434    }
1435}
1436
1437fn dedupe_events(events: Vec<Event>) -> Vec<Event> {
1438    let mut seen = HashSet::new();
1439    let mut deduped = Vec::new();
1440    for event in events {
1441        if seen.insert(event.id.to_bytes()) {
1442            deduped.push(event);
1443        }
1444    }
1445    deduped.sort_by(|a, b| {
1446        b.created_at
1447            .as_u64()
1448            .cmp(&a.created_at.as_u64())
1449            .then_with(|| a.id.cmp(&b.id))
1450    });
1451    deduped
1452}
1453
1454impl SocialGraphBackend for SocialGraphStore {
1455    fn stats(&self) -> Result<SocialGraphStats> {
1456        SocialGraphStore::stats(self)
1457    }
1458
1459    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1460        SocialGraphStore::users_by_follow_distance(self, distance)
1461    }
1462
1463    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1464        SocialGraphStore::follow_distance(self, pk_bytes)
1465    }
1466
1467    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1468        SocialGraphStore::follow_list_created_at(self, owner)
1469    }
1470
1471    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1472        SocialGraphStore::followed_targets(self, owner)
1473    }
1474
1475    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1476        SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1477    }
1478
1479    fn profile_search_root(&self) -> Result<Option<Cid>> {
1480        SocialGraphStore::profile_search_root(self)
1481    }
1482
1483    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1484        SocialGraphStore::snapshot_chunks(self, root, options)
1485    }
1486
1487    fn ingest_event(&self, event: &Event) -> Result<()> {
1488        SocialGraphStore::ingest_event(self, event)
1489    }
1490
1491    fn ingest_event_with_storage_class(
1492        &self,
1493        event: &Event,
1494        storage_class: EventStorageClass,
1495    ) -> Result<()> {
1496        SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1497    }
1498
1499    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1500        SocialGraphStore::ingest_events(self, events)
1501    }
1502
1503    fn ingest_events_with_storage_class(
1504        &self,
1505        events: &[Event],
1506        storage_class: EventStorageClass,
1507    ) -> Result<()> {
1508        SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1509    }
1510
1511    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1512        SocialGraphStore::apply_graph_events_only(self, events)
1513    }
1514
1515    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1516        SocialGraphStore::query_events(self, filter, limit)
1517    }
1518}
1519
1520impl NostrSocialGraphBackend for SocialGraphStore {
1521    type Error = UpstreamGraphBackendError;
1522
1523    fn get_root(&self) -> std::result::Result<String, Self::Error> {
1524        let graph = self.graph.lock().unwrap();
1525        graph
1526            .get_root()
1527            .context("read social graph root")
1528            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1529    }
1530
1531    fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1532        let root_bytes =
1533            decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1534        SocialGraphStore::set_root(self, &root_bytes)
1535            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1536    }
1537
1538    fn handle_event(
1539        &mut self,
1540        event: &GraphEvent,
1541        allow_unknown_authors: bool,
1542        overmute_threshold: f64,
1543    ) -> std::result::Result<(), Self::Error> {
1544        {
1545            let mut graph = self.graph.lock().unwrap();
1546            graph
1547                .handle_event(event, allow_unknown_authors, overmute_threshold)
1548                .context("ingest social graph event into heed backend")
1549                .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1550        }
1551        self.invalidate_distance_cache();
1552        Ok(())
1553    }
1554
1555    fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1556        let graph = self.graph.lock().unwrap();
1557        graph
1558            .get_follow_distance(user)
1559            .context("read social graph follow distance")
1560            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1561    }
1562
1563    fn is_following(
1564        &self,
1565        follower: &str,
1566        followed_user: &str,
1567    ) -> std::result::Result<bool, Self::Error> {
1568        let graph = self.graph.lock().unwrap();
1569        graph
1570            .is_following(follower, followed_user)
1571            .context("read social graph following edge")
1572            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1573    }
1574
1575    fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1576        let graph = self.graph.lock().unwrap();
1577        graph
1578            .get_followed_by_user(user)
1579            .context("read followed-by-user list")
1580            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1581    }
1582
1583    fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1584        let graph = self.graph.lock().unwrap();
1585        graph
1586            .get_followers_by_user(user)
1587            .context("read followers-by-user list")
1588            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1589    }
1590
1591    fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1592        let graph = self.graph.lock().unwrap();
1593        graph
1594            .get_muted_by_user(user)
1595            .context("read muted-by-user list")
1596            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1597    }
1598
1599    fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1600        let graph = self.graph.lock().unwrap();
1601        graph
1602            .get_user_muted_by(user)
1603            .context("read user-muted-by list")
1604            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1605    }
1606
1607    fn get_follow_list_created_at(
1608        &self,
1609        user: &str,
1610    ) -> std::result::Result<Option<u64>, Self::Error> {
1611        let graph = self.graph.lock().unwrap();
1612        graph
1613            .get_follow_list_created_at(user)
1614            .context("read social graph follow list timestamp")
1615            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1616    }
1617
1618    fn get_mute_list_created_at(
1619        &self,
1620        user: &str,
1621    ) -> std::result::Result<Option<u64>, Self::Error> {
1622        let graph = self.graph.lock().unwrap();
1623        graph
1624            .get_mute_list_created_at(user)
1625            .context("read social graph mute list timestamp")
1626            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1627    }
1628
1629    fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1630        let graph = self.graph.lock().unwrap();
1631        graph
1632            .is_overmuted(user, threshold)
1633            .context("check social graph overmute")
1634            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1635    }
1636}
1637
1638impl<T> SocialGraphBackend for Arc<T>
1639where
1640    T: SocialGraphBackend + ?Sized,
1641{
1642    fn stats(&self) -> Result<SocialGraphStats> {
1643        self.as_ref().stats()
1644    }
1645
1646    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1647        self.as_ref().users_by_follow_distance(distance)
1648    }
1649
1650    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1651        self.as_ref().follow_distance(pk_bytes)
1652    }
1653
1654    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1655        self.as_ref().follow_list_created_at(owner)
1656    }
1657
1658    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1659        self.as_ref().followed_targets(owner)
1660    }
1661
1662    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1663        self.as_ref().is_overmuted_user(user_pk, threshold)
1664    }
1665
1666    fn profile_search_root(&self) -> Result<Option<Cid>> {
1667        self.as_ref().profile_search_root()
1668    }
1669
1670    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1671        self.as_ref().snapshot_chunks(root, options)
1672    }
1673
1674    fn ingest_event(&self, event: &Event) -> Result<()> {
1675        self.as_ref().ingest_event(event)
1676    }
1677
1678    fn ingest_event_with_storage_class(
1679        &self,
1680        event: &Event,
1681        storage_class: EventStorageClass,
1682    ) -> Result<()> {
1683        self.as_ref()
1684            .ingest_event_with_storage_class(event, storage_class)
1685    }
1686
1687    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1688        self.as_ref().ingest_events(events)
1689    }
1690
1691    fn ingest_events_with_storage_class(
1692        &self,
1693        events: &[Event],
1694        storage_class: EventStorageClass,
1695    ) -> Result<()> {
1696        self.as_ref()
1697            .ingest_events_with_storage_class(events, storage_class)
1698    }
1699
1700    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1701        self.as_ref().ingest_graph_events(events)
1702    }
1703
1704    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1705        self.as_ref().query_events(filter, limit)
1706    }
1707}
1708
1709fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1710    if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1711        return Ok(false);
1712    }
1713
1714    let GraphStats {
1715        users,
1716        follows,
1717        mutes,
1718        ..
1719    } = graph.size().context("size social graph")?;
1720    Ok(users <= 1 && follows == 0 && mutes == 0)
1721}
1722
1723fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1724    let mut set = UserSet::new();
1725    for value in values {
1726        set.insert(decode_pubkey(&value)?);
1727    }
1728    Ok(set)
1729}
1730
1731fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1732    let mut bytes = [0u8; 32];
1733    hex::decode_to_slice(value, &mut bytes)
1734        .with_context(|| format!("decode social graph pubkey {value}"))?;
1735    Ok(bytes)
1736}
1737
1738fn is_social_graph_event(kind: Kind) -> bool {
1739    kind == Kind::ContactList || kind == Kind::MuteList
1740}
1741
1742fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1743    GraphEvent {
1744        created_at: event.created_at.as_u64(),
1745        content: event.content.clone(),
1746        tags: event
1747            .tags
1748            .iter()
1749            .map(|tag| tag.as_slice().to_vec())
1750            .collect(),
1751        kind: event.kind.as_u16() as u32,
1752        pubkey: event.pubkey.to_hex(),
1753        id: event.id.to_hex(),
1754        sig: event.sig.to_string(),
1755    }
1756}
1757
1758fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1759    StoredNostrEvent {
1760        id: event.id.to_hex(),
1761        pubkey: event.pubkey.to_hex(),
1762        created_at: event.created_at.as_u64(),
1763        kind: event.kind.as_u16() as u32,
1764        tags: event
1765            .tags
1766            .iter()
1767            .map(|tag| tag.as_slice().to_vec())
1768            .collect(),
1769        content: event.content.clone(),
1770        sig: event.sig.to_string(),
1771    }
1772}
1773
1774fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1775    let value = serde_json::json!({
1776        "id": event.id,
1777        "pubkey": event.pubkey,
1778        "created_at": event.created_at,
1779        "kind": event.kind,
1780        "tags": event.tags,
1781        "content": event.content,
1782        "sig": event.sig,
1783    });
1784    Event::from_json(value.to_string()).context("decode stored nostr event")
1785}
1786
1787pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1788    nostr_event_from_stored(event)
1789}
1790
1791fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1792    rmp_serde::to_vec_named(&StoredCid {
1793        hash: cid.hash,
1794        key: cid.key,
1795    })
1796    .context("encode social graph events root")
1797}
1798
1799fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1800    let stored: StoredCid =
1801        rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1802    Ok(Some(Cid {
1803        hash: stored.hash,
1804        key: stored.key,
1805    }))
1806}
1807
1808fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1809    let Ok(bytes) = std::fs::read(path) else {
1810        return Ok(None);
1811    };
1812    decode_cid(&bytes)
1813}
1814
1815fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1816    let Some(root) = root else {
1817        if path.exists() {
1818            std::fs::remove_file(path)?;
1819        }
1820        return Ok(());
1821    };
1822
1823    let encoded = encode_cid(root)?;
1824    let tmp_path = path.with_extension("tmp");
1825    std::fs::write(&tmp_path, encoded)?;
1826    std::fs::rename(tmp_path, path)?;
1827    Ok(())
1828}
1829
1830fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1831    let raw = value.as_str()?;
1832    let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1833    if trimmed.is_empty() {
1834        return None;
1835    }
1836    Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1837}
1838
1839fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1840    let mut names = Vec::new();
1841    let mut seen = HashSet::new();
1842
1843    for key in ["display_name", "displayName", "name", "username"] {
1844        let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1845            continue;
1846        };
1847        let lowered = value.to_lowercase();
1848        if seen.insert(lowered) {
1849            names.push(value);
1850        }
1851    }
1852
1853    names
1854}
1855
1856fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1857    if local_part.len() == 1 || local_part.starts_with("npub1") {
1858        return true;
1859    }
1860
1861    primary_name
1862        .to_lowercase()
1863        .split_whitespace()
1864        .collect::<String>()
1865        .contains(local_part)
1866}
1867
1868fn normalize_profile_nip05(
1869    profile: &serde_json::Map<String, serde_json::Value>,
1870    primary_name: Option<&str>,
1871) -> Option<String> {
1872    let raw = profile.get("nip05")?.as_str()?;
1873    let local_part = raw.split('@').next()?.trim().to_lowercase();
1874    if local_part.is_empty() {
1875        return None;
1876    }
1877    let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1878    if truncated.is_empty() {
1879        return None;
1880    }
1881    if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1882        return None;
1883    }
1884    Some(truncated)
1885}
1886
1887fn is_search_stop_word(word: &str) -> bool {
1888    matches!(
1889        word,
1890        "a" | "an"
1891            | "the"
1892            | "and"
1893            | "or"
1894            | "but"
1895            | "in"
1896            | "on"
1897            | "at"
1898            | "to"
1899            | "for"
1900            | "of"
1901            | "with"
1902            | "by"
1903            | "from"
1904            | "is"
1905            | "it"
1906            | "as"
1907            | "be"
1908            | "was"
1909            | "are"
1910            | "this"
1911            | "that"
1912            | "these"
1913            | "those"
1914            | "i"
1915            | "you"
1916            | "he"
1917            | "she"
1918            | "we"
1919            | "they"
1920            | "my"
1921            | "your"
1922            | "his"
1923            | "her"
1924            | "its"
1925            | "our"
1926            | "their"
1927            | "what"
1928            | "which"
1929            | "who"
1930            | "whom"
1931            | "how"
1932            | "when"
1933            | "where"
1934            | "why"
1935            | "will"
1936            | "would"
1937            | "could"
1938            | "should"
1939            | "can"
1940            | "may"
1941            | "might"
1942            | "must"
1943            | "have"
1944            | "has"
1945            | "had"
1946            | "do"
1947            | "does"
1948            | "did"
1949            | "been"
1950            | "being"
1951            | "get"
1952            | "got"
1953            | "just"
1954            | "now"
1955            | "then"
1956            | "so"
1957            | "if"
1958            | "not"
1959            | "no"
1960            | "yes"
1961            | "all"
1962            | "any"
1963            | "some"
1964            | "more"
1965            | "most"
1966            | "other"
1967            | "into"
1968            | "over"
1969            | "after"
1970            | "before"
1971            | "about"
1972            | "up"
1973            | "down"
1974            | "out"
1975            | "off"
1976            | "through"
1977            | "during"
1978            | "under"
1979            | "again"
1980            | "further"
1981            | "once"
1982    )
1983}
1984
1985fn is_pure_search_number(word: &str) -> bool {
1986    if !word.chars().all(|ch| ch.is_ascii_digit()) {
1987        return false;
1988    }
1989    !(word.len() == 4
1990        && word
1991            .parse::<u16>()
1992            .is_ok_and(|year| (1900..=2099).contains(&year)))
1993}
1994
1995fn split_compound_search_word(word: &str) -> Vec<String> {
1996    let mut parts = Vec::new();
1997    let mut current = String::new();
1998    let chars: Vec<char> = word.chars().collect();
1999
2000    for (index, ch) in chars.iter().copied().enumerate() {
2001        let split_before = current.chars().last().is_some_and(|prev| {
2002            (prev.is_lowercase() && ch.is_uppercase())
2003                || (prev.is_ascii_digit() && ch.is_alphabetic())
2004                || (prev.is_alphabetic() && ch.is_ascii_digit())
2005                || (prev.is_uppercase()
2006                    && ch.is_uppercase()
2007                    && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
2008        });
2009
2010        if split_before && !current.is_empty() {
2011            parts.push(std::mem::take(&mut current));
2012        }
2013
2014        current.push(ch);
2015    }
2016
2017    if !current.is_empty() {
2018        parts.push(current);
2019    }
2020
2021    parts
2022}
2023
2024fn parse_search_keywords(text: &str) -> Vec<String> {
2025    let mut keywords = Vec::new();
2026    let mut seen = HashSet::new();
2027
2028    for word in text
2029        .split(|ch: char| !ch.is_alphanumeric())
2030        .filter(|word| !word.is_empty())
2031    {
2032        let mut variants = Vec::with_capacity(1 + word.len() / 4);
2033        variants.push(word.to_lowercase());
2034        variants.extend(
2035            split_compound_search_word(word)
2036                .into_iter()
2037                .map(|part| part.to_lowercase()),
2038        );
2039
2040        for lowered in variants {
2041            if lowered.chars().count() < 2
2042                || is_search_stop_word(&lowered)
2043                || is_pure_search_number(&lowered)
2044            {
2045                continue;
2046            }
2047            if seen.insert(lowered.clone()) {
2048                keywords.push(lowered);
2049            }
2050        }
2051    }
2052
2053    keywords
2054}
2055
2056fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
2057    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
2058        Ok(serde_json::Value::Object(profile)) => profile,
2059        _ => serde_json::Map::new(),
2060    };
2061    let names = extract_profile_names(&profile);
2062    let primary_name = names.first().map(String::as_str);
2063    let mut parts = Vec::new();
2064    if let Some(name) = primary_name {
2065        parts.push(name.to_string());
2066    }
2067    if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
2068        parts.push(nip05);
2069    }
2070    parts.push(event.pubkey.to_hex());
2071    if names.len() > 1 {
2072        parts.extend(names.into_iter().skip(1));
2073    }
2074    parse_search_keywords(&parts.join(" "))
2075}
2076
2077fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
2078    left.created_at
2079        .as_u64()
2080        .cmp(&right.created_at.as_u64())
2081        .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
2082}
2083
2084fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
2085    anyhow::anyhow!("nostr event store error: {err}")
2086}
2087
2088fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
2089    let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
2090    let page_size = page_size_bytes() as u64;
2091    let rounded = requested
2092        .checked_add(page_size.saturating_sub(1))
2093        .map(|size| size / page_size * page_size)
2094        .unwrap_or(requested);
2095    let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
2096
2097    let env = unsafe {
2098        heed::EnvOpenOptions::new()
2099            .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
2100            .max_dbs(SOCIALGRAPH_MAX_DBS)
2101            .open(db_dir)
2102    }
2103    .context("open social graph LMDB env for resize")?;
2104    if env.info().map_size < map_size {
2105        unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
2106    }
2107
2108    Ok(())
2109}
2110
2111fn page_size_bytes() -> usize {
2112    page_size::get_granularity()
2113}
2114
2115#[cfg(test)]
2116mod tests {
2117    use super::*;
2118    use async_trait::async_trait;
2119    use hashtree_config::StorageBackend;
2120    use hashtree_core::{Hash, MemoryStore, Store, StoreError};
2121    use hashtree_nostr::NostrEventStoreOptions;
2122    use std::collections::HashSet;
2123    use std::fs::{self, File};
2124    use std::io::{BufRead, BufReader};
2125    use std::path::{Path, PathBuf};
2126    use std::process::{Command, Stdio};
2127    use std::sync::Mutex;
2128    use std::time::Duration;
2129
2130    use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
2131    use tempfile::TempDir;
2132
2133    const WELLORDER_FIXTURE_URL: &str =
2134        "https://wellorder.xyz/nostr/nostr-wellorder-early-500k-v1.jsonl.bz2";
2135
2136    #[derive(Debug, Clone, Default)]
2137    struct ReadTraceSnapshot {
2138        get_calls: u64,
2139        total_bytes: u64,
2140        unique_blocks: usize,
2141        unique_bytes: u64,
2142        cache_hits: u64,
2143        remote_fetches: u64,
2144        remote_bytes: u64,
2145    }
2146
2147    #[derive(Debug, Default)]
2148    struct ReadTraceState {
2149        get_calls: u64,
2150        total_bytes: u64,
2151        unique_hashes: HashSet<Hash>,
2152        unique_bytes: u64,
2153        cache_hits: u64,
2154        remote_fetches: u64,
2155        remote_bytes: u64,
2156    }
2157
2158    #[derive(Debug)]
2159    struct CountingStore<S: Store> {
2160        base: Arc<S>,
2161        state: Mutex<ReadTraceState>,
2162    }
2163
2164    impl<S: Store> CountingStore<S> {
2165        fn new(base: Arc<S>) -> Self {
2166            Self {
2167                base,
2168                state: Mutex::new(ReadTraceState::default()),
2169            }
2170        }
2171
2172        fn reset(&self) {
2173            *self.state.lock().unwrap() = ReadTraceState::default();
2174        }
2175
2176        fn snapshot(&self) -> ReadTraceSnapshot {
2177            let state = self.state.lock().unwrap();
2178            ReadTraceSnapshot {
2179                get_calls: state.get_calls,
2180                total_bytes: state.total_bytes,
2181                unique_blocks: state.unique_hashes.len(),
2182                unique_bytes: state.unique_bytes,
2183                cache_hits: state.cache_hits,
2184                remote_fetches: state.remote_fetches,
2185                remote_bytes: state.remote_bytes,
2186            }
2187        }
2188
2189        fn record_read(&self, hash: &Hash, bytes: usize) {
2190            let mut state = self.state.lock().unwrap();
2191            state.get_calls += 1;
2192            state.total_bytes += bytes as u64;
2193            if state.unique_hashes.insert(*hash) {
2194                state.unique_bytes += bytes as u64;
2195            }
2196        }
2197    }
2198
2199    #[async_trait]
2200    impl<S: Store> Store for CountingStore<S> {
2201        async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2202            self.base.put(hash, data).await
2203        }
2204
2205        async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2206            self.base.put_many(items).await
2207        }
2208
2209        async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2210            let data = self.base.get(hash).await?;
2211            if let Some(bytes) = data.as_ref() {
2212                self.record_read(hash, bytes.len());
2213            }
2214            Ok(data)
2215        }
2216
2217        async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2218            self.base.has(hash).await
2219        }
2220
2221        async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2222            self.base.delete(hash).await
2223        }
2224    }
2225
2226    #[derive(Debug)]
2227    struct ReadThroughStore<R: Store> {
2228        cache: Arc<MemoryStore>,
2229        remote: Arc<R>,
2230        state: Mutex<ReadTraceState>,
2231    }
2232
2233    impl<R: Store> ReadThroughStore<R> {
2234        fn new(cache: Arc<MemoryStore>, remote: Arc<R>) -> Self {
2235            Self {
2236                cache,
2237                remote,
2238                state: Mutex::new(ReadTraceState::default()),
2239            }
2240        }
2241
2242        fn snapshot(&self) -> ReadTraceSnapshot {
2243            let state = self.state.lock().unwrap();
2244            ReadTraceSnapshot {
2245                get_calls: state.get_calls,
2246                total_bytes: state.total_bytes,
2247                unique_blocks: state.unique_hashes.len(),
2248                unique_bytes: state.unique_bytes,
2249                cache_hits: state.cache_hits,
2250                remote_fetches: state.remote_fetches,
2251                remote_bytes: state.remote_bytes,
2252            }
2253        }
2254    }
2255
2256    #[async_trait]
2257    impl<R: Store> Store for ReadThroughStore<R> {
2258        async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2259            self.cache.put(hash, data).await
2260        }
2261
2262        async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2263            self.cache.put_many(items).await
2264        }
2265
2266        async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2267            {
2268                let mut state = self.state.lock().unwrap();
2269                state.get_calls += 1;
2270            }
2271
2272            if let Some(bytes) = self.cache.get(hash).await? {
2273                let mut state = self.state.lock().unwrap();
2274                state.cache_hits += 1;
2275                state.total_bytes += bytes.len() as u64;
2276                if state.unique_hashes.insert(*hash) {
2277                    state.unique_bytes += bytes.len() as u64;
2278                }
2279                return Ok(Some(bytes));
2280            }
2281
2282            let data = self.remote.get(hash).await?;
2283            if let Some(bytes) = data.as_ref() {
2284                let _ = self.cache.put(*hash, bytes.clone()).await?;
2285                let mut state = self.state.lock().unwrap();
2286                state.remote_fetches += 1;
2287                state.remote_bytes += bytes.len() as u64;
2288                state.total_bytes += bytes.len() as u64;
2289                if state.unique_hashes.insert(*hash) {
2290                    state.unique_bytes += bytes.len() as u64;
2291                }
2292            }
2293            Ok(data)
2294        }
2295
2296        async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2297            if self.cache.has(hash).await? {
2298                return Ok(true);
2299            }
2300            self.remote.has(hash).await
2301        }
2302
2303        async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2304            let cache_deleted = self.cache.delete(hash).await?;
2305            let remote_deleted = self.remote.delete(hash).await?;
2306            Ok(cache_deleted || remote_deleted)
2307        }
2308    }
2309
2310    #[derive(Debug, Clone)]
2311    enum BenchmarkQueryCase {
2312        ById {
2313            id: String,
2314        },
2315        ByAuthor {
2316            pubkey: String,
2317            limit: usize,
2318        },
2319        ByAuthorKind {
2320            pubkey: String,
2321            kind: u32,
2322            limit: usize,
2323        },
2324        ByKind {
2325            kind: u32,
2326            limit: usize,
2327        },
2328        ByTag {
2329            tag_name: String,
2330            tag_value: String,
2331            limit: usize,
2332        },
2333        Recent {
2334            limit: usize,
2335        },
2336        Replaceable {
2337            pubkey: String,
2338            kind: u32,
2339        },
2340        ParameterizedReplaceable {
2341            pubkey: String,
2342            kind: u32,
2343            d_tag: String,
2344        },
2345    }
2346
2347    impl BenchmarkQueryCase {
2348        fn name(&self) -> &'static str {
2349            match self {
2350                BenchmarkQueryCase::ById { .. } => "by_id",
2351                BenchmarkQueryCase::ByAuthor { .. } => "by_author",
2352                BenchmarkQueryCase::ByAuthorKind { .. } => "by_author_kind",
2353                BenchmarkQueryCase::ByKind { .. } => "by_kind",
2354                BenchmarkQueryCase::ByTag { .. } => "by_tag",
2355                BenchmarkQueryCase::Recent { .. } => "recent",
2356                BenchmarkQueryCase::Replaceable { .. } => "replaceable",
2357                BenchmarkQueryCase::ParameterizedReplaceable { .. } => "parameterized_replaceable",
2358            }
2359        }
2360
2361        async fn execute<S: Store>(
2362            &self,
2363            store: &NostrEventStore<S>,
2364            root: &Cid,
2365        ) -> Result<usize, NostrEventStoreError> {
2366            match self {
2367                BenchmarkQueryCase::ById { id } => {
2368                    Ok(store.get_by_id(Some(root), id).await?.into_iter().count())
2369                }
2370                BenchmarkQueryCase::ByAuthor { pubkey, limit } => Ok(store
2371                    .list_by_author(
2372                        Some(root),
2373                        pubkey,
2374                        ListEventsOptions {
2375                            limit: Some(*limit),
2376                            ..Default::default()
2377                        },
2378                    )
2379                    .await?
2380                    .len()),
2381                BenchmarkQueryCase::ByAuthorKind {
2382                    pubkey,
2383                    kind,
2384                    limit,
2385                } => Ok(store
2386                    .list_by_author_and_kind(
2387                        Some(root),
2388                        pubkey,
2389                        *kind,
2390                        ListEventsOptions {
2391                            limit: Some(*limit),
2392                            ..Default::default()
2393                        },
2394                    )
2395                    .await?
2396                    .len()),
2397                BenchmarkQueryCase::ByKind { kind, limit } => Ok(store
2398                    .list_by_kind(
2399                        Some(root),
2400                        *kind,
2401                        ListEventsOptions {
2402                            limit: Some(*limit),
2403                            ..Default::default()
2404                        },
2405                    )
2406                    .await?
2407                    .len()),
2408                BenchmarkQueryCase::ByTag {
2409                    tag_name,
2410                    tag_value,
2411                    limit,
2412                } => Ok(store
2413                    .list_by_tag(
2414                        Some(root),
2415                        tag_name,
2416                        tag_value,
2417                        ListEventsOptions {
2418                            limit: Some(*limit),
2419                            ..Default::default()
2420                        },
2421                    )
2422                    .await?
2423                    .len()),
2424                BenchmarkQueryCase::Recent { limit } => Ok(store
2425                    .list_recent(
2426                        Some(root),
2427                        ListEventsOptions {
2428                            limit: Some(*limit),
2429                            ..Default::default()
2430                        },
2431                    )
2432                    .await?
2433                    .len()),
2434                BenchmarkQueryCase::Replaceable { pubkey, kind } => Ok(store
2435                    .get_replaceable(Some(root), pubkey, *kind)
2436                    .await?
2437                    .into_iter()
2438                    .count()),
2439                BenchmarkQueryCase::ParameterizedReplaceable {
2440                    pubkey,
2441                    kind,
2442                    d_tag,
2443                } => Ok(store
2444                    .get_parameterized_replaceable(Some(root), pubkey, *kind, d_tag)
2445                    .await?
2446                    .into_iter()
2447                    .count()),
2448            }
2449        }
2450    }
2451
2452    #[derive(Debug, Clone, Copy)]
2453    struct NetworkModel {
2454        name: &'static str,
2455        rtt_ms: f64,
2456        bandwidth_mib_per_s: f64,
2457    }
2458
2459    #[derive(Debug, Clone)]
2460    struct QueryBenchmarkResult {
2461        average_duration: Duration,
2462        p95_duration: Duration,
2463        reads: ReadTraceSnapshot,
2464    }
2465
2466    const NETWORK_MODELS: [NetworkModel; 3] = [
2467        NetworkModel {
2468            name: "lan",
2469            rtt_ms: 2.0,
2470            bandwidth_mib_per_s: 100.0,
2471        },
2472        NetworkModel {
2473            name: "wan",
2474            rtt_ms: 40.0,
2475            bandwidth_mib_per_s: 20.0,
2476        },
2477        NetworkModel {
2478            name: "slow",
2479            rtt_ms: 120.0,
2480            bandwidth_mib_per_s: 5.0,
2481        },
2482    ];
2483
2484    #[test]
2485    fn test_open_social_graph_store() {
2486        let _guard = test_lock();
2487        let tmp = TempDir::new().unwrap();
2488        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2489        assert_eq!(Arc::strong_count(&graph_store), 1);
2490    }
2491
2492    #[test]
2493    fn test_set_root_and_get_follow_distance() {
2494        let _guard = test_lock();
2495        let tmp = TempDir::new().unwrap();
2496        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2497        let root_pk = [1u8; 32];
2498        set_social_graph_root(&graph_store, &root_pk);
2499        assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
2500    }
2501
2502    #[test]
2503    fn test_ingest_event_updates_follows_and_mutes() {
2504        let _guard = test_lock();
2505        let tmp = TempDir::new().unwrap();
2506        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2507
2508        let root_keys = Keys::generate();
2509        let alice_keys = Keys::generate();
2510        let bob_keys = Keys::generate();
2511
2512        let root_pk = root_keys.public_key().to_bytes();
2513        set_social_graph_root(&graph_store, &root_pk);
2514
2515        let follow = EventBuilder::new(
2516            Kind::ContactList,
2517            "",
2518            vec![Tag::public_key(alice_keys.public_key())],
2519        )
2520        .custom_created_at(Timestamp::from_secs(10))
2521        .to_event(&root_keys)
2522        .unwrap();
2523        ingest_event(&graph_store, "follow", &follow.as_json());
2524
2525        let mute = EventBuilder::new(
2526            Kind::MuteList,
2527            "",
2528            vec![Tag::public_key(bob_keys.public_key())],
2529        )
2530        .custom_created_at(Timestamp::from_secs(11))
2531        .to_event(&root_keys)
2532        .unwrap();
2533        ingest_event(&graph_store, "mute", &mute.as_json());
2534
2535        assert_eq!(
2536            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
2537            Some(1)
2538        );
2539        assert!(is_overmuted(
2540            &graph_store,
2541            &root_pk,
2542            &bob_keys.public_key().to_bytes(),
2543            1.0
2544        ));
2545    }
2546
2547    #[test]
2548    fn test_metadata_ingest_builds_profile_search_index_and_replaces_old_terms() {
2549        let _guard = test_lock();
2550        let tmp = TempDir::new().unwrap();
2551        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2552        let keys = Keys::generate();
2553
2554        let older = EventBuilder::new(
2555            Kind::Metadata,
2556            serde_json::json!({
2557                "display_name": "sirius",
2558                "name": "Martti Malmi",
2559                "username": "mmalmi",
2560                "nip05": "siriusdev@iris.to",
2561            })
2562            .to_string(),
2563            [],
2564        )
2565        .custom_created_at(Timestamp::from_secs(5))
2566        .to_event(&keys)
2567        .unwrap();
2568        let newer = EventBuilder::new(
2569            Kind::Metadata,
2570            serde_json::json!({
2571                "display_name": "bird",
2572                "nip05": "birdman@iris.to",
2573            })
2574            .to_string(),
2575            [],
2576        )
2577        .custom_created_at(Timestamp::from_secs(6))
2578        .to_event(&keys)
2579        .unwrap();
2580
2581        ingest_parsed_event(&graph_store, &older).unwrap();
2582
2583        let pubkey = keys.public_key().to_hex();
2584        let entries = graph_store
2585            .profile_search_entries_for_prefix("p:sirius")
2586            .unwrap();
2587        assert!(entries
2588            .iter()
2589            .any(|(key, entry)| key == &format!("p:sirius:{pubkey}") && entry.name == "sirius"));
2590        assert!(entries.iter().any(|(key, entry)| {
2591            key == &format!("p:siriusdev:{pubkey}")
2592                && entry.nip05.as_deref() == Some("siriusdev")
2593                && entry.aliases == vec!["Martti Malmi".to_string(), "mmalmi".to_string()]
2594                && entry.event_nhash.starts_with("nhash1")
2595        }));
2596        assert!(entries.iter().all(|(_, entry)| entry.pubkey == pubkey));
2597        assert_eq!(
2598            graph_store
2599                .latest_profile_event(&pubkey)
2600                .unwrap()
2601                .expect("latest mirrored profile")
2602                .id,
2603            older.id
2604        );
2605
2606        ingest_parsed_event(&graph_store, &newer).unwrap();
2607
2608        assert!(graph_store
2609            .profile_search_entries_for_prefix("p:sirius")
2610            .unwrap()
2611            .is_empty());
2612        let bird_entries = graph_store
2613            .profile_search_entries_for_prefix("p:bird")
2614            .unwrap();
2615        assert_eq!(bird_entries.len(), 2);
2616        assert!(bird_entries
2617            .iter()
2618            .any(|(key, entry)| key == &format!("p:bird:{pubkey}") && entry.name == "bird"));
2619        assert!(bird_entries.iter().any(|(key, entry)| {
2620            key == &format!("p:birdman:{pubkey}")
2621                && entry.nip05.as_deref() == Some("birdman")
2622                && entry.aliases.is_empty()
2623        }));
2624        assert_eq!(
2625            graph_store
2626                .latest_profile_event(&pubkey)
2627                .unwrap()
2628                .expect("latest mirrored profile")
2629                .id,
2630            newer.id
2631        );
2632    }
2633
2634    #[test]
2635    fn test_ambient_metadata_events_are_mirrored_into_public_profile_index() {
2636        let _guard = test_lock();
2637        let tmp = TempDir::new().unwrap();
2638        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2639        let keys = Keys::generate();
2640
2641        let profile = EventBuilder::new(
2642            Kind::Metadata,
2643            serde_json::json!({
2644                "display_name": "ambient bird",
2645            })
2646            .to_string(),
2647            [],
2648        )
2649        .custom_created_at(Timestamp::from_secs(5))
2650        .to_event(&keys)
2651        .unwrap();
2652
2653        ingest_parsed_event_with_storage_class(&graph_store, &profile, EventStorageClass::Ambient)
2654            .unwrap();
2655
2656        let pubkey = keys.public_key().to_hex();
2657        let mirrored = graph_store
2658            .latest_profile_event(&pubkey)
2659            .unwrap()
2660            .expect("mirrored ambient profile");
2661        assert_eq!(mirrored.id, profile.id);
2662        assert_eq!(
2663            graph_store
2664                .profile_search_entries_for_prefix("p:ambient")
2665                .unwrap()
2666                .len(),
2667            1
2668        );
2669    }
2670
2671    #[test]
2672    fn test_metadata_ingest_splits_compound_profile_terms_without_losing_whole_token() {
2673        let _guard = test_lock();
2674        let tmp = TempDir::new().unwrap();
2675        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2676        let keys = Keys::generate();
2677
2678        let profile = EventBuilder::new(
2679            Kind::Metadata,
2680            serde_json::json!({
2681                "display_name": "SirLibre",
2682                "username": "XMLHttpRequest42",
2683            })
2684            .to_string(),
2685            [],
2686        )
2687        .custom_created_at(Timestamp::from_secs(5))
2688        .to_event(&keys)
2689        .unwrap();
2690
2691        ingest_parsed_event(&graph_store, &profile).unwrap();
2692
2693        let pubkey = keys.public_key().to_hex();
2694        assert!(
2695            graph_store
2696                .profile_search_entries_for_prefix("p:sirlibre")
2697                .unwrap()
2698                .iter()
2699                .any(|(key, entry)| key == &format!("p:sirlibre:{pubkey}")
2700                    && entry.name == "SirLibre")
2701        );
2702        assert!(graph_store
2703            .profile_search_entries_for_prefix("p:libre")
2704            .unwrap()
2705            .iter()
2706            .any(|(key, entry)| key == &format!("p:libre:{pubkey}") && entry.name == "SirLibre"));
2707        assert!(graph_store
2708            .profile_search_entries_for_prefix("p:xml")
2709            .unwrap()
2710            .iter()
2711            .any(|(key, entry)| {
2712                key == &format!("p:xml:{pubkey}")
2713                    && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2714            }));
2715        assert!(graph_store
2716            .profile_search_entries_for_prefix("p:request")
2717            .unwrap()
2718            .iter()
2719            .any(|(key, entry)| {
2720                key == &format!("p:request:{pubkey}")
2721                    && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2722            }));
2723    }
2724
2725    #[test]
2726    fn test_profile_search_index_persists_across_reopen() {
2727        let _guard = test_lock();
2728        let tmp = TempDir::new().unwrap();
2729        let keys = Keys::generate();
2730        let pubkey = keys.public_key().to_hex();
2731
2732        {
2733            let graph_store = open_social_graph_store(tmp.path()).unwrap();
2734            let profile = EventBuilder::new(
2735                Kind::Metadata,
2736                serde_json::json!({
2737                    "display_name": "reopen user",
2738                })
2739                .to_string(),
2740                [],
2741            )
2742            .custom_created_at(Timestamp::from_secs(5))
2743            .to_event(&keys)
2744            .unwrap();
2745            ingest_parsed_event(&graph_store, &profile).unwrap();
2746            assert!(graph_store.profile_search_root().unwrap().is_some());
2747        }
2748
2749        let reopened = open_social_graph_store(tmp.path()).unwrap();
2750        assert!(reopened.profile_search_root().unwrap().is_some());
2751        assert_eq!(
2752            reopened
2753                .latest_profile_event(&pubkey)
2754                .unwrap()
2755                .expect("mirrored profile after reopen")
2756                .pubkey,
2757            keys.public_key()
2758        );
2759        let links = reopened
2760            .profile_search_entries_for_prefix("p:reopen")
2761            .unwrap();
2762        assert_eq!(links.len(), 1);
2763        assert_eq!(links[0].0, format!("p:reopen:{pubkey}"));
2764        assert_eq!(links[0].1.name, "reopen user");
2765    }
2766
2767    #[test]
2768    fn test_profile_search_index_with_shared_hashtree_storage() {
2769        let _guard = test_lock();
2770        let tmp = TempDir::new().unwrap();
2771        let store =
2772            crate::storage::HashtreeStore::with_options(tmp.path(), None, 1024 * 1024 * 1024)
2773                .unwrap();
2774        let graph_store =
2775            open_social_graph_store_with_storage(tmp.path(), store.store_arc(), None).unwrap();
2776        let keys = Keys::generate();
2777        let pubkey = keys.public_key().to_hex();
2778
2779        let profile = EventBuilder::new(
2780            Kind::Metadata,
2781            serde_json::json!({
2782                "display_name": "shared storage user",
2783                "nip05": "shareduser@example.com",
2784            })
2785            .to_string(),
2786            [],
2787        )
2788        .custom_created_at(Timestamp::from_secs(5))
2789        .to_event(&keys)
2790        .unwrap();
2791
2792        graph_store
2793            .sync_profile_index_for_events(std::slice::from_ref(&profile))
2794            .unwrap();
2795        assert!(graph_store.profile_search_root().unwrap().is_some());
2796        assert!(graph_store.profile_search_root().unwrap().is_some());
2797        let links = graph_store
2798            .profile_search_entries_for_prefix("p:shared")
2799            .unwrap();
2800        assert_eq!(links.len(), 2);
2801        assert!(links
2802            .iter()
2803            .any(|(key, entry)| key == &format!("p:shared:{pubkey}")
2804                && entry.name == "shared storage user"));
2805        assert!(links
2806            .iter()
2807            .any(|(key, entry)| key == &format!("p:shareduser:{pubkey}")
2808                && entry.nip05.as_deref() == Some("shareduser")));
2809    }
2810
2811    #[test]
2812    fn test_rebuild_profile_index_from_stored_events_uses_ambient_and_public_metadata() {
2813        let _guard = test_lock();
2814        let tmp = TempDir::new().unwrap();
2815        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2816        let public_keys = Keys::generate();
2817        let ambient_keys = Keys::generate();
2818        let public_pubkey = public_keys.public_key().to_hex();
2819        let ambient_pubkey = ambient_keys.public_key().to_hex();
2820
2821        let older = EventBuilder::new(
2822            Kind::Metadata,
2823            serde_json::json!({
2824                "display_name": "petri old",
2825            })
2826            .to_string(),
2827            [],
2828        )
2829        .custom_created_at(Timestamp::from_secs(5))
2830        .to_event(&public_keys)
2831        .unwrap();
2832        let newer = EventBuilder::new(
2833            Kind::Metadata,
2834            serde_json::json!({
2835                "display_name": "petri",
2836                "name": "Petri Example",
2837                "nip05": "petri@example.com",
2838            })
2839            .to_string(),
2840            [],
2841        )
2842        .custom_created_at(Timestamp::from_secs(6))
2843        .to_event(&public_keys)
2844        .unwrap();
2845        let ambient = EventBuilder::new(
2846            Kind::Metadata,
2847            serde_json::json!({
2848                "display_name": "ambient petri",
2849            })
2850            .to_string(),
2851            [],
2852        )
2853        .custom_created_at(Timestamp::from_secs(7))
2854        .to_event(&ambient_keys)
2855        .unwrap();
2856
2857        ingest_parsed_event_with_storage_class(&graph_store, &older, EventStorageClass::Public)
2858            .unwrap();
2859        ingest_parsed_event_with_storage_class(&graph_store, &newer, EventStorageClass::Public)
2860            .unwrap();
2861        ingest_parsed_event_with_storage_class(&graph_store, &ambient, EventStorageClass::Ambient)
2862            .unwrap();
2863
2864        graph_store
2865            .profile_index
2866            .write_by_pubkey_root(None)
2867            .unwrap();
2868        graph_store.profile_index.write_search_root(None).unwrap();
2869
2870        let rebuilt = graph_store
2871            .rebuild_profile_index_from_stored_events()
2872            .unwrap();
2873        assert_eq!(rebuilt, 2);
2874
2875        let entries = graph_store
2876            .profile_search_entries_for_prefix("p:petri")
2877            .unwrap();
2878        assert_eq!(entries.len(), 2);
2879        assert!(entries.iter().any(|(key, entry)| {
2880            key == &format!("p:petri:{public_pubkey}")
2881                && entry.name == "petri"
2882                && entry.aliases == vec!["Petri Example".to_string()]
2883                && entry.nip05.is_none()
2884        }));
2885        assert!(entries.iter().any(|(key, entry)| {
2886            key == &format!("p:petri:{ambient_pubkey}")
2887                && entry.name == "ambient petri"
2888                && entry.aliases.is_empty()
2889                && entry.nip05.is_none()
2890        }));
2891    }
2892
2893    #[test]
2894    fn test_rebuild_profile_index_excludes_overmuted_users() {
2895        let _guard = test_lock();
2896        let tmp = TempDir::new().unwrap();
2897        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2898        let root_keys = Keys::generate();
2899        let muted_keys = Keys::generate();
2900        let muted_pubkey = muted_keys.public_key().to_hex();
2901
2902        set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
2903        graph_store.set_profile_index_overmute_threshold(1.0);
2904
2905        let profile = EventBuilder::new(
2906            Kind::Metadata,
2907            serde_json::json!({
2908                "display_name": "muted petri",
2909            })
2910            .to_string(),
2911            [],
2912        )
2913        .custom_created_at(Timestamp::from_secs(5))
2914        .to_event(&muted_keys)
2915        .unwrap();
2916        ingest_parsed_event(&graph_store, &profile).unwrap();
2917        assert!(graph_store.latest_profile_event(&muted_pubkey).unwrap().is_some());
2918
2919        let mute = EventBuilder::new(
2920            Kind::MuteList,
2921            "",
2922            vec![Tag::public_key(muted_keys.public_key())],
2923        )
2924        .custom_created_at(Timestamp::from_secs(6))
2925        .to_event(&root_keys)
2926        .unwrap();
2927        ingest_parsed_event(&graph_store, &mute).unwrap();
2928        assert!(graph_store
2929            .is_overmuted_user(&muted_keys.public_key().to_bytes(), 1.0)
2930            .unwrap());
2931
2932        let rebuilt = graph_store
2933            .rebuild_profile_index_from_stored_events()
2934            .unwrap();
2935        assert_eq!(rebuilt, 0);
2936        assert!(graph_store.latest_profile_event(&muted_pubkey).unwrap().is_none());
2937        assert!(graph_store
2938            .profile_search_entries_for_prefix("p:muted")
2939            .unwrap()
2940            .is_empty());
2941    }
2942
2943    #[test]
2944    fn test_query_events_by_author() {
2945        let _guard = test_lock();
2946        let tmp = TempDir::new().unwrap();
2947        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2948        let keys = Keys::generate();
2949
2950        let older = EventBuilder::new(Kind::TextNote, "older", [])
2951            .custom_created_at(Timestamp::from_secs(5))
2952            .to_event(&keys)
2953            .unwrap();
2954        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2955            .custom_created_at(Timestamp::from_secs(6))
2956            .to_event(&keys)
2957            .unwrap();
2958
2959        ingest_parsed_event(&graph_store, &older).unwrap();
2960        ingest_parsed_event(&graph_store, &newer).unwrap();
2961
2962        let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
2963        let events = query_events(&graph_store, &filter, 10);
2964        assert_eq!(events.len(), 2);
2965        assert_eq!(events[0].id, newer.id);
2966        assert_eq!(events[1].id, older.id);
2967    }
2968
2969    #[test]
2970    fn test_query_events_by_kind() {
2971        let _guard = test_lock();
2972        let tmp = TempDir::new().unwrap();
2973        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2974        let first_keys = Keys::generate();
2975        let second_keys = Keys::generate();
2976
2977        let older = EventBuilder::new(Kind::TextNote, "older", [])
2978            .custom_created_at(Timestamp::from_secs(5))
2979            .to_event(&first_keys)
2980            .unwrap();
2981        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2982            .custom_created_at(Timestamp::from_secs(6))
2983            .to_event(&second_keys)
2984            .unwrap();
2985        let other_kind = EventBuilder::new(Kind::Metadata, "profile", [])
2986            .custom_created_at(Timestamp::from_secs(7))
2987            .to_event(&second_keys)
2988            .unwrap();
2989
2990        ingest_parsed_event(&graph_store, &older).unwrap();
2991        ingest_parsed_event(&graph_store, &newer).unwrap();
2992        ingest_parsed_event(&graph_store, &other_kind).unwrap();
2993
2994        let filter = Filter::new().kind(Kind::TextNote);
2995        let events = query_events(&graph_store, &filter, 10);
2996        assert_eq!(events.len(), 2);
2997        assert_eq!(events[0].id, newer.id);
2998        assert_eq!(events[1].id, older.id);
2999    }
3000
3001    #[test]
3002    fn test_query_events_by_id() {
3003        let _guard = test_lock();
3004        let tmp = TempDir::new().unwrap();
3005        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3006        let keys = Keys::generate();
3007
3008        let first = EventBuilder::new(Kind::TextNote, "first", [])
3009            .custom_created_at(Timestamp::from_secs(5))
3010            .to_event(&keys)
3011            .unwrap();
3012        let target = EventBuilder::new(Kind::TextNote, "target", [])
3013            .custom_created_at(Timestamp::from_secs(6))
3014            .to_event(&keys)
3015            .unwrap();
3016
3017        ingest_parsed_event(&graph_store, &first).unwrap();
3018        ingest_parsed_event(&graph_store, &target).unwrap();
3019
3020        let filter = Filter::new().id(target.id).kind(Kind::TextNote);
3021        let events = query_events(&graph_store, &filter, 10);
3022        assert_eq!(events.len(), 1);
3023        assert_eq!(events[0].id, target.id);
3024    }
3025
3026    #[test]
3027    fn test_query_events_search_is_case_insensitive() {
3028        let _guard = test_lock();
3029        let tmp = TempDir::new().unwrap();
3030        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3031        let keys = Keys::generate();
3032        let other_keys = Keys::generate();
3033
3034        let matching = EventBuilder::new(Kind::TextNote, "Hello Nostr Search", [])
3035            .custom_created_at(Timestamp::from_secs(5))
3036            .to_event(&keys)
3037            .unwrap();
3038        let other = EventBuilder::new(Kind::TextNote, "goodbye world", [])
3039            .custom_created_at(Timestamp::from_secs(6))
3040            .to_event(&other_keys)
3041            .unwrap();
3042
3043        ingest_parsed_event(&graph_store, &matching).unwrap();
3044        ingest_parsed_event(&graph_store, &other).unwrap();
3045
3046        let filter = Filter::new().kind(Kind::TextNote).search("nostr search");
3047        let events = query_events(&graph_store, &filter, 10);
3048        assert_eq!(events.len(), 1);
3049        assert_eq!(events[0].id, matching.id);
3050    }
3051
3052    #[test]
3053    fn test_query_events_since_until_are_inclusive() {
3054        let _guard = test_lock();
3055        let tmp = TempDir::new().unwrap();
3056        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3057        let keys = Keys::generate();
3058
3059        let before = EventBuilder::new(Kind::TextNote, "before", [])
3060            .custom_created_at(Timestamp::from_secs(5))
3061            .to_event(&keys)
3062            .unwrap();
3063        let start = EventBuilder::new(Kind::TextNote, "start", [])
3064            .custom_created_at(Timestamp::from_secs(6))
3065            .to_event(&keys)
3066            .unwrap();
3067        let end = EventBuilder::new(Kind::TextNote, "end", [])
3068            .custom_created_at(Timestamp::from_secs(10))
3069            .to_event(&keys)
3070            .unwrap();
3071        let after = EventBuilder::new(Kind::TextNote, "after", [])
3072            .custom_created_at(Timestamp::from_secs(11))
3073            .to_event(&keys)
3074            .unwrap();
3075
3076        ingest_parsed_event(&graph_store, &before).unwrap();
3077        ingest_parsed_event(&graph_store, &start).unwrap();
3078        ingest_parsed_event(&graph_store, &end).unwrap();
3079        ingest_parsed_event(&graph_store, &after).unwrap();
3080
3081        let filter = Filter::new()
3082            .kind(Kind::TextNote)
3083            .since(Timestamp::from_secs(6))
3084            .until(Timestamp::from_secs(10));
3085        let events = query_events(&graph_store, &filter, 10);
3086        let ids = events.into_iter().map(|event| event.id).collect::<Vec<_>>();
3087        assert_eq!(ids, vec![end.id, start.id]);
3088    }
3089
3090    #[test]
3091    fn test_query_events_replaceable_kind_returns_latest_winner() {
3092        let _guard = test_lock();
3093        let tmp = TempDir::new().unwrap();
3094        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3095        let keys = Keys::generate();
3096
3097        let older = EventBuilder::new(Kind::Custom(10_000), "older mute list", [])
3098            .custom_created_at(Timestamp::from_secs(5))
3099            .to_event(&keys)
3100            .unwrap();
3101        let newer = EventBuilder::new(Kind::Custom(10_000), "newer mute list", [])
3102            .custom_created_at(Timestamp::from_secs(6))
3103            .to_event(&keys)
3104            .unwrap();
3105
3106        ingest_parsed_event(&graph_store, &older).unwrap();
3107        ingest_parsed_event(&graph_store, &newer).unwrap();
3108
3109        let filter = Filter::new()
3110            .author(keys.public_key())
3111            .kind(Kind::Custom(10_000));
3112        let events = query_events(&graph_store, &filter, 10);
3113        assert_eq!(events.len(), 1);
3114        assert_eq!(events[0].id, newer.id);
3115    }
3116
3117    #[test]
3118    fn test_query_events_kind_41_replaceable_returns_latest_winner() {
3119        let _guard = test_lock();
3120        let tmp = TempDir::new().unwrap();
3121        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3122        let keys = Keys::generate();
3123
3124        let older = EventBuilder::new(Kind::Custom(41), "older channel metadata", [])
3125            .custom_created_at(Timestamp::from_secs(5))
3126            .to_event(&keys)
3127            .unwrap();
3128        let newer = EventBuilder::new(Kind::Custom(41), "newer channel metadata", [])
3129            .custom_created_at(Timestamp::from_secs(6))
3130            .to_event(&keys)
3131            .unwrap();
3132
3133        ingest_parsed_event(&graph_store, &older).unwrap();
3134        ingest_parsed_event(&graph_store, &newer).unwrap();
3135
3136        let filter = Filter::new()
3137            .author(keys.public_key())
3138            .kind(Kind::Custom(41));
3139        let events = query_events(&graph_store, &filter, 10);
3140        assert_eq!(events.len(), 1);
3141        assert_eq!(events[0].id, newer.id);
3142    }
3143
3144    #[test]
3145    fn test_public_and_ambient_indexes_stay_separate() {
3146        let _guard = test_lock();
3147        let tmp = TempDir::new().unwrap();
3148        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3149        let public_keys = Keys::generate();
3150        let ambient_keys = Keys::generate();
3151
3152        let public_event = EventBuilder::new(Kind::TextNote, "public", [])
3153            .custom_created_at(Timestamp::from_secs(5))
3154            .to_event(&public_keys)
3155            .unwrap();
3156        let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
3157            .custom_created_at(Timestamp::from_secs(6))
3158            .to_event(&ambient_keys)
3159            .unwrap();
3160
3161        ingest_parsed_event_with_storage_class(
3162            &graph_store,
3163            &public_event,
3164            EventStorageClass::Public,
3165        )
3166        .unwrap();
3167        ingest_parsed_event_with_storage_class(
3168            &graph_store,
3169            &ambient_event,
3170            EventStorageClass::Ambient,
3171        )
3172        .unwrap();
3173
3174        let filter = Filter::new().kind(Kind::TextNote);
3175        let all_events = graph_store
3176            .query_events_in_scope(&filter, 10, EventQueryScope::All)
3177            .unwrap();
3178        assert_eq!(all_events.len(), 2);
3179
3180        let public_events = graph_store
3181            .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3182            .unwrap();
3183        assert_eq!(public_events.len(), 1);
3184        assert_eq!(public_events[0].id, public_event.id);
3185
3186        let ambient_events = graph_store
3187            .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3188            .unwrap();
3189        assert_eq!(ambient_events.len(), 1);
3190        assert_eq!(ambient_events[0].id, ambient_event.id);
3191    }
3192
3193    #[test]
3194    fn test_default_ingest_classifies_root_author_as_public() {
3195        let _guard = test_lock();
3196        let tmp = TempDir::new().unwrap();
3197        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3198        let root_keys = Keys::generate();
3199        let other_keys = Keys::generate();
3200        set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
3201
3202        let root_event = EventBuilder::new(Kind::TextNote, "root", [])
3203            .custom_created_at(Timestamp::from_secs(5))
3204            .to_event(&root_keys)
3205            .unwrap();
3206        let other_event = EventBuilder::new(Kind::TextNote, "other", [])
3207            .custom_created_at(Timestamp::from_secs(6))
3208            .to_event(&other_keys)
3209            .unwrap();
3210
3211        ingest_parsed_event(&graph_store, &root_event).unwrap();
3212        ingest_parsed_event(&graph_store, &other_event).unwrap();
3213
3214        let filter = Filter::new().kind(Kind::TextNote);
3215        let public_events = graph_store
3216            .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3217            .unwrap();
3218        assert_eq!(public_events.len(), 1);
3219        assert_eq!(public_events[0].id, root_event.id);
3220
3221        let ambient_events = graph_store
3222            .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3223            .unwrap();
3224        assert_eq!(ambient_events.len(), 1);
3225        assert_eq!(ambient_events[0].id, other_event.id);
3226    }
3227
3228    #[test]
3229    fn test_query_events_survives_reopen() {
3230        let _guard = test_lock();
3231        let tmp = TempDir::new().unwrap();
3232        let db_dir = tmp.path().join("socialgraph-store");
3233        let keys = Keys::generate();
3234        let other_keys = Keys::generate();
3235
3236        {
3237            let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
3238            let older = EventBuilder::new(Kind::TextNote, "older", [])
3239                .custom_created_at(Timestamp::from_secs(5))
3240                .to_event(&keys)
3241                .unwrap();
3242            let newer = EventBuilder::new(Kind::TextNote, "newer", [])
3243                .custom_created_at(Timestamp::from_secs(6))
3244                .to_event(&keys)
3245                .unwrap();
3246            let latest = EventBuilder::new(Kind::TextNote, "latest", [])
3247                .custom_created_at(Timestamp::from_secs(7))
3248                .to_event(&other_keys)
3249                .unwrap();
3250
3251            ingest_parsed_event(&graph_store, &older).unwrap();
3252            ingest_parsed_event(&graph_store, &newer).unwrap();
3253            ingest_parsed_event(&graph_store, &latest).unwrap();
3254        }
3255
3256        let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
3257
3258        let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
3259        let author_events = query_events(&reopened, &author_filter, 10);
3260        assert_eq!(author_events.len(), 2);
3261        assert_eq!(author_events[0].content, "newer");
3262        assert_eq!(author_events[1].content, "older");
3263
3264        let recent_filter = Filter::new().kind(Kind::TextNote);
3265        let recent_events = query_events(&reopened, &recent_filter, 2);
3266        assert_eq!(recent_events.len(), 2);
3267        assert_eq!(recent_events[0].content, "latest");
3268        assert_eq!(recent_events[1].content, "newer");
3269    }
3270
3271    #[test]
3272    fn test_query_events_parameterized_replaceable_by_d_tag() {
3273        let _guard = test_lock();
3274        let tmp = TempDir::new().unwrap();
3275        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3276        let keys = Keys::generate();
3277
3278        let older = EventBuilder::new(
3279            Kind::Custom(30078),
3280            "",
3281            vec![
3282                Tag::identifier("video"),
3283                Tag::parse(&["l", "hashtree"]).unwrap(),
3284                Tag::parse(&["hash", &"11".repeat(32)]).unwrap(),
3285            ],
3286        )
3287        .custom_created_at(Timestamp::from_secs(5))
3288        .to_event(&keys)
3289        .unwrap();
3290        let newer = EventBuilder::new(
3291            Kind::Custom(30078),
3292            "",
3293            vec![
3294                Tag::identifier("video"),
3295                Tag::parse(&["l", "hashtree"]).unwrap(),
3296                Tag::parse(&["hash", &"22".repeat(32)]).unwrap(),
3297            ],
3298        )
3299        .custom_created_at(Timestamp::from_secs(6))
3300        .to_event(&keys)
3301        .unwrap();
3302        let other_tree = EventBuilder::new(
3303            Kind::Custom(30078),
3304            "",
3305            vec![
3306                Tag::identifier("files"),
3307                Tag::parse(&["l", "hashtree"]).unwrap(),
3308                Tag::parse(&["hash", &"33".repeat(32)]).unwrap(),
3309            ],
3310        )
3311        .custom_created_at(Timestamp::from_secs(7))
3312        .to_event(&keys)
3313        .unwrap();
3314
3315        ingest_parsed_event(&graph_store, &older).unwrap();
3316        ingest_parsed_event(&graph_store, &newer).unwrap();
3317        ingest_parsed_event(&graph_store, &other_tree).unwrap();
3318
3319        let filter = Filter::new()
3320            .author(keys.public_key())
3321            .kind(Kind::Custom(30078))
3322            .identifier("video");
3323        let events = query_events(&graph_store, &filter, 10);
3324        assert_eq!(events.len(), 1);
3325        assert_eq!(events[0].id, newer.id);
3326    }
3327
3328    #[test]
3329    fn test_query_events_by_hashtag_uses_tag_index() {
3330        let _guard = test_lock();
3331        let tmp = TempDir::new().unwrap();
3332        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3333        let keys = Keys::generate();
3334        let other_keys = Keys::generate();
3335
3336        let first = EventBuilder::new(
3337            Kind::TextNote,
3338            "first",
3339            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3340        )
3341        .custom_created_at(Timestamp::from_secs(5))
3342        .to_event(&keys)
3343        .unwrap();
3344        let second = EventBuilder::new(
3345            Kind::TextNote,
3346            "second",
3347            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3348        )
3349        .custom_created_at(Timestamp::from_secs(6))
3350        .to_event(&other_keys)
3351        .unwrap();
3352        let unrelated = EventBuilder::new(
3353            Kind::TextNote,
3354            "third",
3355            vec![Tag::parse(&["t", "other"]).unwrap()],
3356        )
3357        .custom_created_at(Timestamp::from_secs(7))
3358        .to_event(&other_keys)
3359        .unwrap();
3360
3361        ingest_parsed_event(&graph_store, &first).unwrap();
3362        ingest_parsed_event(&graph_store, &second).unwrap();
3363        ingest_parsed_event(&graph_store, &unrelated).unwrap();
3364
3365        let filter = Filter::new().hashtag("hashtree");
3366        let events = query_events(&graph_store, &filter, 10);
3367        assert_eq!(events.len(), 2);
3368        assert_eq!(events[0].id, second.id);
3369        assert_eq!(events[1].id, first.id);
3370    }
3371
3372    #[test]
3373    fn test_query_events_combines_indexes_then_applies_search_filter() {
3374        let _guard = test_lock();
3375        let tmp = TempDir::new().unwrap();
3376        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3377        let keys = Keys::generate();
3378        let other_keys = Keys::generate();
3379
3380        let matching = EventBuilder::new(
3381            Kind::TextNote,
3382            "hashtree video release",
3383            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3384        )
3385        .custom_created_at(Timestamp::from_secs(5))
3386        .to_event(&keys)
3387        .unwrap();
3388        let non_matching = EventBuilder::new(
3389            Kind::TextNote,
3390            "plain text note",
3391            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3392        )
3393        .custom_created_at(Timestamp::from_secs(6))
3394        .to_event(&other_keys)
3395        .unwrap();
3396
3397        ingest_parsed_event(&graph_store, &matching).unwrap();
3398        ingest_parsed_event(&graph_store, &non_matching).unwrap();
3399
3400        let filter = Filter::new().hashtag("hashtree").search("video");
3401        let events = query_events(&graph_store, &filter, 10);
3402        assert_eq!(events.len(), 1);
3403        assert_eq!(events[0].id, matching.id);
3404    }
3405
3406    fn benchmark_dataset_path() -> Option<PathBuf> {
3407        std::env::var_os("HASHTREE_BENCH_DATASET_PATH").map(PathBuf::from)
3408    }
3409
3410    fn benchmark_dataset_url() -> String {
3411        std::env::var("HASHTREE_BENCH_DATASET_URL")
3412            .ok()
3413            .filter(|value| !value.is_empty())
3414            .unwrap_or_else(|| WELLORDER_FIXTURE_URL.to_string())
3415    }
3416
3417    fn benchmark_stream_warmup_events(measured_events: usize) -> usize {
3418        std::env::var("HASHTREE_BENCH_WARMUP_EVENTS")
3419            .ok()
3420            .and_then(|value| value.parse::<usize>().ok())
3421            .unwrap_or_else(|| measured_events.clamp(1, 200))
3422    }
3423
3424    fn ensure_benchmark_dataset(path: &Path, url: &str) -> Result<()> {
3425        if path.exists() {
3426            return Ok(());
3427        }
3428
3429        let parent = path
3430            .parent()
3431            .context("benchmark dataset path has no parent directory")?;
3432        fs::create_dir_all(parent).context("create benchmark dataset directory")?;
3433
3434        let tmp = path.with_extension("tmp");
3435        let mut response = reqwest::blocking::get(url)
3436            .context("download benchmark dataset")?
3437            .error_for_status()
3438            .context("benchmark dataset request failed")?;
3439        let mut file = File::create(&tmp).context("create temporary benchmark dataset file")?;
3440        std::io::copy(&mut response, &mut file).context("write benchmark dataset")?;
3441        fs::rename(&tmp, path).context("move benchmark dataset into place")?;
3442
3443        Ok(())
3444    }
3445
3446    fn load_benchmark_dataset(path: &Path, max_events: usize) -> Result<Vec<Event>> {
3447        if max_events == 0 {
3448            return Ok(Vec::new());
3449        }
3450
3451        let mut child = Command::new("bzip2")
3452            .args(["-dc", &path.to_string_lossy()])
3453            .stdout(Stdio::piped())
3454            .spawn()
3455            .context("spawn bzip2 for benchmark dataset")?;
3456        let stdout = child
3457            .stdout
3458            .take()
3459            .context("benchmark dataset stdout missing")?;
3460        let mut events = Vec::with_capacity(max_events);
3461
3462        {
3463            let reader = BufReader::new(stdout);
3464            for line in reader.lines() {
3465                if events.len() >= max_events {
3466                    break;
3467                }
3468                let line = line.context("read benchmark dataset line")?;
3469                let trimmed = line.trim();
3470                if trimmed.is_empty() {
3471                    continue;
3472                }
3473                if let Ok(event) = Event::from_json(trimmed.to_string()) {
3474                    events.push(event);
3475                }
3476            }
3477        }
3478
3479        if events.len() < max_events {
3480            let status = child.wait().context("wait for benchmark dataset reader")?;
3481            anyhow::ensure!(
3482                status.success(),
3483                "benchmark dataset reader exited with status {status}"
3484            );
3485        } else {
3486            let _ = child.kill();
3487            let _ = child.wait();
3488        }
3489
3490        Ok(events)
3491    }
3492
3493    fn build_synthetic_benchmark_events(event_count: usize, author_count: usize) -> Vec<Event> {
3494        let authors = (0..author_count)
3495            .map(|_| Keys::generate())
3496            .collect::<Vec<_>>();
3497        let mut events = Vec::with_capacity(event_count);
3498        for i in 0..event_count {
3499            let kind = if i % 8 < 5 {
3500                Kind::TextNote
3501            } else {
3502                Kind::Custom(30_023)
3503            };
3504            let mut tags = Vec::new();
3505            if kind == Kind::TextNote && i % 16 == 0 {
3506                tags.push(Tag::parse(&["t", "hashtree"]).unwrap());
3507            }
3508            let content = if kind == Kind::TextNote && i % 32 == 0 {
3509                format!("benchmark target event {i}")
3510            } else {
3511                format!("benchmark event {i}")
3512            };
3513            let event = EventBuilder::new(kind, content, tags)
3514                .custom_created_at(Timestamp::from_secs(1_700_000_000 + i as u64))
3515                .to_event(&authors[i % author_count])
3516                .unwrap();
3517            events.push(event);
3518        }
3519        events
3520    }
3521
3522    fn load_benchmark_events(
3523        event_count: usize,
3524        author_count: usize,
3525    ) -> Result<(String, Vec<Event>)> {
3526        if let Some(path) = benchmark_dataset_path() {
3527            let url = benchmark_dataset_url();
3528            ensure_benchmark_dataset(&path, &url)?;
3529            let events = load_benchmark_dataset(&path, event_count)?;
3530            return Ok((format!("dataset:{}", path.display()), events));
3531        }
3532
3533        Ok((
3534            format!("synthetic:{author_count}-authors"),
3535            build_synthetic_benchmark_events(event_count, author_count),
3536        ))
3537    }
3538
3539    fn first_tag_filter(event: &Event) -> Option<Filter> {
3540        event.tags.iter().find_map(|tag| match tag.as_slice() {
3541            [name, value, ..]
3542                if name.len() == 1
3543                    && !value.is_empty()
3544                    && name.as_bytes()[0].is_ascii_lowercase() =>
3545            {
3546                let letter = SingleLetterTag::from_char(name.chars().next()?).ok()?;
3547                Some(Filter::new().custom_tag(letter, [value.to_string()]))
3548            }
3549            _ => None,
3550        })
3551    }
3552
3553    fn first_search_term(event: &Event) -> Option<String> {
3554        event
3555            .content
3556            .split(|ch: char| !ch.is_alphanumeric())
3557            .find(|token| token.len() >= 4)
3558            .map(|token| token.to_ascii_lowercase())
3559    }
3560
3561    fn benchmark_match_count(events: &[Event], filter: &Filter, limit: usize) -> usize {
3562        events
3563            .iter()
3564            .filter(|event| filter.match_event(event))
3565            .count()
3566            .min(limit)
3567    }
3568
3569    fn benchmark_btree_orders() -> Vec<usize> {
3570        std::env::var("HASHTREE_BTREE_ORDERS")
3571            .ok()
3572            .map(|value| {
3573                value
3574                    .split(',')
3575                    .filter_map(|part| part.trim().parse::<usize>().ok())
3576                    .filter(|order| *order >= 2)
3577                    .collect::<Vec<_>>()
3578            })
3579            .filter(|orders| !orders.is_empty())
3580            .unwrap_or_else(|| vec![16, 24, 32, 48, 64])
3581    }
3582
3583    fn benchmark_read_iterations() -> usize {
3584        std::env::var("HASHTREE_BENCH_READ_ITERATIONS")
3585            .ok()
3586            .and_then(|value| value.parse::<usize>().ok())
3587            .unwrap_or(5)
3588            .max(1)
3589    }
3590
3591    fn average_duration(samples: &[Duration]) -> Duration {
3592        if samples.is_empty() {
3593            return Duration::ZERO;
3594        }
3595
3596        Duration::from_secs_f64(
3597            samples.iter().map(Duration::as_secs_f64).sum::<f64>() / samples.len() as f64,
3598        )
3599    }
3600
3601    fn average_read_trace(samples: &[ReadTraceSnapshot]) -> ReadTraceSnapshot {
3602        if samples.is_empty() {
3603            return ReadTraceSnapshot::default();
3604        }
3605
3606        let len = samples.len() as u64;
3607        ReadTraceSnapshot {
3608            get_calls: samples.iter().map(|sample| sample.get_calls).sum::<u64>() / len,
3609            total_bytes: samples.iter().map(|sample| sample.total_bytes).sum::<u64>() / len,
3610            unique_blocks: (samples
3611                .iter()
3612                .map(|sample| sample.unique_blocks as u64)
3613                .sum::<u64>()
3614                / len) as usize,
3615            unique_bytes: samples
3616                .iter()
3617                .map(|sample| sample.unique_bytes)
3618                .sum::<u64>()
3619                / len,
3620            cache_hits: samples.iter().map(|sample| sample.cache_hits).sum::<u64>() / len,
3621            remote_fetches: samples
3622                .iter()
3623                .map(|sample| sample.remote_fetches)
3624                .sum::<u64>()
3625                / len,
3626            remote_bytes: samples
3627                .iter()
3628                .map(|sample| sample.remote_bytes)
3629                .sum::<u64>()
3630                / len,
3631        }
3632    }
3633
3634    fn estimate_serialized_remote_ms(snapshot: &ReadTraceSnapshot, model: NetworkModel) -> f64 {
3635        let transfer_ms = if model.bandwidth_mib_per_s <= 0.0 {
3636            0.0
3637        } else {
3638            (snapshot.remote_bytes as f64 / (model.bandwidth_mib_per_s * 1024.0 * 1024.0)) * 1000.0
3639        };
3640        snapshot.remote_fetches as f64 * model.rtt_ms + transfer_ms
3641    }
3642
3643    #[derive(Debug, Clone)]
3644    struct IndexBenchmarkDataset {
3645        source: String,
3646        events: Vec<Event>,
3647        guaranteed_tag_name: String,
3648        guaranteed_tag_value: String,
3649        replaceable_pubkey: String,
3650        replaceable_kind: u32,
3651        parameterized_pubkey: String,
3652        parameterized_kind: u32,
3653        parameterized_d_tag: String,
3654    }
3655
3656    fn load_index_benchmark_dataset(
3657        event_count: usize,
3658        author_count: usize,
3659    ) -> Result<IndexBenchmarkDataset> {
3660        let (source, mut events) = load_benchmark_events(event_count, author_count)?;
3661        let base_timestamp = events
3662            .iter()
3663            .map(|event| event.created_at.as_u64())
3664            .max()
3665            .unwrap_or(1_700_000_000)
3666            + 1;
3667
3668        let replaceable_keys = Keys::generate();
3669        let parameterized_keys = Keys::generate();
3670        let tagged_keys = Keys::generate();
3671        let guaranteed_tag_name = "t".to_string();
3672        let guaranteed_tag_value = "btreebench".to_string();
3673        let replaceable_kind = 10_000u32;
3674        let parameterized_kind = 30_023u32;
3675        let parameterized_d_tag = "btree-bench".to_string();
3676
3677        let tagged = EventBuilder::new(
3678            Kind::TextNote,
3679            "btree benchmark tagged note",
3680            vec![Tag::parse(&["t", &guaranteed_tag_value]).unwrap()],
3681        )
3682        .custom_created_at(Timestamp::from_secs(base_timestamp))
3683        .to_event(&tagged_keys)
3684        .unwrap();
3685        let replaceable_old = EventBuilder::new(
3686            Kind::Custom(replaceable_kind.try_into().unwrap()),
3687            "replaceable old",
3688            [],
3689        )
3690        .custom_created_at(Timestamp::from_secs(base_timestamp + 1))
3691        .to_event(&replaceable_keys)
3692        .unwrap();
3693        let replaceable_new = EventBuilder::new(
3694            Kind::Custom(replaceable_kind.try_into().unwrap()),
3695            "replaceable new",
3696            [],
3697        )
3698        .custom_created_at(Timestamp::from_secs(base_timestamp + 2))
3699        .to_event(&replaceable_keys)
3700        .unwrap();
3701        let parameterized_old = EventBuilder::new(
3702            Kind::Custom(parameterized_kind.try_into().unwrap()),
3703            "",
3704            vec![Tag::identifier(&parameterized_d_tag)],
3705        )
3706        .custom_created_at(Timestamp::from_secs(base_timestamp + 3))
3707        .to_event(&parameterized_keys)
3708        .unwrap();
3709        let parameterized_new = EventBuilder::new(
3710            Kind::Custom(parameterized_kind.try_into().unwrap()),
3711            "",
3712            vec![Tag::identifier(&parameterized_d_tag)],
3713        )
3714        .custom_created_at(Timestamp::from_secs(base_timestamp + 4))
3715        .to_event(&parameterized_keys)
3716        .unwrap();
3717
3718        events.extend([
3719            tagged,
3720            replaceable_old,
3721            replaceable_new,
3722            parameterized_old,
3723            parameterized_new,
3724        ]);
3725
3726        Ok(IndexBenchmarkDataset {
3727            source,
3728            events,
3729            guaranteed_tag_name,
3730            guaranteed_tag_value,
3731            replaceable_pubkey: replaceable_keys.public_key().to_hex(),
3732            replaceable_kind,
3733            parameterized_pubkey: parameterized_keys.public_key().to_hex(),
3734            parameterized_kind,
3735            parameterized_d_tag,
3736        })
3737    }
3738
3739    fn build_btree_query_cases(dataset: &IndexBenchmarkDataset) -> Vec<BenchmarkQueryCase> {
3740        let primary_kind = dataset
3741            .events
3742            .iter()
3743            .find(|event| event.kind == Kind::TextNote)
3744            .map(|event| event.kind)
3745            .or_else(|| dataset.events.first().map(|event| event.kind))
3746            .expect("benchmark requires at least one event");
3747        let primary_kind_u32 = primary_kind.as_u16() as u32;
3748
3749        let author_pubkey = dataset
3750            .events
3751            .iter()
3752            .filter(|event| event.kind == primary_kind)
3753            .fold(HashMap::<String, usize>::new(), |mut counts, event| {
3754                *counts.entry(event.pubkey.to_hex()).or_default() += 1;
3755                counts
3756            })
3757            .into_iter()
3758            .max_by_key(|(_, count)| *count)
3759            .map(|(pubkey, _)| pubkey)
3760            .expect("benchmark requires an author for the selected kind");
3761
3762        let by_id_id = dataset.events[dataset.events.len() / 2].id.to_hex();
3763
3764        vec![
3765            BenchmarkQueryCase::ById { id: by_id_id },
3766            BenchmarkQueryCase::ByAuthor {
3767                pubkey: author_pubkey.clone(),
3768                limit: 50,
3769            },
3770            BenchmarkQueryCase::ByAuthorKind {
3771                pubkey: author_pubkey,
3772                kind: primary_kind_u32,
3773                limit: 50,
3774            },
3775            BenchmarkQueryCase::ByKind {
3776                kind: primary_kind_u32,
3777                limit: 200,
3778            },
3779            BenchmarkQueryCase::ByTag {
3780                tag_name: dataset.guaranteed_tag_name.clone(),
3781                tag_value: dataset.guaranteed_tag_value.clone(),
3782                limit: 100,
3783            },
3784            BenchmarkQueryCase::Recent { limit: 100 },
3785            BenchmarkQueryCase::Replaceable {
3786                pubkey: dataset.replaceable_pubkey.clone(),
3787                kind: dataset.replaceable_kind,
3788            },
3789            BenchmarkQueryCase::ParameterizedReplaceable {
3790                pubkey: dataset.parameterized_pubkey.clone(),
3791                kind: dataset.parameterized_kind,
3792                d_tag: dataset.parameterized_d_tag.clone(),
3793            },
3794        ]
3795    }
3796
3797    fn benchmark_warm_query_case<S: Store + 'static>(
3798        base: Arc<S>,
3799        root: &Cid,
3800        order: usize,
3801        case: &BenchmarkQueryCase,
3802        iterations: usize,
3803    ) -> QueryBenchmarkResult {
3804        let trace_store = Arc::new(CountingStore::new(base));
3805        let event_store = NostrEventStore::with_options(
3806            Arc::clone(&trace_store),
3807            NostrEventStoreOptions {
3808                btree_order: Some(order),
3809            },
3810        );
3811        let mut durations = Vec::with_capacity(iterations);
3812        let mut traces = Vec::with_capacity(iterations);
3813        for _ in 0..iterations {
3814            trace_store.reset();
3815            let started = Instant::now();
3816            let matches = block_on(case.execute(&event_store, root)).unwrap();
3817            durations.push(started.elapsed());
3818            traces.push(trace_store.snapshot());
3819            assert!(
3820                matches > 0,
3821                "benchmark query {} returned no matches",
3822                case.name()
3823            );
3824        }
3825        let mut sorted = durations.clone();
3826        sorted.sort_unstable();
3827        QueryBenchmarkResult {
3828            average_duration: average_duration(&durations),
3829            p95_duration: duration_percentile(&sorted, 95, 100),
3830            reads: average_read_trace(&traces),
3831        }
3832    }
3833
3834    fn benchmark_cold_query_case<S: Store + 'static>(
3835        remote: Arc<S>,
3836        root: &Cid,
3837        order: usize,
3838        case: &BenchmarkQueryCase,
3839        iterations: usize,
3840    ) -> QueryBenchmarkResult {
3841        let mut durations = Vec::with_capacity(iterations);
3842        let mut traces = Vec::with_capacity(iterations);
3843        for _ in 0..iterations {
3844            let cache = Arc::new(MemoryStore::new());
3845            let trace_store = Arc::new(ReadThroughStore::new(cache, Arc::clone(&remote)));
3846            let event_store = NostrEventStore::with_options(
3847                Arc::clone(&trace_store),
3848                NostrEventStoreOptions {
3849                    btree_order: Some(order),
3850                },
3851            );
3852            let started = Instant::now();
3853            let matches = block_on(case.execute(&event_store, root)).unwrap();
3854            durations.push(started.elapsed());
3855            traces.push(trace_store.snapshot());
3856            assert!(
3857                matches > 0,
3858                "benchmark query {} returned no matches",
3859                case.name()
3860            );
3861        }
3862        let mut sorted = durations.clone();
3863        sorted.sort_unstable();
3864        QueryBenchmarkResult {
3865            average_duration: average_duration(&durations),
3866            p95_duration: duration_percentile(&sorted, 95, 100),
3867            reads: average_read_trace(&traces),
3868        }
3869    }
3870
3871    fn duration_percentile(
3872        sorted: &[std::time::Duration],
3873        numerator: usize,
3874        denominator: usize,
3875    ) -> std::time::Duration {
3876        if sorted.is_empty() {
3877            return std::time::Duration::ZERO;
3878        }
3879        let index = ((sorted.len() - 1) * numerator) / denominator;
3880        sorted[index]
3881    }
3882
3883    #[test]
3884    #[ignore = "benchmark"]
3885    fn benchmark_query_events_large_dataset() {
3886        let _guard = test_lock();
3887        let tmp = TempDir::new().unwrap();
3888        let graph_store =
3889            open_social_graph_store_with_mapsize(tmp.path(), Some(512 * 1024 * 1024)).unwrap();
3890        set_nostr_profile_enabled(true);
3891        reset_nostr_profile();
3892
3893        let author_count = 64usize;
3894        let measured_event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3895            .ok()
3896            .and_then(|value| value.parse::<usize>().ok())
3897            .unwrap_or(600usize);
3898        let warmup_event_count = benchmark_stream_warmup_events(measured_event_count);
3899        let total_event_count = warmup_event_count + measured_event_count;
3900        let (source, events) = load_benchmark_events(total_event_count, author_count).unwrap();
3901        let loaded_event_count = events.len();
3902        let warmup_event_count = warmup_event_count.min(loaded_event_count.saturating_sub(1));
3903        let (warmup_events, measured_events) = events.split_at(warmup_event_count);
3904
3905        println!(
3906            "starting steady-state dataset benchmark with {} warmup events and {} measured stream events from {}",
3907            warmup_events.len(),
3908            measured_events.len(),
3909            source
3910        );
3911        if !warmup_events.is_empty() {
3912            ingest_parsed_events(&graph_store, warmup_events).unwrap();
3913        }
3914
3915        let stream_start = Instant::now();
3916        let mut per_event_latencies = Vec::with_capacity(measured_events.len());
3917        for event in measured_events {
3918            let event_start = Instant::now();
3919            ingest_parsed_event(&graph_store, event).unwrap();
3920            per_event_latencies.push(event_start.elapsed());
3921        }
3922        let ingest_duration = stream_start.elapsed();
3923
3924        let mut sorted_latencies = per_event_latencies.clone();
3925        sorted_latencies.sort_unstable();
3926        let average_latency = if per_event_latencies.is_empty() {
3927            std::time::Duration::ZERO
3928        } else {
3929            std::time::Duration::from_secs_f64(
3930                per_event_latencies
3931                    .iter()
3932                    .map(std::time::Duration::as_secs_f64)
3933                    .sum::<f64>()
3934                    / per_event_latencies.len() as f64,
3935            )
3936        };
3937        let ingest_capacity_eps = if ingest_duration.is_zero() {
3938            f64::INFINITY
3939        } else {
3940            measured_events.len() as f64 / ingest_duration.as_secs_f64()
3941        };
3942        println!(
3943            "benchmark steady-state ingest complete in {:?} (avg={:?} p50={:?} p95={:?} p99={:?} capacity={:.2} events/s)",
3944            ingest_duration,
3945            average_latency,
3946            duration_percentile(&sorted_latencies, 50, 100),
3947            duration_percentile(&sorted_latencies, 95, 100),
3948            duration_percentile(&sorted_latencies, 99, 100),
3949            ingest_capacity_eps
3950        );
3951        let mut profile = take_nostr_profile();
3952        profile.sort_by(|left, right| right.total.cmp(&left.total));
3953        for stat in profile {
3954            let pct = if ingest_duration.is_zero() {
3955                0.0
3956            } else {
3957                (stat.total.as_secs_f64() / ingest_duration.as_secs_f64()) * 100.0
3958            };
3959            let average = if stat.count == 0 {
3960                std::time::Duration::ZERO
3961            } else {
3962                std::time::Duration::from_secs_f64(stat.total.as_secs_f64() / stat.count as f64)
3963            };
3964            println!(
3965                "ingest profile: label={} total={:?} pct={:.1}% count={} avg={:?} max={:?}",
3966                stat.label, stat.total, pct, stat.count, average, stat.max
3967            );
3968        }
3969        set_nostr_profile_enabled(false);
3970
3971        let kind = events
3972            .iter()
3973            .find(|event| event.kind == Kind::TextNote)
3974            .map(|event| event.kind)
3975            .or_else(|| events.first().map(|event| event.kind))
3976            .expect("benchmark requires at least one event");
3977        let kind_filter = Filter::new().kind(kind);
3978        let kind_start = Instant::now();
3979        let kind_events = query_events(&graph_store, &kind_filter, 200);
3980        let kind_duration = kind_start.elapsed();
3981        assert_eq!(
3982            kind_events.len(),
3983            benchmark_match_count(&events, &kind_filter, 200)
3984        );
3985        assert!(kind_events
3986            .windows(2)
3987            .all(|window| window[0].created_at >= window[1].created_at));
3988
3989        let author_pubkey = events
3990            .iter()
3991            .find(|event| event.kind == kind)
3992            .map(|event| event.pubkey)
3993            .expect("benchmark requires an author for the selected kind");
3994        let author_filter = Filter::new().author(author_pubkey).kind(kind);
3995        let author_start = Instant::now();
3996        let author_events = query_events(&graph_store, &author_filter, 50);
3997        let author_duration = author_start.elapsed();
3998        assert_eq!(
3999            author_events.len(),
4000            benchmark_match_count(&events, &author_filter, 50)
4001        );
4002
4003        let tag_filter = events
4004            .iter()
4005            .find_map(first_tag_filter)
4006            .expect("benchmark requires at least one tagged event");
4007        let tag_start = Instant::now();
4008        let tag_events = query_events(&graph_store, &tag_filter, 100);
4009        let tag_duration = tag_start.elapsed();
4010        assert_eq!(
4011            tag_events.len(),
4012            benchmark_match_count(&events, &tag_filter, 100)
4013        );
4014
4015        let search_source = events
4016            .iter()
4017            .find_map(|event| first_search_term(event).map(|term| (event.kind, term)))
4018            .expect("benchmark requires at least one searchable event");
4019        let search_filter = Filter::new().kind(search_source.0).search(search_source.1);
4020        let search_start = Instant::now();
4021        let search_events = query_events(&graph_store, &search_filter, 100);
4022        let search_duration = search_start.elapsed();
4023        assert_eq!(
4024            search_events.len(),
4025            benchmark_match_count(&events, &search_filter, 100)
4026        );
4027
4028        println!(
4029            "steady-state benchmark: source={} warmup_events={} stream_events={} ingest={:?} avg={:?} p50={:?} p95={:?} p99={:?} capacity_eps={:.2} kind={:?} author={:?} tag={:?} search={:?}",
4030            source,
4031            warmup_events.len(),
4032            measured_events.len(),
4033            ingest_duration,
4034            average_latency,
4035            duration_percentile(&sorted_latencies, 50, 100),
4036            duration_percentile(&sorted_latencies, 95, 100),
4037            duration_percentile(&sorted_latencies, 99, 100),
4038            ingest_capacity_eps,
4039            kind_duration,
4040            author_duration,
4041            tag_duration,
4042            search_duration
4043        );
4044    }
4045
4046    #[test]
4047    #[ignore = "benchmark"]
4048    fn benchmark_nostr_btree_query_tradeoffs() {
4049        let _guard = test_lock();
4050        let event_count = std::env::var("HASHTREE_BENCH_EVENTS")
4051            .ok()
4052            .and_then(|value| value.parse::<usize>().ok())
4053            .unwrap_or(2_000usize);
4054        let iterations = benchmark_read_iterations();
4055        let orders = benchmark_btree_orders();
4056        let dataset = load_index_benchmark_dataset(event_count, 64).unwrap();
4057        let cases = build_btree_query_cases(&dataset);
4058        let stored_events = dataset
4059            .events
4060            .iter()
4061            .map(stored_event_from_nostr)
4062            .collect::<Vec<_>>();
4063
4064        println!(
4065            "btree-order benchmark: source={} events={} iterations={} orders={:?}",
4066            dataset.source,
4067            stored_events.len(),
4068            iterations,
4069            orders
4070        );
4071        println!(
4072            "network models are serialized fetch estimates: {}",
4073            NETWORK_MODELS
4074                .iter()
4075                .map(|model| format!(
4076                    "{}={}ms_rtt/{}MiBps",
4077                    model.name, model.rtt_ms, model.bandwidth_mib_per_s
4078                ))
4079                .collect::<Vec<_>>()
4080                .join(", ")
4081        );
4082
4083        for order in orders {
4084            let tmp = TempDir::new().unwrap();
4085            let local_store =
4086                Arc::new(LocalStore::new(tmp.path().join("blobs"), &StorageBackend::Lmdb).unwrap());
4087            let event_store = NostrEventStore::with_options(
4088                Arc::clone(&local_store),
4089                NostrEventStoreOptions {
4090                    btree_order: Some(order),
4091                },
4092            );
4093            let root = block_on(event_store.build(None, stored_events.clone()))
4094                .unwrap()
4095                .expect("benchmark build root");
4096
4097            println!("btree-order={} root={}", order, hex::encode(root.hash));
4098            let mut warm_total_ms = 0.0f64;
4099            let mut model_totals = NETWORK_MODELS
4100                .iter()
4101                .map(|model| (model.name, 0.0f64))
4102                .collect::<HashMap<_, _>>();
4103
4104            for case in &cases {
4105                let warm = benchmark_warm_query_case(
4106                    Arc::clone(&local_store),
4107                    &root,
4108                    order,
4109                    case,
4110                    iterations,
4111                );
4112                let cold = benchmark_cold_query_case(
4113                    Arc::clone(&local_store),
4114                    &root,
4115                    order,
4116                    case,
4117                    iterations,
4118                );
4119                warm_total_ms += warm.average_duration.as_secs_f64() * 1000.0;
4120
4121                let model_estimates = NETWORK_MODELS
4122                    .iter()
4123                    .map(|model| {
4124                        let estimate = estimate_serialized_remote_ms(&cold.reads, *model);
4125                        *model_totals.get_mut(model.name).unwrap() += estimate;
4126                        format!("{}={:.2}ms", model.name, estimate)
4127                    })
4128                    .collect::<Vec<_>>()
4129                    .join(" ");
4130
4131                println!(
4132                    "btree-order={} query={} warm_avg={:?} warm_p95={:?} warm_blocks={} warm_unique_bytes={} cold_fetches={} cold_bytes={} cold_local_avg={:?} {}",
4133                    order,
4134                    case.name(),
4135                    warm.average_duration,
4136                    warm.p95_duration,
4137                    warm.reads.unique_blocks,
4138                    warm.reads.unique_bytes,
4139                    cold.reads.remote_fetches,
4140                    cold.reads.remote_bytes,
4141                    cold.average_duration,
4142                    model_estimates
4143                );
4144            }
4145
4146            println!(
4147                "btree-order={} summary unweighted_warm_avg_ms={:.3} {}",
4148                order,
4149                warm_total_ms / cases.len() as f64,
4150                NETWORK_MODELS
4151                    .iter()
4152                    .map(|model| format!(
4153                        "{}={:.2}ms",
4154                        model.name,
4155                        model_totals[model.name] / cases.len() as f64
4156                    ))
4157                    .collect::<Vec<_>>()
4158                    .join(" ")
4159            );
4160        }
4161    }
4162
4163    #[test]
4164    fn test_ensure_social_graph_mapsize_rounds_and_applies() {
4165        let _guard = test_lock();
4166        let tmp = TempDir::new().unwrap();
4167        ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
4168        let requested = 70 * 1024 * 1024;
4169        ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
4170        let env = unsafe {
4171            heed::EnvOpenOptions::new()
4172                .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
4173                .max_dbs(SOCIALGRAPH_MAX_DBS)
4174                .open(tmp.path())
4175        }
4176        .unwrap();
4177        assert!(env.info().map_size >= requested as usize);
4178        assert_eq!(env.info().map_size % page_size_bytes(), 0);
4179    }
4180
4181    #[test]
4182    fn test_ingest_events_batches_graph_updates() {
4183        let _guard = test_lock();
4184        let tmp = TempDir::new().unwrap();
4185        let graph_store = open_social_graph_store(tmp.path()).unwrap();
4186
4187        let root_keys = Keys::generate();
4188        let alice_keys = Keys::generate();
4189        let bob_keys = Keys::generate();
4190
4191        let root_pk = root_keys.public_key().to_bytes();
4192        set_social_graph_root(&graph_store, &root_pk);
4193
4194        let root_follows_alice = EventBuilder::new(
4195            Kind::ContactList,
4196            "",
4197            vec![Tag::public_key(alice_keys.public_key())],
4198        )
4199        .custom_created_at(Timestamp::from_secs(10))
4200        .to_event(&root_keys)
4201        .unwrap();
4202        let alice_follows_bob = EventBuilder::new(
4203            Kind::ContactList,
4204            "",
4205            vec![Tag::public_key(bob_keys.public_key())],
4206        )
4207        .custom_created_at(Timestamp::from_secs(11))
4208        .to_event(&alice_keys)
4209        .unwrap();
4210
4211        ingest_parsed_events(
4212            &graph_store,
4213            &[root_follows_alice.clone(), alice_follows_bob.clone()],
4214        )
4215        .unwrap();
4216
4217        assert_eq!(
4218            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4219            Some(1)
4220        );
4221        assert_eq!(
4222            get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
4223            Some(2)
4224        );
4225
4226        let filter = Filter::new().kind(Kind::ContactList);
4227        let stored = query_events(&graph_store, &filter, 10);
4228        let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
4229        assert!(ids.contains(&root_follows_alice.id));
4230        assert!(ids.contains(&alice_follows_bob.id));
4231    }
4232
4233    #[test]
4234    fn test_ingest_graph_events_updates_graph_without_indexing_events() {
4235        let _guard = test_lock();
4236        let tmp = TempDir::new().unwrap();
4237        let graph_store = open_social_graph_store(tmp.path()).unwrap();
4238
4239        let root_keys = Keys::generate();
4240        let alice_keys = Keys::generate();
4241
4242        let root_pk = root_keys.public_key().to_bytes();
4243        set_social_graph_root(&graph_store, &root_pk);
4244
4245        let root_follows_alice = EventBuilder::new(
4246            Kind::ContactList,
4247            "",
4248            vec![Tag::public_key(alice_keys.public_key())],
4249        )
4250        .custom_created_at(Timestamp::from_secs(10))
4251        .to_event(&root_keys)
4252        .unwrap();
4253
4254        ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
4255            .unwrap();
4256
4257        assert_eq!(
4258            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4259            Some(1)
4260        );
4261        let filter = Filter::new().kind(Kind::ContactList);
4262        assert!(query_events(&graph_store, &filter, 10).is_empty());
4263    }
4264}