Skip to main content

hashtree_cli/socialgraph/
mod.rs

1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10    LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
21use hashtree_index::BTree;
22use hashtree_nostr::{
23    is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
24    NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
25};
26#[cfg(test)]
27use hashtree_nostr::{
28    reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
29    take_profile as take_nostr_profile,
30};
31use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
32use nostr_social_graph::{
33    BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
34    SocialGraphBackend as NostrSocialGraphBackend,
35};
36use nostr_social_graph_heed::HeedSocialGraph;
37
38use crate::storage::{LocalStore, StorageRouter};
39
40#[cfg(test)]
41use std::sync::{Mutex, MutexGuard, OnceLock};
42#[cfg(test)]
43use std::time::Instant;
44
45pub type UserSet = BTreeSet<[u8; 32]>;
46
47const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
48const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
49const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
50const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
51const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
52const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
53const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
54const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
55const SOCIALGRAPH_MAX_DBS: u32 = 16;
56const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
57const PROFILE_SEARCH_PREFIX: &str = "p:";
58const PROFILE_NAME_MAX_LENGTH: usize = 100;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum EventStorageClass {
62    Public,
63    Ambient,
64}
65
66#[cfg_attr(not(test), allow(dead_code))]
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub(crate) enum EventQueryScope {
69    PublicOnly,
70    AmbientOnly,
71    All,
72}
73
74struct EventIndexBucket {
75    event_store: NostrEventStore<StorageRouter>,
76    root_path: PathBuf,
77}
78
79struct ProfileIndexBucket {
80    tree: HashTree<StorageRouter>,
81    index: BTree<StorageRouter>,
82    by_pubkey_root_path: PathBuf,
83    search_root_path: PathBuf,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct StoredCid {
88    hash: [u8; 32],
89    key: Option<[u8; 32]>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
93pub struct StoredProfileSearchEntry {
94    pub pubkey: String,
95    pub name: String,
96    #[serde(default)]
97    pub aliases: Vec<String>,
98    #[serde(default)]
99    pub nip05: Option<String>,
100    pub created_at: u64,
101    pub event_nhash: String,
102}
103
104#[derive(Debug, Clone, Default, serde::Serialize)]
105pub struct SocialGraphStats {
106    pub total_users: usize,
107    pub root: Option<String>,
108    pub total_follows: usize,
109    pub max_depth: u32,
110    pub size_by_distance: BTreeMap<u32, usize>,
111    pub enabled: bool,
112}
113
114#[derive(Debug, Clone)]
115struct DistanceCache {
116    stats: SocialGraphStats,
117    users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
118}
119
120#[derive(Debug, thiserror::Error)]
121#[error("{0}")]
122pub struct UpstreamGraphBackendError(String);
123
124pub struct SocialGraphStore {
125    graph: StdMutex<HeedSocialGraph>,
126    distance_cache: StdMutex<Option<DistanceCache>>,
127    public_events: EventIndexBucket,
128    ambient_events: EventIndexBucket,
129    profile_index: ProfileIndexBucket,
130}
131
132pub trait SocialGraphBackend: Send + Sync {
133    fn stats(&self) -> Result<SocialGraphStats>;
134    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
135    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
136    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
137    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
138    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
139    fn profile_search_root(&self) -> Result<Option<Cid>> {
140        Ok(None)
141    }
142    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
143    fn ingest_event(&self, event: &Event) -> Result<()>;
144    fn ingest_event_with_storage_class(
145        &self,
146        event: &Event,
147        storage_class: EventStorageClass,
148    ) -> Result<()> {
149        let _ = storage_class;
150        self.ingest_event(event)
151    }
152    fn ingest_events(&self, events: &[Event]) -> Result<()> {
153        for event in events {
154            self.ingest_event(event)?;
155        }
156        Ok(())
157    }
158    fn ingest_events_with_storage_class(
159        &self,
160        events: &[Event],
161        storage_class: EventStorageClass,
162    ) -> Result<()> {
163        for event in events {
164            self.ingest_event_with_storage_class(event, storage_class)?;
165        }
166        Ok(())
167    }
168    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
169        self.ingest_events(events)
170    }
171    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
172}
173
174#[cfg(test)]
175pub type TestLockGuard = MutexGuard<'static, ()>;
176
177#[cfg(test)]
178static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
179
180#[cfg(test)]
181pub fn test_lock() -> TestLockGuard {
182    NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
183}
184
185pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
186    open_social_graph_store_with_mapsize(data_dir, None)
187}
188
189pub fn open_social_graph_store_with_mapsize(
190    data_dir: &Path,
191    mapsize_bytes: Option<u64>,
192) -> Result<Arc<SocialGraphStore>> {
193    let db_dir = data_dir.join("socialgraph");
194    open_social_graph_store_at_path(&db_dir, mapsize_bytes)
195}
196
197pub fn open_social_graph_store_with_storage(
198    data_dir: &Path,
199    store: Arc<StorageRouter>,
200    mapsize_bytes: Option<u64>,
201) -> Result<Arc<SocialGraphStore>> {
202    let db_dir = data_dir.join("socialgraph");
203    open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
204}
205
206pub fn open_social_graph_store_at_path(
207    db_dir: &Path,
208    mapsize_bytes: Option<u64>,
209) -> Result<Arc<SocialGraphStore>> {
210    let config = hashtree_config::Config::load_or_default();
211    let backend = &config.storage.backend;
212    let local_store = Arc::new(
213        LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
214            .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
215    );
216    let store = Arc::new(StorageRouter::new(local_store));
217    open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
218}
219
220pub fn open_social_graph_store_at_path_with_storage(
221    db_dir: &Path,
222    store: Arc<StorageRouter>,
223    mapsize_bytes: Option<u64>,
224) -> Result<Arc<SocialGraphStore>> {
225    let ambient_backend = store.local_store().backend();
226    let ambient_local = Arc::new(
227        LocalStore::new_with_lmdb_map_size(
228            db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
229            &ambient_backend,
230            mapsize_bytes,
231        )
232        .map_err(|err| {
233            anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
234        })?,
235    );
236    let ambient_store = Arc::new(StorageRouter::new(ambient_local));
237    open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
238}
239
240pub fn open_social_graph_store_at_path_with_storage_split(
241    db_dir: &Path,
242    public_store: Arc<StorageRouter>,
243    ambient_store: Arc<StorageRouter>,
244    mapsize_bytes: Option<u64>,
245) -> Result<Arc<SocialGraphStore>> {
246    std::fs::create_dir_all(db_dir)?;
247    if let Some(size) = mapsize_bytes {
248        ensure_social_graph_mapsize(db_dir, size)?;
249    }
250    let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
251        .context("open nostr-social-graph heed backend")?;
252
253    Ok(Arc::new(SocialGraphStore {
254        graph: StdMutex::new(graph),
255        distance_cache: StdMutex::new(None),
256        public_events: EventIndexBucket {
257            event_store: NostrEventStore::new(Arc::clone(&public_store)),
258            root_path: db_dir.join(EVENTS_ROOT_FILE),
259        },
260        ambient_events: EventIndexBucket {
261            event_store: NostrEventStore::new(ambient_store),
262            root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
263        },
264        profile_index: ProfileIndexBucket {
265            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
266            index: BTree::new(
267                public_store,
268                hashtree_index::BTreeOptions {
269                    order: Some(PROFILE_SEARCH_INDEX_ORDER),
270                },
271            ),
272            by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
273            search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
274        },
275    }))
276}
277
278pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
279    if let Err(err) = store.set_root(pk_bytes) {
280        tracing::warn!("Failed to set social graph root: {err}");
281    }
282}
283
284pub fn get_follow_distance(
285    backend: &(impl SocialGraphBackend + ?Sized),
286    pk_bytes: &[u8; 32],
287) -> Option<u32> {
288    backend.follow_distance(pk_bytes).ok().flatten()
289}
290
291pub fn get_follows(
292    backend: &(impl SocialGraphBackend + ?Sized),
293    pk_bytes: &[u8; 32],
294) -> Vec<[u8; 32]> {
295    match backend.followed_targets(pk_bytes) {
296        Ok(set) => set.into_iter().collect(),
297        Err(_) => Vec::new(),
298    }
299}
300
301pub fn is_overmuted(
302    backend: &(impl SocialGraphBackend + ?Sized),
303    _root_pk: &[u8; 32],
304    user_pk: &[u8; 32],
305    threshold: f64,
306) -> bool {
307    backend
308        .is_overmuted_user(user_pk, threshold)
309        .unwrap_or(false)
310}
311
312pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
313    let event = match Event::from_json(event_json) {
314        Ok(event) => event,
315        Err(_) => return,
316    };
317
318    if let Err(err) = backend.ingest_event(&event) {
319        tracing::warn!("Failed to ingest social graph event: {err}");
320    }
321}
322
323pub fn ingest_parsed_event(
324    backend: &(impl SocialGraphBackend + ?Sized),
325    event: &Event,
326) -> Result<()> {
327    backend.ingest_event(event)
328}
329
330pub fn ingest_parsed_event_with_storage_class(
331    backend: &(impl SocialGraphBackend + ?Sized),
332    event: &Event,
333    storage_class: EventStorageClass,
334) -> Result<()> {
335    backend.ingest_event_with_storage_class(event, storage_class)
336}
337
338pub fn ingest_parsed_events(
339    backend: &(impl SocialGraphBackend + ?Sized),
340    events: &[Event],
341) -> Result<()> {
342    backend.ingest_events(events)
343}
344
345pub fn ingest_parsed_events_with_storage_class(
346    backend: &(impl SocialGraphBackend + ?Sized),
347    events: &[Event],
348    storage_class: EventStorageClass,
349) -> Result<()> {
350    backend.ingest_events_with_storage_class(events, storage_class)
351}
352
353pub fn ingest_graph_parsed_events(
354    backend: &(impl SocialGraphBackend + ?Sized),
355    events: &[Event],
356) -> Result<()> {
357    backend.ingest_graph_events(events)
358}
359
360pub fn query_events(
361    backend: &(impl SocialGraphBackend + ?Sized),
362    filter: &Filter,
363    limit: usize,
364) -> Vec<Event> {
365    backend.query_events(filter, limit).unwrap_or_default()
366}
367
368impl SocialGraphStore {
369    fn invalidate_distance_cache(&self) {
370        *self.distance_cache.lock().unwrap() = None;
371    }
372
373    fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
374        let unique_ids = state
375            .unique_ids
376            .into_iter()
377            .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
378            .collect::<Result<HashMap<_, _>>>()?;
379
380        let mut users_by_distance = BTreeMap::new();
381        let mut size_by_distance = BTreeMap::new();
382        for (distance, users) in state.users_by_follow_distance {
383            let decoded = users
384                .into_iter()
385                .filter_map(|id| unique_ids.get(&id).copied())
386                .collect::<Vec<_>>();
387            size_by_distance.insert(distance, decoded.len());
388            users_by_distance.insert(distance, decoded);
389        }
390
391        let total_follows = state
392            .followed_by_user
393            .iter()
394            .map(|(_, targets)| targets.len())
395            .sum::<usize>();
396        let total_users = size_by_distance.values().copied().sum();
397        let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
398
399        Ok(DistanceCache {
400            stats: SocialGraphStats {
401                total_users,
402                root: Some(state.root),
403                total_follows,
404                max_depth,
405                size_by_distance,
406                enabled: true,
407            },
408            users_by_distance,
409        })
410    }
411
412    fn load_distance_cache(&self) -> Result<DistanceCache> {
413        if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
414            return Ok(cache);
415        }
416
417        let state = {
418            let graph = self.graph.lock().unwrap();
419            graph.export_state().context("export social graph state")?
420        };
421        let cache = Self::build_distance_cache(state)?;
422        *self.distance_cache.lock().unwrap() = Some(cache.clone());
423        Ok(cache)
424    }
425
426    fn set_root(&self, root: &[u8; 32]) -> Result<()> {
427        let root_hex = hex::encode(root);
428        {
429            let mut graph = self.graph.lock().unwrap();
430            if should_replace_placeholder_root(&graph)? {
431                let fresh = SocialGraph::new(&root_hex);
432                graph
433                    .replace_state(&fresh.export_state())
434                    .context("replace placeholder social graph root")?;
435            } else {
436                graph
437                    .set_root(&root_hex)
438                    .context("set nostr-social-graph root")?;
439            }
440        }
441        self.invalidate_distance_cache();
442        Ok(())
443    }
444
445    fn stats(&self) -> Result<SocialGraphStats> {
446        Ok(self.load_distance_cache()?.stats)
447    }
448
449    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
450        let graph = self.graph.lock().unwrap();
451        let distance = graph
452            .get_follow_distance(&hex::encode(pk_bytes))
453            .context("read social graph follow distance")?;
454        Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
455    }
456
457    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
458        Ok(self
459            .load_distance_cache()?
460            .users_by_distance
461            .get(&distance)
462            .cloned()
463            .unwrap_or_default())
464    }
465
466    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
467        let graph = self.graph.lock().unwrap();
468        graph
469            .get_follow_list_created_at(&hex::encode(owner))
470            .context("read social graph follow list timestamp")
471    }
472
473    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
474        let graph = self.graph.lock().unwrap();
475        decode_pubkey_set(
476            graph
477                .get_followed_by_user(&hex::encode(owner))
478                .context("read followed targets")?,
479        )
480    }
481
482    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
483        if threshold <= 0.0 {
484            return Ok(false);
485        }
486        let graph = self.graph.lock().unwrap();
487        graph
488            .is_overmuted(&hex::encode(user_pk), threshold)
489            .context("check social graph overmute")
490    }
491
492    #[cfg_attr(not(test), allow(dead_code))]
493    pub fn profile_search_root(&self) -> Result<Option<Cid>> {
494        self.profile_index.search_root()
495    }
496
497    pub(crate) fn public_events_root(&self) -> Result<Option<Cid>> {
498        self.public_events.events_root()
499    }
500
501    pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
502        self.public_events.write_events_root(root)
503    }
504
505    #[cfg_attr(not(test), allow(dead_code))]
506    pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
507        self.profile_index.profile_event_for_pubkey(pubkey_hex)
508    }
509
510    #[cfg_attr(not(test), allow(dead_code))]
511    pub fn profile_search_entries_for_prefix(
512        &self,
513        prefix: &str,
514    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
515        self.profile_index.search_entries_for_prefix(prefix)
516    }
517
518    pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
519        self.update_profile_index_for_events(events)
520    }
521
522    pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
523        let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
524        let (by_pubkey_root, search_root) = self
525            .profile_index
526            .rebuild_profile_events(latest_by_pubkey.into_values())?;
527        self.profile_index
528            .write_by_pubkey_root(by_pubkey_root.as_ref())?;
529        self.profile_index.write_search_root(search_root.as_ref())?;
530        Ok(())
531    }
532
533    pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
534        let public_events_root = self.public_events.events_root()?;
535        let ambient_events_root = self.ambient_events.events_root()?;
536        if public_events_root.is_none() && ambient_events_root.is_none() {
537            self.profile_index.write_by_pubkey_root(None)?;
538            self.profile_index.write_search_root(None)?;
539            return Ok(0);
540        }
541
542        let mut events = Vec::new();
543        for (bucket, root) in [
544            (&self.public_events, public_events_root),
545            (&self.ambient_events, ambient_events_root),
546        ] {
547            let Some(root) = root else {
548                continue;
549            };
550            let stored = block_on(bucket.event_store.list_by_kind_lossy(
551                Some(&root),
552                Kind::Metadata.as_u16() as u32,
553                ListEventsOptions::default(),
554            ))
555            .map_err(map_event_store_error)?;
556            events.extend(
557                stored
558                    .into_iter()
559                    .map(nostr_event_from_stored)
560                    .collect::<Result<Vec<_>>>()?,
561            );
562        }
563
564        let latest_count = latest_metadata_events_by_pubkey(&events).len();
565        self.rebuild_profile_index_for_events(&events)?;
566        Ok(latest_count)
567    }
568
569    fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
570        let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
571
572        if latest_by_pubkey.is_empty() {
573            return Ok(());
574        }
575
576        let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
577        let mut search_root = self.profile_index.search_root()?;
578        let mut changed = false;
579
580        for event in latest_by_pubkey.into_values() {
581            let (next_by_pubkey_root, next_search_root, updated) = self
582                .profile_index
583                .update_profile_event(by_pubkey_root.as_ref(), search_root.as_ref(), event)?;
584            if updated {
585                by_pubkey_root = next_by_pubkey_root;
586                search_root = next_search_root;
587                changed = true;
588            }
589        }
590
591        if changed {
592            self.profile_index
593                .write_by_pubkey_root(by_pubkey_root.as_ref())?;
594            self.profile_index.write_search_root(search_root.as_ref())?;
595        }
596
597        Ok(())
598    }
599
600    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
601        let state = {
602            let graph = self.graph.lock().unwrap();
603            graph.export_state().context("export social graph state")?
604        };
605        let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
606        let root_hex = hex::encode(root);
607        if graph.get_root() != root_hex {
608            graph
609                .set_root(&root_hex)
610                .context("set snapshot social graph root")?;
611        }
612        let chunks = graph
613            .to_binary_chunks_with_budget(*options)
614            .context("encode social graph snapshot")?;
615        Ok(chunks.into_iter().map(Bytes::from).collect())
616    }
617
618    fn ingest_event(&self, event: &Event) -> Result<()> {
619        self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
620    }
621
622    fn ingest_events(&self, events: &[Event]) -> Result<()> {
623        if events.is_empty() {
624            return Ok(());
625        }
626
627        let mut public = Vec::new();
628        let mut ambient = Vec::new();
629        for event in events {
630            match self.default_storage_class_for(event)? {
631                EventStorageClass::Public => public.push(event.clone()),
632                EventStorageClass::Ambient => ambient.push(event.clone()),
633            }
634        }
635
636        if !public.is_empty() {
637            self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
638        }
639        if !ambient.is_empty() {
640            self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
641        }
642
643        Ok(())
644    }
645
646    fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
647        let graph_events = events
648            .iter()
649            .filter(|event| is_social_graph_event(event.kind))
650            .collect::<Vec<_>>();
651        if graph_events.is_empty() {
652            return Ok(());
653        }
654
655        {
656            let mut graph = self.graph.lock().unwrap();
657            let mut snapshot = SocialGraph::from_state(
658                graph
659                    .export_state()
660                    .context("export social graph state for graph-only ingest")?,
661            )
662            .context("rebuild social graph state for graph-only ingest")?;
663            for event in graph_events {
664                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
665            }
666            graph
667                .replace_state(&snapshot.export_state())
668                .context("replace graph-only social graph state")?;
669        }
670        self.invalidate_distance_cache();
671        Ok(())
672    }
673
674    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
675        self.query_events_in_scope(filter, limit, EventQueryScope::All)
676    }
677
678    fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
679        let graph = self.graph.lock().unwrap();
680        let root_hex = graph.get_root().context("read social graph root")?;
681        if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
682            return Ok(EventStorageClass::Public);
683        }
684        Ok(EventStorageClass::Ambient)
685    }
686
687    fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
688        match storage_class {
689            EventStorageClass::Public => &self.public_events,
690            EventStorageClass::Ambient => &self.ambient_events,
691        }
692    }
693
694    fn ingest_event_with_storage_class(
695        &self,
696        event: &Event,
697        storage_class: EventStorageClass,
698    ) -> Result<()> {
699        let current_root = self.bucket(storage_class).events_root()?;
700        let next_root = self
701            .bucket(storage_class)
702            .store_event(current_root.as_ref(), event)?;
703        self.bucket(storage_class)
704            .write_events_root(Some(&next_root))?;
705
706        self.update_profile_index_for_events(std::slice::from_ref(event))?;
707
708        if is_social_graph_event(event.kind) {
709            {
710                let mut graph = self.graph.lock().unwrap();
711                graph
712                    .handle_event(&graph_event_from_nostr(event), true, 0.0)
713                    .context("ingest social graph event into nostr-social-graph")?;
714            }
715            self.invalidate_distance_cache();
716        }
717
718        Ok(())
719    }
720
721    fn ingest_events_with_storage_class(
722        &self,
723        events: &[Event],
724        storage_class: EventStorageClass,
725    ) -> Result<()> {
726        if events.is_empty() {
727            return Ok(());
728        }
729
730        let bucket = self.bucket(storage_class);
731        let current_root = bucket.events_root()?;
732        let stored_events = events
733            .iter()
734            .map(stored_event_from_nostr)
735            .collect::<Vec<_>>();
736        let next_root = block_on(
737            bucket
738                .event_store
739                .build(current_root.as_ref(), stored_events),
740        )
741        .map_err(map_event_store_error)?;
742        bucket.write_events_root(next_root.as_ref())?;
743
744        self.update_profile_index_for_events(events)?;
745
746        let graph_events = events
747            .iter()
748            .filter(|event| is_social_graph_event(event.kind))
749            .collect::<Vec<_>>();
750        if graph_events.is_empty() {
751            return Ok(());
752        }
753
754        {
755            let mut graph = self.graph.lock().unwrap();
756            let mut snapshot = SocialGraph::from_state(
757                graph
758                    .export_state()
759                    .context("export social graph state for batch ingest")?,
760            )
761            .context("rebuild social graph state for batch ingest")?;
762            for event in graph_events {
763                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
764            }
765            graph
766                .replace_state(&snapshot.export_state())
767                .context("replace batched social graph state")?;
768        }
769        self.invalidate_distance_cache();
770
771        Ok(())
772    }
773
774    pub(crate) fn query_events_in_scope(
775        &self,
776        filter: &Filter,
777        limit: usize,
778        scope: EventQueryScope,
779    ) -> Result<Vec<Event>> {
780        if limit == 0 {
781            return Ok(Vec::new());
782        }
783
784        let buckets: &[&EventIndexBucket] = match scope {
785            EventQueryScope::PublicOnly => &[&self.public_events],
786            EventQueryScope::AmbientOnly => &[&self.ambient_events],
787            EventQueryScope::All => &[&self.public_events, &self.ambient_events],
788        };
789
790        let mut candidates = Vec::new();
791        for bucket in buckets {
792            candidates.extend(bucket.query_events(filter, limit)?);
793        }
794
795        let mut deduped = dedupe_events(candidates);
796        deduped.retain(|event| filter.match_event(event));
797        deduped.truncate(limit);
798        Ok(deduped)
799    }
800}
801
802impl EventIndexBucket {
803    fn events_root(&self) -> Result<Option<Cid>> {
804        let _profile = NostrProfileGuard::new("socialgraph.events_root.read");
805        read_root_file(&self.root_path)
806    }
807
808    fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
809        let _profile = NostrProfileGuard::new("socialgraph.events_root.write");
810        write_root_file(&self.root_path, root)
811    }
812
813    fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
814        let stored = stored_event_from_nostr(event);
815        let _profile = NostrProfileGuard::new("socialgraph.event_store.add");
816        block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
817    }
818
819    fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
820        let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
821            .map_err(map_event_store_error)?;
822        stored.map(nostr_event_from_stored).transpose()
823    }
824
825    fn load_events_for_author(
826        &self,
827        root: &Cid,
828        author: &nostr::PublicKey,
829        filter: &Filter,
830        limit: usize,
831        exact: bool,
832    ) -> Result<Vec<Event>> {
833        let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
834            if kinds.len() == 1 {
835                kinds.iter().next().map(|kind| kind.as_u16() as u32)
836            } else {
837                None
838            }
839        });
840        let author_hex = author.to_hex();
841        let options = filter_list_options(filter, limit, exact);
842        let stored = match kind_filter {
843            Some(kind) => block_on(self.event_store.list_by_author_and_kind(
844                Some(root),
845                &author_hex,
846                kind,
847                options.clone(),
848            ))
849            .map_err(map_event_store_error)?,
850            None => block_on(
851                self.event_store
852                    .list_by_author(Some(root), &author_hex, options),
853            )
854            .map_err(map_event_store_error)?,
855        };
856        stored
857            .into_iter()
858            .map(nostr_event_from_stored)
859            .collect::<Result<Vec<_>>>()
860    }
861
862    fn load_events_for_kind(
863        &self,
864        root: &Cid,
865        kind: Kind,
866        filter: &Filter,
867        limit: usize,
868        exact: bool,
869    ) -> Result<Vec<Event>> {
870        let stored = block_on(self.event_store.list_by_kind(
871            Some(root),
872            kind.as_u16() as u32,
873            filter_list_options(filter, limit, exact),
874        ))
875        .map_err(map_event_store_error)?;
876        stored
877            .into_iter()
878            .map(nostr_event_from_stored)
879            .collect::<Result<Vec<_>>>()
880    }
881
882    fn load_recent_events(
883        &self,
884        root: &Cid,
885        filter: &Filter,
886        limit: usize,
887        exact: bool,
888    ) -> Result<Vec<Event>> {
889        let stored = block_on(
890            self.event_store
891                .list_recent(Some(root), filter_list_options(filter, limit, exact)),
892        )
893        .map_err(map_event_store_error)?;
894        stored
895            .into_iter()
896            .map(nostr_event_from_stored)
897            .collect::<Result<Vec<_>>>()
898    }
899
900    fn load_events_for_tag(
901        &self,
902        root: &Cid,
903        tag_name: &str,
904        values: &[String],
905        filter: &Filter,
906        limit: usize,
907        exact: bool,
908    ) -> Result<Vec<Event>> {
909        let mut events = Vec::new();
910        let options = filter_list_options(filter, limit, exact);
911        for value in values {
912            let stored = block_on(self.event_store.list_by_tag(
913                Some(root),
914                tag_name,
915                value,
916                options.clone(),
917            ))
918            .map_err(map_event_store_error)?;
919            events.extend(
920                stored
921                    .into_iter()
922                    .map(nostr_event_from_stored)
923                    .collect::<Result<Vec<_>>>()?,
924            );
925        }
926        Ok(dedupe_events(events))
927    }
928
929    fn choose_tag_source(&self, filter: &Filter) -> Option<(String, Vec<String>)> {
930        filter
931            .generic_tags
932            .iter()
933            .min_by_key(|(_, values)| values.len())
934            .map(|(tag, values)| {
935                (
936                    tag.as_char().to_ascii_lowercase().to_string(),
937                    values.iter().cloned().collect(),
938                )
939            })
940    }
941
942    fn load_major_index_candidates(
943        &self,
944        root: &Cid,
945        filter: &Filter,
946        limit: usize,
947    ) -> Result<Option<Vec<Event>>> {
948        if let Some(events) = self.load_direct_replaceable_candidates(root, filter)? {
949            return Ok(Some(events));
950        }
951
952        if let Some((tag_name, values)) = self.choose_tag_source(filter) {
953            let exact = filter.authors.is_none()
954                && filter.kinds.is_none()
955                && filter.search.is_none()
956                && filter.generic_tags.len() == 1;
957            return Ok(Some(self.load_events_for_tag(
958                root, &tag_name, &values, filter, limit, exact,
959            )?));
960        }
961
962        if let (Some(authors), Some(kinds)) = (filter.authors.as_ref(), filter.kinds.as_ref()) {
963            if authors.len() == 1 && kinds.len() == 1 {
964                let author = authors.iter().next().expect("checked single author");
965                let exact = filter.generic_tags.is_empty() && filter.search.is_none();
966                return Ok(Some(
967                    self.load_events_for_author(root, author, filter, limit, exact)?,
968                ));
969            }
970
971            if kinds.len() < authors.len() {
972                let mut events = Vec::new();
973                for kind in kinds {
974                    events.extend(self.load_events_for_kind(root, *kind, filter, limit, false)?);
975                }
976                return Ok(Some(dedupe_events(events)));
977            }
978
979            let mut events = Vec::new();
980            for author in authors {
981                events.extend(self.load_events_for_author(root, author, filter, limit, false)?);
982            }
983            return Ok(Some(dedupe_events(events)));
984        }
985
986        if let Some(authors) = filter.authors.as_ref() {
987            let mut events = Vec::new();
988            let exact = filter.generic_tags.is_empty() && filter.search.is_none();
989            for author in authors {
990                events.extend(self.load_events_for_author(root, author, filter, limit, exact)?);
991            }
992            return Ok(Some(dedupe_events(events)));
993        }
994
995        if let Some(kinds) = filter.kinds.as_ref() {
996            let mut events = Vec::new();
997            let exact = filter.authors.is_none()
998                && filter.generic_tags.is_empty()
999                && filter.search.is_none();
1000            for kind in kinds {
1001                events.extend(self.load_events_for_kind(root, *kind, filter, limit, exact)?);
1002            }
1003            return Ok(Some(dedupe_events(events)));
1004        }
1005
1006        Ok(None)
1007    }
1008
1009    fn load_direct_replaceable_candidates(
1010        &self,
1011        root: &Cid,
1012        filter: &Filter,
1013    ) -> Result<Option<Vec<Event>>> {
1014        let Some(authors) = filter.authors.as_ref() else {
1015            return Ok(None);
1016        };
1017        let Some(kinds) = filter.kinds.as_ref() else {
1018            return Ok(None);
1019        };
1020        if kinds.len() != 1 {
1021            return Ok(None);
1022        }
1023
1024        let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
1025
1026        if is_parameterized_replaceable_kind(kind) {
1027            let d_tag = SingleLetterTag::lowercase(nostr::Alphabet::D);
1028            let Some(d_values) = filter.generic_tags.get(&d_tag) else {
1029                return Ok(None);
1030            };
1031            let mut events = Vec::new();
1032            for author in authors {
1033                let author_hex = author.to_hex();
1034                for d_value in d_values {
1035                    if let Some(stored) = block_on(self.event_store.get_parameterized_replaceable(
1036                        Some(root),
1037                        &author_hex,
1038                        kind,
1039                        d_value,
1040                    ))
1041                    .map_err(map_event_store_error)?
1042                    {
1043                        events.push(nostr_event_from_stored(stored)?);
1044                    }
1045                }
1046            }
1047            return Ok(Some(dedupe_events(events)));
1048        }
1049
1050        if is_replaceable_kind(kind) {
1051            let mut events = Vec::new();
1052            for author in authors {
1053                if let Some(stored) = block_on(self.event_store.get_replaceable(
1054                    Some(root),
1055                    &author.to_hex(),
1056                    kind,
1057                ))
1058                .map_err(map_event_store_error)?
1059                {
1060                    events.push(nostr_event_from_stored(stored)?);
1061                }
1062            }
1063            return Ok(Some(dedupe_events(events)));
1064        }
1065
1066        Ok(None)
1067    }
1068
1069    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1070        if limit == 0 {
1071            return Ok(Vec::new());
1072        }
1073
1074        let events_root = self.events_root()?;
1075        let Some(root) = events_root.as_ref() else {
1076            return Ok(Vec::new());
1077        };
1078        let mut candidates = Vec::new();
1079        let mut seen: HashSet<[u8; 32]> = HashSet::new();
1080
1081        if let Some(ids) = filter.ids.as_ref() {
1082            for id in ids {
1083                let id_bytes = id.to_bytes();
1084                if !seen.insert(id_bytes) {
1085                    continue;
1086                }
1087                if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
1088                    if filter.match_event(&event) {
1089                        candidates.push(event);
1090                    }
1091                }
1092                if candidates.len() >= limit {
1093                    break;
1094                }
1095            }
1096        } else {
1097            let base_events = match self.load_major_index_candidates(root, filter, limit)? {
1098                Some(events) => events,
1099                None => self.load_recent_events(
1100                    root,
1101                    filter,
1102                    limit,
1103                    filter.authors.is_none()
1104                        && filter.kinds.is_none()
1105                        && filter.generic_tags.is_empty()
1106                        && filter.search.is_none(),
1107                )?,
1108            };
1109
1110            for event in base_events {
1111                let id_bytes = event.id.to_bytes();
1112                if !seen.insert(id_bytes) {
1113                    continue;
1114                }
1115                if filter.match_event(&event) {
1116                    candidates.push(event);
1117                }
1118                if candidates.len() >= limit {
1119                    break;
1120                }
1121            }
1122        }
1123
1124        candidates.sort_by(|a, b| {
1125            b.created_at
1126                .as_u64()
1127                .cmp(&a.created_at.as_u64())
1128                .then_with(|| a.id.cmp(&b.id))
1129        });
1130        candidates.truncate(limit);
1131        Ok(candidates)
1132    }
1133}
1134
1135impl ProfileIndexBucket {
1136    fn by_pubkey_root(&self) -> Result<Option<Cid>> {
1137        let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.read");
1138        read_root_file(&self.by_pubkey_root_path)
1139    }
1140
1141    fn search_root(&self) -> Result<Option<Cid>> {
1142        let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.read");
1143        read_root_file(&self.search_root_path)
1144    }
1145
1146    fn write_by_pubkey_root(&self, root: Option<&Cid>) -> Result<()> {
1147        let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.write");
1148        write_root_file(&self.by_pubkey_root_path, root)
1149    }
1150
1151    fn write_search_root(&self, root: Option<&Cid>) -> Result<()> {
1152        let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.write");
1153        write_root_file(&self.search_root_path, root)
1154    }
1155
1156    fn mirror_profile_event(&self, event: &Event) -> Result<Cid> {
1157        let bytes = event.as_json().into_bytes();
1158        block_on(self.tree.put_file(&bytes))
1159            .map(|(cid, _size)| cid)
1160            .context("store mirrored profile event")
1161    }
1162
1163    fn load_profile_event(&self, cid: &Cid) -> Result<Option<Event>> {
1164        let bytes = block_on(self.tree.get(cid, None)).context("read mirrored profile event")?;
1165        let Some(bytes) = bytes else {
1166            return Ok(None);
1167        };
1168        let json = String::from_utf8(bytes).context("decode mirrored profile event as utf-8")?;
1169        Ok(Some(
1170            Event::from_json(json).context("decode mirrored profile event json")?,
1171        ))
1172    }
1173
1174    #[cfg_attr(not(test), allow(dead_code))]
1175    fn profile_event_for_pubkey(&self, pubkey_hex: &str) -> Result<Option<Event>> {
1176        let root = self.by_pubkey_root()?;
1177        let Some(cid) = block_on(self.index.get_link(root.as_ref(), pubkey_hex))
1178            .context("read mirrored profile event cid by pubkey")?
1179        else {
1180            return Ok(None);
1181        };
1182        self.load_profile_event(&cid)
1183    }
1184
1185    #[cfg_attr(not(test), allow(dead_code))]
1186    fn search_entries_for_prefix(
1187        &self,
1188        prefix: &str,
1189    ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
1190        let Some(root) = self.search_root()? else {
1191            return Ok(Vec::new());
1192        };
1193        let entries = block_on(self.index.prefix(&root, prefix)).context("query profile search prefix")?;
1194        entries
1195            .into_iter()
1196            .map(|(key, value)| {
1197                let entry = serde_json::from_str(&value)
1198                    .context("decode stored profile search entry json")?;
1199                Ok((key, entry))
1200            })
1201            .collect()
1202    }
1203
1204    fn rebuild_profile_events<'a, I>(&self, events: I) -> Result<(Option<Cid>, Option<Cid>)>
1205    where
1206        I: IntoIterator<Item = &'a Event>,
1207    {
1208        let mut by_pubkey_entries = Vec::<(String, Cid)>::new();
1209        let mut search_entries = Vec::<(String, String)>::new();
1210
1211        for event in events {
1212            let pubkey = event.pubkey.to_hex();
1213            let mirrored_cid = self.mirror_profile_event(event)?;
1214            let search_value =
1215                serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1216            by_pubkey_entries.push((pubkey.clone(), mirrored_cid.clone()));
1217            for term in profile_search_terms_for_event(event) {
1218                search_entries.push((format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"), search_value.clone()));
1219            }
1220        }
1221
1222        let by_pubkey_root = block_on(self.index.build_links(by_pubkey_entries))
1223            .context("bulk build mirrored profile-by-pubkey index")?;
1224        let search_root = block_on(self.index.build(search_entries))
1225            .context("bulk build mirrored profile search index")?;
1226        Ok((by_pubkey_root, search_root))
1227    }
1228
1229    fn update_profile_event(
1230        &self,
1231        by_pubkey_root: Option<&Cid>,
1232        search_root: Option<&Cid>,
1233        event: &Event,
1234    ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1235        let pubkey = event.pubkey.to_hex();
1236        let existing_cid = block_on(self.index.get_link(by_pubkey_root, &pubkey))
1237            .context("lookup existing mirrored profile event")?;
1238
1239        let existing_event = match existing_cid.as_ref() {
1240            Some(cid) => self.load_profile_event(cid)?,
1241            None => None,
1242        };
1243
1244        if existing_event
1245            .as_ref()
1246            .is_some_and(|current| compare_nostr_events(event, current).is_le())
1247        {
1248            return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1249        }
1250
1251        let mirrored_cid = self.mirror_profile_event(event)?;
1252        let next_by_pubkey_root = Some(
1253            block_on(
1254                self.index
1255                    .insert_link(by_pubkey_root, &pubkey, &mirrored_cid),
1256            )
1257            .context("write mirrored profile event index")?,
1258        );
1259
1260        let mut next_search_root = search_root.cloned();
1261        if let Some(current) = existing_event.as_ref() {
1262            for term in profile_search_terms_for_event(current) {
1263                let Some(root) = next_search_root.as_ref() else {
1264                    break;
1265                };
1266                next_search_root = block_on(
1267                    self.index
1268                        .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1269                )
1270                .context("remove stale profile search term")?;
1271            }
1272        }
1273
1274        let search_value =
1275            serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1276        for term in profile_search_terms_for_event(event) {
1277            next_search_root = Some(
1278                block_on(self.index.insert(
1279                    next_search_root.as_ref(),
1280                    &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1281                    &search_value,
1282                ))
1283                .context("write profile search term")?,
1284            );
1285        }
1286
1287        Ok((next_by_pubkey_root, next_search_root, true))
1288    }
1289}
1290
1291fn latest_metadata_events_by_pubkey<'a>(events: &'a [Event]) -> BTreeMap<String, &'a Event> {
1292    let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
1293    for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
1294        let pubkey = event.pubkey.to_hex();
1295        match latest_by_pubkey.get(&pubkey) {
1296            Some(current) if compare_nostr_events(event, current).is_le() => {}
1297            _ => {
1298                latest_by_pubkey.insert(pubkey, event);
1299            }
1300        }
1301    }
1302    latest_by_pubkey
1303}
1304
1305fn serialize_profile_search_entry(entry: &StoredProfileSearchEntry) -> Result<String> {
1306    serde_json::to_string(entry).context("encode stored profile search entry json")
1307}
1308
1309fn cid_to_nhash(cid: &Cid) -> Result<String> {
1310    nhash_encode_full(&NHashData {
1311        hash: cid.hash,
1312        decrypt_key: cid.key,
1313    })
1314    .context("encode mirrored profile event nhash")
1315}
1316
1317fn build_profile_search_entry(event: &Event, mirrored_cid: &Cid) -> Result<StoredProfileSearchEntry> {
1318    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1319        Ok(serde_json::Value::Object(profile)) => profile,
1320        _ => serde_json::Map::new(),
1321    };
1322    let names = extract_profile_names(&profile);
1323    let primary_name = names.first().cloned();
1324    let nip05 = normalize_profile_nip05(&profile, primary_name.as_deref());
1325    let name = primary_name
1326        .clone()
1327        .or_else(|| nip05.clone())
1328        .unwrap_or_else(|| event.pubkey.to_hex());
1329
1330    Ok(StoredProfileSearchEntry {
1331        pubkey: event.pubkey.to_hex(),
1332        name,
1333        aliases: names.into_iter().skip(1).collect(),
1334        nip05,
1335        created_at: event.created_at.as_u64(),
1336        event_nhash: cid_to_nhash(mirrored_cid)?,
1337    })
1338}
1339
1340fn filter_list_options(filter: &Filter, limit: usize, exact: bool) -> ListEventsOptions {
1341    ListEventsOptions {
1342        limit: exact.then_some(limit.max(1)),
1343        since: filter.since.map(|timestamp| timestamp.as_u64()),
1344        until: filter.until.map(|timestamp| timestamp.as_u64()),
1345    }
1346}
1347
1348fn dedupe_events(events: Vec<Event>) -> Vec<Event> {
1349    let mut seen = HashSet::new();
1350    let mut deduped = Vec::new();
1351    for event in events {
1352        if seen.insert(event.id.to_bytes()) {
1353            deduped.push(event);
1354        }
1355    }
1356    deduped.sort_by(|a, b| {
1357        b.created_at
1358            .as_u64()
1359            .cmp(&a.created_at.as_u64())
1360            .then_with(|| a.id.cmp(&b.id))
1361    });
1362    deduped
1363}
1364
1365impl SocialGraphBackend for SocialGraphStore {
1366    fn stats(&self) -> Result<SocialGraphStats> {
1367        SocialGraphStore::stats(self)
1368    }
1369
1370    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1371        SocialGraphStore::users_by_follow_distance(self, distance)
1372    }
1373
1374    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1375        SocialGraphStore::follow_distance(self, pk_bytes)
1376    }
1377
1378    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1379        SocialGraphStore::follow_list_created_at(self, owner)
1380    }
1381
1382    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1383        SocialGraphStore::followed_targets(self, owner)
1384    }
1385
1386    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1387        SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1388    }
1389
1390    fn profile_search_root(&self) -> Result<Option<Cid>> {
1391        SocialGraphStore::profile_search_root(self)
1392    }
1393
1394    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1395        SocialGraphStore::snapshot_chunks(self, root, options)
1396    }
1397
1398    fn ingest_event(&self, event: &Event) -> Result<()> {
1399        SocialGraphStore::ingest_event(self, event)
1400    }
1401
1402    fn ingest_event_with_storage_class(
1403        &self,
1404        event: &Event,
1405        storage_class: EventStorageClass,
1406    ) -> Result<()> {
1407        SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1408    }
1409
1410    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1411        SocialGraphStore::ingest_events(self, events)
1412    }
1413
1414    fn ingest_events_with_storage_class(
1415        &self,
1416        events: &[Event],
1417        storage_class: EventStorageClass,
1418    ) -> Result<()> {
1419        SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1420    }
1421
1422    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1423        SocialGraphStore::apply_graph_events_only(self, events)
1424    }
1425
1426    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1427        SocialGraphStore::query_events(self, filter, limit)
1428    }
1429}
1430
1431impl NostrSocialGraphBackend for SocialGraphStore {
1432    type Error = UpstreamGraphBackendError;
1433
1434    fn get_root(&self) -> std::result::Result<String, Self::Error> {
1435        let graph = self.graph.lock().unwrap();
1436        graph
1437            .get_root()
1438            .context("read social graph root")
1439            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1440    }
1441
1442    fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1443        let root_bytes =
1444            decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1445        SocialGraphStore::set_root(self, &root_bytes)
1446            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1447    }
1448
1449    fn handle_event(
1450        &mut self,
1451        event: &GraphEvent,
1452        allow_unknown_authors: bool,
1453        overmute_threshold: f64,
1454    ) -> std::result::Result<(), Self::Error> {
1455        {
1456            let mut graph = self.graph.lock().unwrap();
1457            graph
1458                .handle_event(event, allow_unknown_authors, overmute_threshold)
1459                .context("ingest social graph event into heed backend")
1460                .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1461        }
1462        self.invalidate_distance_cache();
1463        Ok(())
1464    }
1465
1466    fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1467        let graph = self.graph.lock().unwrap();
1468        graph
1469            .get_follow_distance(user)
1470            .context("read social graph follow distance")
1471            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1472    }
1473
1474    fn is_following(
1475        &self,
1476        follower: &str,
1477        followed_user: &str,
1478    ) -> std::result::Result<bool, Self::Error> {
1479        let graph = self.graph.lock().unwrap();
1480        graph
1481            .is_following(follower, followed_user)
1482            .context("read social graph following edge")
1483            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1484    }
1485
1486    fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1487        let graph = self.graph.lock().unwrap();
1488        graph
1489            .get_followed_by_user(user)
1490            .context("read followed-by-user list")
1491            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1492    }
1493
1494    fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1495        let graph = self.graph.lock().unwrap();
1496        graph
1497            .get_followers_by_user(user)
1498            .context("read followers-by-user list")
1499            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1500    }
1501
1502    fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1503        let graph = self.graph.lock().unwrap();
1504        graph
1505            .get_muted_by_user(user)
1506            .context("read muted-by-user list")
1507            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1508    }
1509
1510    fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1511        let graph = self.graph.lock().unwrap();
1512        graph
1513            .get_user_muted_by(user)
1514            .context("read user-muted-by list")
1515            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1516    }
1517
1518    fn get_follow_list_created_at(
1519        &self,
1520        user: &str,
1521    ) -> std::result::Result<Option<u64>, Self::Error> {
1522        let graph = self.graph.lock().unwrap();
1523        graph
1524            .get_follow_list_created_at(user)
1525            .context("read social graph follow list timestamp")
1526            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1527    }
1528
1529    fn get_mute_list_created_at(
1530        &self,
1531        user: &str,
1532    ) -> std::result::Result<Option<u64>, Self::Error> {
1533        let graph = self.graph.lock().unwrap();
1534        graph
1535            .get_mute_list_created_at(user)
1536            .context("read social graph mute list timestamp")
1537            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1538    }
1539
1540    fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1541        let graph = self.graph.lock().unwrap();
1542        graph
1543            .is_overmuted(user, threshold)
1544            .context("check social graph overmute")
1545            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1546    }
1547}
1548
1549impl<T> SocialGraphBackend for Arc<T>
1550where
1551    T: SocialGraphBackend + ?Sized,
1552{
1553    fn stats(&self) -> Result<SocialGraphStats> {
1554        self.as_ref().stats()
1555    }
1556
1557    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1558        self.as_ref().users_by_follow_distance(distance)
1559    }
1560
1561    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1562        self.as_ref().follow_distance(pk_bytes)
1563    }
1564
1565    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1566        self.as_ref().follow_list_created_at(owner)
1567    }
1568
1569    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1570        self.as_ref().followed_targets(owner)
1571    }
1572
1573    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1574        self.as_ref().is_overmuted_user(user_pk, threshold)
1575    }
1576
1577    fn profile_search_root(&self) -> Result<Option<Cid>> {
1578        self.as_ref().profile_search_root()
1579    }
1580
1581    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1582        self.as_ref().snapshot_chunks(root, options)
1583    }
1584
1585    fn ingest_event(&self, event: &Event) -> Result<()> {
1586        self.as_ref().ingest_event(event)
1587    }
1588
1589    fn ingest_event_with_storage_class(
1590        &self,
1591        event: &Event,
1592        storage_class: EventStorageClass,
1593    ) -> Result<()> {
1594        self.as_ref()
1595            .ingest_event_with_storage_class(event, storage_class)
1596    }
1597
1598    fn ingest_events(&self, events: &[Event]) -> Result<()> {
1599        self.as_ref().ingest_events(events)
1600    }
1601
1602    fn ingest_events_with_storage_class(
1603        &self,
1604        events: &[Event],
1605        storage_class: EventStorageClass,
1606    ) -> Result<()> {
1607        self.as_ref()
1608            .ingest_events_with_storage_class(events, storage_class)
1609    }
1610
1611    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1612        self.as_ref().ingest_graph_events(events)
1613    }
1614
1615    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1616        self.as_ref().query_events(filter, limit)
1617    }
1618}
1619
1620fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1621    if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1622        return Ok(false);
1623    }
1624
1625    let GraphStats {
1626        users,
1627        follows,
1628        mutes,
1629        ..
1630    } = graph.size().context("size social graph")?;
1631    Ok(users <= 1 && follows == 0 && mutes == 0)
1632}
1633
1634fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1635    let mut set = UserSet::new();
1636    for value in values {
1637        set.insert(decode_pubkey(&value)?);
1638    }
1639    Ok(set)
1640}
1641
1642fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1643    let mut bytes = [0u8; 32];
1644    hex::decode_to_slice(value, &mut bytes)
1645        .with_context(|| format!("decode social graph pubkey {value}"))?;
1646    Ok(bytes)
1647}
1648
1649fn is_social_graph_event(kind: Kind) -> bool {
1650    kind == Kind::ContactList || kind == Kind::MuteList
1651}
1652
1653fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1654    GraphEvent {
1655        created_at: event.created_at.as_u64(),
1656        content: event.content.clone(),
1657        tags: event
1658            .tags
1659            .iter()
1660            .map(|tag| tag.as_slice().to_vec())
1661            .collect(),
1662        kind: event.kind.as_u16() as u32,
1663        pubkey: event.pubkey.to_hex(),
1664        id: event.id.to_hex(),
1665        sig: event.sig.to_string(),
1666    }
1667}
1668
1669fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1670    StoredNostrEvent {
1671        id: event.id.to_hex(),
1672        pubkey: event.pubkey.to_hex(),
1673        created_at: event.created_at.as_u64(),
1674        kind: event.kind.as_u16() as u32,
1675        tags: event
1676            .tags
1677            .iter()
1678            .map(|tag| tag.as_slice().to_vec())
1679            .collect(),
1680        content: event.content.clone(),
1681        sig: event.sig.to_string(),
1682    }
1683}
1684
1685fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1686    let value = serde_json::json!({
1687        "id": event.id,
1688        "pubkey": event.pubkey,
1689        "created_at": event.created_at,
1690        "kind": event.kind,
1691        "tags": event.tags,
1692        "content": event.content,
1693        "sig": event.sig,
1694    });
1695    Event::from_json(value.to_string()).context("decode stored nostr event")
1696}
1697
1698pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1699    nostr_event_from_stored(event)
1700}
1701
1702fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1703    rmp_serde::to_vec_named(&StoredCid {
1704        hash: cid.hash,
1705        key: cid.key,
1706    })
1707    .context("encode social graph events root")
1708}
1709
1710fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1711    let stored: StoredCid =
1712        rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1713    Ok(Some(Cid {
1714        hash: stored.hash,
1715        key: stored.key,
1716    }))
1717}
1718
1719fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1720    let Ok(bytes) = std::fs::read(path) else {
1721        return Ok(None);
1722    };
1723    decode_cid(&bytes)
1724}
1725
1726fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1727    let Some(root) = root else {
1728        if path.exists() {
1729            std::fs::remove_file(path)?;
1730        }
1731        return Ok(());
1732    };
1733
1734    let encoded = encode_cid(root)?;
1735    let tmp_path = path.with_extension("tmp");
1736    std::fs::write(&tmp_path, encoded)?;
1737    std::fs::rename(tmp_path, path)?;
1738    Ok(())
1739}
1740
1741fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1742    let raw = value.as_str()?;
1743    let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1744    if trimmed.is_empty() {
1745        return None;
1746    }
1747    Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1748}
1749
1750fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1751    let mut names = Vec::new();
1752    let mut seen = HashSet::new();
1753
1754    for key in ["display_name", "displayName", "name", "username"] {
1755        let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1756            continue;
1757        };
1758        let lowered = value.to_lowercase();
1759        if seen.insert(lowered) {
1760            names.push(value);
1761        }
1762    }
1763
1764    names
1765}
1766
1767fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1768    if local_part.len() == 1 || local_part.starts_with("npub1") {
1769        return true;
1770    }
1771
1772    primary_name
1773        .to_lowercase()
1774        .split_whitespace()
1775        .collect::<String>()
1776        .contains(local_part)
1777}
1778
1779fn normalize_profile_nip05(
1780    profile: &serde_json::Map<String, serde_json::Value>,
1781    primary_name: Option<&str>,
1782) -> Option<String> {
1783    let raw = profile.get("nip05")?.as_str()?;
1784    let local_part = raw.split('@').next()?.trim().to_lowercase();
1785    if local_part.is_empty() {
1786        return None;
1787    }
1788    let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1789    if truncated.is_empty() {
1790        return None;
1791    }
1792    if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1793        return None;
1794    }
1795    Some(truncated)
1796}
1797
1798fn is_search_stop_word(word: &str) -> bool {
1799    matches!(
1800        word,
1801        "a" | "an"
1802            | "the"
1803            | "and"
1804            | "or"
1805            | "but"
1806            | "in"
1807            | "on"
1808            | "at"
1809            | "to"
1810            | "for"
1811            | "of"
1812            | "with"
1813            | "by"
1814            | "from"
1815            | "is"
1816            | "it"
1817            | "as"
1818            | "be"
1819            | "was"
1820            | "are"
1821            | "this"
1822            | "that"
1823            | "these"
1824            | "those"
1825            | "i"
1826            | "you"
1827            | "he"
1828            | "she"
1829            | "we"
1830            | "they"
1831            | "my"
1832            | "your"
1833            | "his"
1834            | "her"
1835            | "its"
1836            | "our"
1837            | "their"
1838            | "what"
1839            | "which"
1840            | "who"
1841            | "whom"
1842            | "how"
1843            | "when"
1844            | "where"
1845            | "why"
1846            | "will"
1847            | "would"
1848            | "could"
1849            | "should"
1850            | "can"
1851            | "may"
1852            | "might"
1853            | "must"
1854            | "have"
1855            | "has"
1856            | "had"
1857            | "do"
1858            | "does"
1859            | "did"
1860            | "been"
1861            | "being"
1862            | "get"
1863            | "got"
1864            | "just"
1865            | "now"
1866            | "then"
1867            | "so"
1868            | "if"
1869            | "not"
1870            | "no"
1871            | "yes"
1872            | "all"
1873            | "any"
1874            | "some"
1875            | "more"
1876            | "most"
1877            | "other"
1878            | "into"
1879            | "over"
1880            | "after"
1881            | "before"
1882            | "about"
1883            | "up"
1884            | "down"
1885            | "out"
1886            | "off"
1887            | "through"
1888            | "during"
1889            | "under"
1890            | "again"
1891            | "further"
1892            | "once"
1893    )
1894}
1895
1896fn is_pure_search_number(word: &str) -> bool {
1897    if !word.chars().all(|ch| ch.is_ascii_digit()) {
1898        return false;
1899    }
1900    !(word.len() == 4
1901        && word
1902            .parse::<u16>()
1903            .is_ok_and(|year| (1900..=2099).contains(&year)))
1904}
1905
1906fn split_compound_search_word(word: &str) -> Vec<String> {
1907    let mut parts = Vec::new();
1908    let mut current = String::new();
1909    let chars: Vec<char> = word.chars().collect();
1910
1911    for (index, ch) in chars.iter().copied().enumerate() {
1912        let split_before = current.chars().last().is_some_and(|prev| {
1913            (prev.is_lowercase() && ch.is_uppercase())
1914                || (prev.is_ascii_digit() && ch.is_alphabetic())
1915                || (prev.is_alphabetic() && ch.is_ascii_digit())
1916                || (prev.is_uppercase()
1917                    && ch.is_uppercase()
1918                    && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
1919        });
1920
1921        if split_before && !current.is_empty() {
1922            parts.push(std::mem::take(&mut current));
1923        }
1924
1925        current.push(ch);
1926    }
1927
1928    if !current.is_empty() {
1929        parts.push(current);
1930    }
1931
1932    parts
1933}
1934
1935fn parse_search_keywords(text: &str) -> Vec<String> {
1936    let mut keywords = Vec::new();
1937    let mut seen = HashSet::new();
1938
1939    for word in text
1940        .split(|ch: char| !ch.is_alphanumeric())
1941        .filter(|word| !word.is_empty())
1942    {
1943        let mut variants = Vec::with_capacity(1 + word.len() / 4);
1944        variants.push(word.to_lowercase());
1945        variants.extend(
1946            split_compound_search_word(word)
1947                .into_iter()
1948                .map(|part| part.to_lowercase()),
1949        );
1950
1951        for lowered in variants {
1952            if lowered.chars().count() < 2
1953                || is_search_stop_word(&lowered)
1954                || is_pure_search_number(&lowered)
1955            {
1956                continue;
1957            }
1958            if seen.insert(lowered.clone()) {
1959                keywords.push(lowered);
1960            }
1961        }
1962    }
1963
1964    keywords
1965}
1966
1967fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
1968    let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1969        Ok(serde_json::Value::Object(profile)) => profile,
1970        _ => serde_json::Map::new(),
1971    };
1972    let names = extract_profile_names(&profile);
1973    let primary_name = names.first().map(String::as_str);
1974    let mut parts = Vec::new();
1975    if let Some(name) = primary_name {
1976        parts.push(name.to_string());
1977    }
1978    if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
1979        parts.push(nip05);
1980    }
1981    parts.push(event.pubkey.to_hex());
1982    if names.len() > 1 {
1983        parts.extend(names.into_iter().skip(1));
1984    }
1985    parse_search_keywords(&parts.join(" "))
1986}
1987
1988fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
1989    left.created_at
1990        .as_u64()
1991        .cmp(&right.created_at.as_u64())
1992        .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
1993}
1994
1995fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
1996    anyhow::anyhow!("nostr event store error: {err}")
1997}
1998
1999fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
2000    let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
2001    let page_size = page_size_bytes() as u64;
2002    let rounded = requested
2003        .checked_add(page_size.saturating_sub(1))
2004        .map(|size| size / page_size * page_size)
2005        .unwrap_or(requested);
2006    let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
2007
2008    let env = unsafe {
2009        heed::EnvOpenOptions::new()
2010            .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
2011            .max_dbs(SOCIALGRAPH_MAX_DBS)
2012            .open(db_dir)
2013    }
2014    .context("open social graph LMDB env for resize")?;
2015    if env.info().map_size < map_size {
2016        unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
2017    }
2018
2019    Ok(())
2020}
2021
2022fn page_size_bytes() -> usize {
2023    page_size::get_granularity()
2024}
2025
2026#[cfg(test)]
2027mod tests {
2028    use super::*;
2029    use async_trait::async_trait;
2030    use hashtree_config::StorageBackend;
2031    use hashtree_core::{Hash, MemoryStore, Store, StoreError};
2032    use hashtree_nostr::NostrEventStoreOptions;
2033    use std::collections::HashSet;
2034    use std::fs::{self, File};
2035    use std::io::{BufRead, BufReader};
2036    use std::path::{Path, PathBuf};
2037    use std::process::{Command, Stdio};
2038    use std::sync::Mutex;
2039    use std::time::Duration;
2040
2041    use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
2042    use tempfile::TempDir;
2043
2044    const WELLORDER_FIXTURE_URL: &str =
2045        "https://wellorder.xyz/nostr/nostr-wellorder-early-500k-v1.jsonl.bz2";
2046
2047    #[derive(Debug, Clone, Default)]
2048    struct ReadTraceSnapshot {
2049        get_calls: u64,
2050        total_bytes: u64,
2051        unique_blocks: usize,
2052        unique_bytes: u64,
2053        cache_hits: u64,
2054        remote_fetches: u64,
2055        remote_bytes: u64,
2056    }
2057
2058    #[derive(Debug, Default)]
2059    struct ReadTraceState {
2060        get_calls: u64,
2061        total_bytes: u64,
2062        unique_hashes: HashSet<Hash>,
2063        unique_bytes: u64,
2064        cache_hits: u64,
2065        remote_fetches: u64,
2066        remote_bytes: u64,
2067    }
2068
2069    #[derive(Debug)]
2070    struct CountingStore<S: Store> {
2071        base: Arc<S>,
2072        state: Mutex<ReadTraceState>,
2073    }
2074
2075    impl<S: Store> CountingStore<S> {
2076        fn new(base: Arc<S>) -> Self {
2077            Self {
2078                base,
2079                state: Mutex::new(ReadTraceState::default()),
2080            }
2081        }
2082
2083        fn reset(&self) {
2084            *self.state.lock().unwrap() = ReadTraceState::default();
2085        }
2086
2087        fn snapshot(&self) -> ReadTraceSnapshot {
2088            let state = self.state.lock().unwrap();
2089            ReadTraceSnapshot {
2090                get_calls: state.get_calls,
2091                total_bytes: state.total_bytes,
2092                unique_blocks: state.unique_hashes.len(),
2093                unique_bytes: state.unique_bytes,
2094                cache_hits: state.cache_hits,
2095                remote_fetches: state.remote_fetches,
2096                remote_bytes: state.remote_bytes,
2097            }
2098        }
2099
2100        fn record_read(&self, hash: &Hash, bytes: usize) {
2101            let mut state = self.state.lock().unwrap();
2102            state.get_calls += 1;
2103            state.total_bytes += bytes as u64;
2104            if state.unique_hashes.insert(*hash) {
2105                state.unique_bytes += bytes as u64;
2106            }
2107        }
2108    }
2109
2110    #[async_trait]
2111    impl<S: Store> Store for CountingStore<S> {
2112        async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2113            self.base.put(hash, data).await
2114        }
2115
2116        async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2117            self.base.put_many(items).await
2118        }
2119
2120        async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2121            let data = self.base.get(hash).await?;
2122            if let Some(bytes) = data.as_ref() {
2123                self.record_read(hash, bytes.len());
2124            }
2125            Ok(data)
2126        }
2127
2128        async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2129            self.base.has(hash).await
2130        }
2131
2132        async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2133            self.base.delete(hash).await
2134        }
2135    }
2136
2137    #[derive(Debug)]
2138    struct ReadThroughStore<R: Store> {
2139        cache: Arc<MemoryStore>,
2140        remote: Arc<R>,
2141        state: Mutex<ReadTraceState>,
2142    }
2143
2144    impl<R: Store> ReadThroughStore<R> {
2145        fn new(cache: Arc<MemoryStore>, remote: Arc<R>) -> Self {
2146            Self {
2147                cache,
2148                remote,
2149                state: Mutex::new(ReadTraceState::default()),
2150            }
2151        }
2152
2153        fn snapshot(&self) -> ReadTraceSnapshot {
2154            let state = self.state.lock().unwrap();
2155            ReadTraceSnapshot {
2156                get_calls: state.get_calls,
2157                total_bytes: state.total_bytes,
2158                unique_blocks: state.unique_hashes.len(),
2159                unique_bytes: state.unique_bytes,
2160                cache_hits: state.cache_hits,
2161                remote_fetches: state.remote_fetches,
2162                remote_bytes: state.remote_bytes,
2163            }
2164        }
2165    }
2166
2167    #[async_trait]
2168    impl<R: Store> Store for ReadThroughStore<R> {
2169        async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2170            self.cache.put(hash, data).await
2171        }
2172
2173        async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2174            self.cache.put_many(items).await
2175        }
2176
2177        async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2178            {
2179                let mut state = self.state.lock().unwrap();
2180                state.get_calls += 1;
2181            }
2182
2183            if let Some(bytes) = self.cache.get(hash).await? {
2184                let mut state = self.state.lock().unwrap();
2185                state.cache_hits += 1;
2186                state.total_bytes += bytes.len() as u64;
2187                if state.unique_hashes.insert(*hash) {
2188                    state.unique_bytes += bytes.len() as u64;
2189                }
2190                return Ok(Some(bytes));
2191            }
2192
2193            let data = self.remote.get(hash).await?;
2194            if let Some(bytes) = data.as_ref() {
2195                let _ = self.cache.put(*hash, bytes.clone()).await?;
2196                let mut state = self.state.lock().unwrap();
2197                state.remote_fetches += 1;
2198                state.remote_bytes += bytes.len() as u64;
2199                state.total_bytes += bytes.len() as u64;
2200                if state.unique_hashes.insert(*hash) {
2201                    state.unique_bytes += bytes.len() as u64;
2202                }
2203            }
2204            Ok(data)
2205        }
2206
2207        async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2208            if self.cache.has(hash).await? {
2209                return Ok(true);
2210            }
2211            self.remote.has(hash).await
2212        }
2213
2214        async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2215            let cache_deleted = self.cache.delete(hash).await?;
2216            let remote_deleted = self.remote.delete(hash).await?;
2217            Ok(cache_deleted || remote_deleted)
2218        }
2219    }
2220
2221    #[derive(Debug, Clone)]
2222    enum BenchmarkQueryCase {
2223        ById {
2224            id: String,
2225        },
2226        ByAuthor {
2227            pubkey: String,
2228            limit: usize,
2229        },
2230        ByAuthorKind {
2231            pubkey: String,
2232            kind: u32,
2233            limit: usize,
2234        },
2235        ByKind {
2236            kind: u32,
2237            limit: usize,
2238        },
2239        ByTag {
2240            tag_name: String,
2241            tag_value: String,
2242            limit: usize,
2243        },
2244        Recent {
2245            limit: usize,
2246        },
2247        Replaceable {
2248            pubkey: String,
2249            kind: u32,
2250        },
2251        ParameterizedReplaceable {
2252            pubkey: String,
2253            kind: u32,
2254            d_tag: String,
2255        },
2256    }
2257
2258    impl BenchmarkQueryCase {
2259        fn name(&self) -> &'static str {
2260            match self {
2261                BenchmarkQueryCase::ById { .. } => "by_id",
2262                BenchmarkQueryCase::ByAuthor { .. } => "by_author",
2263                BenchmarkQueryCase::ByAuthorKind { .. } => "by_author_kind",
2264                BenchmarkQueryCase::ByKind { .. } => "by_kind",
2265                BenchmarkQueryCase::ByTag { .. } => "by_tag",
2266                BenchmarkQueryCase::Recent { .. } => "recent",
2267                BenchmarkQueryCase::Replaceable { .. } => "replaceable",
2268                BenchmarkQueryCase::ParameterizedReplaceable { .. } => "parameterized_replaceable",
2269            }
2270        }
2271
2272        async fn execute<S: Store>(
2273            &self,
2274            store: &NostrEventStore<S>,
2275            root: &Cid,
2276        ) -> Result<usize, NostrEventStoreError> {
2277            match self {
2278                BenchmarkQueryCase::ById { id } => {
2279                    Ok(store.get_by_id(Some(root), id).await?.into_iter().count())
2280                }
2281                BenchmarkQueryCase::ByAuthor { pubkey, limit } => Ok(store
2282                    .list_by_author(
2283                        Some(root),
2284                        pubkey,
2285                        ListEventsOptions {
2286                            limit: Some(*limit),
2287                            ..Default::default()
2288                        },
2289                    )
2290                    .await?
2291                    .len()),
2292                BenchmarkQueryCase::ByAuthorKind {
2293                    pubkey,
2294                    kind,
2295                    limit,
2296                } => Ok(store
2297                    .list_by_author_and_kind(
2298                        Some(root),
2299                        pubkey,
2300                        *kind,
2301                        ListEventsOptions {
2302                            limit: Some(*limit),
2303                            ..Default::default()
2304                        },
2305                    )
2306                    .await?
2307                    .len()),
2308                BenchmarkQueryCase::ByKind { kind, limit } => Ok(store
2309                    .list_by_kind(
2310                        Some(root),
2311                        *kind,
2312                        ListEventsOptions {
2313                            limit: Some(*limit),
2314                            ..Default::default()
2315                        },
2316                    )
2317                    .await?
2318                    .len()),
2319                BenchmarkQueryCase::ByTag {
2320                    tag_name,
2321                    tag_value,
2322                    limit,
2323                } => Ok(store
2324                    .list_by_tag(
2325                        Some(root),
2326                        tag_name,
2327                        tag_value,
2328                        ListEventsOptions {
2329                            limit: Some(*limit),
2330                            ..Default::default()
2331                        },
2332                    )
2333                    .await?
2334                    .len()),
2335                BenchmarkQueryCase::Recent { limit } => Ok(store
2336                    .list_recent(
2337                        Some(root),
2338                        ListEventsOptions {
2339                            limit: Some(*limit),
2340                            ..Default::default()
2341                        },
2342                    )
2343                    .await?
2344                    .len()),
2345                BenchmarkQueryCase::Replaceable { pubkey, kind } => Ok(store
2346                    .get_replaceable(Some(root), pubkey, *kind)
2347                    .await?
2348                    .into_iter()
2349                    .count()),
2350                BenchmarkQueryCase::ParameterizedReplaceable {
2351                    pubkey,
2352                    kind,
2353                    d_tag,
2354                } => Ok(store
2355                    .get_parameterized_replaceable(Some(root), pubkey, *kind, d_tag)
2356                    .await?
2357                    .into_iter()
2358                    .count()),
2359            }
2360        }
2361    }
2362
2363    #[derive(Debug, Clone, Copy)]
2364    struct NetworkModel {
2365        name: &'static str,
2366        rtt_ms: f64,
2367        bandwidth_mib_per_s: f64,
2368    }
2369
2370    #[derive(Debug, Clone)]
2371    struct QueryBenchmarkResult {
2372        average_duration: Duration,
2373        p95_duration: Duration,
2374        reads: ReadTraceSnapshot,
2375    }
2376
2377    const NETWORK_MODELS: [NetworkModel; 3] = [
2378        NetworkModel {
2379            name: "lan",
2380            rtt_ms: 2.0,
2381            bandwidth_mib_per_s: 100.0,
2382        },
2383        NetworkModel {
2384            name: "wan",
2385            rtt_ms: 40.0,
2386            bandwidth_mib_per_s: 20.0,
2387        },
2388        NetworkModel {
2389            name: "slow",
2390            rtt_ms: 120.0,
2391            bandwidth_mib_per_s: 5.0,
2392        },
2393    ];
2394
2395    #[test]
2396    fn test_open_social_graph_store() {
2397        let _guard = test_lock();
2398        let tmp = TempDir::new().unwrap();
2399        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2400        assert_eq!(Arc::strong_count(&graph_store), 1);
2401    }
2402
2403    #[test]
2404    fn test_set_root_and_get_follow_distance() {
2405        let _guard = test_lock();
2406        let tmp = TempDir::new().unwrap();
2407        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2408        let root_pk = [1u8; 32];
2409        set_social_graph_root(&graph_store, &root_pk);
2410        assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
2411    }
2412
2413    #[test]
2414    fn test_ingest_event_updates_follows_and_mutes() {
2415        let _guard = test_lock();
2416        let tmp = TempDir::new().unwrap();
2417        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2418
2419        let root_keys = Keys::generate();
2420        let alice_keys = Keys::generate();
2421        let bob_keys = Keys::generate();
2422
2423        let root_pk = root_keys.public_key().to_bytes();
2424        set_social_graph_root(&graph_store, &root_pk);
2425
2426        let follow = EventBuilder::new(
2427            Kind::ContactList,
2428            "",
2429            vec![Tag::public_key(alice_keys.public_key())],
2430        )
2431        .custom_created_at(Timestamp::from_secs(10))
2432        .to_event(&root_keys)
2433        .unwrap();
2434        ingest_event(&graph_store, "follow", &follow.as_json());
2435
2436        let mute = EventBuilder::new(
2437            Kind::MuteList,
2438            "",
2439            vec![Tag::public_key(bob_keys.public_key())],
2440        )
2441        .custom_created_at(Timestamp::from_secs(11))
2442        .to_event(&root_keys)
2443        .unwrap();
2444        ingest_event(&graph_store, "mute", &mute.as_json());
2445
2446        assert_eq!(
2447            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
2448            Some(1)
2449        );
2450        assert!(is_overmuted(
2451            &graph_store,
2452            &root_pk,
2453            &bob_keys.public_key().to_bytes(),
2454            1.0
2455        ));
2456    }
2457
2458    #[test]
2459    fn test_metadata_ingest_builds_profile_search_index_and_replaces_old_terms() {
2460        let _guard = test_lock();
2461        let tmp = TempDir::new().unwrap();
2462        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2463        let keys = Keys::generate();
2464
2465        let older = EventBuilder::new(
2466            Kind::Metadata,
2467            serde_json::json!({
2468                "display_name": "sirius",
2469                "name": "Martti Malmi",
2470                "username": "mmalmi",
2471                "nip05": "siriusdev@iris.to",
2472            })
2473            .to_string(),
2474            [],
2475        )
2476        .custom_created_at(Timestamp::from_secs(5))
2477        .to_event(&keys)
2478        .unwrap();
2479        let newer = EventBuilder::new(
2480            Kind::Metadata,
2481            serde_json::json!({
2482                "display_name": "bird",
2483                "nip05": "birdman@iris.to",
2484            })
2485            .to_string(),
2486            [],
2487        )
2488        .custom_created_at(Timestamp::from_secs(6))
2489        .to_event(&keys)
2490        .unwrap();
2491
2492        ingest_parsed_event(&graph_store, &older).unwrap();
2493
2494        let pubkey = keys.public_key().to_hex();
2495        let entries = graph_store
2496            .profile_search_entries_for_prefix("p:sirius")
2497            .unwrap();
2498        assert!(entries
2499            .iter()
2500            .any(|(key, entry)| key == &format!("p:sirius:{pubkey}") && entry.name == "sirius"));
2501        assert!(entries.iter().any(|(key, entry)| {
2502            key == &format!("p:siriusdev:{pubkey}")
2503                && entry.nip05.as_deref() == Some("siriusdev")
2504                && entry.aliases == vec!["Martti Malmi".to_string(), "mmalmi".to_string()]
2505                && entry.event_nhash.starts_with("nhash1")
2506        }));
2507        assert!(entries
2508            .iter()
2509            .all(|(_, entry)| entry.pubkey == pubkey));
2510        assert_eq!(
2511            graph_store
2512                .latest_profile_event(&pubkey)
2513                .unwrap()
2514                .expect("latest mirrored profile")
2515                .id,
2516            older.id
2517        );
2518
2519        ingest_parsed_event(&graph_store, &newer).unwrap();
2520
2521        assert!(graph_store
2522            .profile_search_entries_for_prefix("p:sirius")
2523            .unwrap()
2524            .is_empty());
2525        let bird_entries = graph_store
2526            .profile_search_entries_for_prefix("p:bird")
2527            .unwrap();
2528        assert_eq!(bird_entries.len(), 2);
2529        assert!(bird_entries
2530            .iter()
2531            .any(|(key, entry)| key == &format!("p:bird:{pubkey}") && entry.name == "bird"));
2532        assert!(bird_entries
2533            .iter()
2534            .any(|(key, entry)| {
2535                key == &format!("p:birdman:{pubkey}")
2536                    && entry.nip05.as_deref() == Some("birdman")
2537                    && entry.aliases.is_empty()
2538            }));
2539        assert_eq!(
2540            graph_store
2541                .latest_profile_event(&pubkey)
2542                .unwrap()
2543                .expect("latest mirrored profile")
2544                .id,
2545            newer.id
2546        );
2547    }
2548
2549    #[test]
2550    fn test_ambient_metadata_events_are_mirrored_into_public_profile_index() {
2551        let _guard = test_lock();
2552        let tmp = TempDir::new().unwrap();
2553        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2554        let keys = Keys::generate();
2555
2556        let profile = EventBuilder::new(
2557            Kind::Metadata,
2558            serde_json::json!({
2559                "display_name": "ambient bird",
2560            })
2561            .to_string(),
2562            [],
2563        )
2564        .custom_created_at(Timestamp::from_secs(5))
2565        .to_event(&keys)
2566        .unwrap();
2567
2568        ingest_parsed_event_with_storage_class(&graph_store, &profile, EventStorageClass::Ambient)
2569            .unwrap();
2570
2571        let pubkey = keys.public_key().to_hex();
2572        let mirrored = graph_store
2573            .latest_profile_event(&pubkey)
2574            .unwrap()
2575            .expect("mirrored ambient profile");
2576        assert_eq!(mirrored.id, profile.id);
2577        assert_eq!(
2578            graph_store
2579                .profile_search_entries_for_prefix("p:ambient")
2580                .unwrap()
2581                .len(),
2582            1
2583        );
2584    }
2585
2586    #[test]
2587    fn test_metadata_ingest_splits_compound_profile_terms_without_losing_whole_token() {
2588        let _guard = test_lock();
2589        let tmp = TempDir::new().unwrap();
2590        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2591        let keys = Keys::generate();
2592
2593        let profile = EventBuilder::new(
2594            Kind::Metadata,
2595            serde_json::json!({
2596                "display_name": "SirLibre",
2597                "username": "XMLHttpRequest42",
2598            })
2599            .to_string(),
2600            [],
2601        )
2602        .custom_created_at(Timestamp::from_secs(5))
2603        .to_event(&keys)
2604        .unwrap();
2605
2606        ingest_parsed_event(&graph_store, &profile).unwrap();
2607
2608        let pubkey = keys.public_key().to_hex();
2609        assert!(graph_store
2610            .profile_search_entries_for_prefix("p:sirlibre")
2611            .unwrap()
2612            .iter()
2613            .any(|(key, entry)| key == &format!("p:sirlibre:{pubkey}") && entry.name == "SirLibre"));
2614        assert!(graph_store
2615            .profile_search_entries_for_prefix("p:libre")
2616            .unwrap()
2617            .iter()
2618            .any(|(key, entry)| key == &format!("p:libre:{pubkey}") && entry.name == "SirLibre"));
2619        assert!(graph_store
2620            .profile_search_entries_for_prefix("p:xml")
2621            .unwrap()
2622            .iter()
2623            .any(|(key, entry)| {
2624                key == &format!("p:xml:{pubkey}")
2625                    && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2626            }));
2627        assert!(graph_store
2628            .profile_search_entries_for_prefix("p:request")
2629            .unwrap()
2630            .iter()
2631            .any(|(key, entry)| {
2632                key == &format!("p:request:{pubkey}")
2633                    && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2634            }));
2635    }
2636
2637    #[test]
2638    fn test_profile_search_index_persists_across_reopen() {
2639        let _guard = test_lock();
2640        let tmp = TempDir::new().unwrap();
2641        let keys = Keys::generate();
2642        let pubkey = keys.public_key().to_hex();
2643
2644        {
2645            let graph_store = open_social_graph_store(tmp.path()).unwrap();
2646            let profile = EventBuilder::new(
2647                Kind::Metadata,
2648                serde_json::json!({
2649                    "display_name": "reopen user",
2650                })
2651                .to_string(),
2652                [],
2653            )
2654            .custom_created_at(Timestamp::from_secs(5))
2655            .to_event(&keys)
2656            .unwrap();
2657            ingest_parsed_event(&graph_store, &profile).unwrap();
2658            assert!(graph_store.profile_search_root().unwrap().is_some());
2659        }
2660
2661        let reopened = open_social_graph_store(tmp.path()).unwrap();
2662        assert!(reopened.profile_search_root().unwrap().is_some());
2663        assert_eq!(
2664            reopened
2665                .latest_profile_event(&pubkey)
2666                .unwrap()
2667                .expect("mirrored profile after reopen")
2668                .pubkey,
2669            keys.public_key()
2670        );
2671        let links = reopened
2672            .profile_search_entries_for_prefix("p:reopen")
2673            .unwrap();
2674        assert_eq!(links.len(), 1);
2675        assert_eq!(links[0].0, format!("p:reopen:{pubkey}"));
2676        assert_eq!(links[0].1.name, "reopen user");
2677    }
2678
2679    #[test]
2680    fn test_profile_search_index_with_shared_hashtree_storage() {
2681        let _guard = test_lock();
2682        let tmp = TempDir::new().unwrap();
2683        let store =
2684            crate::storage::HashtreeStore::with_options(tmp.path(), None, 1024 * 1024 * 1024)
2685                .unwrap();
2686        let graph_store =
2687            open_social_graph_store_with_storage(tmp.path(), store.store_arc(), None).unwrap();
2688        let keys = Keys::generate();
2689        let pubkey = keys.public_key().to_hex();
2690
2691        let profile = EventBuilder::new(
2692            Kind::Metadata,
2693            serde_json::json!({
2694                "display_name": "shared storage user",
2695                "nip05": "shareduser@example.com",
2696            })
2697            .to_string(),
2698            [],
2699        )
2700        .custom_created_at(Timestamp::from_secs(5))
2701        .to_event(&keys)
2702        .unwrap();
2703
2704        graph_store
2705            .sync_profile_index_for_events(std::slice::from_ref(&profile))
2706            .unwrap();
2707        assert!(graph_store.profile_search_root().unwrap().is_some());
2708        assert!(graph_store.profile_search_root().unwrap().is_some());
2709        let links = graph_store
2710            .profile_search_entries_for_prefix("p:shared")
2711            .unwrap();
2712        assert_eq!(links.len(), 2);
2713        assert!(links
2714            .iter()
2715            .any(|(key, entry)| key == &format!("p:shared:{pubkey}") && entry.name == "shared storage user"));
2716        assert!(links
2717            .iter()
2718            .any(|(key, entry)| key == &format!("p:shareduser:{pubkey}") && entry.nip05.as_deref() == Some("shareduser")));
2719    }
2720
2721    #[test]
2722    fn test_rebuild_profile_index_from_stored_events_uses_ambient_and_public_metadata() {
2723        let _guard = test_lock();
2724        let tmp = TempDir::new().unwrap();
2725        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2726        let public_keys = Keys::generate();
2727        let ambient_keys = Keys::generate();
2728        let public_pubkey = public_keys.public_key().to_hex();
2729        let ambient_pubkey = ambient_keys.public_key().to_hex();
2730
2731        let older = EventBuilder::new(
2732            Kind::Metadata,
2733            serde_json::json!({
2734                "display_name": "petri old",
2735            })
2736            .to_string(),
2737            [],
2738        )
2739        .custom_created_at(Timestamp::from_secs(5))
2740        .to_event(&public_keys)
2741        .unwrap();
2742        let newer = EventBuilder::new(
2743            Kind::Metadata,
2744            serde_json::json!({
2745                "display_name": "petri",
2746                "name": "Petri Example",
2747                "nip05": "petri@example.com",
2748            })
2749            .to_string(),
2750            [],
2751        )
2752        .custom_created_at(Timestamp::from_secs(6))
2753        .to_event(&public_keys)
2754        .unwrap();
2755        let ambient = EventBuilder::new(
2756            Kind::Metadata,
2757            serde_json::json!({
2758                "display_name": "ambient petri",
2759            })
2760            .to_string(),
2761            [],
2762        )
2763        .custom_created_at(Timestamp::from_secs(7))
2764        .to_event(&ambient_keys)
2765        .unwrap();
2766
2767        ingest_parsed_event_with_storage_class(&graph_store, &older, EventStorageClass::Public)
2768            .unwrap();
2769        ingest_parsed_event_with_storage_class(&graph_store, &newer, EventStorageClass::Public)
2770            .unwrap();
2771        ingest_parsed_event_with_storage_class(&graph_store, &ambient, EventStorageClass::Ambient)
2772            .unwrap();
2773
2774        graph_store.profile_index.write_by_pubkey_root(None).unwrap();
2775        graph_store.profile_index.write_search_root(None).unwrap();
2776
2777        let rebuilt = graph_store
2778            .rebuild_profile_index_from_stored_events()
2779            .unwrap();
2780        assert_eq!(rebuilt, 2);
2781
2782        let entries = graph_store
2783            .profile_search_entries_for_prefix("p:petri")
2784            .unwrap();
2785        assert_eq!(entries.len(), 2);
2786        assert!(entries.iter().any(|(key, entry)| {
2787            key == &format!("p:petri:{public_pubkey}")
2788                && entry.name == "petri"
2789                && entry.aliases == vec!["Petri Example".to_string()]
2790                && entry.nip05.is_none()
2791        }));
2792        assert!(entries.iter().any(|(key, entry)| {
2793            key == &format!("p:petri:{ambient_pubkey}")
2794                && entry.name == "ambient petri"
2795                && entry.aliases.is_empty()
2796                && entry.nip05.is_none()
2797        }));
2798    }
2799
2800    #[test]
2801    fn test_query_events_by_author() {
2802        let _guard = test_lock();
2803        let tmp = TempDir::new().unwrap();
2804        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2805        let keys = Keys::generate();
2806
2807        let older = EventBuilder::new(Kind::TextNote, "older", [])
2808            .custom_created_at(Timestamp::from_secs(5))
2809            .to_event(&keys)
2810            .unwrap();
2811        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2812            .custom_created_at(Timestamp::from_secs(6))
2813            .to_event(&keys)
2814            .unwrap();
2815
2816        ingest_parsed_event(&graph_store, &older).unwrap();
2817        ingest_parsed_event(&graph_store, &newer).unwrap();
2818
2819        let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
2820        let events = query_events(&graph_store, &filter, 10);
2821        assert_eq!(events.len(), 2);
2822        assert_eq!(events[0].id, newer.id);
2823        assert_eq!(events[1].id, older.id);
2824    }
2825
2826    #[test]
2827    fn test_query_events_by_kind() {
2828        let _guard = test_lock();
2829        let tmp = TempDir::new().unwrap();
2830        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2831        let first_keys = Keys::generate();
2832        let second_keys = Keys::generate();
2833
2834        let older = EventBuilder::new(Kind::TextNote, "older", [])
2835            .custom_created_at(Timestamp::from_secs(5))
2836            .to_event(&first_keys)
2837            .unwrap();
2838        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2839            .custom_created_at(Timestamp::from_secs(6))
2840            .to_event(&second_keys)
2841            .unwrap();
2842        let other_kind = EventBuilder::new(Kind::Metadata, "profile", [])
2843            .custom_created_at(Timestamp::from_secs(7))
2844            .to_event(&second_keys)
2845            .unwrap();
2846
2847        ingest_parsed_event(&graph_store, &older).unwrap();
2848        ingest_parsed_event(&graph_store, &newer).unwrap();
2849        ingest_parsed_event(&graph_store, &other_kind).unwrap();
2850
2851        let filter = Filter::new().kind(Kind::TextNote);
2852        let events = query_events(&graph_store, &filter, 10);
2853        assert_eq!(events.len(), 2);
2854        assert_eq!(events[0].id, newer.id);
2855        assert_eq!(events[1].id, older.id);
2856    }
2857
2858    #[test]
2859    fn test_query_events_by_id() {
2860        let _guard = test_lock();
2861        let tmp = TempDir::new().unwrap();
2862        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2863        let keys = Keys::generate();
2864
2865        let first = EventBuilder::new(Kind::TextNote, "first", [])
2866            .custom_created_at(Timestamp::from_secs(5))
2867            .to_event(&keys)
2868            .unwrap();
2869        let target = EventBuilder::new(Kind::TextNote, "target", [])
2870            .custom_created_at(Timestamp::from_secs(6))
2871            .to_event(&keys)
2872            .unwrap();
2873
2874        ingest_parsed_event(&graph_store, &first).unwrap();
2875        ingest_parsed_event(&graph_store, &target).unwrap();
2876
2877        let filter = Filter::new().id(target.id).kind(Kind::TextNote);
2878        let events = query_events(&graph_store, &filter, 10);
2879        assert_eq!(events.len(), 1);
2880        assert_eq!(events[0].id, target.id);
2881    }
2882
2883    #[test]
2884    fn test_query_events_search_is_case_insensitive() {
2885        let _guard = test_lock();
2886        let tmp = TempDir::new().unwrap();
2887        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2888        let keys = Keys::generate();
2889        let other_keys = Keys::generate();
2890
2891        let matching = EventBuilder::new(Kind::TextNote, "Hello Nostr Search", [])
2892            .custom_created_at(Timestamp::from_secs(5))
2893            .to_event(&keys)
2894            .unwrap();
2895        let other = EventBuilder::new(Kind::TextNote, "goodbye world", [])
2896            .custom_created_at(Timestamp::from_secs(6))
2897            .to_event(&other_keys)
2898            .unwrap();
2899
2900        ingest_parsed_event(&graph_store, &matching).unwrap();
2901        ingest_parsed_event(&graph_store, &other).unwrap();
2902
2903        let filter = Filter::new().kind(Kind::TextNote).search("nostr search");
2904        let events = query_events(&graph_store, &filter, 10);
2905        assert_eq!(events.len(), 1);
2906        assert_eq!(events[0].id, matching.id);
2907    }
2908
2909    #[test]
2910    fn test_query_events_since_until_are_inclusive() {
2911        let _guard = test_lock();
2912        let tmp = TempDir::new().unwrap();
2913        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2914        let keys = Keys::generate();
2915
2916        let before = EventBuilder::new(Kind::TextNote, "before", [])
2917            .custom_created_at(Timestamp::from_secs(5))
2918            .to_event(&keys)
2919            .unwrap();
2920        let start = EventBuilder::new(Kind::TextNote, "start", [])
2921            .custom_created_at(Timestamp::from_secs(6))
2922            .to_event(&keys)
2923            .unwrap();
2924        let end = EventBuilder::new(Kind::TextNote, "end", [])
2925            .custom_created_at(Timestamp::from_secs(10))
2926            .to_event(&keys)
2927            .unwrap();
2928        let after = EventBuilder::new(Kind::TextNote, "after", [])
2929            .custom_created_at(Timestamp::from_secs(11))
2930            .to_event(&keys)
2931            .unwrap();
2932
2933        ingest_parsed_event(&graph_store, &before).unwrap();
2934        ingest_parsed_event(&graph_store, &start).unwrap();
2935        ingest_parsed_event(&graph_store, &end).unwrap();
2936        ingest_parsed_event(&graph_store, &after).unwrap();
2937
2938        let filter = Filter::new()
2939            .kind(Kind::TextNote)
2940            .since(Timestamp::from_secs(6))
2941            .until(Timestamp::from_secs(10));
2942        let events = query_events(&graph_store, &filter, 10);
2943        let ids = events.into_iter().map(|event| event.id).collect::<Vec<_>>();
2944        assert_eq!(ids, vec![end.id, start.id]);
2945    }
2946
2947    #[test]
2948    fn test_query_events_replaceable_kind_returns_latest_winner() {
2949        let _guard = test_lock();
2950        let tmp = TempDir::new().unwrap();
2951        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2952        let keys = Keys::generate();
2953
2954        let older = EventBuilder::new(Kind::Custom(10_000), "older mute list", [])
2955            .custom_created_at(Timestamp::from_secs(5))
2956            .to_event(&keys)
2957            .unwrap();
2958        let newer = EventBuilder::new(Kind::Custom(10_000), "newer mute list", [])
2959            .custom_created_at(Timestamp::from_secs(6))
2960            .to_event(&keys)
2961            .unwrap();
2962
2963        ingest_parsed_event(&graph_store, &older).unwrap();
2964        ingest_parsed_event(&graph_store, &newer).unwrap();
2965
2966        let filter = Filter::new()
2967            .author(keys.public_key())
2968            .kind(Kind::Custom(10_000));
2969        let events = query_events(&graph_store, &filter, 10);
2970        assert_eq!(events.len(), 1);
2971        assert_eq!(events[0].id, newer.id);
2972    }
2973
2974    #[test]
2975    fn test_query_events_kind_41_replaceable_returns_latest_winner() {
2976        let _guard = test_lock();
2977        let tmp = TempDir::new().unwrap();
2978        let graph_store = open_social_graph_store(tmp.path()).unwrap();
2979        let keys = Keys::generate();
2980
2981        let older = EventBuilder::new(Kind::Custom(41), "older channel metadata", [])
2982            .custom_created_at(Timestamp::from_secs(5))
2983            .to_event(&keys)
2984            .unwrap();
2985        let newer = EventBuilder::new(Kind::Custom(41), "newer channel metadata", [])
2986            .custom_created_at(Timestamp::from_secs(6))
2987            .to_event(&keys)
2988            .unwrap();
2989
2990        ingest_parsed_event(&graph_store, &older).unwrap();
2991        ingest_parsed_event(&graph_store, &newer).unwrap();
2992
2993        let filter = Filter::new()
2994            .author(keys.public_key())
2995            .kind(Kind::Custom(41));
2996        let events = query_events(&graph_store, &filter, 10);
2997        assert_eq!(events.len(), 1);
2998        assert_eq!(events[0].id, newer.id);
2999    }
3000
3001    #[test]
3002    fn test_public_and_ambient_indexes_stay_separate() {
3003        let _guard = test_lock();
3004        let tmp = TempDir::new().unwrap();
3005        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3006        let public_keys = Keys::generate();
3007        let ambient_keys = Keys::generate();
3008
3009        let public_event = EventBuilder::new(Kind::TextNote, "public", [])
3010            .custom_created_at(Timestamp::from_secs(5))
3011            .to_event(&public_keys)
3012            .unwrap();
3013        let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
3014            .custom_created_at(Timestamp::from_secs(6))
3015            .to_event(&ambient_keys)
3016            .unwrap();
3017
3018        ingest_parsed_event_with_storage_class(
3019            &graph_store,
3020            &public_event,
3021            EventStorageClass::Public,
3022        )
3023        .unwrap();
3024        ingest_parsed_event_with_storage_class(
3025            &graph_store,
3026            &ambient_event,
3027            EventStorageClass::Ambient,
3028        )
3029        .unwrap();
3030
3031        let filter = Filter::new().kind(Kind::TextNote);
3032        let all_events = graph_store
3033            .query_events_in_scope(&filter, 10, EventQueryScope::All)
3034            .unwrap();
3035        assert_eq!(all_events.len(), 2);
3036
3037        let public_events = graph_store
3038            .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3039            .unwrap();
3040        assert_eq!(public_events.len(), 1);
3041        assert_eq!(public_events[0].id, public_event.id);
3042
3043        let ambient_events = graph_store
3044            .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3045            .unwrap();
3046        assert_eq!(ambient_events.len(), 1);
3047        assert_eq!(ambient_events[0].id, ambient_event.id);
3048    }
3049
3050    #[test]
3051    fn test_default_ingest_classifies_root_author_as_public() {
3052        let _guard = test_lock();
3053        let tmp = TempDir::new().unwrap();
3054        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3055        let root_keys = Keys::generate();
3056        let other_keys = Keys::generate();
3057        set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
3058
3059        let root_event = EventBuilder::new(Kind::TextNote, "root", [])
3060            .custom_created_at(Timestamp::from_secs(5))
3061            .to_event(&root_keys)
3062            .unwrap();
3063        let other_event = EventBuilder::new(Kind::TextNote, "other", [])
3064            .custom_created_at(Timestamp::from_secs(6))
3065            .to_event(&other_keys)
3066            .unwrap();
3067
3068        ingest_parsed_event(&graph_store, &root_event).unwrap();
3069        ingest_parsed_event(&graph_store, &other_event).unwrap();
3070
3071        let filter = Filter::new().kind(Kind::TextNote);
3072        let public_events = graph_store
3073            .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3074            .unwrap();
3075        assert_eq!(public_events.len(), 1);
3076        assert_eq!(public_events[0].id, root_event.id);
3077
3078        let ambient_events = graph_store
3079            .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3080            .unwrap();
3081        assert_eq!(ambient_events.len(), 1);
3082        assert_eq!(ambient_events[0].id, other_event.id);
3083    }
3084
3085    #[test]
3086    fn test_query_events_survives_reopen() {
3087        let _guard = test_lock();
3088        let tmp = TempDir::new().unwrap();
3089        let db_dir = tmp.path().join("socialgraph-store");
3090        let keys = Keys::generate();
3091        let other_keys = Keys::generate();
3092
3093        {
3094            let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
3095            let older = EventBuilder::new(Kind::TextNote, "older", [])
3096                .custom_created_at(Timestamp::from_secs(5))
3097                .to_event(&keys)
3098                .unwrap();
3099            let newer = EventBuilder::new(Kind::TextNote, "newer", [])
3100                .custom_created_at(Timestamp::from_secs(6))
3101                .to_event(&keys)
3102                .unwrap();
3103            let latest = EventBuilder::new(Kind::TextNote, "latest", [])
3104                .custom_created_at(Timestamp::from_secs(7))
3105                .to_event(&other_keys)
3106                .unwrap();
3107
3108            ingest_parsed_event(&graph_store, &older).unwrap();
3109            ingest_parsed_event(&graph_store, &newer).unwrap();
3110            ingest_parsed_event(&graph_store, &latest).unwrap();
3111        }
3112
3113        let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
3114
3115        let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
3116        let author_events = query_events(&reopened, &author_filter, 10);
3117        assert_eq!(author_events.len(), 2);
3118        assert_eq!(author_events[0].content, "newer");
3119        assert_eq!(author_events[1].content, "older");
3120
3121        let recent_filter = Filter::new().kind(Kind::TextNote);
3122        let recent_events = query_events(&reopened, &recent_filter, 2);
3123        assert_eq!(recent_events.len(), 2);
3124        assert_eq!(recent_events[0].content, "latest");
3125        assert_eq!(recent_events[1].content, "newer");
3126    }
3127
3128    #[test]
3129    fn test_query_events_parameterized_replaceable_by_d_tag() {
3130        let _guard = test_lock();
3131        let tmp = TempDir::new().unwrap();
3132        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3133        let keys = Keys::generate();
3134
3135        let older = EventBuilder::new(
3136            Kind::Custom(30078),
3137            "",
3138            vec![
3139                Tag::identifier("video"),
3140                Tag::parse(&["l", "hashtree"]).unwrap(),
3141                Tag::parse(&["hash", &"11".repeat(32)]).unwrap(),
3142            ],
3143        )
3144        .custom_created_at(Timestamp::from_secs(5))
3145        .to_event(&keys)
3146        .unwrap();
3147        let newer = EventBuilder::new(
3148            Kind::Custom(30078),
3149            "",
3150            vec![
3151                Tag::identifier("video"),
3152                Tag::parse(&["l", "hashtree"]).unwrap(),
3153                Tag::parse(&["hash", &"22".repeat(32)]).unwrap(),
3154            ],
3155        )
3156        .custom_created_at(Timestamp::from_secs(6))
3157        .to_event(&keys)
3158        .unwrap();
3159        let other_tree = EventBuilder::new(
3160            Kind::Custom(30078),
3161            "",
3162            vec![
3163                Tag::identifier("files"),
3164                Tag::parse(&["l", "hashtree"]).unwrap(),
3165                Tag::parse(&["hash", &"33".repeat(32)]).unwrap(),
3166            ],
3167        )
3168        .custom_created_at(Timestamp::from_secs(7))
3169        .to_event(&keys)
3170        .unwrap();
3171
3172        ingest_parsed_event(&graph_store, &older).unwrap();
3173        ingest_parsed_event(&graph_store, &newer).unwrap();
3174        ingest_parsed_event(&graph_store, &other_tree).unwrap();
3175
3176        let filter = Filter::new()
3177            .author(keys.public_key())
3178            .kind(Kind::Custom(30078))
3179            .identifier("video");
3180        let events = query_events(&graph_store, &filter, 10);
3181        assert_eq!(events.len(), 1);
3182        assert_eq!(events[0].id, newer.id);
3183    }
3184
3185    #[test]
3186    fn test_query_events_by_hashtag_uses_tag_index() {
3187        let _guard = test_lock();
3188        let tmp = TempDir::new().unwrap();
3189        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3190        let keys = Keys::generate();
3191        let other_keys = Keys::generate();
3192
3193        let first = EventBuilder::new(
3194            Kind::TextNote,
3195            "first",
3196            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3197        )
3198        .custom_created_at(Timestamp::from_secs(5))
3199        .to_event(&keys)
3200        .unwrap();
3201        let second = EventBuilder::new(
3202            Kind::TextNote,
3203            "second",
3204            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3205        )
3206        .custom_created_at(Timestamp::from_secs(6))
3207        .to_event(&other_keys)
3208        .unwrap();
3209        let unrelated = EventBuilder::new(
3210            Kind::TextNote,
3211            "third",
3212            vec![Tag::parse(&["t", "other"]).unwrap()],
3213        )
3214        .custom_created_at(Timestamp::from_secs(7))
3215        .to_event(&other_keys)
3216        .unwrap();
3217
3218        ingest_parsed_event(&graph_store, &first).unwrap();
3219        ingest_parsed_event(&graph_store, &second).unwrap();
3220        ingest_parsed_event(&graph_store, &unrelated).unwrap();
3221
3222        let filter = Filter::new().hashtag("hashtree");
3223        let events = query_events(&graph_store, &filter, 10);
3224        assert_eq!(events.len(), 2);
3225        assert_eq!(events[0].id, second.id);
3226        assert_eq!(events[1].id, first.id);
3227    }
3228
3229    #[test]
3230    fn test_query_events_combines_indexes_then_applies_search_filter() {
3231        let _guard = test_lock();
3232        let tmp = TempDir::new().unwrap();
3233        let graph_store = open_social_graph_store(tmp.path()).unwrap();
3234        let keys = Keys::generate();
3235        let other_keys = Keys::generate();
3236
3237        let matching = EventBuilder::new(
3238            Kind::TextNote,
3239            "hashtree video release",
3240            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3241        )
3242        .custom_created_at(Timestamp::from_secs(5))
3243        .to_event(&keys)
3244        .unwrap();
3245        let non_matching = EventBuilder::new(
3246            Kind::TextNote,
3247            "plain text note",
3248            vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3249        )
3250        .custom_created_at(Timestamp::from_secs(6))
3251        .to_event(&other_keys)
3252        .unwrap();
3253
3254        ingest_parsed_event(&graph_store, &matching).unwrap();
3255        ingest_parsed_event(&graph_store, &non_matching).unwrap();
3256
3257        let filter = Filter::new().hashtag("hashtree").search("video");
3258        let events = query_events(&graph_store, &filter, 10);
3259        assert_eq!(events.len(), 1);
3260        assert_eq!(events[0].id, matching.id);
3261    }
3262
3263    fn benchmark_dataset_path() -> Option<PathBuf> {
3264        std::env::var_os("HASHTREE_BENCH_DATASET_PATH").map(PathBuf::from)
3265    }
3266
3267    fn benchmark_dataset_url() -> String {
3268        std::env::var("HASHTREE_BENCH_DATASET_URL")
3269            .ok()
3270            .filter(|value| !value.is_empty())
3271            .unwrap_or_else(|| WELLORDER_FIXTURE_URL.to_string())
3272    }
3273
3274    fn benchmark_stream_warmup_events(measured_events: usize) -> usize {
3275        std::env::var("HASHTREE_BENCH_WARMUP_EVENTS")
3276            .ok()
3277            .and_then(|value| value.parse::<usize>().ok())
3278            .unwrap_or_else(|| measured_events.clamp(1, 200))
3279    }
3280
3281    fn ensure_benchmark_dataset(path: &Path, url: &str) -> Result<()> {
3282        if path.exists() {
3283            return Ok(());
3284        }
3285
3286        let parent = path
3287            .parent()
3288            .context("benchmark dataset path has no parent directory")?;
3289        fs::create_dir_all(parent).context("create benchmark dataset directory")?;
3290
3291        let tmp = path.with_extension("tmp");
3292        let mut response = reqwest::blocking::get(url)
3293            .context("download benchmark dataset")?
3294            .error_for_status()
3295            .context("benchmark dataset request failed")?;
3296        let mut file = File::create(&tmp).context("create temporary benchmark dataset file")?;
3297        std::io::copy(&mut response, &mut file).context("write benchmark dataset")?;
3298        fs::rename(&tmp, path).context("move benchmark dataset into place")?;
3299
3300        Ok(())
3301    }
3302
3303    fn load_benchmark_dataset(path: &Path, max_events: usize) -> Result<Vec<Event>> {
3304        if max_events == 0 {
3305            return Ok(Vec::new());
3306        }
3307
3308        let mut child = Command::new("bzip2")
3309            .args(["-dc", &path.to_string_lossy()])
3310            .stdout(Stdio::piped())
3311            .spawn()
3312            .context("spawn bzip2 for benchmark dataset")?;
3313        let stdout = child
3314            .stdout
3315            .take()
3316            .context("benchmark dataset stdout missing")?;
3317        let mut events = Vec::with_capacity(max_events);
3318
3319        {
3320            let reader = BufReader::new(stdout);
3321            for line in reader.lines() {
3322                if events.len() >= max_events {
3323                    break;
3324                }
3325                let line = line.context("read benchmark dataset line")?;
3326                let trimmed = line.trim();
3327                if trimmed.is_empty() {
3328                    continue;
3329                }
3330                if let Ok(event) = Event::from_json(trimmed.to_string()) {
3331                    events.push(event);
3332                }
3333            }
3334        }
3335
3336        if events.len() < max_events {
3337            let status = child.wait().context("wait for benchmark dataset reader")?;
3338            anyhow::ensure!(
3339                status.success(),
3340                "benchmark dataset reader exited with status {status}"
3341            );
3342        } else {
3343            let _ = child.kill();
3344            let _ = child.wait();
3345        }
3346
3347        Ok(events)
3348    }
3349
3350    fn build_synthetic_benchmark_events(event_count: usize, author_count: usize) -> Vec<Event> {
3351        let authors = (0..author_count)
3352            .map(|_| Keys::generate())
3353            .collect::<Vec<_>>();
3354        let mut events = Vec::with_capacity(event_count);
3355        for i in 0..event_count {
3356            let kind = if i % 8 < 5 {
3357                Kind::TextNote
3358            } else {
3359                Kind::Custom(30_023)
3360            };
3361            let mut tags = Vec::new();
3362            if kind == Kind::TextNote && i % 16 == 0 {
3363                tags.push(Tag::parse(&["t", "hashtree"]).unwrap());
3364            }
3365            let content = if kind == Kind::TextNote && i % 32 == 0 {
3366                format!("benchmark target event {i}")
3367            } else {
3368                format!("benchmark event {i}")
3369            };
3370            let event = EventBuilder::new(kind, content, tags)
3371                .custom_created_at(Timestamp::from_secs(1_700_000_000 + i as u64))
3372                .to_event(&authors[i % author_count])
3373                .unwrap();
3374            events.push(event);
3375        }
3376        events
3377    }
3378
3379    fn load_benchmark_events(
3380        event_count: usize,
3381        author_count: usize,
3382    ) -> Result<(String, Vec<Event>)> {
3383        if let Some(path) = benchmark_dataset_path() {
3384            let url = benchmark_dataset_url();
3385            ensure_benchmark_dataset(&path, &url)?;
3386            let events = load_benchmark_dataset(&path, event_count)?;
3387            return Ok((format!("dataset:{}", path.display()), events));
3388        }
3389
3390        Ok((
3391            format!("synthetic:{author_count}-authors"),
3392            build_synthetic_benchmark_events(event_count, author_count),
3393        ))
3394    }
3395
3396    fn first_tag_filter(event: &Event) -> Option<Filter> {
3397        event.tags.iter().find_map(|tag| match tag.as_slice() {
3398            [name, value, ..]
3399                if name.len() == 1
3400                    && !value.is_empty()
3401                    && name.as_bytes()[0].is_ascii_lowercase() =>
3402            {
3403                let letter = SingleLetterTag::from_char(name.chars().next()?).ok()?;
3404                Some(Filter::new().custom_tag(letter, [value.to_string()]))
3405            }
3406            _ => None,
3407        })
3408    }
3409
3410    fn first_search_term(event: &Event) -> Option<String> {
3411        event
3412            .content
3413            .split(|ch: char| !ch.is_alphanumeric())
3414            .find(|token| token.len() >= 4)
3415            .map(|token| token.to_ascii_lowercase())
3416    }
3417
3418    fn benchmark_match_count(events: &[Event], filter: &Filter, limit: usize) -> usize {
3419        events
3420            .iter()
3421            .filter(|event| filter.match_event(event))
3422            .count()
3423            .min(limit)
3424    }
3425
3426    fn benchmark_btree_orders() -> Vec<usize> {
3427        std::env::var("HASHTREE_BTREE_ORDERS")
3428            .ok()
3429            .map(|value| {
3430                value
3431                    .split(',')
3432                    .filter_map(|part| part.trim().parse::<usize>().ok())
3433                    .filter(|order| *order >= 2)
3434                    .collect::<Vec<_>>()
3435            })
3436            .filter(|orders| !orders.is_empty())
3437            .unwrap_or_else(|| vec![16, 24, 32, 48, 64])
3438    }
3439
3440    fn benchmark_read_iterations() -> usize {
3441        std::env::var("HASHTREE_BENCH_READ_ITERATIONS")
3442            .ok()
3443            .and_then(|value| value.parse::<usize>().ok())
3444            .unwrap_or(5)
3445            .max(1)
3446    }
3447
3448    fn average_duration(samples: &[Duration]) -> Duration {
3449        if samples.is_empty() {
3450            return Duration::ZERO;
3451        }
3452
3453        Duration::from_secs_f64(
3454            samples.iter().map(Duration::as_secs_f64).sum::<f64>() / samples.len() as f64,
3455        )
3456    }
3457
3458    fn average_read_trace(samples: &[ReadTraceSnapshot]) -> ReadTraceSnapshot {
3459        if samples.is_empty() {
3460            return ReadTraceSnapshot::default();
3461        }
3462
3463        let len = samples.len() as u64;
3464        ReadTraceSnapshot {
3465            get_calls: samples.iter().map(|sample| sample.get_calls).sum::<u64>() / len,
3466            total_bytes: samples.iter().map(|sample| sample.total_bytes).sum::<u64>() / len,
3467            unique_blocks: (samples
3468                .iter()
3469                .map(|sample| sample.unique_blocks as u64)
3470                .sum::<u64>()
3471                / len) as usize,
3472            unique_bytes: samples
3473                .iter()
3474                .map(|sample| sample.unique_bytes)
3475                .sum::<u64>()
3476                / len,
3477            cache_hits: samples.iter().map(|sample| sample.cache_hits).sum::<u64>() / len,
3478            remote_fetches: samples
3479                .iter()
3480                .map(|sample| sample.remote_fetches)
3481                .sum::<u64>()
3482                / len,
3483            remote_bytes: samples
3484                .iter()
3485                .map(|sample| sample.remote_bytes)
3486                .sum::<u64>()
3487                / len,
3488        }
3489    }
3490
3491    fn estimate_serialized_remote_ms(snapshot: &ReadTraceSnapshot, model: NetworkModel) -> f64 {
3492        let transfer_ms = if model.bandwidth_mib_per_s <= 0.0 {
3493            0.0
3494        } else {
3495            (snapshot.remote_bytes as f64 / (model.bandwidth_mib_per_s * 1024.0 * 1024.0)) * 1000.0
3496        };
3497        snapshot.remote_fetches as f64 * model.rtt_ms + transfer_ms
3498    }
3499
3500    #[derive(Debug, Clone)]
3501    struct IndexBenchmarkDataset {
3502        source: String,
3503        events: Vec<Event>,
3504        guaranteed_tag_name: String,
3505        guaranteed_tag_value: String,
3506        replaceable_pubkey: String,
3507        replaceable_kind: u32,
3508        parameterized_pubkey: String,
3509        parameterized_kind: u32,
3510        parameterized_d_tag: String,
3511    }
3512
3513    fn load_index_benchmark_dataset(
3514        event_count: usize,
3515        author_count: usize,
3516    ) -> Result<IndexBenchmarkDataset> {
3517        let (source, mut events) = load_benchmark_events(event_count, author_count)?;
3518        let base_timestamp = events
3519            .iter()
3520            .map(|event| event.created_at.as_u64())
3521            .max()
3522            .unwrap_or(1_700_000_000)
3523            + 1;
3524
3525        let replaceable_keys = Keys::generate();
3526        let parameterized_keys = Keys::generate();
3527        let tagged_keys = Keys::generate();
3528        let guaranteed_tag_name = "t".to_string();
3529        let guaranteed_tag_value = "btreebench".to_string();
3530        let replaceable_kind = 10_000u32;
3531        let parameterized_kind = 30_023u32;
3532        let parameterized_d_tag = "btree-bench".to_string();
3533
3534        let tagged = EventBuilder::new(
3535            Kind::TextNote,
3536            "btree benchmark tagged note",
3537            vec![Tag::parse(&["t", &guaranteed_tag_value]).unwrap()],
3538        )
3539        .custom_created_at(Timestamp::from_secs(base_timestamp))
3540        .to_event(&tagged_keys)
3541        .unwrap();
3542        let replaceable_old = EventBuilder::new(
3543            Kind::Custom(replaceable_kind.try_into().unwrap()),
3544            "replaceable old",
3545            [],
3546        )
3547        .custom_created_at(Timestamp::from_secs(base_timestamp + 1))
3548        .to_event(&replaceable_keys)
3549        .unwrap();
3550        let replaceable_new = EventBuilder::new(
3551            Kind::Custom(replaceable_kind.try_into().unwrap()),
3552            "replaceable new",
3553            [],
3554        )
3555        .custom_created_at(Timestamp::from_secs(base_timestamp + 2))
3556        .to_event(&replaceable_keys)
3557        .unwrap();
3558        let parameterized_old = EventBuilder::new(
3559            Kind::Custom(parameterized_kind.try_into().unwrap()),
3560            "",
3561            vec![Tag::identifier(&parameterized_d_tag)],
3562        )
3563        .custom_created_at(Timestamp::from_secs(base_timestamp + 3))
3564        .to_event(&parameterized_keys)
3565        .unwrap();
3566        let parameterized_new = EventBuilder::new(
3567            Kind::Custom(parameterized_kind.try_into().unwrap()),
3568            "",
3569            vec![Tag::identifier(&parameterized_d_tag)],
3570        )
3571        .custom_created_at(Timestamp::from_secs(base_timestamp + 4))
3572        .to_event(&parameterized_keys)
3573        .unwrap();
3574
3575        events.extend([
3576            tagged,
3577            replaceable_old,
3578            replaceable_new,
3579            parameterized_old,
3580            parameterized_new,
3581        ]);
3582
3583        Ok(IndexBenchmarkDataset {
3584            source,
3585            events,
3586            guaranteed_tag_name,
3587            guaranteed_tag_value,
3588            replaceable_pubkey: replaceable_keys.public_key().to_hex(),
3589            replaceable_kind,
3590            parameterized_pubkey: parameterized_keys.public_key().to_hex(),
3591            parameterized_kind,
3592            parameterized_d_tag,
3593        })
3594    }
3595
3596    fn build_btree_query_cases(dataset: &IndexBenchmarkDataset) -> Vec<BenchmarkQueryCase> {
3597        let primary_kind = dataset
3598            .events
3599            .iter()
3600            .find(|event| event.kind == Kind::TextNote)
3601            .map(|event| event.kind)
3602            .or_else(|| dataset.events.first().map(|event| event.kind))
3603            .expect("benchmark requires at least one event");
3604        let primary_kind_u32 = primary_kind.as_u16() as u32;
3605
3606        let author_pubkey = dataset
3607            .events
3608            .iter()
3609            .filter(|event| event.kind == primary_kind)
3610            .fold(HashMap::<String, usize>::new(), |mut counts, event| {
3611                *counts.entry(event.pubkey.to_hex()).or_default() += 1;
3612                counts
3613            })
3614            .into_iter()
3615            .max_by_key(|(_, count)| *count)
3616            .map(|(pubkey, _)| pubkey)
3617            .expect("benchmark requires an author for the selected kind");
3618
3619        let by_id_id = dataset.events[dataset.events.len() / 2].id.to_hex();
3620
3621        vec![
3622            BenchmarkQueryCase::ById { id: by_id_id },
3623            BenchmarkQueryCase::ByAuthor {
3624                pubkey: author_pubkey.clone(),
3625                limit: 50,
3626            },
3627            BenchmarkQueryCase::ByAuthorKind {
3628                pubkey: author_pubkey,
3629                kind: primary_kind_u32,
3630                limit: 50,
3631            },
3632            BenchmarkQueryCase::ByKind {
3633                kind: primary_kind_u32,
3634                limit: 200,
3635            },
3636            BenchmarkQueryCase::ByTag {
3637                tag_name: dataset.guaranteed_tag_name.clone(),
3638                tag_value: dataset.guaranteed_tag_value.clone(),
3639                limit: 100,
3640            },
3641            BenchmarkQueryCase::Recent { limit: 100 },
3642            BenchmarkQueryCase::Replaceable {
3643                pubkey: dataset.replaceable_pubkey.clone(),
3644                kind: dataset.replaceable_kind,
3645            },
3646            BenchmarkQueryCase::ParameterizedReplaceable {
3647                pubkey: dataset.parameterized_pubkey.clone(),
3648                kind: dataset.parameterized_kind,
3649                d_tag: dataset.parameterized_d_tag.clone(),
3650            },
3651        ]
3652    }
3653
3654    fn benchmark_warm_query_case<S: Store + 'static>(
3655        base: Arc<S>,
3656        root: &Cid,
3657        order: usize,
3658        case: &BenchmarkQueryCase,
3659        iterations: usize,
3660    ) -> QueryBenchmarkResult {
3661        let trace_store = Arc::new(CountingStore::new(base));
3662        let event_store = NostrEventStore::with_options(
3663            Arc::clone(&trace_store),
3664            NostrEventStoreOptions {
3665                btree_order: Some(order),
3666            },
3667        );
3668        let mut durations = Vec::with_capacity(iterations);
3669        let mut traces = Vec::with_capacity(iterations);
3670        for _ in 0..iterations {
3671            trace_store.reset();
3672            let started = Instant::now();
3673            let matches = block_on(case.execute(&event_store, root)).unwrap();
3674            durations.push(started.elapsed());
3675            traces.push(trace_store.snapshot());
3676            assert!(
3677                matches > 0,
3678                "benchmark query {} returned no matches",
3679                case.name()
3680            );
3681        }
3682        let mut sorted = durations.clone();
3683        sorted.sort_unstable();
3684        QueryBenchmarkResult {
3685            average_duration: average_duration(&durations),
3686            p95_duration: duration_percentile(&sorted, 95, 100),
3687            reads: average_read_trace(&traces),
3688        }
3689    }
3690
3691    fn benchmark_cold_query_case<S: Store + 'static>(
3692        remote: Arc<S>,
3693        root: &Cid,
3694        order: usize,
3695        case: &BenchmarkQueryCase,
3696        iterations: usize,
3697    ) -> QueryBenchmarkResult {
3698        let mut durations = Vec::with_capacity(iterations);
3699        let mut traces = Vec::with_capacity(iterations);
3700        for _ in 0..iterations {
3701            let cache = Arc::new(MemoryStore::new());
3702            let trace_store = Arc::new(ReadThroughStore::new(cache, Arc::clone(&remote)));
3703            let event_store = NostrEventStore::with_options(
3704                Arc::clone(&trace_store),
3705                NostrEventStoreOptions {
3706                    btree_order: Some(order),
3707                },
3708            );
3709            let started = Instant::now();
3710            let matches = block_on(case.execute(&event_store, root)).unwrap();
3711            durations.push(started.elapsed());
3712            traces.push(trace_store.snapshot());
3713            assert!(
3714                matches > 0,
3715                "benchmark query {} returned no matches",
3716                case.name()
3717            );
3718        }
3719        let mut sorted = durations.clone();
3720        sorted.sort_unstable();
3721        QueryBenchmarkResult {
3722            average_duration: average_duration(&durations),
3723            p95_duration: duration_percentile(&sorted, 95, 100),
3724            reads: average_read_trace(&traces),
3725        }
3726    }
3727
3728    fn duration_percentile(
3729        sorted: &[std::time::Duration],
3730        numerator: usize,
3731        denominator: usize,
3732    ) -> std::time::Duration {
3733        if sorted.is_empty() {
3734            return std::time::Duration::ZERO;
3735        }
3736        let index = ((sorted.len() - 1) * numerator) / denominator;
3737        sorted[index]
3738    }
3739
3740    #[test]
3741    #[ignore = "benchmark"]
3742    fn benchmark_query_events_large_dataset() {
3743        let _guard = test_lock();
3744        let tmp = TempDir::new().unwrap();
3745        let graph_store =
3746            open_social_graph_store_with_mapsize(tmp.path(), Some(512 * 1024 * 1024)).unwrap();
3747        set_nostr_profile_enabled(true);
3748        reset_nostr_profile();
3749
3750        let author_count = 64usize;
3751        let measured_event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3752            .ok()
3753            .and_then(|value| value.parse::<usize>().ok())
3754            .unwrap_or(600usize);
3755        let warmup_event_count = benchmark_stream_warmup_events(measured_event_count);
3756        let total_event_count = warmup_event_count + measured_event_count;
3757        let (source, events) = load_benchmark_events(total_event_count, author_count).unwrap();
3758        let loaded_event_count = events.len();
3759        let warmup_event_count = warmup_event_count.min(loaded_event_count.saturating_sub(1));
3760        let (warmup_events, measured_events) = events.split_at(warmup_event_count);
3761
3762        println!(
3763            "starting steady-state dataset benchmark with {} warmup events and {} measured stream events from {}",
3764            warmup_events.len(),
3765            measured_events.len(),
3766            source
3767        );
3768        if !warmup_events.is_empty() {
3769            ingest_parsed_events(&graph_store, warmup_events).unwrap();
3770        }
3771
3772        let stream_start = Instant::now();
3773        let mut per_event_latencies = Vec::with_capacity(measured_events.len());
3774        for event in measured_events {
3775            let event_start = Instant::now();
3776            ingest_parsed_event(&graph_store, event).unwrap();
3777            per_event_latencies.push(event_start.elapsed());
3778        }
3779        let ingest_duration = stream_start.elapsed();
3780
3781        let mut sorted_latencies = per_event_latencies.clone();
3782        sorted_latencies.sort_unstable();
3783        let average_latency = if per_event_latencies.is_empty() {
3784            std::time::Duration::ZERO
3785        } else {
3786            std::time::Duration::from_secs_f64(
3787                per_event_latencies
3788                    .iter()
3789                    .map(std::time::Duration::as_secs_f64)
3790                    .sum::<f64>()
3791                    / per_event_latencies.len() as f64,
3792            )
3793        };
3794        let ingest_capacity_eps = if ingest_duration.is_zero() {
3795            f64::INFINITY
3796        } else {
3797            measured_events.len() as f64 / ingest_duration.as_secs_f64()
3798        };
3799        println!(
3800            "benchmark steady-state ingest complete in {:?} (avg={:?} p50={:?} p95={:?} p99={:?} capacity={:.2} events/s)",
3801            ingest_duration,
3802            average_latency,
3803            duration_percentile(&sorted_latencies, 50, 100),
3804            duration_percentile(&sorted_latencies, 95, 100),
3805            duration_percentile(&sorted_latencies, 99, 100),
3806            ingest_capacity_eps
3807        );
3808        let mut profile = take_nostr_profile();
3809        profile.sort_by(|left, right| right.total.cmp(&left.total));
3810        for stat in profile {
3811            let pct = if ingest_duration.is_zero() {
3812                0.0
3813            } else {
3814                (stat.total.as_secs_f64() / ingest_duration.as_secs_f64()) * 100.0
3815            };
3816            let average = if stat.count == 0 {
3817                std::time::Duration::ZERO
3818            } else {
3819                std::time::Duration::from_secs_f64(stat.total.as_secs_f64() / stat.count as f64)
3820            };
3821            println!(
3822                "ingest profile: label={} total={:?} pct={:.1}% count={} avg={:?} max={:?}",
3823                stat.label, stat.total, pct, stat.count, average, stat.max
3824            );
3825        }
3826        set_nostr_profile_enabled(false);
3827
3828        let kind = events
3829            .iter()
3830            .find(|event| event.kind == Kind::TextNote)
3831            .map(|event| event.kind)
3832            .or_else(|| events.first().map(|event| event.kind))
3833            .expect("benchmark requires at least one event");
3834        let kind_filter = Filter::new().kind(kind);
3835        let kind_start = Instant::now();
3836        let kind_events = query_events(&graph_store, &kind_filter, 200);
3837        let kind_duration = kind_start.elapsed();
3838        assert_eq!(
3839            kind_events.len(),
3840            benchmark_match_count(&events, &kind_filter, 200)
3841        );
3842        assert!(kind_events
3843            .windows(2)
3844            .all(|window| window[0].created_at >= window[1].created_at));
3845
3846        let author_pubkey = events
3847            .iter()
3848            .find(|event| event.kind == kind)
3849            .map(|event| event.pubkey)
3850            .expect("benchmark requires an author for the selected kind");
3851        let author_filter = Filter::new().author(author_pubkey).kind(kind);
3852        let author_start = Instant::now();
3853        let author_events = query_events(&graph_store, &author_filter, 50);
3854        let author_duration = author_start.elapsed();
3855        assert_eq!(
3856            author_events.len(),
3857            benchmark_match_count(&events, &author_filter, 50)
3858        );
3859
3860        let tag_filter = events
3861            .iter()
3862            .find_map(first_tag_filter)
3863            .expect("benchmark requires at least one tagged event");
3864        let tag_start = Instant::now();
3865        let tag_events = query_events(&graph_store, &tag_filter, 100);
3866        let tag_duration = tag_start.elapsed();
3867        assert_eq!(
3868            tag_events.len(),
3869            benchmark_match_count(&events, &tag_filter, 100)
3870        );
3871
3872        let search_source = events
3873            .iter()
3874            .find_map(|event| first_search_term(event).map(|term| (event.kind, term)))
3875            .expect("benchmark requires at least one searchable event");
3876        let search_filter = Filter::new().kind(search_source.0).search(search_source.1);
3877        let search_start = Instant::now();
3878        let search_events = query_events(&graph_store, &search_filter, 100);
3879        let search_duration = search_start.elapsed();
3880        assert_eq!(
3881            search_events.len(),
3882            benchmark_match_count(&events, &search_filter, 100)
3883        );
3884
3885        println!(
3886            "steady-state benchmark: source={} warmup_events={} stream_events={} ingest={:?} avg={:?} p50={:?} p95={:?} p99={:?} capacity_eps={:.2} kind={:?} author={:?} tag={:?} search={:?}",
3887            source,
3888            warmup_events.len(),
3889            measured_events.len(),
3890            ingest_duration,
3891            average_latency,
3892            duration_percentile(&sorted_latencies, 50, 100),
3893            duration_percentile(&sorted_latencies, 95, 100),
3894            duration_percentile(&sorted_latencies, 99, 100),
3895            ingest_capacity_eps,
3896            kind_duration,
3897            author_duration,
3898            tag_duration,
3899            search_duration
3900        );
3901    }
3902
3903    #[test]
3904    #[ignore = "benchmark"]
3905    fn benchmark_nostr_btree_query_tradeoffs() {
3906        let _guard = test_lock();
3907        let event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3908            .ok()
3909            .and_then(|value| value.parse::<usize>().ok())
3910            .unwrap_or(2_000usize);
3911        let iterations = benchmark_read_iterations();
3912        let orders = benchmark_btree_orders();
3913        let dataset = load_index_benchmark_dataset(event_count, 64).unwrap();
3914        let cases = build_btree_query_cases(&dataset);
3915        let stored_events = dataset
3916            .events
3917            .iter()
3918            .map(stored_event_from_nostr)
3919            .collect::<Vec<_>>();
3920
3921        println!(
3922            "btree-order benchmark: source={} events={} iterations={} orders={:?}",
3923            dataset.source,
3924            stored_events.len(),
3925            iterations,
3926            orders
3927        );
3928        println!(
3929            "network models are serialized fetch estimates: {}",
3930            NETWORK_MODELS
3931                .iter()
3932                .map(|model| format!(
3933                    "{}={}ms_rtt/{}MiBps",
3934                    model.name, model.rtt_ms, model.bandwidth_mib_per_s
3935                ))
3936                .collect::<Vec<_>>()
3937                .join(", ")
3938        );
3939
3940        for order in orders {
3941            let tmp = TempDir::new().unwrap();
3942            let local_store =
3943                Arc::new(LocalStore::new(tmp.path().join("blobs"), &StorageBackend::Lmdb).unwrap());
3944            let event_store = NostrEventStore::with_options(
3945                Arc::clone(&local_store),
3946                NostrEventStoreOptions {
3947                    btree_order: Some(order),
3948                },
3949            );
3950            let root = block_on(event_store.build(None, stored_events.clone()))
3951                .unwrap()
3952                .expect("benchmark build root");
3953
3954            println!("btree-order={} root={}", order, hex::encode(root.hash));
3955            let mut warm_total_ms = 0.0f64;
3956            let mut model_totals = NETWORK_MODELS
3957                .iter()
3958                .map(|model| (model.name, 0.0f64))
3959                .collect::<HashMap<_, _>>();
3960
3961            for case in &cases {
3962                let warm = benchmark_warm_query_case(
3963                    Arc::clone(&local_store),
3964                    &root,
3965                    order,
3966                    case,
3967                    iterations,
3968                );
3969                let cold = benchmark_cold_query_case(
3970                    Arc::clone(&local_store),
3971                    &root,
3972                    order,
3973                    case,
3974                    iterations,
3975                );
3976                warm_total_ms += warm.average_duration.as_secs_f64() * 1000.0;
3977
3978                let model_estimates = NETWORK_MODELS
3979                    .iter()
3980                    .map(|model| {
3981                        let estimate = estimate_serialized_remote_ms(&cold.reads, *model);
3982                        *model_totals.get_mut(model.name).unwrap() += estimate;
3983                        format!("{}={:.2}ms", model.name, estimate)
3984                    })
3985                    .collect::<Vec<_>>()
3986                    .join(" ");
3987
3988                println!(
3989                    "btree-order={} query={} warm_avg={:?} warm_p95={:?} warm_blocks={} warm_unique_bytes={} cold_fetches={} cold_bytes={} cold_local_avg={:?} {}",
3990                    order,
3991                    case.name(),
3992                    warm.average_duration,
3993                    warm.p95_duration,
3994                    warm.reads.unique_blocks,
3995                    warm.reads.unique_bytes,
3996                    cold.reads.remote_fetches,
3997                    cold.reads.remote_bytes,
3998                    cold.average_duration,
3999                    model_estimates
4000                );
4001            }
4002
4003            println!(
4004                "btree-order={} summary unweighted_warm_avg_ms={:.3} {}",
4005                order,
4006                warm_total_ms / cases.len() as f64,
4007                NETWORK_MODELS
4008                    .iter()
4009                    .map(|model| format!(
4010                        "{}={:.2}ms",
4011                        model.name,
4012                        model_totals[model.name] / cases.len() as f64
4013                    ))
4014                    .collect::<Vec<_>>()
4015                    .join(" ")
4016            );
4017        }
4018    }
4019
4020    #[test]
4021    fn test_ensure_social_graph_mapsize_rounds_and_applies() {
4022        let _guard = test_lock();
4023        let tmp = TempDir::new().unwrap();
4024        ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
4025        let requested = 70 * 1024 * 1024;
4026        ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
4027        let env = unsafe {
4028            heed::EnvOpenOptions::new()
4029                .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
4030                .max_dbs(SOCIALGRAPH_MAX_DBS)
4031                .open(tmp.path())
4032        }
4033        .unwrap();
4034        assert!(env.info().map_size >= requested as usize);
4035        assert_eq!(env.info().map_size % page_size_bytes(), 0);
4036    }
4037
4038    #[test]
4039    fn test_ingest_events_batches_graph_updates() {
4040        let _guard = test_lock();
4041        let tmp = TempDir::new().unwrap();
4042        let graph_store = open_social_graph_store(tmp.path()).unwrap();
4043
4044        let root_keys = Keys::generate();
4045        let alice_keys = Keys::generate();
4046        let bob_keys = Keys::generate();
4047
4048        let root_pk = root_keys.public_key().to_bytes();
4049        set_social_graph_root(&graph_store, &root_pk);
4050
4051        let root_follows_alice = EventBuilder::new(
4052            Kind::ContactList,
4053            "",
4054            vec![Tag::public_key(alice_keys.public_key())],
4055        )
4056        .custom_created_at(Timestamp::from_secs(10))
4057        .to_event(&root_keys)
4058        .unwrap();
4059        let alice_follows_bob = EventBuilder::new(
4060            Kind::ContactList,
4061            "",
4062            vec![Tag::public_key(bob_keys.public_key())],
4063        )
4064        .custom_created_at(Timestamp::from_secs(11))
4065        .to_event(&alice_keys)
4066        .unwrap();
4067
4068        ingest_parsed_events(
4069            &graph_store,
4070            &[root_follows_alice.clone(), alice_follows_bob.clone()],
4071        )
4072        .unwrap();
4073
4074        assert_eq!(
4075            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4076            Some(1)
4077        );
4078        assert_eq!(
4079            get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
4080            Some(2)
4081        );
4082
4083        let filter = Filter::new().kind(Kind::ContactList);
4084        let stored = query_events(&graph_store, &filter, 10);
4085        let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
4086        assert!(ids.contains(&root_follows_alice.id));
4087        assert!(ids.contains(&alice_follows_bob.id));
4088    }
4089
4090    #[test]
4091    fn test_ingest_graph_events_updates_graph_without_indexing_events() {
4092        let _guard = test_lock();
4093        let tmp = TempDir::new().unwrap();
4094        let graph_store = open_social_graph_store(tmp.path()).unwrap();
4095
4096        let root_keys = Keys::generate();
4097        let alice_keys = Keys::generate();
4098
4099        let root_pk = root_keys.public_key().to_bytes();
4100        set_social_graph_root(&graph_store, &root_pk);
4101
4102        let root_follows_alice = EventBuilder::new(
4103            Kind::ContactList,
4104            "",
4105            vec![Tag::public_key(alice_keys.public_key())],
4106        )
4107        .custom_created_at(Timestamp::from_secs(10))
4108        .to_event(&root_keys)
4109        .unwrap();
4110
4111        ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
4112            .unwrap();
4113
4114        assert_eq!(
4115            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4116            Some(1)
4117        );
4118        let filter = Filter::new().kind(Kind::ContactList);
4119        assert!(query_events(&graph_store, &filter, 10).is_empty());
4120    }
4121}