Skip to main content

hashtree_cli/socialgraph/
mod.rs

1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10    LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
21use hashtree_index::BTree;
22use hashtree_nostr::{
23    is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
24    NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
25};
26#[cfg(test)]
27use hashtree_nostr::{
28    reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
29    take_profile as take_nostr_profile,
30};
31use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
32use nostr_social_graph::{
33    BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
34    SocialGraphBackend as NostrSocialGraphBackend,
35};
36use nostr_social_graph_heed::HeedSocialGraph;
37
38use crate::storage::{LocalStore, StorageRouter};
39
40#[cfg(test)]
41use std::sync::{Mutex, MutexGuard, OnceLock};
42#[cfg(test)]
43use std::time::Instant;
44
45pub type UserSet = BTreeSet<[u8; 32]>;
46
47const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
48const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
49const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
50const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
51const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
52const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
53const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
54const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
55const SOCIALGRAPH_MAX_DBS: u32 = 16;
56const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
57const PROFILE_SEARCH_PREFIX: &str = "p:";
58const PROFILE_NAME_MAX_LENGTH: usize = 100;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum EventStorageClass {
62    Public,
63    Ambient,
64}
65
66#[cfg_attr(not(test), allow(dead_code))]
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub(crate) enum EventQueryScope {
69    PublicOnly,
70    AmbientOnly,
71    All,
72}
73
74struct EventIndexBucket {
75    event_store: NostrEventStore<StorageRouter>,
76    root_path: PathBuf,
77}
78
79struct ProfileIndexBucket {
80    tree: HashTree<StorageRouter>,
81    index: BTree<StorageRouter>,
82    by_pubkey_root_path: PathBuf,
83    search_root_path: PathBuf,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct StoredCid {
88    hash: [u8; 32],
89    key: Option<[u8; 32]>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
93pub struct StoredProfileSearchEntry {
94    pub pubkey: String,
95    pub name: String,
96    #[serde(default)]
97    pub aliases: Vec<String>,
98    #[serde(default)]
99    pub nip05: Option<String>,
100    pub created_at: u64,
101    pub event_nhash: String,
102}
103
104#[derive(Debug, Clone, Default, serde::Serialize)]
105pub struct SocialGraphStats {
106    pub total_users: usize,
107    pub root: Option<String>,
108    pub total_follows: usize,
109    pub max_depth: u32,
110    pub size_by_distance: BTreeMap<u32, usize>,
111    pub enabled: bool,
112}
113
114#[derive(Debug, Clone)]
115struct DistanceCache {
116    stats: SocialGraphStats,
117    users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
118}
119
120#[derive(Debug, thiserror::Error)]
121#[error("{0}")]
122pub struct UpstreamGraphBackendError(String);
123
124pub struct SocialGraphStore {
125    graph: StdMutex<HeedSocialGraph>,
126    distance_cache: StdMutex<Option<DistanceCache>>,
127    public_events: EventIndexBucket,
128    ambient_events: EventIndexBucket,
129    profile_index: ProfileIndexBucket,
130    profile_index_overmute_threshold: StdMutex<f64>,
131}
132
133pub trait SocialGraphBackend: Send + Sync {
134    fn stats(&self) -> Result<SocialGraphStats>;
135    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
136    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
137    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
138    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
139    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
140    fn profile_search_root(&self) -> Result<Option<Cid>> {
141        Ok(None)
142    }
143    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
144    fn ingest_event(&self, event: &Event) -> Result<()>;
145    fn ingest_event_with_storage_class(
146        &self,
147        event: &Event,
148        storage_class: EventStorageClass,
149    ) -> Result<()> {
150        let _ = storage_class;
151        self.ingest_event(event)
152    }
153    fn ingest_events(&self, events: &[Event]) -> Result<()> {
154        for event in events {
155            self.ingest_event(event)?;
156        }
157        Ok(())
158    }
159    fn ingest_events_with_storage_class(
160        &self,
161        events: &[Event],
162        storage_class: EventStorageClass,
163    ) -> Result<()> {
164        for event in events {
165            self.ingest_event_with_storage_class(event, storage_class)?;
166        }
167        Ok(())
168    }
169    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
170        self.ingest_events(events)
171    }
172    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
173}
174
175#[cfg(test)]
176pub type TestLockGuard = MutexGuard<'static, ()>;
177
178#[cfg(test)]
179static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
180
181#[cfg(test)]
182pub fn test_lock() -> TestLockGuard {
183    NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
184}
185
186pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
187    open_social_graph_store_with_mapsize(data_dir, None)
188}
189
190pub fn open_social_graph_store_with_mapsize(
191    data_dir: &Path,
192    mapsize_bytes: Option<u64>,
193) -> Result<Arc<SocialGraphStore>> {
194    let db_dir = data_dir.join("socialgraph");
195    open_social_graph_store_at_path(&db_dir, mapsize_bytes)
196}
197
198pub fn open_social_graph_store_with_storage(
199    data_dir: &Path,
200    store: Arc<StorageRouter>,
201    mapsize_bytes: Option<u64>,
202) -> Result<Arc<SocialGraphStore>> {
203    let db_dir = data_dir.join("socialgraph");
204    open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
205}
206
207pub fn open_social_graph_store_at_path(
208    db_dir: &Path,
209    mapsize_bytes: Option<u64>,
210) -> Result<Arc<SocialGraphStore>> {
211    let config = hashtree_config::Config::load_or_default();
212    let backend = &config.storage.backend;
213    let local_store = Arc::new(
214        LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
215            .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
216    );
217    let store = Arc::new(StorageRouter::new(local_store));
218    open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
219}
220
221pub fn open_social_graph_store_at_path_with_storage(
222    db_dir: &Path,
223    store: Arc<StorageRouter>,
224    mapsize_bytes: Option<u64>,
225) -> Result<Arc<SocialGraphStore>> {
226    let ambient_backend = store.local_store().backend();
227    let ambient_local = Arc::new(
228        LocalStore::new_with_lmdb_map_size(
229            db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
230            &ambient_backend,
231            mapsize_bytes,
232        )
233        .map_err(|err| {
234            anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
235        })?,
236    );
237    let ambient_store = Arc::new(StorageRouter::new(ambient_local));
238    open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
239}
240
241pub fn open_social_graph_store_at_path_with_storage_split(
242    db_dir: &Path,
243    public_store: Arc<StorageRouter>,
244    ambient_store: Arc<StorageRouter>,
245    mapsize_bytes: Option<u64>,
246) -> Result<Arc<SocialGraphStore>> {
247    std::fs::create_dir_all(db_dir)?;
248    if let Some(size) = mapsize_bytes {
249        ensure_social_graph_mapsize(db_dir, size)?;
250    }
251    let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
252        .context("open nostr-social-graph heed backend")?;
253
254    Ok(Arc::new(SocialGraphStore {
255        graph: StdMutex::new(graph),
256        distance_cache: StdMutex::new(None),
257        public_events: EventIndexBucket {
258            event_store: NostrEventStore::new(Arc::clone(&public_store)),
259            root_path: db_dir.join(EVENTS_ROOT_FILE),
260        },
261        ambient_events: EventIndexBucket {
262            event_store: NostrEventStore::new(ambient_store),
263            root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
264        },
265        profile_index: ProfileIndexBucket {
266            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
267            index: BTree::new(
268                public_store,
269                hashtree_index::BTreeOptions {
270                    order: Some(PROFILE_SEARCH_INDEX_ORDER),
271                },
272            ),
273            by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
274            search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
275        },
276        profile_index_overmute_threshold: StdMutex::new(1.0),
277    }))
278}
279
280pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
281    if let Err(err) = store.set_root(pk_bytes) {
282        tracing::warn!("Failed to set social graph root: {err}");
283    }
284}
285
286pub fn get_follow_distance(
287    backend: &(impl SocialGraphBackend + ?Sized),
288    pk_bytes: &[u8; 32],
289) -> Option<u32> {
290    backend.follow_distance(pk_bytes).ok().flatten()
291}
292
293pub fn get_follows(
294    backend: &(impl SocialGraphBackend + ?Sized),
295    pk_bytes: &[u8; 32],
296) -> Vec<[u8; 32]> {
297    match backend.followed_targets(pk_bytes) {
298        Ok(set) => set.into_iter().collect(),
299        Err(_) => Vec::new(),
300    }
301}
302
303pub fn is_overmuted(
304    backend: &(impl SocialGraphBackend + ?Sized),
305    _root_pk: &[u8; 32],
306    user_pk: &[u8; 32],
307    threshold: f64,
308) -> bool {
309    backend
310        .is_overmuted_user(user_pk, threshold)
311        .unwrap_or(false)
312}
313
314pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
315    let event = match Event::from_json(event_json) {
316        Ok(event) => event,
317        Err(_) => return,
318    };
319
320    if let Err(err) = backend.ingest_event(&event) {
321        tracing::warn!("Failed to ingest social graph event: {err}");
322    }
323}
324
325pub fn ingest_parsed_event(
326    backend: &(impl SocialGraphBackend + ?Sized),
327    event: &Event,
328) -> Result<()> {
329    backend.ingest_event(event)
330}
331
332pub fn ingest_parsed_event_with_storage_class(
333    backend: &(impl SocialGraphBackend + ?Sized),
334    event: &Event,
335    storage_class: EventStorageClass,
336) -> Result<()> {
337    backend.ingest_event_with_storage_class(event, storage_class)
338}
339
340pub fn ingest_parsed_events(
341    backend: &(impl SocialGraphBackend + ?Sized),
342    events: &[Event],
343) -> Result<()> {
344    backend.ingest_events(events)
345}
346
347pub fn ingest_parsed_events_with_storage_class(
348    backend: &(impl SocialGraphBackend + ?Sized),
349    events: &[Event],
350    storage_class: EventStorageClass,
351) -> Result<()> {
352    backend.ingest_events_with_storage_class(events, storage_class)
353}
354
355pub fn ingest_graph_parsed_events(
356    backend: &(impl SocialGraphBackend + ?Sized),
357    events: &[Event],
358) -> Result<()> {
359    backend.ingest_graph_events(events)
360}
361
362pub fn query_events(
363    backend: &(impl SocialGraphBackend + ?Sized),
364    filter: &Filter,
365    limit: usize,
366) -> Vec<Event> {
367    backend.query_events(filter, limit).unwrap_or_default()
368}
369
370impl SocialGraphStore {
371    pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
372        *self
373            .profile_index_overmute_threshold
374            .lock()
375            .expect("profile index overmute threshold") = threshold;
376    }
377
378    fn profile_index_overmute_threshold(&self) -> f64 {
379        *self
380            .profile_index_overmute_threshold
381            .lock()
382            .expect("profile index overmute threshold")
383    }
384
385    fn invalidate_distance_cache(&self) {
386        *self.distance_cache.lock().unwrap() = None;
387    }
388
389    fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
390        let unique_ids = state
391            .unique_ids
392            .into_iter()
393            .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
394            .collect::<Result<HashMap<_, _>>>()?;
395
396        let mut users_by_distance = BTreeMap::new();
397        let mut size_by_distance = BTreeMap::new();
398        for (distance, users) in state.users_by_follow_distance {
399            let decoded = users
400                .into_iter()
401                .filter_map(|id| unique_ids.get(&id).copied())
402                .collect::<Vec<_>>();
403            size_by_distance.insert(distance, decoded.len());
404            users_by_distance.insert(distance, decoded);
405        }
406
407        let total_follows = state
408            .followed_by_user
409            .iter()
410            .map(|(_, targets)| targets.len())
411            .sum::<usize>();
412        let total_users = size_by_distance.values().copied().sum();
413        let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
414
415        Ok(DistanceCache {
416            stats: SocialGraphStats {
417                total_users,
418                root: Some(state.root),
419                total_follows,
420                max_depth,
421                size_by_distance,
422                enabled: true,
423            },
424            users_by_distance,
425        })
426    }
427
428    fn load_distance_cache(&self) -> Result<DistanceCache> {
429        if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
430            return Ok(cache);
431        }
432
433        let state = {
434            let graph = self.graph.lock().unwrap();
435            graph.export_state().context("export social graph state")?
436        };
437        let cache = Self::build_distance_cache(state)?;
438        *self.distance_cache.lock().unwrap() = Some(cache.clone());
439        Ok(cache)
440    }
441
442    fn set_root(&self, root: &[u8; 32]) -> Result<()> {
443        let root_hex = hex::encode(root);
444        {
445            let mut graph = self.graph.lock().unwrap();
446            if should_replace_placeholder_root(&graph)? {
447                let fresh = SocialGraph::new(&root_hex);
448                graph
449                    .replace_state(&fresh.export_state())
450                    .context("replace placeholder social graph root")?;
451            } else {
452                graph
453                    .set_root(&root_hex)
454                    .context("set nostr-social-graph root")?;
455            }
456        }
457        self.invalidate_distance_cache();
458        Ok(())
459    }
460
461    fn stats(&self) -> Result<SocialGraphStats> {
462        Ok(self.load_distance_cache()?.stats)
463    }
464
465    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
466        let graph = self.graph.lock().unwrap();
467        let distance = graph
468            .get_follow_distance(&hex::encode(pk_bytes))
469            .context("read social graph follow distance")?;
470        Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
471    }
472
473    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
474        Ok(self
475            .load_distance_cache()?
476            .users_by_distance
477            .get(&distance)
478            .cloned()
479            .unwrap_or_default())
480    }
481
482    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
483        let graph = self.graph.lock().unwrap();
484        graph
485            .get_follow_list_created_at(&hex::encode(owner))
486            .context("read social graph follow list timestamp")
487    }
488
489    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
490        let graph = self.graph.lock().unwrap();
491        decode_pubkey_set(
492            graph
493                .get_followed_by_user(&hex::encode(owner))
494                .context("read followed targets")?,
495        )
496    }
497
498    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
499        if threshold <= 0.0 {
500            return Ok(false);
501        }
502        let graph = self.graph.lock().unwrap();
503        graph
504            .is_overmuted(&hex::encode(user_pk), threshold)
505            .context("check social graph overmute")
506    }
507
508    #[cfg_attr(not(test), allow(dead_code))]
509    pub fn profile_search_root(&self) -> Result<Option<Cid>> {
510        self.profile_index.search_root()
511    }
512
513    pub(crate) fn public_events_root(&self) -> Result<Option<Cid>> {
514        self.public_events.events_root()
515    }
516
517    pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
518        self.public_events.write_events_root(root)
519    }
520
521    #[cfg_attr(not(test), allow(dead_code))]
522    pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
523        self.profile_index.profile_event_for_pubkey(pubkey_hex)
524    }
525
526    #[cfg_attr(not(test), allow(dead_code))]
527    pub fn profile_search_entries_for_prefix(
528        &self,
529        prefix: &str,
530    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
531        self.profile_index.search_entries_for_prefix(prefix)
532    }
533
534    pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
535        self.update_profile_index_for_events(events)
536    }
537
538    pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
539        let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
540        let (by_pubkey_root, search_root) = self
541            .profile_index
542            .rebuild_profile_events(latest_by_pubkey.into_values())?;
543        self.profile_index
544            .write_by_pubkey_root(by_pubkey_root.as_ref())?;
545        self.profile_index.write_search_root(search_root.as_ref())?;
546        Ok(())
547    }
548
549    pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
550        let public_events_root = self.public_events.events_root()?;
551        let ambient_events_root = self.ambient_events.events_root()?;
552        if public_events_root.is_none() && ambient_events_root.is_none() {
553            self.profile_index.write_by_pubkey_root(None)?;
554            self.profile_index.write_search_root(None)?;
555            return Ok(0);
556        }
557
558        let mut events = Vec::new();
559        for (bucket, root) in [
560            (&self.public_events, public_events_root),
561            (&self.ambient_events, ambient_events_root),
562        ] {
563            let Some(root) = root else {
564                continue;
565            };
566            let stored = block_on(bucket.event_store.list_by_kind_lossy(
567                Some(&root),
568                Kind::Metadata.as_u16() as u32,
569                ListEventsOptions::default(),
570            ))
571            .map_err(map_event_store_error)?;
572            events.extend(
573                stored
574                    .into_iter()
575                    .map(nostr_event_from_stored)
576                    .collect::<Result<Vec<_>>>()?,
577            );
578        }
579
580        let latest_count = self
581            .filtered_latest_metadata_events_by_pubkey(&events)?
582            .len();
583        self.rebuild_profile_index_for_events(&events)?;
584        Ok(latest_count)
585    }
586
587    fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
588        let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
589        let threshold = self.profile_index_overmute_threshold();
590
591        if latest_by_pubkey.is_empty() {
592            return Ok(());
593        }
594
595        let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
596        let mut search_root = self.profile_index.search_root()?;
597        let mut changed = false;
598
599        for event in latest_by_pubkey.into_values() {
600            let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
601            let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
602                self.profile_index.remove_profile_event(
603                    by_pubkey_root.as_ref(),
604                    search_root.as_ref(),
605                    &event.pubkey.to_hex(),
606                )?
607            } else {
608                self.profile_index.update_profile_event(
609                    by_pubkey_root.as_ref(),
610                    search_root.as_ref(),
611                    event,
612                )?
613            };
614            if updated {
615                by_pubkey_root = next_by_pubkey_root;
616                search_root = next_search_root;
617                changed = true;
618            }
619        }
620
621        if changed {
622            self.profile_index
623                .write_by_pubkey_root(by_pubkey_root.as_ref())?;
624            self.profile_index.write_search_root(search_root.as_ref())?;
625        }
626
627        Ok(())
628    }
629
630    fn filtered_latest_metadata_events_by_pubkey<'a>(
631        &self,
632        events: &'a [Event],
633    ) -> Result<BTreeMap<String, &'a Event>> {
634        let threshold = self.profile_index_overmute_threshold();
635        let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
636        for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
637            if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
638                continue;
639            }
640            let pubkey = event.pubkey.to_hex();
641            match latest_by_pubkey.get(&pubkey) {
642                Some(current) if compare_nostr_events(event, current).is_le() => {}
643                _ => {
644                    latest_by_pubkey.insert(pubkey, event);
645                }
646            }
647        }
648        Ok(latest_by_pubkey)
649    }
650
651    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
652        let state = {
653            let graph = self.graph.lock().unwrap();
654            graph.export_state().context("export social graph state")?
655        };
656        let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
657        let root_hex = hex::encode(root);
658        if graph.get_root() != root_hex {
659            graph
660                .set_root(&root_hex)
661                .context("set snapshot social graph root")?;
662        }
663        let chunks = graph
664            .to_binary_chunks_with_budget(*options)
665            .context("encode social graph snapshot")?;
666        Ok(chunks.into_iter().map(Bytes::from).collect())
667    }
668
669    fn ingest_event(&self, event: &Event) -> Result<()> {
670        self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
671    }
672
673    fn ingest_events(&self, events: &[Event]) -> Result<()> {
674        if events.is_empty() {
675            return Ok(());
676        }
677
678        let mut public = Vec::new();
679        let mut ambient = Vec::new();
680        for event in events {
681            match self.default_storage_class_for(event)? {
682                EventStorageClass::Public => public.push(event.clone()),
683                EventStorageClass::Ambient => ambient.push(event.clone()),
684            }
685        }
686
687        if !public.is_empty() {
688            self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
689        }
690        if !ambient.is_empty() {
691            self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
692        }
693
694        Ok(())
695    }
696
697    fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
698        let graph_events = events
699            .iter()
700            .filter(|event| is_social_graph_event(event.kind))
701            .collect::<Vec<_>>();
702        if graph_events.is_empty() {
703            return Ok(());
704        }
705
706        {
707            let mut graph = self.graph.lock().unwrap();
708            let mut snapshot = SocialGraph::from_state(
709                graph
710                    .export_state()
711                    .context("export social graph state for graph-only ingest")?,
712            )
713            .context("rebuild social graph state for graph-only ingest")?;
714            for event in graph_events {
715                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
716            }
717            graph
718                .replace_state(&snapshot.export_state())
719                .context("replace graph-only social graph state")?;
720        }
721        self.invalidate_distance_cache();
722        Ok(())
723    }
724
725    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
726        self.query_events_in_scope(filter, limit, EventQueryScope::All)
727    }
728
729    fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
730        let graph = self.graph.lock().unwrap();
731        let root_hex = graph.get_root().context("read social graph root")?;
732        if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
733            return Ok(EventStorageClass::Public);
734        }
735        Ok(EventStorageClass::Ambient)
736    }
737
738    fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
739        match storage_class {
740            EventStorageClass::Public => &self.public_events,
741            EventStorageClass::Ambient => &self.ambient_events,
742        }
743    }
744
745    fn ingest_event_with_storage_class(
746        &self,
747        event: &Event,
748        storage_class: EventStorageClass,
749    ) -> Result<()> {
750        let current_root = self.bucket(storage_class).events_root()?;
751        let next_root = self
752            .bucket(storage_class)
753            .store_event(current_root.as_ref(), event)?;
754        self.bucket(storage_class)
755            .write_events_root(Some(&next_root))?;
756
757        self.update_profile_index_for_events(std::slice::from_ref(event))?;
758
759        if is_social_graph_event(event.kind) {
760            {
761                let mut graph = self.graph.lock().unwrap();
762                graph
763                    .handle_event(&graph_event_from_nostr(event), true, 0.0)
764                    .context("ingest social graph event into nostr-social-graph")?;
765            }
766            self.invalidate_distance_cache();
767        }
768
769        Ok(())
770    }
771
772    fn ingest_events_with_storage_class(
773        &self,
774        events: &[Event],
775        storage_class: EventStorageClass,
776    ) -> Result<()> {
777        if events.is_empty() {
778            return Ok(());
779        }
780
781        let bucket = self.bucket(storage_class);
782        let current_root = bucket.events_root()?;
783        let stored_events = events
784            .iter()
785            .map(stored_event_from_nostr)
786            .collect::<Vec<_>>();
787        let next_root = block_on(
788            bucket
789                .event_store
790                .build(current_root.as_ref(), stored_events),
791        )
792        .map_err(map_event_store_error)?;
793        bucket.write_events_root(next_root.as_ref())?;
794
795        self.update_profile_index_for_events(events)?;
796
797        let graph_events = events
798            .iter()
799            .filter(|event| is_social_graph_event(event.kind))
800            .collect::<Vec<_>>();
801        if graph_events.is_empty() {
802            return Ok(());
803        }
804
805        {
806            let mut graph = self.graph.lock().unwrap();
807            let mut snapshot = SocialGraph::from_state(
808                graph
809                    .export_state()
810                    .context("export social graph state for batch ingest")?,
811            )
812            .context("rebuild social graph state for batch ingest")?;
813            for event in graph_events {
814                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
815            }
816            graph
817                .replace_state(&snapshot.export_state())
818                .context("replace batched social graph state")?;
819        }
820        self.invalidate_distance_cache();
821
822        Ok(())
823    }
824
825    pub(crate) fn query_events_in_scope(
826        &self,
827        filter: &Filter,
828        limit: usize,
829        scope: EventQueryScope,
830    ) -> Result<Vec<Event>> {
831        if limit == 0 {
832            return Ok(Vec::new());
833        }
834
835        let buckets: &[&EventIndexBucket] = match scope {
836            EventQueryScope::PublicOnly => &[&self.public_events],
837            EventQueryScope::AmbientOnly => &[&self.ambient_events],
838            EventQueryScope::All => &[&self.public_events, &self.ambient_events],
839        };
840
841        let mut candidates = Vec::new();
842        for bucket in buckets {
843            candidates.extend(bucket.query_events(filter, limit)?);
844        }
845
846        let mut deduped = dedupe_events(candidates);
847        deduped.retain(|event| filter.match_event(event));
848        deduped.truncate(limit);
849        Ok(deduped)
850    }
851}
852
853impl EventIndexBucket {
854    fn events_root(&self) -> Result<Option<Cid>> {
855        let _profile = NostrProfileGuard::new("socialgraph.events_root.read");
856        read_root_file(&self.root_path)
857    }
858
859    fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
860        let _profile = NostrProfileGuard::new("socialgraph.events_root.write");
861        write_root_file(&self.root_path, root)
862    }
863
864    fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
865        let stored = stored_event_from_nostr(event);
866        let _profile = NostrProfileGuard::new("socialgraph.event_store.add");
867        block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
868    }
869
870    fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
871        let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
872            .map_err(map_event_store_error)?;
873        stored.map(nostr_event_from_stored).transpose()
874    }
875
876    fn load_events_for_author(
877        &self,
878        root: &Cid,
879        author: &nostr::PublicKey,
880        filter: &Filter,
881        limit: usize,
882        exact: bool,
883    ) -> Result<Vec<Event>> {
884        let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
885            if kinds.len() == 1 {
886                kinds.iter().next().map(|kind| kind.as_u16() as u32)
887            } else {
888                None
889            }
890        });
891        let author_hex = author.to_hex();
892        let options = filter_list_options(filter, limit, exact);
893        let stored = match kind_filter {
894            Some(kind) => block_on(self.event_store.list_by_author_and_kind(
895                Some(root),
896                &author_hex,
897                kind,
898                options.clone(),
899            ))
900            .map_err(map_event_store_error)?,
901            None => block_on(
902                self.event_store
903                    .list_by_author(Some(root), &author_hex, options),
904            )
905            .map_err(map_event_store_error)?,
906        };
907        stored
908            .into_iter()
909            .map(nostr_event_from_stored)
910            .collect::<Result<Vec<_>>>()
911    }
912
913    fn load_events_for_kind(
914        &self,
915        root: &Cid,
916        kind: Kind,
917        filter: &Filter,
918        limit: usize,
919        exact: bool,
920    ) -> Result<Vec<Event>> {
921        let stored = block_on(self.event_store.list_by_kind(
922            Some(root),
923            kind.as_u16() as u32,
924            filter_list_options(filter, limit, exact),
925        ))
926        .map_err(map_event_store_error)?;
927        stored
928            .into_iter()
929            .map(nostr_event_from_stored)
930            .collect::<Result<Vec<_>>>()
931    }
932
933    fn load_recent_events(
934        &self,
935        root: &Cid,
936        filter: &Filter,
937        limit: usize,
938        exact: bool,
939    ) -> Result<Vec<Event>> {
940        let stored = block_on(
941            self.event_store
942                .list_recent(Some(root), filter_list_options(filter, limit, exact)),
943        )
944        .map_err(map_event_store_error)?;
945        stored
946            .into_iter()
947            .map(nostr_event_from_stored)
948            .collect::<Result<Vec<_>>>()
949    }
950
951    fn load_events_for_tag(
952        &self,
953        root: &Cid,
954        tag_name: &str,
955        values: &[String],
956        filter: &Filter,
957        limit: usize,
958        exact: bool,
959    ) -> Result<Vec<Event>> {
960        let mut events = Vec::new();
961        let options = filter_list_options(filter, limit, exact);
962        for value in values {
963            let stored = block_on(self.event_store.list_by_tag(
964                Some(root),
965                tag_name,
966                value,
967                options.clone(),
968            ))
969            .map_err(map_event_store_error)?;
970            events.extend(
971                stored
972                    .into_iter()
973                    .map(nostr_event_from_stored)
974                    .collect::<Result<Vec<_>>>()?,
975            );
976        }
977        Ok(dedupe_events(events))
978    }
979
980    fn choose_tag_source(&self, filter: &Filter) -> Option<(String, Vec<String>)> {
981        filter
982            .generic_tags
983            .iter()
984            .min_by_key(|(_, values)| values.len())
985            .map(|(tag, values)| {
986                (
987                    tag.as_char().to_ascii_lowercase().to_string(),
988                    values.iter().cloned().collect(),
989                )
990            })
991    }
992
993    fn load_major_index_candidates(
994        &self,
995        root: &Cid,
996        filter: &Filter,
997        limit: usize,
998    ) -> Result<Option<Vec<Event>>> {
999        if let Some(events) = self.load_direct_replaceable_candidates(root, filter)? {
1000            return Ok(Some(events));
1001        }
1002
1003        if let Some((tag_name, values)) = self.choose_tag_source(filter) {
1004            let exact = filter.authors.is_none()
1005                && filter.kinds.is_none()
1006                && filter.search.is_none()
1007                && filter.generic_tags.len() == 1;
1008            return Ok(Some(self.load_events_for_tag(
1009                root, &tag_name, &values, filter, limit, exact,
1010            )?));
1011        }
1012
1013        if let (Some(authors), Some(kinds)) = (filter.authors.as_ref(), filter.kinds.as_ref()) {
1014            if authors.len() == 1 && kinds.len() == 1 {
1015                let author = authors.iter().next().expect("checked single author");
1016                let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1017                return Ok(Some(
1018                    self.load_events_for_author(root, author, filter, limit, exact)?,
1019                ));
1020            }
1021
1022            if kinds.len() < authors.len() {
1023                let mut events = Vec::new();
1024                for kind in kinds {
1025                    events.extend(self.load_events_for_kind(root, *kind, filter, limit, false)?);
1026                }
1027                return Ok(Some(dedupe_events(events)));
1028            }
1029
1030            let mut events = Vec::new();
1031            for author in authors {
1032                events.extend(self.load_events_for_author(root, author, filter, limit, false)?);
1033            }
1034            return Ok(Some(dedupe_events(events)));
1035        }
1036
1037        if let Some(authors) = filter.authors.as_ref() {
1038            let mut events = Vec::new();
1039            let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1040            for author in authors {
1041                events.extend(self.load_events_for_author(root, author, filter, limit, exact)?);
1042            }
1043            return Ok(Some(dedupe_events(events)));
1044        }
1045
1046        if let Some(kinds) = filter.kinds.as_ref() {
1047            let mut events = Vec::new();
1048            let exact = filter.authors.is_none()
1049                && filter.generic_tags.is_empty()
1050                && filter.search.is_none();
1051            for kind in kinds {
1052                events.extend(self.load_events_for_kind(root, *kind, filter, limit, exact)?);
1053            }
1054            return Ok(Some(dedupe_events(events)));
1055        }
1056
1057        Ok(None)
1058    }
1059
1060    fn load_direct_replaceable_candidates(
1061        &self,
1062        root: &Cid,
1063        filter: &Filter,
1064    ) -> Result<Option<Vec<Event>>> {
1065        let Some(authors) = filter.authors.as_ref() else {
1066            return Ok(None);
1067        };
1068        let Some(kinds) = filter.kinds.as_ref() else {
1069            return Ok(None);
1070        };
1071        if kinds.len() != 1 {
1072            return Ok(None);
1073        }
1074
1075        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
1076
1077        if is_parameterized_replaceable_kind(kind) {
1078            let d_tag = SingleLetterTag::lowercase(nostr::Alphabet::D);
1079            let Some(d_values) = filter.generic_tags.get(&d_tag) else {
1080                return Ok(None);
1081            };
1082            let mut events = Vec::new();
1083            for author in authors {
1084                let author_hex = author.to_hex();
1085                for d_value in d_values {
1086                    if let Some(stored) = block_on(self.event_store.get_parameterized_replaceable(
1087                        Some(root),
1088                        &author_hex,
1089                        kind,
1090                        d_value,
1091                    ))
1092                    .map_err(map_event_store_error)?
1093                    {
1094                        events.push(nostr_event_from_stored(stored)?);
1095                    }
1096                }
1097            }
1098            return Ok(Some(dedupe_events(events)));
1099        }
1100
1101        if is_replaceable_kind(kind) {
1102            let mut events = Vec::new();
1103            for author in authors {
1104                if let Some(stored) = block_on(self.event_store.get_replaceable(
1105                    Some(root),
1106                    &author.to_hex(),
1107                    kind,
1108                ))
1109                .map_err(map_event_store_error)?
1110                {
1111                    events.push(nostr_event_from_stored(stored)?);
1112                }
1113            }
1114            return Ok(Some(dedupe_events(events)));
1115        }
1116
1117        Ok(None)
1118    }
1119
1120    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1121        if limit == 0 {
1122            return Ok(Vec::new());
1123        }
1124
1125        let events_root = self.events_root()?;
1126        let Some(root) = events_root.as_ref() else {
1127            return Ok(Vec::new());
1128        };
1129        let mut candidates = Vec::new();
1130        let mut seen: HashSet<[u8; 32]> = HashSet::new();
1131
1132        if let Some(ids) = filter.ids.as_ref() {
1133            for id in ids {
1134                let id_bytes = id.to_bytes();
1135                if !seen.insert(id_bytes) {
1136                    continue;
1137                }
1138                if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
1139                    if filter.match_event(&event) {
1140                        candidates.push(event);
1141                    }
1142                }
1143                if candidates.len() >= limit {
1144                    break;
1145                }
1146            }
1147        } else {
1148            let base_events = match self.load_major_index_candidates(root, filter, limit)? {
1149                Some(events) => events,
1150                None => self.load_recent_events(
1151                    root,
1152                    filter,
1153                    limit,
1154                    filter.authors.is_none()
1155                        && filter.kinds.is_none()
1156                        && filter.generic_tags.is_empty()
1157                        && filter.search.is_none(),
1158                )?,
1159            };
1160
1161            for event in base_events {
1162                let id_bytes = event.id.to_bytes();
1163                if !seen.insert(id_bytes) {
1164                    continue;
1165                }
1166                if filter.match_event(&event) {
1167                    candidates.push(event);
1168                }
1169                if candidates.len() >= limit {
1170                    break;
1171                }
1172            }
1173        }
1174
1175        candidates.sort_by(|a, b| {
1176            b.created_at
1177                .as_u64()
1178                .cmp(&a.created_at.as_u64())
1179                .then_with(|| a.id.cmp(&b.id))
1180        });
1181        candidates.truncate(limit);
1182        Ok(candidates)
1183    }
1184}
1185
1186impl ProfileIndexBucket {
1187    fn by_pubkey_root(&self) -> Result<Option<Cid>> {
1188        let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.read");
1189        read_root_file(&self.by_pubkey_root_path)
1190    }
1191
1192    fn search_root(&self) -> Result<Option<Cid>> {
1193        let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.read");
1194        read_root_file(&self.search_root_path)
1195    }
1196
1197    fn write_by_pubkey_root(&self, root: Option<&Cid>) -> Result<()> {
1198        let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.write");
1199        write_root_file(&self.by_pubkey_root_path, root)
1200    }
1201
1202    fn write_search_root(&self, root: Option<&Cid>) -> Result<()> {
1203        let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.write");
1204        write_root_file(&self.search_root_path, root)
1205    }
1206
1207    fn mirror_profile_event(&self, event: &Event) -> Result<Cid> {
1208        let bytes = event.as_json().into_bytes();
1209        block_on(self.tree.put_file(&bytes))
1210            .map(|(cid, _size)| cid)
1211            .context("store mirrored profile event")
1212    }
1213
1214    fn load_profile_event(&self, cid: &Cid) -> Result<Option<Event>> {
1215        let bytes = block_on(self.tree.get(cid, None)).context("read mirrored profile event")?;
1216        let Some(bytes) = bytes else {
1217            return Ok(None);
1218        };
1219        let json = String::from_utf8(bytes).context("decode mirrored profile event as utf-8")?;
1220        Ok(Some(
1221            Event::from_json(json).context("decode mirrored profile event json")?,
1222        ))
1223    }
1224
1225    #[cfg_attr(not(test), allow(dead_code))]
1226    fn profile_event_for_pubkey(&self, pubkey_hex: &str) -> Result<Option<Event>> {
1227        let root = self.by_pubkey_root()?;
1228        let Some(cid) = block_on(self.index.get_link(root.as_ref(), pubkey_hex))
1229            .context("read mirrored profile event cid by pubkey")?
1230        else {
1231            return Ok(None);
1232        };
1233        self.load_profile_event(&cid)
1234    }
1235
1236    #[cfg_attr(not(test), allow(dead_code))]
1237    fn search_entries_for_prefix(
1238        &self,
1239        prefix: &str,
1240    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
1241        let Some(root) = self.search_root()? else {
1242            return Ok(Vec::new());
1243        };
1244        let entries =
1245            block_on(self.index.prefix(&root, prefix)).context("query profile search prefix")?;
1246        entries
1247            .into_iter()
1248            .map(|(key, value)| {
1249                let entry = serde_json::from_str(&value)
1250                    .context("decode stored profile search entry json")?;
1251                Ok((key, entry))
1252            })
1253            .collect()
1254    }
1255
1256    fn rebuild_profile_events<'a, I>(&self, events: I) -> Result<(Option<Cid>, Option<Cid>)>
1257    where
1258        I: IntoIterator<Item = &'a Event>,
1259    {
1260        let mut by_pubkey_entries = Vec::<(String, Cid)>::new();
1261        let mut search_entries = Vec::<(String, String)>::new();
1262
1263        for event in events {
1264            let pubkey = event.pubkey.to_hex();
1265            let mirrored_cid = self.mirror_profile_event(event)?;
1266            let search_value =
1267                serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1268            by_pubkey_entries.push((pubkey.clone(), mirrored_cid.clone()));
1269            for term in profile_search_terms_for_event(event) {
1270                search_entries.push((
1271                    format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1272                    search_value.clone(),
1273                ));
1274            }
1275        }
1276
1277        let by_pubkey_root = block_on(self.index.build_links(by_pubkey_entries))
1278            .context("bulk build mirrored profile-by-pubkey index")?;
1279        let search_root = block_on(self.index.build(search_entries))
1280            .context("bulk build mirrored profile search index")?;
1281        Ok((by_pubkey_root, search_root))
1282    }
1283
1284    fn update_profile_event(
1285        &self,
1286        by_pubkey_root: Option<&Cid>,
1287        search_root: Option<&Cid>,
1288        event: &Event,
1289    ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1290        let pubkey = event.pubkey.to_hex();
1291        let existing_cid = block_on(self.index.get_link(by_pubkey_root, &pubkey))
1292            .context("lookup existing mirrored profile event")?;
1293
1294        let existing_event = match existing_cid.as_ref() {
1295            Some(cid) => self.load_profile_event(cid)?,
1296            None => None,
1297        };
1298
1299        if existing_event
1300            .as_ref()
1301            .is_some_and(|current| compare_nostr_events(event, current).is_le())
1302        {
1303            return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1304        }
1305
1306        let mirrored_cid = self.mirror_profile_event(event)?;
1307        let next_by_pubkey_root = Some(
1308            block_on(
1309                self.index
1310                    .insert_link(by_pubkey_root, &pubkey, &mirrored_cid),
1311            )
1312            .context("write mirrored profile event index")?,
1313        );
1314
1315        let mut next_search_root = search_root.cloned();
1316        if let Some(current) = existing_event.as_ref() {
1317            for term in profile_search_terms_for_event(current) {
1318                let Some(root) = next_search_root.as_ref() else {
1319                    break;
1320                };
1321                next_search_root = block_on(
1322                    self.index
1323                        .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1324                )
1325                .context("remove stale profile search term")?;
1326            }
1327        }
1328
1329        let search_value =
1330            serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1331        for term in profile_search_terms_for_event(event) {
1332            next_search_root = Some(
1333                block_on(self.index.insert(
1334                    next_search_root.as_ref(),
1335                    &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1336                    &search_value,
1337                ))
1338                .context("write profile search term")?,
1339            );
1340        }
1341
1342        Ok((next_by_pubkey_root, next_search_root, true))
1343    }
1344
1345    fn remove_profile_event(
1346        &self,
1347        by_pubkey_root: Option<&Cid>,
1348        search_root: Option<&Cid>,
1349        pubkey: &str,
1350    ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1351        let existing_cid = block_on(self.index.get_link(by_pubkey_root, pubkey))
1352            .context("lookup mirrored profile event for removal")?;
1353        let Some(existing_cid) = existing_cid else {
1354            return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1355        };
1356
1357        let existing_event = self.load_profile_event(&existing_cid)?;
1358        let next_by_pubkey_root = match by_pubkey_root {
1359            Some(root) => block_on(self.index.delete(root, pubkey))
1360                .context("remove mirrored profile-by-pubkey entry")?,
1361            None => None,
1362        };
1363
1364        let mut next_search_root = search_root.cloned();
1365        if let Some(current) = existing_event.as_ref() {
1366            for term in profile_search_terms_for_event(current) {
1367                let Some(root) = next_search_root.as_ref() else {
1368                    break;
1369                };
1370                next_search_root = block_on(
1371                    self.index
1372                        .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1373                )
1374                .context("remove overmuted profile search term")?;
1375            }
1376        }
1377
1378        Ok((next_by_pubkey_root, next_search_root, true))
1379    }
1380}
1381
1382fn latest_metadata_events_by_pubkey<'a>(events: &'a [Event]) -> BTreeMap<String, &'a Event> {
1383    let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
1384    for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
1385        let pubkey = event.pubkey.to_hex();
1386        match latest_by_pubkey.get(&pubkey) {
1387            Some(current) if compare_nostr_events(event, current).is_le() => {}
1388            _ => {
1389                latest_by_pubkey.insert(pubkey, event);
1390            }
1391        }
1392    }
1393    latest_by_pubkey
1394}
1395
1396fn serialize_profile_search_entry(entry: &StoredProfileSearchEntry) -> Result<String> {
1397    serde_json::to_string(entry).context("encode stored profile search entry json")
1398}
1399
1400fn cid_to_nhash(cid: &Cid) -> Result<String> {
1401    nhash_encode_full(&NHashData {
1402        hash: cid.hash,
1403        decrypt_key: cid.key,
1404    })
1405    .context("encode mirrored profile event nhash")
1406}
1407
1408fn build_profile_search_entry(
1409    event: &Event,
1410    mirrored_cid: &Cid,
1411) -> Result<StoredProfileSearchEntry> {
1412    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1413        Ok(serde_json::Value::Object(profile)) => profile,
1414        _ => serde_json::Map::new(),
1415    };
1416    let names = extract_profile_names(&profile);
1417    let primary_name = names.first().cloned();
1418    let nip05 = normalize_profile_nip05(&profile, primary_name.as_deref());
1419    let name = primary_name
1420        .clone()
1421        .or_else(|| nip05.clone())
1422        .unwrap_or_else(|| event.pubkey.to_hex());
1423
1424    Ok(StoredProfileSearchEntry {
1425        pubkey: event.pubkey.to_hex(),
1426        name,
1427        aliases: names.into_iter().skip(1).collect(),
1428        nip05,
1429        created_at: event.created_at.as_u64(),
1430        event_nhash: cid_to_nhash(mirrored_cid)?,
1431    })
1432}
1433
1434fn filter_list_options(filter: &Filter, limit: usize, exact: bool) -> ListEventsOptions {
1435    ListEventsOptions {
1436        limit: exact.then_some(limit.max(1)),
1437        since: filter.since.map(|timestamp| timestamp.as_u64()),
1438        until: filter.until.map(|timestamp| timestamp.as_u64()),
1439    }
1440}
1441
1442fn dedupe_events(events: Vec<Event>) -> Vec<Event> {
1443    let mut seen = HashSet::new();
1444    let mut deduped = Vec::new();
1445    for event in events {
1446        if seen.insert(event.id.to_bytes()) {
1447            deduped.push(event);
1448        }
1449    }
1450    deduped.sort_by(|a, b| {
1451        b.created_at
1452            .as_u64()
1453            .cmp(&a.created_at.as_u64())
1454            .then_with(|| a.id.cmp(&b.id))
1455    });
1456    deduped
1457}
1458
1459impl SocialGraphBackend for SocialGraphStore {
1460    fn stats(&self) -> Result<SocialGraphStats> {
1461        SocialGraphStore::stats(self)
1462    }
1463
1464    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1465        SocialGraphStore::users_by_follow_distance(self, distance)
1466    }
1467
1468    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1469        SocialGraphStore::follow_distance(self, pk_bytes)
1470    }
1471
1472    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1473        SocialGraphStore::follow_list_created_at(self, owner)
1474    }
1475
1476    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1477        SocialGraphStore::followed_targets(self, owner)
1478    }
1479
1480    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1481        SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1482    }
1483
1484    fn profile_search_root(&self) -> Result<Option<Cid>> {
1485        SocialGraphStore::profile_search_root(self)
1486    }
1487
1488    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1489        SocialGraphStore::snapshot_chunks(self, root, options)
1490    }
1491
1492    fn ingest_event(&self, event: &Event) -> Result<()> {
1493        SocialGraphStore::ingest_event(self, event)
1494    }
1495
1496    fn ingest_event_with_storage_class(
1497        &self,
1498        event: &Event,
1499        storage_class: EventStorageClass,
1500    ) -> Result<()> {
1501        SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1502    }
1503
1504    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1505        SocialGraphStore::ingest_events(self, events)
1506    }
1507
1508    fn ingest_events_with_storage_class(
1509        &self,
1510        events: &[Event],
1511        storage_class: EventStorageClass,
1512    ) -> Result<()> {
1513        SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1514    }
1515
1516    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1517        SocialGraphStore::apply_graph_events_only(self, events)
1518    }
1519
1520    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1521        SocialGraphStore::query_events(self, filter, limit)
1522    }
1523}
1524
1525impl NostrSocialGraphBackend for SocialGraphStore {
1526    type Error = UpstreamGraphBackendError;
1527
1528    fn get_root(&self) -> std::result::Result<String, Self::Error> {
1529        let graph = self.graph.lock().unwrap();
1530        graph
1531            .get_root()
1532            .context("read social graph root")
1533            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1534    }
1535
1536    fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1537        let root_bytes =
1538            decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1539        SocialGraphStore::set_root(self, &root_bytes)
1540            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1541    }
1542
1543    fn handle_event(
1544        &mut self,
1545        event: &GraphEvent,
1546        allow_unknown_authors: bool,
1547        overmute_threshold: f64,
1548    ) -> std::result::Result<(), Self::Error> {
1549        {
1550            let mut graph = self.graph.lock().unwrap();
1551            graph
1552                .handle_event(event, allow_unknown_authors, overmute_threshold)
1553                .context("ingest social graph event into heed backend")
1554                .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1555        }
1556        self.invalidate_distance_cache();
1557        Ok(())
1558    }
1559
1560    fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1561        let graph = self.graph.lock().unwrap();
1562        graph
1563            .get_follow_distance(user)
1564            .context("read social graph follow distance")
1565            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1566    }
1567
1568    fn is_following(
1569        &self,
1570        follower: &str,
1571        followed_user: &str,
1572    ) -> std::result::Result<bool, Self::Error> {
1573        let graph = self.graph.lock().unwrap();
1574        graph
1575            .is_following(follower, followed_user)
1576            .context("read social graph following edge")
1577            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1578    }
1579
1580    fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1581        let graph = self.graph.lock().unwrap();
1582        graph
1583            .get_followed_by_user(user)
1584            .context("read followed-by-user list")
1585            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1586    }
1587
1588    fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1589        let graph = self.graph.lock().unwrap();
1590        graph
1591            .get_followers_by_user(user)
1592            .context("read followers-by-user list")
1593            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1594    }
1595
1596    fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1597        let graph = self.graph.lock().unwrap();
1598        graph
1599            .get_muted_by_user(user)
1600            .context("read muted-by-user list")
1601            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1602    }
1603
1604    fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1605        let graph = self.graph.lock().unwrap();
1606        graph
1607            .get_user_muted_by(user)
1608            .context("read user-muted-by list")
1609            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1610    }
1611
1612    fn get_follow_list_created_at(
1613        &self,
1614        user: &str,
1615    ) -> std::result::Result<Option<u64>, Self::Error> {
1616        let graph = self.graph.lock().unwrap();
1617        graph
1618            .get_follow_list_created_at(user)
1619            .context("read social graph follow list timestamp")
1620            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1621    }
1622
1623    fn get_mute_list_created_at(
1624        &self,
1625        user: &str,
1626    ) -> std::result::Result<Option<u64>, Self::Error> {
1627        let graph = self.graph.lock().unwrap();
1628        graph
1629            .get_mute_list_created_at(user)
1630            .context("read social graph mute list timestamp")
1631            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1632    }
1633
1634    fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1635        let graph = self.graph.lock().unwrap();
1636        graph
1637            .is_overmuted(user, threshold)
1638            .context("check social graph overmute")
1639            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1640    }
1641}
1642
1643impl<T> SocialGraphBackend for Arc<T>
1644where
1645    T: SocialGraphBackend + ?Sized,
1646{
1647    fn stats(&self) -> Result<SocialGraphStats> {
1648        self.as_ref().stats()
1649    }
1650
1651    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1652        self.as_ref().users_by_follow_distance(distance)
1653    }
1654
1655    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1656        self.as_ref().follow_distance(pk_bytes)
1657    }
1658
1659    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1660        self.as_ref().follow_list_created_at(owner)
1661    }
1662
1663    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1664        self.as_ref().followed_targets(owner)
1665    }
1666
1667    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1668        self.as_ref().is_overmuted_user(user_pk, threshold)
1669    }
1670
1671    fn profile_search_root(&self) -> Result<Option<Cid>> {
1672        self.as_ref().profile_search_root()
1673    }
1674
1675    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1676        self.as_ref().snapshot_chunks(root, options)
1677    }
1678
1679    fn ingest_event(&self, event: &Event) -> Result<()> {
1680        self.as_ref().ingest_event(event)
1681    }
1682
1683    fn ingest_event_with_storage_class(
1684        &self,
1685        event: &Event,
1686        storage_class: EventStorageClass,
1687    ) -> Result<()> {
1688        self.as_ref()
1689            .ingest_event_with_storage_class(event, storage_class)
1690    }
1691
1692    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1693        self.as_ref().ingest_events(events)
1694    }
1695
1696    fn ingest_events_with_storage_class(
1697        &self,
1698        events: &[Event],
1699        storage_class: EventStorageClass,
1700    ) -> Result<()> {
1701        self.as_ref()
1702            .ingest_events_with_storage_class(events, storage_class)
1703    }
1704
1705    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1706        self.as_ref().ingest_graph_events(events)
1707    }
1708
1709    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1710        self.as_ref().query_events(filter, limit)
1711    }
1712}
1713
1714fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1715    if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1716        return Ok(false);
1717    }
1718
1719    let GraphStats {
1720        users,
1721        follows,
1722        mutes,
1723        ..
1724    } = graph.size().context("size social graph")?;
1725    Ok(users <= 1 && follows == 0 && mutes == 0)
1726}
1727
1728fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1729    let mut set = UserSet::new();
1730    for value in values {
1731        set.insert(decode_pubkey(&value)?);
1732    }
1733    Ok(set)
1734}
1735
1736fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1737    let mut bytes = [0u8; 32];
1738    hex::decode_to_slice(value, &mut bytes)
1739        .with_context(|| format!("decode social graph pubkey {value}"))?;
1740    Ok(bytes)
1741}
1742
1743fn is_social_graph_event(kind: Kind) -> bool {
1744    kind == Kind::ContactList || kind == Kind::MuteList
1745}
1746
1747fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1748    GraphEvent {
1749        created_at: event.created_at.as_u64(),
1750        content: event.content.clone(),
1751        tags: event
1752            .tags
1753            .iter()
1754            .map(|tag| tag.as_slice().to_vec())
1755            .collect(),
1756        kind: event.kind.as_u16() as u32,
1757        pubkey: event.pubkey.to_hex(),
1758        id: event.id.to_hex(),
1759        sig: event.sig.to_string(),
1760    }
1761}
1762
1763fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1764    StoredNostrEvent {
1765        id: event.id.to_hex(),
1766        pubkey: event.pubkey.to_hex(),
1767        created_at: event.created_at.as_u64(),
1768        kind: event.kind.as_u16() as u32,
1769        tags: event
1770            .tags
1771            .iter()
1772            .map(|tag| tag.as_slice().to_vec())
1773            .collect(),
1774        content: event.content.clone(),
1775        sig: event.sig.to_string(),
1776    }
1777}
1778
1779fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1780    let value = serde_json::json!({
1781        "id": event.id,
1782        "pubkey": event.pubkey,
1783        "created_at": event.created_at,
1784        "kind": event.kind,
1785        "tags": event.tags,
1786        "content": event.content,
1787        "sig": event.sig,
1788    });
1789    Event::from_json(value.to_string()).context("decode stored nostr event")
1790}
1791
1792pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1793    nostr_event_from_stored(event)
1794}
1795
1796fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1797    rmp_serde::to_vec_named(&StoredCid {
1798        hash: cid.hash,
1799        key: cid.key,
1800    })
1801    .context("encode social graph events root")
1802}
1803
1804fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1805    let stored: StoredCid =
1806        rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1807    Ok(Some(Cid {
1808        hash: stored.hash,
1809        key: stored.key,
1810    }))
1811}
1812
1813fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1814    let Ok(bytes) = std::fs::read(path) else {
1815        return Ok(None);
1816    };
1817    decode_cid(&bytes)
1818}
1819
1820fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1821    let Some(root) = root else {
1822        if path.exists() {
1823            std::fs::remove_file(path)?;
1824        }
1825        return Ok(());
1826    };
1827
1828    let encoded = encode_cid(root)?;
1829    let tmp_path = path.with_extension("tmp");
1830    std::fs::write(&tmp_path, encoded)?;
1831    std::fs::rename(tmp_path, path)?;
1832    Ok(())
1833}
1834
1835fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1836    let raw = value.as_str()?;
1837    let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1838    if trimmed.is_empty() {
1839        return None;
1840    }
1841    Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1842}
1843
1844fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1845    let mut names = Vec::new();
1846    let mut seen = HashSet::new();
1847
1848    for key in ["display_name", "displayName", "name", "username"] {
1849        let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1850            continue;
1851        };
1852        let lowered = value.to_lowercase();
1853        if seen.insert(lowered) {
1854            names.push(value);
1855        }
1856    }
1857
1858    names
1859}
1860
1861fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1862    if local_part.len() == 1 || local_part.starts_with("npub1") {
1863        return true;
1864    }
1865
1866    primary_name
1867        .to_lowercase()
1868        .split_whitespace()
1869        .collect::<String>()
1870        .contains(local_part)
1871}
1872
1873fn normalize_profile_nip05(
1874    profile: &serde_json::Map<String, serde_json::Value>,
1875    primary_name: Option<&str>,
1876) -> Option<String> {
1877    let raw = profile.get("nip05")?.as_str()?;
1878    let local_part = raw.split('@').next()?.trim().to_lowercase();
1879    if local_part.is_empty() {
1880        return None;
1881    }
1882    let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1883    if truncated.is_empty() {
1884        return None;
1885    }
1886    if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1887        return None;
1888    }
1889    Some(truncated)
1890}
1891
1892fn is_search_stop_word(word: &str) -> bool {
1893    matches!(
1894        word,
1895        "a" | "an"
1896            | "the"
1897            | "and"
1898            | "or"
1899            | "but"
1900            | "in"
1901            | "on"
1902            | "at"
1903            | "to"
1904            | "for"
1905            | "of"
1906            | "with"
1907            | "by"
1908            | "from"
1909            | "is"
1910            | "it"
1911            | "as"
1912            | "be"
1913            | "was"
1914            | "are"
1915            | "this"
1916            | "that"
1917            | "these"
1918            | "those"
1919            | "i"
1920            | "you"
1921            | "he"
1922            | "she"
1923            | "we"
1924            | "they"
1925            | "my"
1926            | "your"
1927            | "his"
1928            | "her"
1929            | "its"
1930            | "our"
1931            | "their"
1932            | "what"
1933            | "which"
1934            | "who"
1935            | "whom"
1936            | "how"
1937            | "when"
1938            | "where"
1939            | "why"
1940            | "will"
1941            | "would"
1942            | "could"
1943            | "should"
1944            | "can"
1945            | "may"
1946            | "might"
1947            | "must"
1948            | "have"
1949            | "has"
1950            | "had"
1951            | "do"
1952            | "does"
1953            | "did"
1954            | "been"
1955            | "being"
1956            | "get"
1957            | "got"
1958            | "just"
1959            | "now"
1960            | "then"
1961            | "so"
1962            | "if"
1963            | "not"
1964            | "no"
1965            | "yes"
1966            | "all"
1967            | "any"
1968            | "some"
1969            | "more"
1970            | "most"
1971            | "other"
1972            | "into"
1973            | "over"
1974            | "after"
1975            | "before"
1976            | "about"
1977            | "up"
1978            | "down"
1979            | "out"
1980            | "off"
1981            | "through"
1982            | "during"
1983            | "under"
1984            | "again"
1985            | "further"
1986            | "once"
1987    )
1988}
1989
1990fn is_pure_search_number(word: &str) -> bool {
1991    if !word.chars().all(|ch| ch.is_ascii_digit()) {
1992        return false;
1993    }
1994    !(word.len() == 4
1995        && word
1996            .parse::<u16>()
1997            .is_ok_and(|year| (1900..=2099).contains(&year)))
1998}
1999
2000fn split_compound_search_word(word: &str) -> Vec<String> {
2001    let mut parts = Vec::new();
2002    let mut current = String::new();
2003    let chars: Vec<char> = word.chars().collect();
2004
2005    for (index, ch) in chars.iter().copied().enumerate() {
2006        let split_before = current.chars().last().is_some_and(|prev| {
2007            (prev.is_lowercase() && ch.is_uppercase())
2008                || (prev.is_ascii_digit() && ch.is_alphabetic())
2009                || (prev.is_alphabetic() && ch.is_ascii_digit())
2010                || (prev.is_uppercase()
2011                    && ch.is_uppercase()
2012                    && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
2013        });
2014
2015        if split_before && !current.is_empty() {
2016            parts.push(std::mem::take(&mut current));
2017        }
2018
2019        current.push(ch);
2020    }
2021
2022    if !current.is_empty() {
2023        parts.push(current);
2024    }
2025
2026    parts
2027}
2028
2029fn parse_search_keywords(text: &str) -> Vec<String> {
2030    let mut keywords = Vec::new();
2031    let mut seen = HashSet::new();
2032
2033    for word in text
2034        .split(|ch: char| !ch.is_alphanumeric())
2035        .filter(|word| !word.is_empty())
2036    {
2037        let mut variants = Vec::with_capacity(1 + word.len() / 4);
2038        variants.push(word.to_lowercase());
2039        variants.extend(
2040            split_compound_search_word(word)
2041                .into_iter()
2042                .map(|part| part.to_lowercase()),
2043        );
2044
2045        for lowered in variants {
2046            if lowered.chars().count() < 2
2047                || is_search_stop_word(&lowered)
2048                || is_pure_search_number(&lowered)
2049            {
2050                continue;
2051            }
2052            if seen.insert(lowered.clone()) {
2053                keywords.push(lowered);
2054            }
2055        }
2056    }
2057
2058    keywords
2059}
2060
2061fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
2062    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
2063        Ok(serde_json::Value::Object(profile)) => profile,
2064        _ => serde_json::Map::new(),
2065    };
2066    let names = extract_profile_names(&profile);
2067    let primary_name = names.first().map(String::as_str);
2068    let mut parts = Vec::new();
2069    if let Some(name) = primary_name {
2070        parts.push(name.to_string());
2071    }
2072    if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
2073        parts.push(nip05);
2074    }
2075    parts.push(event.pubkey.to_hex());
2076    if names.len() > 1 {
2077        parts.extend(names.into_iter().skip(1));
2078    }
2079    parse_search_keywords(&parts.join(" "))
2080}
2081
2082fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
2083    left.created_at
2084        .as_u64()
2085        .cmp(&right.created_at.as_u64())
2086        .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
2087}
2088
2089fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
2090    anyhow::anyhow!("nostr event store error: {err}")
2091}
2092
2093fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
2094    let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
2095    let page_size = page_size_bytes() as u64;
2096    let rounded = requested
2097        .checked_add(page_size.saturating_sub(1))
2098        .map(|size| size / page_size * page_size)
2099        .unwrap_or(requested);
2100    let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
2101
2102    let env = unsafe {
2103        heed::EnvOpenOptions::new()
2104            .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
2105            .max_dbs(SOCIALGRAPH_MAX_DBS)
2106            .open(db_dir)
2107    }
2108    .context("open social graph LMDB env for resize")?;
2109    if env.info().map_size < map_size {
2110        unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
2111    }
2112
2113    Ok(())
2114}
2115
2116fn page_size_bytes() -> usize {
2117    page_size::get_granularity()
2118}
2119
2120#[cfg(test)]
2121mod tests {
2122    use super::*;
2123    use async_trait::async_trait;
2124    use hashtree_config::StorageBackend;
2125    use hashtree_core::{Hash, MemoryStore, Store, StoreError};
2126    use hashtree_nostr::NostrEventStoreOptions;
2127    use std::collections::HashSet;
2128    use std::fs::{self, File};
2129    use std::io::{BufRead, BufReader};
2130    use std::path::{Path, PathBuf};
2131    use std::process::{Command, Stdio};
2132    use std::sync::Mutex;
2133    use std::time::Duration;
2134
2135    use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
2136    use tempfile::TempDir;
2137
2138    const WELLORDER_FIXTURE_URL: &str =
2139        "https://wellorder.xyz/nostr/nostr-wellorder-early-500k-v1.jsonl.bz2";
2140
2141    #[derive(Debug, Clone, Default)]
2142    struct ReadTraceSnapshot {
2143        get_calls: u64,
2144        total_bytes: u64,
2145        unique_blocks: usize,
2146        unique_bytes: u64,
2147        cache_hits: u64,
2148        remote_fetches: u64,
2149        remote_bytes: u64,
2150    }
2151
2152    #[derive(Debug, Default)]
2153    struct ReadTraceState {
2154        get_calls: u64,
2155        total_bytes: u64,
2156        unique_hashes: HashSet<Hash>,
2157        unique_bytes: u64,
2158        cache_hits: u64,
2159        remote_fetches: u64,
2160        remote_bytes: u64,
2161    }
2162
2163    #[derive(Debug)]
2164    struct CountingStore<S: Store> {
2165        base: Arc<S>,
2166        state: Mutex<ReadTraceState>,
2167    }
2168
2169    impl<S: Store> CountingStore<S> {
2170        fn new(base: Arc<S>) -> Self {
2171            Self {
2172                base,
2173                state: Mutex::new(ReadTraceState::default()),
2174            }
2175        }
2176
2177        fn reset(&self) {
2178            *self.state.lock().unwrap() = ReadTraceState::default();
2179        }
2180
2181        fn snapshot(&self) -> ReadTraceSnapshot {
2182            let state = self.state.lock().unwrap();
2183            ReadTraceSnapshot {
2184                get_calls: state.get_calls,
2185                total_bytes: state.total_bytes,
2186                unique_blocks: state.unique_hashes.len(),
2187                unique_bytes: state.unique_bytes,
2188                cache_hits: state.cache_hits,
2189                remote_fetches: state.remote_fetches,
2190                remote_bytes: state.remote_bytes,
2191            }
2192        }
2193
2194        fn record_read(&self, hash: &Hash, bytes: usize) {
2195            let mut state = self.state.lock().unwrap();
2196            state.get_calls += 1;
2197            state.total_bytes += bytes as u64;
2198            if state.unique_hashes.insert(*hash) {
2199                state.unique_bytes += bytes as u64;
2200            }
2201        }
2202    }
2203
2204    #[async_trait]
2205    impl<S: Store> Store for CountingStore<S> {
2206        async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2207            self.base.put(hash, data).await
2208        }
2209
2210        async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2211            self.base.put_many(items).await
2212        }
2213
2214        async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2215            let data = self.base.get(hash).await?;
2216            if let Some(bytes) = data.as_ref() {
2217                self.record_read(hash, bytes.len());
2218            }
2219            Ok(data)
2220        }
2221
2222        async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2223            self.base.has(hash).await
2224        }
2225
2226        async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2227            self.base.delete(hash).await
2228        }
2229    }
2230
2231    #[derive(Debug)]
2232    struct ReadThroughStore<R: Store> {
2233        cache: Arc<MemoryStore>,
2234        remote: Arc<R>,
2235        state: Mutex<ReadTraceState>,
2236    }
2237
2238    impl<R: Store> ReadThroughStore<R> {
2239        fn new(cache: Arc<MemoryStore>, remote: Arc<R>) -> Self {
2240            Self {
2241                cache,
2242                remote,
2243                state: Mutex::new(ReadTraceState::default()),
2244            }
2245        }
2246
2247        fn snapshot(&self) -> ReadTraceSnapshot {
2248            let state = self.state.lock().unwrap();
2249            ReadTraceSnapshot {
2250                get_calls: state.get_calls,
2251                total_bytes: state.total_bytes,
2252                unique_blocks: state.unique_hashes.len(),
2253                unique_bytes: state.unique_bytes,
2254                cache_hits: state.cache_hits,
2255                remote_fetches: state.remote_fetches,
2256                remote_bytes: state.remote_bytes,
2257            }
2258        }
2259    }
2260
2261    #[async_trait]
2262    impl<R: Store> Store for ReadThroughStore<R> {
2263        async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2264            self.cache.put(hash, data).await
2265        }
2266
2267        async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2268            self.cache.put_many(items).await
2269        }
2270
2271        async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2272            {
2273                let mut state = self.state.lock().unwrap();
2274                state.get_calls += 1;
2275            }
2276
2277            if let Some(bytes) = self.cache.get(hash).await? {
2278                let mut state = self.state.lock().unwrap();
2279                state.cache_hits += 1;
2280                state.total_bytes += bytes.len() as u64;
2281                if state.unique_hashes.insert(*hash) {
2282                    state.unique_bytes += bytes.len() as u64;
2283                }
2284                return Ok(Some(bytes));
2285            }
2286
2287            let data = self.remote.get(hash).await?;
2288            if let Some(bytes) = data.as_ref() {
2289                let _ = self.cache.put(*hash, bytes.clone()).await?;
2290                let mut state = self.state.lock().unwrap();
2291                state.remote_fetches += 1;
2292                state.remote_bytes += bytes.len() as u64;
2293                state.total_bytes += bytes.len() as u64;
2294                if state.unique_hashes.insert(*hash) {
2295                    state.unique_bytes += bytes.len() as u64;
2296                }
2297            }
2298            Ok(data)
2299        }
2300
2301        async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2302            if self.cache.has(hash).await? {
2303                return Ok(true);
2304            }
2305            self.remote.has(hash).await
2306        }
2307
2308        async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2309            let cache_deleted = self.cache.delete(hash).await?;
2310            let remote_deleted = self.remote.delete(hash).await?;
2311            Ok(cache_deleted || remote_deleted)
2312        }
2313    }
2314
2315    #[derive(Debug, Clone)]
2316    enum BenchmarkQueryCase {
2317        ById {
2318            id: String,
2319        },
2320        ByAuthor {
2321            pubkey: String,
2322            limit: usize,
2323        },
2324        ByAuthorKind {
2325            pubkey: String,
2326            kind: u32,
2327            limit: usize,
2328        },
2329        ByKind {
2330            kind: u32,
2331            limit: usize,
2332        },
2333        ByTag {
2334            tag_name: String,
2335            tag_value: String,
2336            limit: usize,
2337        },
2338        Recent {
2339            limit: usize,
2340        },
2341        Replaceable {
2342            pubkey: String,
2343            kind: u32,
2344        },
2345        ParameterizedReplaceable {
2346            pubkey: String,
2347            kind: u32,
2348            d_tag: String,
2349        },
2350    }
2351
2352    impl BenchmarkQueryCase {
2353        fn name(&self) -> &'static str {
2354            match self {
2355                BenchmarkQueryCase::ById { .. } => "by_id",
2356                BenchmarkQueryCase::ByAuthor { .. } => "by_author",
2357                BenchmarkQueryCase::ByAuthorKind { .. } => "by_author_kind",
2358                BenchmarkQueryCase::ByKind { .. } => "by_kind",
2359                BenchmarkQueryCase::ByTag { .. } => "by_tag",
2360                BenchmarkQueryCase::Recent { .. } => "recent",
2361                BenchmarkQueryCase::Replaceable { .. } => "replaceable",
2362                BenchmarkQueryCase::ParameterizedReplaceable { .. } => "parameterized_replaceable",
2363            }
2364        }
2365
2366        async fn execute<S: Store>(
2367            &self,
2368            store: &NostrEventStore<S>,
2369            root: &Cid,
2370        ) -> Result<usize, NostrEventStoreError> {
2371            match self {
2372                BenchmarkQueryCase::ById { id } => {
2373                    Ok(store.get_by_id(Some(root), id).await?.into_iter().count())
2374                }
2375                BenchmarkQueryCase::ByAuthor { pubkey, limit } => Ok(store
2376                    .list_by_author(
2377                        Some(root),
2378                        pubkey,
2379                        ListEventsOptions {
2380                            limit: Some(*limit),
2381                            ..Default::default()
2382                        },
2383                    )
2384                    .await?
2385                    .len()),
2386                BenchmarkQueryCase::ByAuthorKind {
2387                    pubkey,
2388                    kind,
2389                    limit,
2390                } => Ok(store
2391                    .list_by_author_and_kind(
2392                        Some(root),
2393                        pubkey,
2394                        *kind,
2395                        ListEventsOptions {
2396                            limit: Some(*limit),
2397                            ..Default::default()
2398                        },
2399                    )
2400                    .await?
2401                    .len()),
2402                BenchmarkQueryCase::ByKind { kind, limit } => Ok(store
2403                    .list_by_kind(
2404                        Some(root),
2405                        *kind,
2406                        ListEventsOptions {
2407                            limit: Some(*limit),
2408                            ..Default::default()
2409                        },
2410                    )
2411                    .await?
2412                    .len()),
2413                BenchmarkQueryCase::ByTag {
2414                    tag_name,
2415                    tag_value,
2416                    limit,
2417                } => Ok(store
2418                    .list_by_tag(
2419                        Some(root),
2420                        tag_name,
2421                        tag_value,
2422                        ListEventsOptions {
2423                            limit: Some(*limit),
2424                            ..Default::default()
2425                        },
2426                    )
2427                    .await?
2428                    .len()),
2429                BenchmarkQueryCase::Recent { limit } => Ok(store
2430                    .list_recent(
2431                        Some(root),
2432                        ListEventsOptions {
2433                            limit: Some(*limit),
2434                            ..Default::default()
2435                        },
2436                    )
2437                    .await?
2438                    .len()),
2439                BenchmarkQueryCase::Replaceable { pubkey, kind } => Ok(store
2440                    .get_replaceable(Some(root), pubkey, *kind)
2441                    .await?
2442                    .into_iter()
2443                    .count()),
2444                BenchmarkQueryCase::ParameterizedReplaceable {
2445                    pubkey,
2446                    kind,
2447                    d_tag,
2448                } => Ok(store
2449                    .get_parameterized_replaceable(Some(root), pubkey, *kind, d_tag)
2450                    .await?
2451                    .into_iter()
2452                    .count()),
2453            }
2454        }
2455    }
2456
2457    #[derive(Debug, Clone, Copy)]
2458    struct NetworkModel {
2459        name: &'static str,
2460        rtt_ms: f64,
2461        bandwidth_mib_per_s: f64,
2462    }
2463
2464    #[derive(Debug, Clone)]
2465    struct QueryBenchmarkResult {
2466        average_duration: Duration,
2467        p95_duration: Duration,
2468        reads: ReadTraceSnapshot,
2469    }
2470
2471    const NETWORK_MODELS: [NetworkModel; 3] = [
2472        NetworkModel {
2473            name: "lan",
2474            rtt_ms: 2.0,
2475            bandwidth_mib_per_s: 100.0,
2476        },
2477        NetworkModel {
2478            name: "wan",
2479            rtt_ms: 40.0,
2480            bandwidth_mib_per_s: 20.0,
2481        },
2482        NetworkModel {
2483            name: "slow",
2484            rtt_ms: 120.0,
2485            bandwidth_mib_per_s: 5.0,
2486        },
2487    ];
2488
2489    #[test]
2490    fn test_open_social_graph_store() {
2491        let _guard = test_lock();
2492        let tmp = TempDir::new().unwrap();
2493        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2494        assert_eq!(Arc::strong_count(&graph_store), 1);
2495    }
2496
2497    #[test]
2498    fn test_set_root_and_get_follow_distance() {
2499        let _guard = test_lock();
2500        let tmp = TempDir::new().unwrap();
2501        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2502        let root_pk = [1u8; 32];
2503        set_social_graph_root(&graph_store, &root_pk);
2504        assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
2505    }
2506
2507    #[test]
2508    fn test_ingest_event_updates_follows_and_mutes() {
2509        let _guard = test_lock();
2510        let tmp = TempDir::new().unwrap();
2511        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2512
2513        let root_keys = Keys::generate();
2514        let alice_keys = Keys::generate();
2515        let bob_keys = Keys::generate();
2516
2517        let root_pk = root_keys.public_key().to_bytes();
2518        set_social_graph_root(&graph_store, &root_pk);
2519
2520        let follow = EventBuilder::new(
2521            Kind::ContactList,
2522            "",
2523            vec![Tag::public_key(alice_keys.public_key())],
2524        )
2525        .custom_created_at(Timestamp::from_secs(10))
2526        .to_event(&root_keys)
2527        .unwrap();
2528        ingest_event(&graph_store, "follow", &follow.as_json());
2529
2530        let mute = EventBuilder::new(
2531            Kind::MuteList,
2532            "",
2533            vec![Tag::public_key(bob_keys.public_key())],
2534        )
2535        .custom_created_at(Timestamp::from_secs(11))
2536        .to_event(&root_keys)
2537        .unwrap();
2538        ingest_event(&graph_store, "mute", &mute.as_json());
2539
2540        assert_eq!(
2541            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
2542            Some(1)
2543        );
2544        assert!(is_overmuted(
2545            &graph_store,
2546            &root_pk,
2547            &bob_keys.public_key().to_bytes(),
2548            1.0
2549        ));
2550    }
2551
2552    #[test]
2553    fn test_metadata_ingest_builds_profile_search_index_and_replaces_old_terms() {
2554        let _guard = test_lock();
2555        let tmp = TempDir::new().unwrap();
2556        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2557        let keys = Keys::generate();
2558
2559        let older = EventBuilder::new(
2560            Kind::Metadata,
2561            serde_json::json!({
2562                "display_name": "sirius",
2563                "name": "Martti Malmi",
2564                "username": "mmalmi",
2565                "nip05": "siriusdev@iris.to",
2566            })
2567            .to_string(),
2568            [],
2569        )
2570        .custom_created_at(Timestamp::from_secs(5))
2571        .to_event(&keys)
2572        .unwrap();
2573        let newer = EventBuilder::new(
2574            Kind::Metadata,
2575            serde_json::json!({
2576                "display_name": "bird",
2577                "nip05": "birdman@iris.to",
2578            })
2579            .to_string(),
2580            [],
2581        )
2582        .custom_created_at(Timestamp::from_secs(6))
2583        .to_event(&keys)
2584        .unwrap();
2585
2586        ingest_parsed_event(&graph_store, &older).unwrap();
2587
2588        let pubkey = keys.public_key().to_hex();
2589        let entries = graph_store
2590            .profile_search_entries_for_prefix("p:sirius")
2591            .unwrap();
2592        assert!(entries
2593            .iter()
2594            .any(|(key, entry)| key == &format!("p:sirius:{pubkey}") && entry.name == "sirius"));
2595        assert!(entries.iter().any(|(key, entry)| {
2596            key == &format!("p:siriusdev:{pubkey}")
2597                && entry.nip05.as_deref() == Some("siriusdev")
2598                && entry.aliases == vec!["Martti Malmi".to_string(), "mmalmi".to_string()]
2599                && entry.event_nhash.starts_with("nhash1")
2600        }));
2601        assert!(entries.iter().all(|(_, entry)| entry.pubkey == pubkey));
2602        assert_eq!(
2603            graph_store
2604                .latest_profile_event(&pubkey)
2605                .unwrap()
2606                .expect("latest mirrored profile")
2607                .id,
2608            older.id
2609        );
2610
2611        ingest_parsed_event(&graph_store, &newer).unwrap();
2612
2613        assert!(graph_store
2614            .profile_search_entries_for_prefix("p:sirius")
2615            .unwrap()
2616            .is_empty());
2617        let bird_entries = graph_store
2618            .profile_search_entries_for_prefix("p:bird")
2619            .unwrap();
2620        assert_eq!(bird_entries.len(), 2);
2621        assert!(bird_entries
2622            .iter()
2623            .any(|(key, entry)| key == &format!("p:bird:{pubkey}") && entry.name == "bird"));
2624        assert!(bird_entries.iter().any(|(key, entry)| {
2625            key == &format!("p:birdman:{pubkey}")
2626                && entry.nip05.as_deref() == Some("birdman")
2627                && entry.aliases.is_empty()
2628        }));
2629        assert_eq!(
2630            graph_store
2631                .latest_profile_event(&pubkey)
2632                .unwrap()
2633                .expect("latest mirrored profile")
2634                .id,
2635            newer.id
2636        );
2637    }
2638
2639    #[test]
2640    fn test_ambient_metadata_events_are_mirrored_into_public_profile_index() {
2641        let _guard = test_lock();
2642        let tmp = TempDir::new().unwrap();
2643        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2644        let keys = Keys::generate();
2645
2646        let profile = EventBuilder::new(
2647            Kind::Metadata,
2648            serde_json::json!({
2649                "display_name": "ambient bird",
2650            })
2651            .to_string(),
2652            [],
2653        )
2654        .custom_created_at(Timestamp::from_secs(5))
2655        .to_event(&keys)
2656        .unwrap();
2657
2658        ingest_parsed_event_with_storage_class(&graph_store, &profile, EventStorageClass::Ambient)
2659            .unwrap();
2660
2661        let pubkey = keys.public_key().to_hex();
2662        let mirrored = graph_store
2663            .latest_profile_event(&pubkey)
2664            .unwrap()
2665            .expect("mirrored ambient profile");
2666        assert_eq!(mirrored.id, profile.id);
2667        assert_eq!(
2668            graph_store
2669                .profile_search_entries_for_prefix("p:ambient")
2670                .unwrap()
2671                .len(),
2672            1
2673        );
2674    }
2675
2676    #[test]
2677    fn test_metadata_ingest_splits_compound_profile_terms_without_losing_whole_token() {
2678        let _guard = test_lock();
2679        let tmp = TempDir::new().unwrap();
2680        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2681        let keys = Keys::generate();
2682
2683        let profile = EventBuilder::new(
2684            Kind::Metadata,
2685            serde_json::json!({
2686                "display_name": "SirLibre",
2687                "username": "XMLHttpRequest42",
2688            })
2689            .to_string(),
2690            [],
2691        )
2692        .custom_created_at(Timestamp::from_secs(5))
2693        .to_event(&keys)
2694        .unwrap();
2695
2696        ingest_parsed_event(&graph_store, &profile).unwrap();
2697
2698        let pubkey = keys.public_key().to_hex();
2699        assert!(
2700            graph_store
2701                .profile_search_entries_for_prefix("p:sirlibre")
2702                .unwrap()
2703                .iter()
2704                .any(|(key, entry)| key == &format!("p:sirlibre:{pubkey}")
2705                    && entry.name == "SirLibre")
2706        );
2707        assert!(graph_store
2708            .profile_search_entries_for_prefix("p:libre")
2709            .unwrap()
2710            .iter()
2711            .any(|(key, entry)| key == &format!("p:libre:{pubkey}") && entry.name == "SirLibre"));
2712        assert!(graph_store
2713            .profile_search_entries_for_prefix("p:xml")
2714            .unwrap()
2715            .iter()
2716            .any(|(key, entry)| {
2717                key == &format!("p:xml:{pubkey}")
2718                    && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2719            }));
2720        assert!(graph_store
2721            .profile_search_entries_for_prefix("p:request")
2722            .unwrap()
2723            .iter()
2724            .any(|(key, entry)| {
2725                key == &format!("p:request:{pubkey}")
2726                    && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2727            }));
2728    }
2729
2730    #[test]
2731    fn test_profile_search_index_persists_across_reopen() {
2732        let _guard = test_lock();
2733        let tmp = TempDir::new().unwrap();
2734        let keys = Keys::generate();
2735        let pubkey = keys.public_key().to_hex();
2736
2737        {
2738            let graph_store = open_social_graph_store(tmp.path()).unwrap();
2739            let profile = EventBuilder::new(
2740                Kind::Metadata,
2741                serde_json::json!({
2742                    "display_name": "reopen user",
2743                })
2744                .to_string(),
2745                [],
2746            )
2747            .custom_created_at(Timestamp::from_secs(5))
2748            .to_event(&keys)
2749            .unwrap();
2750            ingest_parsed_event(&graph_store, &profile).unwrap();
2751            assert!(graph_store.profile_search_root().unwrap().is_some());
2752        }
2753
2754        let reopened = open_social_graph_store(tmp.path()).unwrap();
2755        assert!(reopened.profile_search_root().unwrap().is_some());
2756        assert_eq!(
2757            reopened
2758                .latest_profile_event(&pubkey)
2759                .unwrap()
2760                .expect("mirrored profile after reopen")
2761                .pubkey,
2762            keys.public_key()
2763        );
2764        let links = reopened
2765            .profile_search_entries_for_prefix("p:reopen")
2766            .unwrap();
2767        assert_eq!(links.len(), 1);
2768        assert_eq!(links[0].0, format!("p:reopen:{pubkey}"));
2769        assert_eq!(links[0].1.name, "reopen user");
2770    }
2771
2772    #[test]
2773    fn test_profile_search_index_with_shared_hashtree_storage() {
2774        let _guard = test_lock();
2775        let tmp = TempDir::new().unwrap();
2776        let store =
2777            crate::storage::HashtreeStore::with_options(tmp.path(), None, 1024 * 1024 * 1024)
2778                .unwrap();
2779        let graph_store =
2780            open_social_graph_store_with_storage(tmp.path(), store.store_arc(), None).unwrap();
2781        let keys = Keys::generate();
2782        let pubkey = keys.public_key().to_hex();
2783
2784        let profile = EventBuilder::new(
2785            Kind::Metadata,
2786            serde_json::json!({
2787                "display_name": "shared storage user",
2788                "nip05": "shareduser@example.com",
2789            })
2790            .to_string(),
2791            [],
2792        )
2793        .custom_created_at(Timestamp::from_secs(5))
2794        .to_event(&keys)
2795        .unwrap();
2796
2797        graph_store
2798            .sync_profile_index_for_events(std::slice::from_ref(&profile))
2799            .unwrap();
2800        assert!(graph_store.profile_search_root().unwrap().is_some());
2801        assert!(graph_store.profile_search_root().unwrap().is_some());
2802        let links = graph_store
2803            .profile_search_entries_for_prefix("p:shared")
2804            .unwrap();
2805        assert_eq!(links.len(), 2);
2806        assert!(links
2807            .iter()
2808            .any(|(key, entry)| key == &format!("p:shared:{pubkey}")
2809                && entry.name == "shared storage user"));
2810        assert!(links
2811            .iter()
2812            .any(|(key, entry)| key == &format!("p:shareduser:{pubkey}")
2813                && entry.nip05.as_deref() == Some("shareduser")));
2814    }
2815
2816    #[test]
2817    fn test_rebuild_profile_index_from_stored_events_uses_ambient_and_public_metadata() {
2818        let _guard = test_lock();
2819        let tmp = TempDir::new().unwrap();
2820        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2821        let public_keys = Keys::generate();
2822        let ambient_keys = Keys::generate();
2823        let public_pubkey = public_keys.public_key().to_hex();
2824        let ambient_pubkey = ambient_keys.public_key().to_hex();
2825
2826        let older = EventBuilder::new(
2827            Kind::Metadata,
2828            serde_json::json!({
2829                "display_name": "petri old",
2830            })
2831            .to_string(),
2832            [],
2833        )
2834        .custom_created_at(Timestamp::from_secs(5))
2835        .to_event(&public_keys)
2836        .unwrap();
2837        let newer = EventBuilder::new(
2838            Kind::Metadata,
2839            serde_json::json!({
2840                "display_name": "petri",
2841                "name": "Petri Example",
2842                "nip05": "petri@example.com",
2843            })
2844            .to_string(),
2845            [],
2846        )
2847        .custom_created_at(Timestamp::from_secs(6))
2848        .to_event(&public_keys)
2849        .unwrap();
2850        let ambient = EventBuilder::new(
2851            Kind::Metadata,
2852            serde_json::json!({
2853                "display_name": "ambient petri",
2854            })
2855            .to_string(),
2856            [],
2857        )
2858        .custom_created_at(Timestamp::from_secs(7))
2859        .to_event(&ambient_keys)
2860        .unwrap();
2861
2862        ingest_parsed_event_with_storage_class(&graph_store, &older, EventStorageClass::Public)
2863            .unwrap();
2864        ingest_parsed_event_with_storage_class(&graph_store, &newer, EventStorageClass::Public)
2865            .unwrap();
2866        ingest_parsed_event_with_storage_class(&graph_store, &ambient, EventStorageClass::Ambient)
2867            .unwrap();
2868
2869        graph_store
2870            .profile_index
2871            .write_by_pubkey_root(None)
2872            .unwrap();
2873        graph_store.profile_index.write_search_root(None).unwrap();
2874
2875        let rebuilt = graph_store
2876            .rebuild_profile_index_from_stored_events()
2877            .unwrap();
2878        assert_eq!(rebuilt, 2);
2879
2880        let entries = graph_store
2881            .profile_search_entries_for_prefix("p:petri")
2882            .unwrap();
2883        assert_eq!(entries.len(), 2);
2884        assert!(entries.iter().any(|(key, entry)| {
2885            key == &format!("p:petri:{public_pubkey}")
2886                && entry.name == "petri"
2887                && entry.aliases == vec!["Petri Example".to_string()]
2888                && entry.nip05.is_none()
2889        }));
2890        assert!(entries.iter().any(|(key, entry)| {
2891            key == &format!("p:petri:{ambient_pubkey}")
2892                && entry.name == "ambient petri"
2893                && entry.aliases.is_empty()
2894                && entry.nip05.is_none()
2895        }));
2896    }
2897
2898    #[test]
2899    fn test_rebuild_profile_index_excludes_overmuted_users() {
2900        let _guard = test_lock();
2901        let tmp = TempDir::new().unwrap();
2902        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2903        let root_keys = Keys::generate();
2904        let muted_keys = Keys::generate();
2905        let muted_pubkey = muted_keys.public_key().to_hex();
2906
2907        set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
2908        graph_store.set_profile_index_overmute_threshold(1.0);
2909
2910        let profile = EventBuilder::new(
2911            Kind::Metadata,
2912            serde_json::json!({
2913                "display_name": "muted petri",
2914            })
2915            .to_string(),
2916            [],
2917        )
2918        .custom_created_at(Timestamp::from_secs(5))
2919        .to_event(&muted_keys)
2920        .unwrap();
2921        ingest_parsed_event(&graph_store, &profile).unwrap();
2922        assert!(graph_store
2923            .latest_profile_event(&muted_pubkey)
2924            .unwrap()
2925            .is_some());
2926
2927        let mute = EventBuilder::new(
2928            Kind::MuteList,
2929            "",
2930            vec![Tag::public_key(muted_keys.public_key())],
2931        )
2932        .custom_created_at(Timestamp::from_secs(6))
2933        .to_event(&root_keys)
2934        .unwrap();
2935        ingest_parsed_event(&graph_store, &mute).unwrap();
2936        assert!(graph_store
2937            .is_overmuted_user(&muted_keys.public_key().to_bytes(), 1.0)
2938            .unwrap());
2939
2940        let rebuilt = graph_store
2941            .rebuild_profile_index_from_stored_events()
2942            .unwrap();
2943        assert_eq!(rebuilt, 0);
2944        assert!(graph_store
2945            .latest_profile_event(&muted_pubkey)
2946            .unwrap()
2947            .is_none());
2948        assert!(graph_store
2949            .profile_search_entries_for_prefix("p:muted")
2950            .unwrap()
2951            .is_empty());
2952    }
2953
2954    #[test]
2955    fn test_query_events_by_author() {
2956        let _guard = test_lock();
2957        let tmp = TempDir::new().unwrap();
2958        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2959        let keys = Keys::generate();
2960
2961        let older = EventBuilder::new(Kind::TextNote, "older", [])
2962            .custom_created_at(Timestamp::from_secs(5))
2963            .to_event(&keys)
2964            .unwrap();
2965        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2966            .custom_created_at(Timestamp::from_secs(6))
2967            .to_event(&keys)
2968            .unwrap();
2969
2970        ingest_parsed_event(&graph_store, &older).unwrap();
2971        ingest_parsed_event(&graph_store, &newer).unwrap();
2972
2973        let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
2974        let events = query_events(&graph_store, &filter, 10);
2975        assert_eq!(events.len(), 2);
2976        assert_eq!(events[0].id, newer.id);
2977        assert_eq!(events[1].id, older.id);
2978    }
2979
2980    #[test]
2981    fn test_query_events_by_kind() {
2982        let _guard = test_lock();
2983        let tmp = TempDir::new().unwrap();
2984        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2985        let first_keys = Keys::generate();
2986        let second_keys = Keys::generate();
2987
2988        let older = EventBuilder::new(Kind::TextNote, "older", [])
2989            .custom_created_at(Timestamp::from_secs(5))
2990            .to_event(&first_keys)
2991            .unwrap();
2992        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2993            .custom_created_at(Timestamp::from_secs(6))
2994            .to_event(&second_keys)
2995            .unwrap();
2996        let other_kind = EventBuilder::new(Kind::Metadata, "profile", [])
2997            .custom_created_at(Timestamp::from_secs(7))
2998            .to_event(&second_keys)
2999            .unwrap();
3000
3001        ingest_parsed_event(&graph_store, &older).unwrap();
3002        ingest_parsed_event(&graph_store, &newer).unwrap();
3003        ingest_parsed_event(&graph_store, &other_kind).unwrap();
3004
3005        let filter = Filter::new().kind(Kind::TextNote);
3006        let events = query_events(&graph_store, &filter, 10);
3007        assert_eq!(events.len(), 2);
3008        assert_eq!(events[0].id, newer.id);
3009        assert_eq!(events[1].id, older.id);
3010    }
3011
3012    #[test]
3013    fn test_query_events_by_id() {
3014        let _guard = test_lock();
3015        let tmp = TempDir::new().unwrap();
3016        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3017        let keys = Keys::generate();
3018
3019        let first = EventBuilder::new(Kind::TextNote, "first", [])
3020            .custom_created_at(Timestamp::from_secs(5))
3021            .to_event(&keys)
3022            .unwrap();
3023        let target = EventBuilder::new(Kind::TextNote, "target", [])
3024            .custom_created_at(Timestamp::from_secs(6))
3025            .to_event(&keys)
3026            .unwrap();
3027
3028        ingest_parsed_event(&graph_store, &first).unwrap();
3029        ingest_parsed_event(&graph_store, &target).unwrap();
3030
3031        let filter = Filter::new().id(target.id).kind(Kind::TextNote);
3032        let events = query_events(&graph_store, &filter, 10);
3033        assert_eq!(events.len(), 1);
3034        assert_eq!(events[0].id, target.id);
3035    }
3036
3037    #[test]
3038    fn test_query_events_search_is_case_insensitive() {
3039        let _guard = test_lock();
3040        let tmp = TempDir::new().unwrap();
3041        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3042        let keys = Keys::generate();
3043        let other_keys = Keys::generate();
3044
3045        let matching = EventBuilder::new(Kind::TextNote, "Hello Nostr Search", [])
3046            .custom_created_at(Timestamp::from_secs(5))
3047            .to_event(&keys)
3048            .unwrap();
3049        let other = EventBuilder::new(Kind::TextNote, "goodbye world", [])
3050            .custom_created_at(Timestamp::from_secs(6))
3051            .to_event(&other_keys)
3052            .unwrap();
3053
3054        ingest_parsed_event(&graph_store, &matching).unwrap();
3055        ingest_parsed_event(&graph_store, &other).unwrap();
3056
3057        let filter = Filter::new().kind(Kind::TextNote).search("nostr search");
3058        let events = query_events(&graph_store, &filter, 10);
3059        assert_eq!(events.len(), 1);
3060        assert_eq!(events[0].id, matching.id);
3061    }
3062
3063    #[test]
3064    fn test_query_events_since_until_are_inclusive() {
3065        let _guard = test_lock();
3066        let tmp = TempDir::new().unwrap();
3067        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3068        let keys = Keys::generate();
3069
3070        let before = EventBuilder::new(Kind::TextNote, "before", [])
3071            .custom_created_at(Timestamp::from_secs(5))
3072            .to_event(&keys)
3073            .unwrap();
3074        let start = EventBuilder::new(Kind::TextNote, "start", [])
3075            .custom_created_at(Timestamp::from_secs(6))
3076            .to_event(&keys)
3077            .unwrap();
3078        let end = EventBuilder::new(Kind::TextNote, "end", [])
3079            .custom_created_at(Timestamp::from_secs(10))
3080            .to_event(&keys)
3081            .unwrap();
3082        let after = EventBuilder::new(Kind::TextNote, "after", [])
3083            .custom_created_at(Timestamp::from_secs(11))
3084            .to_event(&keys)
3085            .unwrap();
3086
3087        ingest_parsed_event(&graph_store, &before).unwrap();
3088        ingest_parsed_event(&graph_store, &start).unwrap();
3089        ingest_parsed_event(&graph_store, &end).unwrap();
3090        ingest_parsed_event(&graph_store, &after).unwrap();
3091
3092        let filter = Filter::new()
3093            .kind(Kind::TextNote)
3094            .since(Timestamp::from_secs(6))
3095            .until(Timestamp::from_secs(10));
3096        let events = query_events(&graph_store, &filter, 10);
3097        let ids = events.into_iter().map(|event| event.id).collect::<Vec<_>>();
3098        assert_eq!(ids, vec![end.id, start.id]);
3099    }
3100
3101    #[test]
3102    fn test_query_events_replaceable_kind_returns_latest_winner() {
3103        let _guard = test_lock();
3104        let tmp = TempDir::new().unwrap();
3105        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3106        let keys = Keys::generate();
3107
3108        let older = EventBuilder::new(Kind::Custom(10_000), "older mute list", [])
3109            .custom_created_at(Timestamp::from_secs(5))
3110            .to_event(&keys)
3111            .unwrap();
3112        let newer = EventBuilder::new(Kind::Custom(10_000), "newer mute list", [])
3113            .custom_created_at(Timestamp::from_secs(6))
3114            .to_event(&keys)
3115            .unwrap();
3116
3117        ingest_parsed_event(&graph_store, &older).unwrap();
3118        ingest_parsed_event(&graph_store, &newer).unwrap();
3119
3120        let filter = Filter::new()
3121            .author(keys.public_key())
3122            .kind(Kind::Custom(10_000));
3123        let events = query_events(&graph_store, &filter, 10);
3124        assert_eq!(events.len(), 1);
3125        assert_eq!(events[0].id, newer.id);
3126    }
3127
3128    #[test]
3129    fn test_query_events_kind_41_replaceable_returns_latest_winner() {
3130        let _guard = test_lock();
3131        let tmp = TempDir::new().unwrap();
3132        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3133        let keys = Keys::generate();
3134
3135        let older = EventBuilder::new(Kind::Custom(41), "older channel metadata", [])
3136            .custom_created_at(Timestamp::from_secs(5))
3137            .to_event(&keys)
3138            .unwrap();
3139        let newer = EventBuilder::new(Kind::Custom(41), "newer channel metadata", [])
3140            .custom_created_at(Timestamp::from_secs(6))
3141            .to_event(&keys)
3142            .unwrap();
3143
3144        ingest_parsed_event(&graph_store, &older).unwrap();
3145        ingest_parsed_event(&graph_store, &newer).unwrap();
3146
3147        let filter = Filter::new()
3148            .author(keys.public_key())
3149            .kind(Kind::Custom(41));
3150        let events = query_events(&graph_store, &filter, 10);
3151        assert_eq!(events.len(), 1);
3152        assert_eq!(events[0].id, newer.id);
3153    }
3154
3155    #[test]
3156    fn test_public_and_ambient_indexes_stay_separate() {
3157        let _guard = test_lock();
3158        let tmp = TempDir::new().unwrap();
3159        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3160        let public_keys = Keys::generate();
3161        let ambient_keys = Keys::generate();
3162
3163        let public_event = EventBuilder::new(Kind::TextNote, "public", [])
3164            .custom_created_at(Timestamp::from_secs(5))
3165            .to_event(&public_keys)
3166            .unwrap();
3167        let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
3168            .custom_created_at(Timestamp::from_secs(6))
3169            .to_event(&ambient_keys)
3170            .unwrap();
3171
3172        ingest_parsed_event_with_storage_class(
3173            &graph_store,
3174            &public_event,
3175            EventStorageClass::Public,
3176        )
3177        .unwrap();
3178        ingest_parsed_event_with_storage_class(
3179            &graph_store,
3180            &ambient_event,
3181            EventStorageClass::Ambient,
3182        )
3183        .unwrap();
3184
3185        let filter = Filter::new().kind(Kind::TextNote);
3186        let all_events = graph_store
3187            .query_events_in_scope(&filter, 10, EventQueryScope::All)
3188            .unwrap();
3189        assert_eq!(all_events.len(), 2);
3190
3191        let public_events = graph_store
3192            .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3193            .unwrap();
3194        assert_eq!(public_events.len(), 1);
3195        assert_eq!(public_events[0].id, public_event.id);
3196
3197        let ambient_events = graph_store
3198            .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3199            .unwrap();
3200        assert_eq!(ambient_events.len(), 1);
3201        assert_eq!(ambient_events[0].id, ambient_event.id);
3202    }
3203
3204    #[test]
3205    fn test_default_ingest_classifies_root_author_as_public() {
3206        let _guard = test_lock();
3207        let tmp = TempDir::new().unwrap();
3208        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3209        let root_keys = Keys::generate();
3210        let other_keys = Keys::generate();
3211        set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
3212
3213        let root_event = EventBuilder::new(Kind::TextNote, "root", [])
3214            .custom_created_at(Timestamp::from_secs(5))
3215            .to_event(&root_keys)
3216            .unwrap();
3217        let other_event = EventBuilder::new(Kind::TextNote, "other", [])
3218            .custom_created_at(Timestamp::from_secs(6))
3219            .to_event(&other_keys)
3220            .unwrap();
3221
3222        ingest_parsed_event(&graph_store, &root_event).unwrap();
3223        ingest_parsed_event(&graph_store, &other_event).unwrap();
3224
3225        let filter = Filter::new().kind(Kind::TextNote);
3226        let public_events = graph_store
3227            .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3228            .unwrap();
3229        assert_eq!(public_events.len(), 1);
3230        assert_eq!(public_events[0].id, root_event.id);
3231
3232        let ambient_events = graph_store
3233            .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3234            .unwrap();
3235        assert_eq!(ambient_events.len(), 1);
3236        assert_eq!(ambient_events[0].id, other_event.id);
3237    }
3238
3239    #[test]
3240    fn test_query_events_survives_reopen() {
3241        let _guard = test_lock();
3242        let tmp = TempDir::new().unwrap();
3243        let db_dir = tmp.path().join("socialgraph-store");
3244        let keys = Keys::generate();
3245        let other_keys = Keys::generate();
3246
3247        {
3248            let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
3249            let older = EventBuilder::new(Kind::TextNote, "older", [])
3250                .custom_created_at(Timestamp::from_secs(5))
3251                .to_event(&keys)
3252                .unwrap();
3253            let newer = EventBuilder::new(Kind::TextNote, "newer", [])
3254                .custom_created_at(Timestamp::from_secs(6))
3255                .to_event(&keys)
3256                .unwrap();
3257            let latest = EventBuilder::new(Kind::TextNote, "latest", [])
3258                .custom_created_at(Timestamp::from_secs(7))
3259                .to_event(&other_keys)
3260                .unwrap();
3261
3262            ingest_parsed_event(&graph_store, &older).unwrap();
3263            ingest_parsed_event(&graph_store, &newer).unwrap();
3264            ingest_parsed_event(&graph_store, &latest).unwrap();
3265        }
3266
3267        let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
3268
3269        let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
3270        let author_events = query_events(&reopened, &author_filter, 10);
3271        assert_eq!(author_events.len(), 2);
3272        assert_eq!(author_events[0].content, "newer");
3273        assert_eq!(author_events[1].content, "older");
3274
3275        let recent_filter = Filter::new().kind(Kind::TextNote);
3276        let recent_events = query_events(&reopened, &recent_filter, 2);
3277        assert_eq!(recent_events.len(), 2);
3278        assert_eq!(recent_events[0].content, "latest");
3279        assert_eq!(recent_events[1].content, "newer");
3280    }
3281
3282    #[test]
3283    fn test_query_events_parameterized_replaceable_by_d_tag() {
3284        let _guard = test_lock();
3285        let tmp = TempDir::new().unwrap();
3286        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3287        let keys = Keys::generate();
3288
3289        let older = EventBuilder::new(
3290            Kind::Custom(30078),
3291            "",
3292            vec![
3293                Tag::identifier("video"),
3294                Tag::parse(&["l", "hashtree"]).unwrap(),
3295                Tag::parse(&["hash", &"11".repeat(32)]).unwrap(),
3296            ],
3297        )
3298        .custom_created_at(Timestamp::from_secs(5))
3299        .to_event(&keys)
3300        .unwrap();
3301        let newer = EventBuilder::new(
3302            Kind::Custom(30078),
3303            "",
3304            vec![
3305                Tag::identifier("video"),
3306                Tag::parse(&["l", "hashtree"]).unwrap(),
3307                Tag::parse(&["hash", &"22".repeat(32)]).unwrap(),
3308            ],
3309        )
3310        .custom_created_at(Timestamp::from_secs(6))
3311        .to_event(&keys)
3312        .unwrap();
3313        let other_tree = EventBuilder::new(
3314            Kind::Custom(30078),
3315            "",
3316            vec![
3317                Tag::identifier("files"),
3318                Tag::parse(&["l", "hashtree"]).unwrap(),
3319                Tag::parse(&["hash", &"33".repeat(32)]).unwrap(),
3320            ],
3321        )
3322        .custom_created_at(Timestamp::from_secs(7))
3323        .to_event(&keys)
3324        .unwrap();
3325
3326        ingest_parsed_event(&graph_store, &older).unwrap();
3327        ingest_parsed_event(&graph_store, &newer).unwrap();
3328        ingest_parsed_event(&graph_store, &other_tree).unwrap();
3329
3330        let filter = Filter::new()
3331            .author(keys.public_key())
3332            .kind(Kind::Custom(30078))
3333            .identifier("video");
3334        let events = query_events(&graph_store, &filter, 10);
3335        assert_eq!(events.len(), 1);
3336        assert_eq!(events[0].id, newer.id);
3337    }
3338
3339    #[test]
3340    fn test_query_events_by_hashtag_uses_tag_index() {
3341        let _guard = test_lock();
3342        let tmp = TempDir::new().unwrap();
3343        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3344        let keys = Keys::generate();
3345        let other_keys = Keys::generate();
3346
3347        let first = EventBuilder::new(
3348            Kind::TextNote,
3349            "first",
3350            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3351        )
3352        .custom_created_at(Timestamp::from_secs(5))
3353        .to_event(&keys)
3354        .unwrap();
3355        let second = EventBuilder::new(
3356            Kind::TextNote,
3357            "second",
3358            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3359        )
3360        .custom_created_at(Timestamp::from_secs(6))
3361        .to_event(&other_keys)
3362        .unwrap();
3363        let unrelated = EventBuilder::new(
3364            Kind::TextNote,
3365            "third",
3366            vec![Tag::parse(&["t", "other"]).unwrap()],
3367        )
3368        .custom_created_at(Timestamp::from_secs(7))
3369        .to_event(&other_keys)
3370        .unwrap();
3371
3372        ingest_parsed_event(&graph_store, &first).unwrap();
3373        ingest_parsed_event(&graph_store, &second).unwrap();
3374        ingest_parsed_event(&graph_store, &unrelated).unwrap();
3375
3376        let filter = Filter::new().hashtag("hashtree");
3377        let events = query_events(&graph_store, &filter, 10);
3378        assert_eq!(events.len(), 2);
3379        assert_eq!(events[0].id, second.id);
3380        assert_eq!(events[1].id, first.id);
3381    }
3382
3383    #[test]
3384    fn test_query_events_combines_indexes_then_applies_search_filter() {
3385        let _guard = test_lock();
3386        let tmp = TempDir::new().unwrap();
3387        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3388        let keys = Keys::generate();
3389        let other_keys = Keys::generate();
3390
3391        let matching = EventBuilder::new(
3392            Kind::TextNote,
3393            "hashtree video release",
3394            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3395        )
3396        .custom_created_at(Timestamp::from_secs(5))
3397        .to_event(&keys)
3398        .unwrap();
3399        let non_matching = EventBuilder::new(
3400            Kind::TextNote,
3401            "plain text note",
3402            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3403        )
3404        .custom_created_at(Timestamp::from_secs(6))
3405        .to_event(&other_keys)
3406        .unwrap();
3407
3408        ingest_parsed_event(&graph_store, &matching).unwrap();
3409        ingest_parsed_event(&graph_store, &non_matching).unwrap();
3410
3411        let filter = Filter::new().hashtag("hashtree").search("video");
3412        let events = query_events(&graph_store, &filter, 10);
3413        assert_eq!(events.len(), 1);
3414        assert_eq!(events[0].id, matching.id);
3415    }
3416
3417    fn benchmark_dataset_path() -> Option<PathBuf> {
3418        std::env::var_os("HASHTREE_BENCH_DATASET_PATH").map(PathBuf::from)
3419    }
3420
3421    fn benchmark_dataset_url() -> String {
3422        std::env::var("HASHTREE_BENCH_DATASET_URL")
3423            .ok()
3424            .filter(|value| !value.is_empty())
3425            .unwrap_or_else(|| WELLORDER_FIXTURE_URL.to_string())
3426    }
3427
3428    fn benchmark_stream_warmup_events(measured_events: usize) -> usize {
3429        std::env::var("HASHTREE_BENCH_WARMUP_EVENTS")
3430            .ok()
3431            .and_then(|value| value.parse::<usize>().ok())
3432            .unwrap_or_else(|| measured_events.clamp(1, 200))
3433    }
3434
3435    fn ensure_benchmark_dataset(path: &Path, url: &str) -> Result<()> {
3436        if path.exists() {
3437            return Ok(());
3438        }
3439
3440        let parent = path
3441            .parent()
3442            .context("benchmark dataset path has no parent directory")?;
3443        fs::create_dir_all(parent).context("create benchmark dataset directory")?;
3444
3445        let tmp = path.with_extension("tmp");
3446        let mut response = reqwest::blocking::get(url)
3447            .context("download benchmark dataset")?
3448            .error_for_status()
3449            .context("benchmark dataset request failed")?;
3450        let mut file = File::create(&tmp).context("create temporary benchmark dataset file")?;
3451        std::io::copy(&mut response, &mut file).context("write benchmark dataset")?;
3452        fs::rename(&tmp, path).context("move benchmark dataset into place")?;
3453
3454        Ok(())
3455    }
3456
3457    fn load_benchmark_dataset(path: &Path, max_events: usize) -> Result<Vec<Event>> {
3458        if max_events == 0 {
3459            return Ok(Vec::new());
3460        }
3461
3462        let mut child = Command::new("bzip2")
3463            .args(["-dc", &path.to_string_lossy()])
3464            .stdout(Stdio::piped())
3465            .spawn()
3466            .context("spawn bzip2 for benchmark dataset")?;
3467        let stdout = child
3468            .stdout
3469            .take()
3470            .context("benchmark dataset stdout missing")?;
3471        let mut events = Vec::with_capacity(max_events);
3472
3473        {
3474            let reader = BufReader::new(stdout);
3475            for line in reader.lines() {
3476                if events.len() >= max_events {
3477                    break;
3478                }
3479                let line = line.context("read benchmark dataset line")?;
3480                let trimmed = line.trim();
3481                if trimmed.is_empty() {
3482                    continue;
3483                }
3484                if let Ok(event) = Event::from_json(trimmed.to_string()) {
3485                    events.push(event);
3486                }
3487            }
3488        }
3489
3490        if events.len() < max_events {
3491            let status = child.wait().context("wait for benchmark dataset reader")?;
3492            anyhow::ensure!(
3493                status.success(),
3494                "benchmark dataset reader exited with status {status}"
3495            );
3496        } else {
3497            let _ = child.kill();
3498            let _ = child.wait();
3499        }
3500
3501        Ok(events)
3502    }
3503
3504    fn build_synthetic_benchmark_events(event_count: usize, author_count: usize) -> Vec<Event> {
3505        let authors = (0..author_count)
3506            .map(|_| Keys::generate())
3507            .collect::<Vec<_>>();
3508        let mut events = Vec::with_capacity(event_count);
3509        for i in 0..event_count {
3510            let kind = if i % 8 < 5 {
3511                Kind::TextNote
3512            } else {
3513                Kind::Custom(30_023)
3514            };
3515            let mut tags = Vec::new();
3516            if kind == Kind::TextNote && i % 16 == 0 {
3517                tags.push(Tag::parse(&["t", "hashtree"]).unwrap());
3518            }
3519            let content = if kind == Kind::TextNote && i % 32 == 0 {
3520                format!("benchmark target event {i}")
3521            } else {
3522                format!("benchmark event {i}")
3523            };
3524            let event = EventBuilder::new(kind, content, tags)
3525                .custom_created_at(Timestamp::from_secs(1_700_000_000 + i as u64))
3526                .to_event(&authors[i % author_count])
3527                .unwrap();
3528            events.push(event);
3529        }
3530        events
3531    }
3532
3533    fn load_benchmark_events(
3534        event_count: usize,
3535        author_count: usize,
3536    ) -> Result<(String, Vec<Event>)> {
3537        if let Some(path) = benchmark_dataset_path() {
3538            let url = benchmark_dataset_url();
3539            ensure_benchmark_dataset(&path, &url)?;
3540            let events = load_benchmark_dataset(&path, event_count)?;
3541            return Ok((format!("dataset:{}", path.display()), events));
3542        }
3543
3544        Ok((
3545            format!("synthetic:{author_count}-authors"),
3546            build_synthetic_benchmark_events(event_count, author_count),
3547        ))
3548    }
3549
3550    fn first_tag_filter(event: &Event) -> Option<Filter> {
3551        event.tags.iter().find_map(|tag| match tag.as_slice() {
3552            [name, value, ..]
3553                if name.len() == 1
3554                    && !value.is_empty()
3555                    && name.as_bytes()[0].is_ascii_lowercase() =>
3556            {
3557                let letter = SingleLetterTag::from_char(name.chars().next()?).ok()?;
3558                Some(Filter::new().custom_tag(letter, [value.to_string()]))
3559            }
3560            _ => None,
3561        })
3562    }
3563
3564    fn first_search_term(event: &Event) -> Option<String> {
3565        event
3566            .content
3567            .split(|ch: char| !ch.is_alphanumeric())
3568            .find(|token| token.len() >= 4)
3569            .map(|token| token.to_ascii_lowercase())
3570    }
3571
3572    fn benchmark_match_count(events: &[Event], filter: &Filter, limit: usize) -> usize {
3573        events
3574            .iter()
3575            .filter(|event| filter.match_event(event))
3576            .count()
3577            .min(limit)
3578    }
3579
3580    fn benchmark_btree_orders() -> Vec<usize> {
3581        std::env::var("HASHTREE_BTREE_ORDERS")
3582            .ok()
3583            .map(|value| {
3584                value
3585                    .split(',')
3586                    .filter_map(|part| part.trim().parse::<usize>().ok())
3587                    .filter(|order| *order >= 2)
3588                    .collect::<Vec<_>>()
3589            })
3590            .filter(|orders| !orders.is_empty())
3591            .unwrap_or_else(|| vec![16, 24, 32, 48, 64])
3592    }
3593
3594    fn benchmark_read_iterations() -> usize {
3595        std::env::var("HASHTREE_BENCH_READ_ITERATIONS")
3596            .ok()
3597            .and_then(|value| value.parse::<usize>().ok())
3598            .unwrap_or(5)
3599            .max(1)
3600    }
3601
3602    fn average_duration(samples: &[Duration]) -> Duration {
3603        if samples.is_empty() {
3604            return Duration::ZERO;
3605        }
3606
3607        Duration::from_secs_f64(
3608            samples.iter().map(Duration::as_secs_f64).sum::<f64>() / samples.len() as f64,
3609        )
3610    }
3611
3612    fn average_read_trace(samples: &[ReadTraceSnapshot]) -> ReadTraceSnapshot {
3613        if samples.is_empty() {
3614            return ReadTraceSnapshot::default();
3615        }
3616
3617        let len = samples.len() as u64;
3618        ReadTraceSnapshot {
3619            get_calls: samples.iter().map(|sample| sample.get_calls).sum::<u64>() / len,
3620            total_bytes: samples.iter().map(|sample| sample.total_bytes).sum::<u64>() / len,
3621            unique_blocks: (samples
3622                .iter()
3623                .map(|sample| sample.unique_blocks as u64)
3624                .sum::<u64>()
3625                / len) as usize,
3626            unique_bytes: samples
3627                .iter()
3628                .map(|sample| sample.unique_bytes)
3629                .sum::<u64>()
3630                / len,
3631            cache_hits: samples.iter().map(|sample| sample.cache_hits).sum::<u64>() / len,
3632            remote_fetches: samples
3633                .iter()
3634                .map(|sample| sample.remote_fetches)
3635                .sum::<u64>()
3636                / len,
3637            remote_bytes: samples
3638                .iter()
3639                .map(|sample| sample.remote_bytes)
3640                .sum::<u64>()
3641                / len,
3642        }
3643    }
3644
3645    fn estimate_serialized_remote_ms(snapshot: &ReadTraceSnapshot, model: NetworkModel) -> f64 {
3646        let transfer_ms = if model.bandwidth_mib_per_s <= 0.0 {
3647            0.0
3648        } else {
3649            (snapshot.remote_bytes as f64 / (model.bandwidth_mib_per_s * 1024.0 * 1024.0)) * 1000.0
3650        };
3651        snapshot.remote_fetches as f64 * model.rtt_ms + transfer_ms
3652    }
3653
3654    #[derive(Debug, Clone)]
3655    struct IndexBenchmarkDataset {
3656        source: String,
3657        events: Vec<Event>,
3658        guaranteed_tag_name: String,
3659        guaranteed_tag_value: String,
3660        replaceable_pubkey: String,
3661        replaceable_kind: u32,
3662        parameterized_pubkey: String,
3663        parameterized_kind: u32,
3664        parameterized_d_tag: String,
3665    }
3666
3667    fn load_index_benchmark_dataset(
3668        event_count: usize,
3669        author_count: usize,
3670    ) -> Result<IndexBenchmarkDataset> {
3671        let (source, mut events) = load_benchmark_events(event_count, author_count)?;
3672        let base_timestamp = events
3673            .iter()
3674            .map(|event| event.created_at.as_u64())
3675            .max()
3676            .unwrap_or(1_700_000_000)
3677            + 1;
3678
3679        let replaceable_keys = Keys::generate();
3680        let parameterized_keys = Keys::generate();
3681        let tagged_keys = Keys::generate();
3682        let guaranteed_tag_name = "t".to_string();
3683        let guaranteed_tag_value = "btreebench".to_string();
3684        let replaceable_kind = 10_000u32;
3685        let parameterized_kind = 30_023u32;
3686        let parameterized_d_tag = "btree-bench".to_string();
3687
3688        let tagged = EventBuilder::new(
3689            Kind::TextNote,
3690            "btree benchmark tagged note",
3691            vec![Tag::parse(&["t", &guaranteed_tag_value]).unwrap()],
3692        )
3693        .custom_created_at(Timestamp::from_secs(base_timestamp))
3694        .to_event(&tagged_keys)
3695        .unwrap();
3696        let replaceable_old = EventBuilder::new(
3697            Kind::Custom(replaceable_kind.try_into().unwrap()),
3698            "replaceable old",
3699            [],
3700        )
3701        .custom_created_at(Timestamp::from_secs(base_timestamp + 1))
3702        .to_event(&replaceable_keys)
3703        .unwrap();
3704        let replaceable_new = EventBuilder::new(
3705            Kind::Custom(replaceable_kind.try_into().unwrap()),
3706            "replaceable new",
3707            [],
3708        )
3709        .custom_created_at(Timestamp::from_secs(base_timestamp + 2))
3710        .to_event(&replaceable_keys)
3711        .unwrap();
3712        let parameterized_old = EventBuilder::new(
3713            Kind::Custom(parameterized_kind.try_into().unwrap()),
3714            "",
3715            vec![Tag::identifier(&parameterized_d_tag)],
3716        )
3717        .custom_created_at(Timestamp::from_secs(base_timestamp + 3))
3718        .to_event(&parameterized_keys)
3719        .unwrap();
3720        let parameterized_new = EventBuilder::new(
3721            Kind::Custom(parameterized_kind.try_into().unwrap()),
3722            "",
3723            vec![Tag::identifier(&parameterized_d_tag)],
3724        )
3725        .custom_created_at(Timestamp::from_secs(base_timestamp + 4))
3726        .to_event(&parameterized_keys)
3727        .unwrap();
3728
3729        events.extend([
3730            tagged,
3731            replaceable_old,
3732            replaceable_new,
3733            parameterized_old,
3734            parameterized_new,
3735        ]);
3736
3737        Ok(IndexBenchmarkDataset {
3738            source,
3739            events,
3740            guaranteed_tag_name,
3741            guaranteed_tag_value,
3742            replaceable_pubkey: replaceable_keys.public_key().to_hex(),
3743            replaceable_kind,
3744            parameterized_pubkey: parameterized_keys.public_key().to_hex(),
3745            parameterized_kind,
3746            parameterized_d_tag,
3747        })
3748    }
3749
3750    fn build_btree_query_cases(dataset: &IndexBenchmarkDataset) -> Vec<BenchmarkQueryCase> {
3751        let primary_kind = dataset
3752            .events
3753            .iter()
3754            .find(|event| event.kind == Kind::TextNote)
3755            .map(|event| event.kind)
3756            .or_else(|| dataset.events.first().map(|event| event.kind))
3757            .expect("benchmark requires at least one event");
3758        let primary_kind_u32 = primary_kind.as_u16() as u32;
3759
3760        let author_pubkey = dataset
3761            .events
3762            .iter()
3763            .filter(|event| event.kind == primary_kind)
3764            .fold(HashMap::<String, usize>::new(), |mut counts, event| {
3765                *counts.entry(event.pubkey.to_hex()).or_default() += 1;
3766                counts
3767            })
3768            .into_iter()
3769            .max_by_key(|(_, count)| *count)
3770            .map(|(pubkey, _)| pubkey)
3771            .expect("benchmark requires an author for the selected kind");
3772
3773        let by_id_id = dataset.events[dataset.events.len() / 2].id.to_hex();
3774
3775        vec![
3776            BenchmarkQueryCase::ById { id: by_id_id },
3777            BenchmarkQueryCase::ByAuthor {
3778                pubkey: author_pubkey.clone(),
3779                limit: 50,
3780            },
3781            BenchmarkQueryCase::ByAuthorKind {
3782                pubkey: author_pubkey,
3783                kind: primary_kind_u32,
3784                limit: 50,
3785            },
3786            BenchmarkQueryCase::ByKind {
3787                kind: primary_kind_u32,
3788                limit: 200,
3789            },
3790            BenchmarkQueryCase::ByTag {
3791                tag_name: dataset.guaranteed_tag_name.clone(),
3792                tag_value: dataset.guaranteed_tag_value.clone(),
3793                limit: 100,
3794            },
3795            BenchmarkQueryCase::Recent { limit: 100 },
3796            BenchmarkQueryCase::Replaceable {
3797                pubkey: dataset.replaceable_pubkey.clone(),
3798                kind: dataset.replaceable_kind,
3799            },
3800            BenchmarkQueryCase::ParameterizedReplaceable {
3801                pubkey: dataset.parameterized_pubkey.clone(),
3802                kind: dataset.parameterized_kind,
3803                d_tag: dataset.parameterized_d_tag.clone(),
3804            },
3805        ]
3806    }
3807
3808    fn benchmark_warm_query_case<S: Store + 'static>(
3809        base: Arc<S>,
3810        root: &Cid,
3811        order: usize,
3812        case: &BenchmarkQueryCase,
3813        iterations: usize,
3814    ) -> QueryBenchmarkResult {
3815        let trace_store = Arc::new(CountingStore::new(base));
3816        let event_store = NostrEventStore::with_options(
3817            Arc::clone(&trace_store),
3818            NostrEventStoreOptions {
3819                btree_order: Some(order),
3820            },
3821        );
3822        let mut durations = Vec::with_capacity(iterations);
3823        let mut traces = Vec::with_capacity(iterations);
3824        for _ in 0..iterations {
3825            trace_store.reset();
3826            let started = Instant::now();
3827            let matches = block_on(case.execute(&event_store, root)).unwrap();
3828            durations.push(started.elapsed());
3829            traces.push(trace_store.snapshot());
3830            assert!(
3831                matches > 0,
3832                "benchmark query {} returned no matches",
3833                case.name()
3834            );
3835        }
3836        let mut sorted = durations.clone();
3837        sorted.sort_unstable();
3838        QueryBenchmarkResult {
3839            average_duration: average_duration(&durations),
3840            p95_duration: duration_percentile(&sorted, 95, 100),
3841            reads: average_read_trace(&traces),
3842        }
3843    }
3844
3845    fn benchmark_cold_query_case<S: Store + 'static>(
3846        remote: Arc<S>,
3847        root: &Cid,
3848        order: usize,
3849        case: &BenchmarkQueryCase,
3850        iterations: usize,
3851    ) -> QueryBenchmarkResult {
3852        let mut durations = Vec::with_capacity(iterations);
3853        let mut traces = Vec::with_capacity(iterations);
3854        for _ in 0..iterations {
3855            let cache = Arc::new(MemoryStore::new());
3856            let trace_store = Arc::new(ReadThroughStore::new(cache, Arc::clone(&remote)));
3857            let event_store = NostrEventStore::with_options(
3858                Arc::clone(&trace_store),
3859                NostrEventStoreOptions {
3860                    btree_order: Some(order),
3861                },
3862            );
3863            let started = Instant::now();
3864            let matches = block_on(case.execute(&event_store, root)).unwrap();
3865            durations.push(started.elapsed());
3866            traces.push(trace_store.snapshot());
3867            assert!(
3868                matches > 0,
3869                "benchmark query {} returned no matches",
3870                case.name()
3871            );
3872        }
3873        let mut sorted = durations.clone();
3874        sorted.sort_unstable();
3875        QueryBenchmarkResult {
3876            average_duration: average_duration(&durations),
3877            p95_duration: duration_percentile(&sorted, 95, 100),
3878            reads: average_read_trace(&traces),
3879        }
3880    }
3881
3882    fn duration_percentile(
3883        sorted: &[std::time::Duration],
3884        numerator: usize,
3885        denominator: usize,
3886    ) -> std::time::Duration {
3887        if sorted.is_empty() {
3888            return std::time::Duration::ZERO;
3889        }
3890        let index = ((sorted.len() - 1) * numerator) / denominator;
3891        sorted[index]
3892    }
3893
3894    #[test]
3895    #[ignore = "benchmark"]
3896    fn benchmark_query_events_large_dataset() {
3897        let _guard = test_lock();
3898        let tmp = TempDir::new().unwrap();
3899        let graph_store =
3900            open_social_graph_store_with_mapsize(tmp.path(), Some(512 * 1024 * 1024)).unwrap();
3901        set_nostr_profile_enabled(true);
3902        reset_nostr_profile();
3903
3904        let author_count = 64usize;
3905        let measured_event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3906            .ok()
3907            .and_then(|value| value.parse::<usize>().ok())
3908            .unwrap_or(600usize);
3909        let warmup_event_count = benchmark_stream_warmup_events(measured_event_count);
3910        let total_event_count = warmup_event_count + measured_event_count;
3911        let (source, events) = load_benchmark_events(total_event_count, author_count).unwrap();
3912        let loaded_event_count = events.len();
3913        let warmup_event_count = warmup_event_count.min(loaded_event_count.saturating_sub(1));
3914        let (warmup_events, measured_events) = events.split_at(warmup_event_count);
3915
3916        println!(
3917            "starting steady-state dataset benchmark with {} warmup events and {} measured stream events from {}",
3918            warmup_events.len(),
3919            measured_events.len(),
3920            source
3921        );
3922        if !warmup_events.is_empty() {
3923            ingest_parsed_events(&graph_store, warmup_events).unwrap();
3924        }
3925
3926        let stream_start = Instant::now();
3927        let mut per_event_latencies = Vec::with_capacity(measured_events.len());
3928        for event in measured_events {
3929            let event_start = Instant::now();
3930            ingest_parsed_event(&graph_store, event).unwrap();
3931            per_event_latencies.push(event_start.elapsed());
3932        }
3933        let ingest_duration = stream_start.elapsed();
3934
3935        let mut sorted_latencies = per_event_latencies.clone();
3936        sorted_latencies.sort_unstable();
3937        let average_latency = if per_event_latencies.is_empty() {
3938            std::time::Duration::ZERO
3939        } else {
3940            std::time::Duration::from_secs_f64(
3941                per_event_latencies
3942                    .iter()
3943                    .map(std::time::Duration::as_secs_f64)
3944                    .sum::<f64>()
3945                    / per_event_latencies.len() as f64,
3946            )
3947        };
3948        let ingest_capacity_eps = if ingest_duration.is_zero() {
3949            f64::INFINITY
3950        } else {
3951            measured_events.len() as f64 / ingest_duration.as_secs_f64()
3952        };
3953        println!(
3954            "benchmark steady-state ingest complete in {:?} (avg={:?} p50={:?} p95={:?} p99={:?} capacity={:.2} events/s)",
3955            ingest_duration,
3956            average_latency,
3957            duration_percentile(&sorted_latencies, 50, 100),
3958            duration_percentile(&sorted_latencies, 95, 100),
3959            duration_percentile(&sorted_latencies, 99, 100),
3960            ingest_capacity_eps
3961        );
3962        let mut profile = take_nostr_profile();
3963        profile.sort_by(|left, right| right.total.cmp(&left.total));
3964        for stat in profile {
3965            let pct = if ingest_duration.is_zero() {
3966                0.0
3967            } else {
3968                (stat.total.as_secs_f64() / ingest_duration.as_secs_f64()) * 100.0
3969            };
3970            let average = if stat.count == 0 {
3971                std::time::Duration::ZERO
3972            } else {
3973                std::time::Duration::from_secs_f64(stat.total.as_secs_f64() / stat.count as f64)
3974            };
3975            println!(
3976                "ingest profile: label={} total={:?} pct={:.1}% count={} avg={:?} max={:?}",
3977                stat.label, stat.total, pct, stat.count, average, stat.max
3978            );
3979        }
3980        set_nostr_profile_enabled(false);
3981
3982        let kind = events
3983            .iter()
3984            .find(|event| event.kind == Kind::TextNote)
3985            .map(|event| event.kind)
3986            .or_else(|| events.first().map(|event| event.kind))
3987            .expect("benchmark requires at least one event");
3988        let kind_filter = Filter::new().kind(kind);
3989        let kind_start = Instant::now();
3990        let kind_events = query_events(&graph_store, &kind_filter, 200);
3991        let kind_duration = kind_start.elapsed();
3992        assert_eq!(
3993            kind_events.len(),
3994            benchmark_match_count(&events, &kind_filter, 200)
3995        );
3996        assert!(kind_events
3997            .windows(2)
3998            .all(|window| window[0].created_at >= window[1].created_at));
3999
4000        let author_pubkey = events
4001            .iter()
4002            .find(|event| event.kind == kind)
4003            .map(|event| event.pubkey)
4004            .expect("benchmark requires an author for the selected kind");
4005        let author_filter = Filter::new().author(author_pubkey).kind(kind);
4006        let author_start = Instant::now();
4007        let author_events = query_events(&graph_store, &author_filter, 50);
4008        let author_duration = author_start.elapsed();
4009        assert_eq!(
4010            author_events.len(),
4011            benchmark_match_count(&events, &author_filter, 50)
4012        );
4013
4014        let tag_filter = events
4015            .iter()
4016            .find_map(first_tag_filter)
4017            .expect("benchmark requires at least one tagged event");
4018        let tag_start = Instant::now();
4019        let tag_events = query_events(&graph_store, &tag_filter, 100);
4020        let tag_duration = tag_start.elapsed();
4021        assert_eq!(
4022            tag_events.len(),
4023            benchmark_match_count(&events, &tag_filter, 100)
4024        );
4025
4026        let search_source = events
4027            .iter()
4028            .find_map(|event| first_search_term(event).map(|term| (event.kind, term)))
4029            .expect("benchmark requires at least one searchable event");
4030        let search_filter = Filter::new().kind(search_source.0).search(search_source.1);
4031        let search_start = Instant::now();
4032        let search_events = query_events(&graph_store, &search_filter, 100);
4033        let search_duration = search_start.elapsed();
4034        assert_eq!(
4035            search_events.len(),
4036            benchmark_match_count(&events, &search_filter, 100)
4037        );
4038
4039        println!(
4040            "steady-state benchmark: source={} warmup_events={} stream_events={} ingest={:?} avg={:?} p50={:?} p95={:?} p99={:?} capacity_eps={:.2} kind={:?} author={:?} tag={:?} search={:?}",
4041            source,
4042            warmup_events.len(),
4043            measured_events.len(),
4044            ingest_duration,
4045            average_latency,
4046            duration_percentile(&sorted_latencies, 50, 100),
4047            duration_percentile(&sorted_latencies, 95, 100),
4048            duration_percentile(&sorted_latencies, 99, 100),
4049            ingest_capacity_eps,
4050            kind_duration,
4051            author_duration,
4052            tag_duration,
4053            search_duration
4054        );
4055    }
4056
4057    #[test]
4058    #[ignore = "benchmark"]
4059    fn benchmark_nostr_btree_query_tradeoffs() {
4060        let _guard = test_lock();
4061        let event_count = std::env::var("HASHTREE_BENCH_EVENTS")
4062            .ok()
4063            .and_then(|value| value.parse::<usize>().ok())
4064            .unwrap_or(2_000usize);
4065        let iterations = benchmark_read_iterations();
4066        let orders = benchmark_btree_orders();
4067        let dataset = load_index_benchmark_dataset(event_count, 64).unwrap();
4068        let cases = build_btree_query_cases(&dataset);
4069        let stored_events = dataset
4070            .events
4071            .iter()
4072            .map(stored_event_from_nostr)
4073            .collect::<Vec<_>>();
4074
4075        println!(
4076            "btree-order benchmark: source={} events={} iterations={} orders={:?}",
4077            dataset.source,
4078            stored_events.len(),
4079            iterations,
4080            orders
4081        );
4082        println!(
4083            "network models are serialized fetch estimates: {}",
4084            NETWORK_MODELS
4085                .iter()
4086                .map(|model| format!(
4087                    "{}={}ms_rtt/{}MiBps",
4088                    model.name, model.rtt_ms, model.bandwidth_mib_per_s
4089                ))
4090                .collect::<Vec<_>>()
4091                .join(", ")
4092        );
4093
4094        for order in orders {
4095            let tmp = TempDir::new().unwrap();
4096            let local_store =
4097                Arc::new(LocalStore::new(tmp.path().join("blobs"), &StorageBackend::Lmdb).unwrap());
4098            let event_store = NostrEventStore::with_options(
4099                Arc::clone(&local_store),
4100                NostrEventStoreOptions {
4101                    btree_order: Some(order),
4102                },
4103            );
4104            let root = block_on(event_store.build(None, stored_events.clone()))
4105                .unwrap()
4106                .expect("benchmark build root");
4107
4108            println!("btree-order={} root={}", order, hex::encode(root.hash));
4109            let mut warm_total_ms = 0.0f64;
4110            let mut model_totals = NETWORK_MODELS
4111                .iter()
4112                .map(|model| (model.name, 0.0f64))
4113                .collect::<HashMap<_, _>>();
4114
4115            for case in &cases {
4116                let warm = benchmark_warm_query_case(
4117                    Arc::clone(&local_store),
4118                    &root,
4119                    order,
4120                    case,
4121                    iterations,
4122                );
4123                let cold = benchmark_cold_query_case(
4124                    Arc::clone(&local_store),
4125                    &root,
4126                    order,
4127                    case,
4128                    iterations,
4129                );
4130                warm_total_ms += warm.average_duration.as_secs_f64() * 1000.0;
4131
4132                let model_estimates = NETWORK_MODELS
4133                    .iter()
4134                    .map(|model| {
4135                        let estimate = estimate_serialized_remote_ms(&cold.reads, *model);
4136                        *model_totals.get_mut(model.name).unwrap() += estimate;
4137                        format!("{}={:.2}ms", model.name, estimate)
4138                    })
4139                    .collect::<Vec<_>>()
4140                    .join(" ");
4141
4142                println!(
4143                    "btree-order={} query={} warm_avg={:?} warm_p95={:?} warm_blocks={} warm_unique_bytes={} cold_fetches={} cold_bytes={} cold_local_avg={:?} {}",
4144                    order,
4145                    case.name(),
4146                    warm.average_duration,
4147                    warm.p95_duration,
4148                    warm.reads.unique_blocks,
4149                    warm.reads.unique_bytes,
4150                    cold.reads.remote_fetches,
4151                    cold.reads.remote_bytes,
4152                    cold.average_duration,
4153                    model_estimates
4154                );
4155            }
4156
4157            println!(
4158                "btree-order={} summary unweighted_warm_avg_ms={:.3} {}",
4159                order,
4160                warm_total_ms / cases.len() as f64,
4161                NETWORK_MODELS
4162                    .iter()
4163                    .map(|model| format!(
4164                        "{}={:.2}ms",
4165                        model.name,
4166                        model_totals[model.name] / cases.len() as f64
4167                    ))
4168                    .collect::<Vec<_>>()
4169                    .join(" ")
4170            );
4171        }
4172    }
4173
4174    #[test]
4175    fn test_ensure_social_graph_mapsize_rounds_and_applies() {
4176        let _guard = test_lock();
4177        let tmp = TempDir::new().unwrap();
4178        ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
4179        let requested = 70 * 1024 * 1024;
4180        ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
4181        let env = unsafe {
4182            heed::EnvOpenOptions::new()
4183                .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
4184                .max_dbs(SOCIALGRAPH_MAX_DBS)
4185                .open(tmp.path())
4186        }
4187        .unwrap();
4188        assert!(env.info().map_size >= requested as usize);
4189        assert_eq!(env.info().map_size % page_size_bytes(), 0);
4190    }
4191
4192    #[test]
4193    fn test_ingest_events_batches_graph_updates() {
4194        let _guard = test_lock();
4195        let tmp = TempDir::new().unwrap();
4196        let graph_store = open_social_graph_store(tmp.path()).unwrap();
4197
4198        let root_keys = Keys::generate();
4199        let alice_keys = Keys::generate();
4200        let bob_keys = Keys::generate();
4201
4202        let root_pk = root_keys.public_key().to_bytes();
4203        set_social_graph_root(&graph_store, &root_pk);
4204
4205        let root_follows_alice = EventBuilder::new(
4206            Kind::ContactList,
4207            "",
4208            vec![Tag::public_key(alice_keys.public_key())],
4209        )
4210        .custom_created_at(Timestamp::from_secs(10))
4211        .to_event(&root_keys)
4212        .unwrap();
4213        let alice_follows_bob = EventBuilder::new(
4214            Kind::ContactList,
4215            "",
4216            vec![Tag::public_key(bob_keys.public_key())],
4217        )
4218        .custom_created_at(Timestamp::from_secs(11))
4219        .to_event(&alice_keys)
4220        .unwrap();
4221
4222        ingest_parsed_events(
4223            &graph_store,
4224            &[root_follows_alice.clone(), alice_follows_bob.clone()],
4225        )
4226        .unwrap();
4227
4228        assert_eq!(
4229            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4230            Some(1)
4231        );
4232        assert_eq!(
4233            get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
4234            Some(2)
4235        );
4236
4237        let filter = Filter::new().kind(Kind::ContactList);
4238        let stored = query_events(&graph_store, &filter, 10);
4239        let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
4240        assert!(ids.contains(&root_follows_alice.id));
4241        assert!(ids.contains(&alice_follows_bob.id));
4242    }
4243
4244    #[test]
4245    fn test_ingest_graph_events_updates_graph_without_indexing_events() {
4246        let _guard = test_lock();
4247        let tmp = TempDir::new().unwrap();
4248        let graph_store = open_social_graph_store(tmp.path()).unwrap();
4249
4250        let root_keys = Keys::generate();
4251        let alice_keys = Keys::generate();
4252
4253        let root_pk = root_keys.public_key().to_bytes();
4254        set_social_graph_root(&graph_store, &root_pk);
4255
4256        let root_follows_alice = EventBuilder::new(
4257            Kind::ContactList,
4258            "",
4259            vec![Tag::public_key(alice_keys.public_key())],
4260        )
4261        .custom_created_at(Timestamp::from_secs(10))
4262        .to_event(&root_keys)
4263        .unwrap();
4264
4265        ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
4266            .unwrap();
4267
4268        assert_eq!(
4269            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4270            Some(1)
4271        );
4272        let filter = Filter::new().kind(Kind::ContactList);
4273        assert!(query_events(&graph_store, &filter, 10).is_empty());
4274    }
4275}