1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10 LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
21use hashtree_index::BTree;
22use hashtree_nostr::{
23 is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
24 NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
25};
26#[cfg(test)]
27use hashtree_nostr::{
28 reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
29 take_profile as take_nostr_profile,
30};
31use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
32use nostr_social_graph::{
33 BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
34 SocialGraphBackend as NostrSocialGraphBackend,
35};
36use nostr_social_graph_heed::HeedSocialGraph;
37
38use crate::storage::{LocalStore, StorageRouter};
39
40#[cfg(test)]
41use std::sync::{Mutex, MutexGuard, OnceLock};
42#[cfg(test)]
43use std::time::Instant;
44
45pub type UserSet = BTreeSet<[u8; 32]>;
46
47const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
48const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
49const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
50const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
51const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
52const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
53const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
54const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
55const SOCIALGRAPH_MAX_DBS: u32 = 16;
56const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
57const PROFILE_SEARCH_PREFIX: &str = "p:";
58const PROFILE_NAME_MAX_LENGTH: usize = 100;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum EventStorageClass {
62 Public,
63 Ambient,
64}
65
66#[cfg_attr(not(test), allow(dead_code))]
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub(crate) enum EventQueryScope {
69 PublicOnly,
70 AmbientOnly,
71 All,
72}
73
74struct EventIndexBucket {
75 event_store: NostrEventStore<StorageRouter>,
76 root_path: PathBuf,
77}
78
79struct ProfileIndexBucket {
80 tree: HashTree<StorageRouter>,
81 index: BTree<StorageRouter>,
82 by_pubkey_root_path: PathBuf,
83 search_root_path: PathBuf,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct StoredCid {
88 hash: [u8; 32],
89 key: Option<[u8; 32]>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
93pub struct StoredProfileSearchEntry {
94 pub pubkey: String,
95 pub name: String,
96 #[serde(default)]
97 pub aliases: Vec<String>,
98 #[serde(default)]
99 pub nip05: Option<String>,
100 pub created_at: u64,
101 pub event_nhash: String,
102}
103
104#[derive(Debug, Clone, Default, serde::Serialize)]
105pub struct SocialGraphStats {
106 pub total_users: usize,
107 pub root: Option<String>,
108 pub total_follows: usize,
109 pub max_depth: u32,
110 pub size_by_distance: BTreeMap<u32, usize>,
111 pub enabled: bool,
112}
113
114#[derive(Debug, Clone)]
115struct DistanceCache {
116 stats: SocialGraphStats,
117 users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
118}
119
120#[derive(Debug, thiserror::Error)]
121#[error("{0}")]
122pub struct UpstreamGraphBackendError(String);
123
124pub struct SocialGraphStore {
125 graph: StdMutex<HeedSocialGraph>,
126 distance_cache: StdMutex<Option<DistanceCache>>,
127 public_events: EventIndexBucket,
128 ambient_events: EventIndexBucket,
129 profile_index: ProfileIndexBucket,
130 profile_index_overmute_threshold: StdMutex<f64>,
131}
132
133pub trait SocialGraphBackend: Send + Sync {
134 fn stats(&self) -> Result<SocialGraphStats>;
135 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
136 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
137 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
138 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
139 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
140 fn profile_search_root(&self) -> Result<Option<Cid>> {
141 Ok(None)
142 }
143 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
144 fn ingest_event(&self, event: &Event) -> Result<()>;
145 fn ingest_event_with_storage_class(
146 &self,
147 event: &Event,
148 storage_class: EventStorageClass,
149 ) -> Result<()> {
150 let _ = storage_class;
151 self.ingest_event(event)
152 }
153 fn ingest_events(&self, events: &[Event]) -> Result<()> {
154 for event in events {
155 self.ingest_event(event)?;
156 }
157 Ok(())
158 }
159 fn ingest_events_with_storage_class(
160 &self,
161 events: &[Event],
162 storage_class: EventStorageClass,
163 ) -> Result<()> {
164 for event in events {
165 self.ingest_event_with_storage_class(event, storage_class)?;
166 }
167 Ok(())
168 }
169 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
170 self.ingest_events(events)
171 }
172 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
173}
174
175#[cfg(test)]
176pub type TestLockGuard = MutexGuard<'static, ()>;
177
178#[cfg(test)]
179static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
180
181#[cfg(test)]
182pub fn test_lock() -> TestLockGuard {
183 NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
184}
185
186pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
187 open_social_graph_store_with_mapsize(data_dir, None)
188}
189
190pub fn open_social_graph_store_with_mapsize(
191 data_dir: &Path,
192 mapsize_bytes: Option<u64>,
193) -> Result<Arc<SocialGraphStore>> {
194 let db_dir = data_dir.join("socialgraph");
195 open_social_graph_store_at_path(&db_dir, mapsize_bytes)
196}
197
198pub fn open_social_graph_store_with_storage(
199 data_dir: &Path,
200 store: Arc<StorageRouter>,
201 mapsize_bytes: Option<u64>,
202) -> Result<Arc<SocialGraphStore>> {
203 let db_dir = data_dir.join("socialgraph");
204 open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
205}
206
207pub fn open_social_graph_store_at_path(
208 db_dir: &Path,
209 mapsize_bytes: Option<u64>,
210) -> Result<Arc<SocialGraphStore>> {
211 let config = hashtree_config::Config::load_or_default();
212 let backend = &config.storage.backend;
213 let local_store = Arc::new(
214 LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
215 .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
216 );
217 let store = Arc::new(StorageRouter::new(local_store));
218 open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
219}
220
221pub fn open_social_graph_store_at_path_with_storage(
222 db_dir: &Path,
223 store: Arc<StorageRouter>,
224 mapsize_bytes: Option<u64>,
225) -> Result<Arc<SocialGraphStore>> {
226 let ambient_backend = store.local_store().backend();
227 let ambient_local = Arc::new(
228 LocalStore::new_with_lmdb_map_size(
229 db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
230 &ambient_backend,
231 mapsize_bytes,
232 )
233 .map_err(|err| {
234 anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
235 })?,
236 );
237 let ambient_store = Arc::new(StorageRouter::new(ambient_local));
238 open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
239}
240
241pub fn open_social_graph_store_at_path_with_storage_split(
242 db_dir: &Path,
243 public_store: Arc<StorageRouter>,
244 ambient_store: Arc<StorageRouter>,
245 mapsize_bytes: Option<u64>,
246) -> Result<Arc<SocialGraphStore>> {
247 std::fs::create_dir_all(db_dir)?;
248 if let Some(size) = mapsize_bytes {
249 ensure_social_graph_mapsize(db_dir, size)?;
250 }
251 let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
252 .context("open nostr-social-graph heed backend")?;
253
254 Ok(Arc::new(SocialGraphStore {
255 graph: StdMutex::new(graph),
256 distance_cache: StdMutex::new(None),
257 public_events: EventIndexBucket {
258 event_store: NostrEventStore::new(Arc::clone(&public_store)),
259 root_path: db_dir.join(EVENTS_ROOT_FILE),
260 },
261 ambient_events: EventIndexBucket {
262 event_store: NostrEventStore::new(ambient_store),
263 root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
264 },
265 profile_index: ProfileIndexBucket {
266 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
267 index: BTree::new(
268 public_store,
269 hashtree_index::BTreeOptions {
270 order: Some(PROFILE_SEARCH_INDEX_ORDER),
271 },
272 ),
273 by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
274 search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
275 },
276 profile_index_overmute_threshold: StdMutex::new(1.0),
277 }))
278}
279
280pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
281 if let Err(err) = store.set_root(pk_bytes) {
282 tracing::warn!("Failed to set social graph root: {err}");
283 }
284}
285
286pub fn get_follow_distance(
287 backend: &(impl SocialGraphBackend + ?Sized),
288 pk_bytes: &[u8; 32],
289) -> Option<u32> {
290 backend.follow_distance(pk_bytes).ok().flatten()
291}
292
293pub fn get_follows(
294 backend: &(impl SocialGraphBackend + ?Sized),
295 pk_bytes: &[u8; 32],
296) -> Vec<[u8; 32]> {
297 match backend.followed_targets(pk_bytes) {
298 Ok(set) => set.into_iter().collect(),
299 Err(_) => Vec::new(),
300 }
301}
302
303pub fn is_overmuted(
304 backend: &(impl SocialGraphBackend + ?Sized),
305 _root_pk: &[u8; 32],
306 user_pk: &[u8; 32],
307 threshold: f64,
308) -> bool {
309 backend
310 .is_overmuted_user(user_pk, threshold)
311 .unwrap_or(false)
312}
313
314pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
315 let event = match Event::from_json(event_json) {
316 Ok(event) => event,
317 Err(_) => return,
318 };
319
320 if let Err(err) = backend.ingest_event(&event) {
321 tracing::warn!("Failed to ingest social graph event: {err}");
322 }
323}
324
325pub fn ingest_parsed_event(
326 backend: &(impl SocialGraphBackend + ?Sized),
327 event: &Event,
328) -> Result<()> {
329 backend.ingest_event(event)
330}
331
332pub fn ingest_parsed_event_with_storage_class(
333 backend: &(impl SocialGraphBackend + ?Sized),
334 event: &Event,
335 storage_class: EventStorageClass,
336) -> Result<()> {
337 backend.ingest_event_with_storage_class(event, storage_class)
338}
339
340pub fn ingest_parsed_events(
341 backend: &(impl SocialGraphBackend + ?Sized),
342 events: &[Event],
343) -> Result<()> {
344 backend.ingest_events(events)
345}
346
347pub fn ingest_parsed_events_with_storage_class(
348 backend: &(impl SocialGraphBackend + ?Sized),
349 events: &[Event],
350 storage_class: EventStorageClass,
351) -> Result<()> {
352 backend.ingest_events_with_storage_class(events, storage_class)
353}
354
355pub fn ingest_graph_parsed_events(
356 backend: &(impl SocialGraphBackend + ?Sized),
357 events: &[Event],
358) -> Result<()> {
359 backend.ingest_graph_events(events)
360}
361
362pub fn query_events(
363 backend: &(impl SocialGraphBackend + ?Sized),
364 filter: &Filter,
365 limit: usize,
366) -> Vec<Event> {
367 backend.query_events(filter, limit).unwrap_or_default()
368}
369
370impl SocialGraphStore {
371 pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
372 *self
373 .profile_index_overmute_threshold
374 .lock()
375 .expect("profile index overmute threshold") = threshold;
376 }
377
378 fn profile_index_overmute_threshold(&self) -> f64 {
379 *self
380 .profile_index_overmute_threshold
381 .lock()
382 .expect("profile index overmute threshold")
383 }
384
385 fn invalidate_distance_cache(&self) {
386 *self.distance_cache.lock().unwrap() = None;
387 }
388
389 fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
390 let unique_ids = state
391 .unique_ids
392 .into_iter()
393 .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
394 .collect::<Result<HashMap<_, _>>>()?;
395
396 let mut users_by_distance = BTreeMap::new();
397 let mut size_by_distance = BTreeMap::new();
398 for (distance, users) in state.users_by_follow_distance {
399 let decoded = users
400 .into_iter()
401 .filter_map(|id| unique_ids.get(&id).copied())
402 .collect::<Vec<_>>();
403 size_by_distance.insert(distance, decoded.len());
404 users_by_distance.insert(distance, decoded);
405 }
406
407 let total_follows = state
408 .followed_by_user
409 .iter()
410 .map(|(_, targets)| targets.len())
411 .sum::<usize>();
412 let total_users = size_by_distance.values().copied().sum();
413 let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
414
415 Ok(DistanceCache {
416 stats: SocialGraphStats {
417 total_users,
418 root: Some(state.root),
419 total_follows,
420 max_depth,
421 size_by_distance,
422 enabled: true,
423 },
424 users_by_distance,
425 })
426 }
427
428 fn load_distance_cache(&self) -> Result<DistanceCache> {
429 if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
430 return Ok(cache);
431 }
432
433 let state = {
434 let graph = self.graph.lock().unwrap();
435 graph.export_state().context("export social graph state")?
436 };
437 let cache = Self::build_distance_cache(state)?;
438 *self.distance_cache.lock().unwrap() = Some(cache.clone());
439 Ok(cache)
440 }
441
442 fn set_root(&self, root: &[u8; 32]) -> Result<()> {
443 let root_hex = hex::encode(root);
444 {
445 let mut graph = self.graph.lock().unwrap();
446 if should_replace_placeholder_root(&graph)? {
447 let fresh = SocialGraph::new(&root_hex);
448 graph
449 .replace_state(&fresh.export_state())
450 .context("replace placeholder social graph root")?;
451 } else {
452 graph
453 .set_root(&root_hex)
454 .context("set nostr-social-graph root")?;
455 }
456 }
457 self.invalidate_distance_cache();
458 Ok(())
459 }
460
461 fn stats(&self) -> Result<SocialGraphStats> {
462 Ok(self.load_distance_cache()?.stats)
463 }
464
465 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
466 let graph = self.graph.lock().unwrap();
467 let distance = graph
468 .get_follow_distance(&hex::encode(pk_bytes))
469 .context("read social graph follow distance")?;
470 Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
471 }
472
473 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
474 Ok(self
475 .load_distance_cache()?
476 .users_by_distance
477 .get(&distance)
478 .cloned()
479 .unwrap_or_default())
480 }
481
482 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
483 let graph = self.graph.lock().unwrap();
484 graph
485 .get_follow_list_created_at(&hex::encode(owner))
486 .context("read social graph follow list timestamp")
487 }
488
489 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
490 let graph = self.graph.lock().unwrap();
491 decode_pubkey_set(
492 graph
493 .get_followed_by_user(&hex::encode(owner))
494 .context("read followed targets")?,
495 )
496 }
497
498 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
499 if threshold <= 0.0 {
500 return Ok(false);
501 }
502 let graph = self.graph.lock().unwrap();
503 graph
504 .is_overmuted(&hex::encode(user_pk), threshold)
505 .context("check social graph overmute")
506 }
507
508 #[cfg_attr(not(test), allow(dead_code))]
509 pub fn profile_search_root(&self) -> Result<Option<Cid>> {
510 self.profile_index.search_root()
511 }
512
513 pub(crate) fn public_events_root(&self) -> Result<Option<Cid>> {
514 self.public_events.events_root()
515 }
516
517 pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
518 self.public_events.write_events_root(root)
519 }
520
521 #[cfg_attr(not(test), allow(dead_code))]
522 pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
523 self.profile_index.profile_event_for_pubkey(pubkey_hex)
524 }
525
526 #[cfg_attr(not(test), allow(dead_code))]
527 pub fn profile_search_entries_for_prefix(
528 &self,
529 prefix: &str,
530 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
531 self.profile_index.search_entries_for_prefix(prefix)
532 }
533
534 pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
535 self.update_profile_index_for_events(events)
536 }
537
538 pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
539 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
540 let (by_pubkey_root, search_root) = self
541 .profile_index
542 .rebuild_profile_events(latest_by_pubkey.into_values())?;
543 self.profile_index
544 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
545 self.profile_index.write_search_root(search_root.as_ref())?;
546 Ok(())
547 }
548
549 pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
550 let public_events_root = self.public_events.events_root()?;
551 let ambient_events_root = self.ambient_events.events_root()?;
552 if public_events_root.is_none() && ambient_events_root.is_none() {
553 self.profile_index.write_by_pubkey_root(None)?;
554 self.profile_index.write_search_root(None)?;
555 return Ok(0);
556 }
557
558 let mut events = Vec::new();
559 for (bucket, root) in [
560 (&self.public_events, public_events_root),
561 (&self.ambient_events, ambient_events_root),
562 ] {
563 let Some(root) = root else {
564 continue;
565 };
566 let stored = block_on(bucket.event_store.list_by_kind_lossy(
567 Some(&root),
568 Kind::Metadata.as_u16() as u32,
569 ListEventsOptions::default(),
570 ))
571 .map_err(map_event_store_error)?;
572 events.extend(
573 stored
574 .into_iter()
575 .map(nostr_event_from_stored)
576 .collect::<Result<Vec<_>>>()?,
577 );
578 }
579
580 let latest_count = self
581 .filtered_latest_metadata_events_by_pubkey(&events)?
582 .len();
583 self.rebuild_profile_index_for_events(&events)?;
584 Ok(latest_count)
585 }
586
587 fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
588 let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
589 let threshold = self.profile_index_overmute_threshold();
590
591 if latest_by_pubkey.is_empty() {
592 return Ok(());
593 }
594
595 let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
596 let mut search_root = self.profile_index.search_root()?;
597 let mut changed = false;
598
599 for event in latest_by_pubkey.into_values() {
600 let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
601 let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
602 self.profile_index.remove_profile_event(
603 by_pubkey_root.as_ref(),
604 search_root.as_ref(),
605 &event.pubkey.to_hex(),
606 )?
607 } else {
608 self.profile_index.update_profile_event(
609 by_pubkey_root.as_ref(),
610 search_root.as_ref(),
611 event,
612 )?
613 };
614 if updated {
615 by_pubkey_root = next_by_pubkey_root;
616 search_root = next_search_root;
617 changed = true;
618 }
619 }
620
621 if changed {
622 self.profile_index
623 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
624 self.profile_index.write_search_root(search_root.as_ref())?;
625 }
626
627 Ok(())
628 }
629
630 fn filtered_latest_metadata_events_by_pubkey<'a>(
631 &self,
632 events: &'a [Event],
633 ) -> Result<BTreeMap<String, &'a Event>> {
634 let threshold = self.profile_index_overmute_threshold();
635 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
636 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
637 if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
638 continue;
639 }
640 let pubkey = event.pubkey.to_hex();
641 match latest_by_pubkey.get(&pubkey) {
642 Some(current) if compare_nostr_events(event, current).is_le() => {}
643 _ => {
644 latest_by_pubkey.insert(pubkey, event);
645 }
646 }
647 }
648 Ok(latest_by_pubkey)
649 }
650
651 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
652 let state = {
653 let graph = self.graph.lock().unwrap();
654 graph.export_state().context("export social graph state")?
655 };
656 let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
657 let root_hex = hex::encode(root);
658 if graph.get_root() != root_hex {
659 graph
660 .set_root(&root_hex)
661 .context("set snapshot social graph root")?;
662 }
663 let chunks = graph
664 .to_binary_chunks_with_budget(*options)
665 .context("encode social graph snapshot")?;
666 Ok(chunks.into_iter().map(Bytes::from).collect())
667 }
668
669 fn ingest_event(&self, event: &Event) -> Result<()> {
670 self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
671 }
672
673 fn ingest_events(&self, events: &[Event]) -> Result<()> {
674 if events.is_empty() {
675 return Ok(());
676 }
677
678 let mut public = Vec::new();
679 let mut ambient = Vec::new();
680 for event in events {
681 match self.default_storage_class_for(event)? {
682 EventStorageClass::Public => public.push(event.clone()),
683 EventStorageClass::Ambient => ambient.push(event.clone()),
684 }
685 }
686
687 if !public.is_empty() {
688 self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
689 }
690 if !ambient.is_empty() {
691 self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
692 }
693
694 Ok(())
695 }
696
697 fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
698 let graph_events = events
699 .iter()
700 .filter(|event| is_social_graph_event(event.kind))
701 .collect::<Vec<_>>();
702 if graph_events.is_empty() {
703 return Ok(());
704 }
705
706 {
707 let mut graph = self.graph.lock().unwrap();
708 let mut snapshot = SocialGraph::from_state(
709 graph
710 .export_state()
711 .context("export social graph state for graph-only ingest")?,
712 )
713 .context("rebuild social graph state for graph-only ingest")?;
714 for event in graph_events {
715 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
716 }
717 graph
718 .replace_state(&snapshot.export_state())
719 .context("replace graph-only social graph state")?;
720 }
721 self.invalidate_distance_cache();
722 Ok(())
723 }
724
725 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
726 self.query_events_in_scope(filter, limit, EventQueryScope::All)
727 }
728
729 fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
730 let graph = self.graph.lock().unwrap();
731 let root_hex = graph.get_root().context("read social graph root")?;
732 if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
733 return Ok(EventStorageClass::Public);
734 }
735 Ok(EventStorageClass::Ambient)
736 }
737
738 fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
739 match storage_class {
740 EventStorageClass::Public => &self.public_events,
741 EventStorageClass::Ambient => &self.ambient_events,
742 }
743 }
744
745 fn ingest_event_with_storage_class(
746 &self,
747 event: &Event,
748 storage_class: EventStorageClass,
749 ) -> Result<()> {
750 let current_root = self.bucket(storage_class).events_root()?;
751 let next_root = self
752 .bucket(storage_class)
753 .store_event(current_root.as_ref(), event)?;
754 self.bucket(storage_class)
755 .write_events_root(Some(&next_root))?;
756
757 self.update_profile_index_for_events(std::slice::from_ref(event))?;
758
759 if is_social_graph_event(event.kind) {
760 {
761 let mut graph = self.graph.lock().unwrap();
762 graph
763 .handle_event(&graph_event_from_nostr(event), true, 0.0)
764 .context("ingest social graph event into nostr-social-graph")?;
765 }
766 self.invalidate_distance_cache();
767 }
768
769 Ok(())
770 }
771
772 fn ingest_events_with_storage_class(
773 &self,
774 events: &[Event],
775 storage_class: EventStorageClass,
776 ) -> Result<()> {
777 if events.is_empty() {
778 return Ok(());
779 }
780
781 let bucket = self.bucket(storage_class);
782 let current_root = bucket.events_root()?;
783 let stored_events = events
784 .iter()
785 .map(stored_event_from_nostr)
786 .collect::<Vec<_>>();
787 let next_root = block_on(
788 bucket
789 .event_store
790 .build(current_root.as_ref(), stored_events),
791 )
792 .map_err(map_event_store_error)?;
793 bucket.write_events_root(next_root.as_ref())?;
794
795 self.update_profile_index_for_events(events)?;
796
797 let graph_events = events
798 .iter()
799 .filter(|event| is_social_graph_event(event.kind))
800 .collect::<Vec<_>>();
801 if graph_events.is_empty() {
802 return Ok(());
803 }
804
805 {
806 let mut graph = self.graph.lock().unwrap();
807 let mut snapshot = SocialGraph::from_state(
808 graph
809 .export_state()
810 .context("export social graph state for batch ingest")?,
811 )
812 .context("rebuild social graph state for batch ingest")?;
813 for event in graph_events {
814 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
815 }
816 graph
817 .replace_state(&snapshot.export_state())
818 .context("replace batched social graph state")?;
819 }
820 self.invalidate_distance_cache();
821
822 Ok(())
823 }
824
825 pub(crate) fn query_events_in_scope(
826 &self,
827 filter: &Filter,
828 limit: usize,
829 scope: EventQueryScope,
830 ) -> Result<Vec<Event>> {
831 if limit == 0 {
832 return Ok(Vec::new());
833 }
834
835 let buckets: &[&EventIndexBucket] = match scope {
836 EventQueryScope::PublicOnly => &[&self.public_events],
837 EventQueryScope::AmbientOnly => &[&self.ambient_events],
838 EventQueryScope::All => &[&self.public_events, &self.ambient_events],
839 };
840
841 let mut candidates = Vec::new();
842 for bucket in buckets {
843 candidates.extend(bucket.query_events(filter, limit)?);
844 }
845
846 let mut deduped = dedupe_events(candidates);
847 deduped.retain(|event| filter.match_event(event));
848 deduped.truncate(limit);
849 Ok(deduped)
850 }
851}
852
853impl EventIndexBucket {
854 fn events_root(&self) -> Result<Option<Cid>> {
855 let _profile = NostrProfileGuard::new("socialgraph.events_root.read");
856 read_root_file(&self.root_path)
857 }
858
859 fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
860 let _profile = NostrProfileGuard::new("socialgraph.events_root.write");
861 write_root_file(&self.root_path, root)
862 }
863
864 fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
865 let stored = stored_event_from_nostr(event);
866 let _profile = NostrProfileGuard::new("socialgraph.event_store.add");
867 block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
868 }
869
870 fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
871 let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
872 .map_err(map_event_store_error)?;
873 stored.map(nostr_event_from_stored).transpose()
874 }
875
876 fn load_events_for_author(
877 &self,
878 root: &Cid,
879 author: &nostr::PublicKey,
880 filter: &Filter,
881 limit: usize,
882 exact: bool,
883 ) -> Result<Vec<Event>> {
884 let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
885 if kinds.len() == 1 {
886 kinds.iter().next().map(|kind| kind.as_u16() as u32)
887 } else {
888 None
889 }
890 });
891 let author_hex = author.to_hex();
892 let options = filter_list_options(filter, limit, exact);
893 let stored = match kind_filter {
894 Some(kind) => block_on(self.event_store.list_by_author_and_kind(
895 Some(root),
896 &author_hex,
897 kind,
898 options.clone(),
899 ))
900 .map_err(map_event_store_error)?,
901 None => block_on(
902 self.event_store
903 .list_by_author(Some(root), &author_hex, options),
904 )
905 .map_err(map_event_store_error)?,
906 };
907 stored
908 .into_iter()
909 .map(nostr_event_from_stored)
910 .collect::<Result<Vec<_>>>()
911 }
912
913 fn load_events_for_kind(
914 &self,
915 root: &Cid,
916 kind: Kind,
917 filter: &Filter,
918 limit: usize,
919 exact: bool,
920 ) -> Result<Vec<Event>> {
921 let stored = block_on(self.event_store.list_by_kind(
922 Some(root),
923 kind.as_u16() as u32,
924 filter_list_options(filter, limit, exact),
925 ))
926 .map_err(map_event_store_error)?;
927 stored
928 .into_iter()
929 .map(nostr_event_from_stored)
930 .collect::<Result<Vec<_>>>()
931 }
932
933 fn load_recent_events(
934 &self,
935 root: &Cid,
936 filter: &Filter,
937 limit: usize,
938 exact: bool,
939 ) -> Result<Vec<Event>> {
940 let stored = block_on(
941 self.event_store
942 .list_recent(Some(root), filter_list_options(filter, limit, exact)),
943 )
944 .map_err(map_event_store_error)?;
945 stored
946 .into_iter()
947 .map(nostr_event_from_stored)
948 .collect::<Result<Vec<_>>>()
949 }
950
951 fn load_events_for_tag(
952 &self,
953 root: &Cid,
954 tag_name: &str,
955 values: &[String],
956 filter: &Filter,
957 limit: usize,
958 exact: bool,
959 ) -> Result<Vec<Event>> {
960 let mut events = Vec::new();
961 let options = filter_list_options(filter, limit, exact);
962 for value in values {
963 let stored = block_on(self.event_store.list_by_tag(
964 Some(root),
965 tag_name,
966 value,
967 options.clone(),
968 ))
969 .map_err(map_event_store_error)?;
970 events.extend(
971 stored
972 .into_iter()
973 .map(nostr_event_from_stored)
974 .collect::<Result<Vec<_>>>()?,
975 );
976 }
977 Ok(dedupe_events(events))
978 }
979
980 fn choose_tag_source(&self, filter: &Filter) -> Option<(String, Vec<String>)> {
981 filter
982 .generic_tags
983 .iter()
984 .min_by_key(|(_, values)| values.len())
985 .map(|(tag, values)| {
986 (
987 tag.as_char().to_ascii_lowercase().to_string(),
988 values.iter().cloned().collect(),
989 )
990 })
991 }
992
993 fn load_major_index_candidates(
994 &self,
995 root: &Cid,
996 filter: &Filter,
997 limit: usize,
998 ) -> Result<Option<Vec<Event>>> {
999 if let Some(events) = self.load_direct_replaceable_candidates(root, filter)? {
1000 return Ok(Some(events));
1001 }
1002
1003 if let Some((tag_name, values)) = self.choose_tag_source(filter) {
1004 let exact = filter.authors.is_none()
1005 && filter.kinds.is_none()
1006 && filter.search.is_none()
1007 && filter.generic_tags.len() == 1;
1008 return Ok(Some(self.load_events_for_tag(
1009 root, &tag_name, &values, filter, limit, exact,
1010 )?));
1011 }
1012
1013 if let (Some(authors), Some(kinds)) = (filter.authors.as_ref(), filter.kinds.as_ref()) {
1014 if authors.len() == 1 && kinds.len() == 1 {
1015 let author = authors.iter().next().expect("checked single author");
1016 let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1017 return Ok(Some(
1018 self.load_events_for_author(root, author, filter, limit, exact)?,
1019 ));
1020 }
1021
1022 if kinds.len() < authors.len() {
1023 let mut events = Vec::new();
1024 for kind in kinds {
1025 events.extend(self.load_events_for_kind(root, *kind, filter, limit, false)?);
1026 }
1027 return Ok(Some(dedupe_events(events)));
1028 }
1029
1030 let mut events = Vec::new();
1031 for author in authors {
1032 events.extend(self.load_events_for_author(root, author, filter, limit, false)?);
1033 }
1034 return Ok(Some(dedupe_events(events)));
1035 }
1036
1037 if let Some(authors) = filter.authors.as_ref() {
1038 let mut events = Vec::new();
1039 let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1040 for author in authors {
1041 events.extend(self.load_events_for_author(root, author, filter, limit, exact)?);
1042 }
1043 return Ok(Some(dedupe_events(events)));
1044 }
1045
1046 if let Some(kinds) = filter.kinds.as_ref() {
1047 let mut events = Vec::new();
1048 let exact = filter.authors.is_none()
1049 && filter.generic_tags.is_empty()
1050 && filter.search.is_none();
1051 for kind in kinds {
1052 events.extend(self.load_events_for_kind(root, *kind, filter, limit, exact)?);
1053 }
1054 return Ok(Some(dedupe_events(events)));
1055 }
1056
1057 Ok(None)
1058 }
1059
1060 fn load_direct_replaceable_candidates(
1061 &self,
1062 root: &Cid,
1063 filter: &Filter,
1064 ) -> Result<Option<Vec<Event>>> {
1065 let Some(authors) = filter.authors.as_ref() else {
1066 return Ok(None);
1067 };
1068 let Some(kinds) = filter.kinds.as_ref() else {
1069 return Ok(None);
1070 };
1071 if kinds.len() != 1 {
1072 return Ok(None);
1073 }
1074
1075 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
1076
1077 if is_parameterized_replaceable_kind(kind) {
1078 let d_tag = SingleLetterTag::lowercase(nostr::Alphabet::D);
1079 let Some(d_values) = filter.generic_tags.get(&d_tag) else {
1080 return Ok(None);
1081 };
1082 let mut events = Vec::new();
1083 for author in authors {
1084 let author_hex = author.to_hex();
1085 for d_value in d_values {
1086 if let Some(stored) = block_on(self.event_store.get_parameterized_replaceable(
1087 Some(root),
1088 &author_hex,
1089 kind,
1090 d_value,
1091 ))
1092 .map_err(map_event_store_error)?
1093 {
1094 events.push(nostr_event_from_stored(stored)?);
1095 }
1096 }
1097 }
1098 return Ok(Some(dedupe_events(events)));
1099 }
1100
1101 if is_replaceable_kind(kind) {
1102 let mut events = Vec::new();
1103 for author in authors {
1104 if let Some(stored) = block_on(self.event_store.get_replaceable(
1105 Some(root),
1106 &author.to_hex(),
1107 kind,
1108 ))
1109 .map_err(map_event_store_error)?
1110 {
1111 events.push(nostr_event_from_stored(stored)?);
1112 }
1113 }
1114 return Ok(Some(dedupe_events(events)));
1115 }
1116
1117 Ok(None)
1118 }
1119
1120 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1121 if limit == 0 {
1122 return Ok(Vec::new());
1123 }
1124
1125 let events_root = self.events_root()?;
1126 let Some(root) = events_root.as_ref() else {
1127 return Ok(Vec::new());
1128 };
1129 let mut candidates = Vec::new();
1130 let mut seen: HashSet<[u8; 32]> = HashSet::new();
1131
1132 if let Some(ids) = filter.ids.as_ref() {
1133 for id in ids {
1134 let id_bytes = id.to_bytes();
1135 if !seen.insert(id_bytes) {
1136 continue;
1137 }
1138 if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
1139 if filter.match_event(&event) {
1140 candidates.push(event);
1141 }
1142 }
1143 if candidates.len() >= limit {
1144 break;
1145 }
1146 }
1147 } else {
1148 let base_events = match self.load_major_index_candidates(root, filter, limit)? {
1149 Some(events) => events,
1150 None => self.load_recent_events(
1151 root,
1152 filter,
1153 limit,
1154 filter.authors.is_none()
1155 && filter.kinds.is_none()
1156 && filter.generic_tags.is_empty()
1157 && filter.search.is_none(),
1158 )?,
1159 };
1160
1161 for event in base_events {
1162 let id_bytes = event.id.to_bytes();
1163 if !seen.insert(id_bytes) {
1164 continue;
1165 }
1166 if filter.match_event(&event) {
1167 candidates.push(event);
1168 }
1169 if candidates.len() >= limit {
1170 break;
1171 }
1172 }
1173 }
1174
1175 candidates.sort_by(|a, b| {
1176 b.created_at
1177 .as_u64()
1178 .cmp(&a.created_at.as_u64())
1179 .then_with(|| a.id.cmp(&b.id))
1180 });
1181 candidates.truncate(limit);
1182 Ok(candidates)
1183 }
1184}
1185
1186impl ProfileIndexBucket {
1187 fn by_pubkey_root(&self) -> Result<Option<Cid>> {
1188 let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.read");
1189 read_root_file(&self.by_pubkey_root_path)
1190 }
1191
1192 fn search_root(&self) -> Result<Option<Cid>> {
1193 let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.read");
1194 read_root_file(&self.search_root_path)
1195 }
1196
1197 fn write_by_pubkey_root(&self, root: Option<&Cid>) -> Result<()> {
1198 let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.write");
1199 write_root_file(&self.by_pubkey_root_path, root)
1200 }
1201
1202 fn write_search_root(&self, root: Option<&Cid>) -> Result<()> {
1203 let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.write");
1204 write_root_file(&self.search_root_path, root)
1205 }
1206
1207 fn mirror_profile_event(&self, event: &Event) -> Result<Cid> {
1208 let bytes = event.as_json().into_bytes();
1209 block_on(self.tree.put_file(&bytes))
1210 .map(|(cid, _size)| cid)
1211 .context("store mirrored profile event")
1212 }
1213
1214 fn load_profile_event(&self, cid: &Cid) -> Result<Option<Event>> {
1215 let bytes = block_on(self.tree.get(cid, None)).context("read mirrored profile event")?;
1216 let Some(bytes) = bytes else {
1217 return Ok(None);
1218 };
1219 let json = String::from_utf8(bytes).context("decode mirrored profile event as utf-8")?;
1220 Ok(Some(
1221 Event::from_json(json).context("decode mirrored profile event json")?,
1222 ))
1223 }
1224
1225 #[cfg_attr(not(test), allow(dead_code))]
1226 fn profile_event_for_pubkey(&self, pubkey_hex: &str) -> Result<Option<Event>> {
1227 let root = self.by_pubkey_root()?;
1228 let Some(cid) = block_on(self.index.get_link(root.as_ref(), pubkey_hex))
1229 .context("read mirrored profile event cid by pubkey")?
1230 else {
1231 return Ok(None);
1232 };
1233 self.load_profile_event(&cid)
1234 }
1235
1236 #[cfg_attr(not(test), allow(dead_code))]
1237 fn search_entries_for_prefix(
1238 &self,
1239 prefix: &str,
1240 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
1241 let Some(root) = self.search_root()? else {
1242 return Ok(Vec::new());
1243 };
1244 let entries =
1245 block_on(self.index.prefix(&root, prefix)).context("query profile search prefix")?;
1246 entries
1247 .into_iter()
1248 .map(|(key, value)| {
1249 let entry = serde_json::from_str(&value)
1250 .context("decode stored profile search entry json")?;
1251 Ok((key, entry))
1252 })
1253 .collect()
1254 }
1255
1256 fn rebuild_profile_events<'a, I>(&self, events: I) -> Result<(Option<Cid>, Option<Cid>)>
1257 where
1258 I: IntoIterator<Item = &'a Event>,
1259 {
1260 let mut by_pubkey_entries = Vec::<(String, Cid)>::new();
1261 let mut search_entries = Vec::<(String, String)>::new();
1262
1263 for event in events {
1264 let pubkey = event.pubkey.to_hex();
1265 let mirrored_cid = self.mirror_profile_event(event)?;
1266 let search_value =
1267 serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1268 by_pubkey_entries.push((pubkey.clone(), mirrored_cid.clone()));
1269 for term in profile_search_terms_for_event(event) {
1270 search_entries.push((
1271 format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1272 search_value.clone(),
1273 ));
1274 }
1275 }
1276
1277 let by_pubkey_root = block_on(self.index.build_links(by_pubkey_entries))
1278 .context("bulk build mirrored profile-by-pubkey index")?;
1279 let search_root = block_on(self.index.build(search_entries))
1280 .context("bulk build mirrored profile search index")?;
1281 Ok((by_pubkey_root, search_root))
1282 }
1283
1284 fn update_profile_event(
1285 &self,
1286 by_pubkey_root: Option<&Cid>,
1287 search_root: Option<&Cid>,
1288 event: &Event,
1289 ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1290 let pubkey = event.pubkey.to_hex();
1291 let existing_cid = block_on(self.index.get_link(by_pubkey_root, &pubkey))
1292 .context("lookup existing mirrored profile event")?;
1293
1294 let existing_event = match existing_cid.as_ref() {
1295 Some(cid) => self.load_profile_event(cid)?,
1296 None => None,
1297 };
1298
1299 if existing_event
1300 .as_ref()
1301 .is_some_and(|current| compare_nostr_events(event, current).is_le())
1302 {
1303 return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1304 }
1305
1306 let mirrored_cid = self.mirror_profile_event(event)?;
1307 let next_by_pubkey_root = Some(
1308 block_on(
1309 self.index
1310 .insert_link(by_pubkey_root, &pubkey, &mirrored_cid),
1311 )
1312 .context("write mirrored profile event index")?,
1313 );
1314
1315 let mut next_search_root = search_root.cloned();
1316 if let Some(current) = existing_event.as_ref() {
1317 for term in profile_search_terms_for_event(current) {
1318 let Some(root) = next_search_root.as_ref() else {
1319 break;
1320 };
1321 next_search_root = block_on(
1322 self.index
1323 .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1324 )
1325 .context("remove stale profile search term")?;
1326 }
1327 }
1328
1329 let search_value =
1330 serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1331 for term in profile_search_terms_for_event(event) {
1332 next_search_root = Some(
1333 block_on(self.index.insert(
1334 next_search_root.as_ref(),
1335 &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1336 &search_value,
1337 ))
1338 .context("write profile search term")?,
1339 );
1340 }
1341
1342 Ok((next_by_pubkey_root, next_search_root, true))
1343 }
1344
1345 fn remove_profile_event(
1346 &self,
1347 by_pubkey_root: Option<&Cid>,
1348 search_root: Option<&Cid>,
1349 pubkey: &str,
1350 ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1351 let existing_cid = block_on(self.index.get_link(by_pubkey_root, pubkey))
1352 .context("lookup mirrored profile event for removal")?;
1353 let Some(existing_cid) = existing_cid else {
1354 return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1355 };
1356
1357 let existing_event = self.load_profile_event(&existing_cid)?;
1358 let next_by_pubkey_root = match by_pubkey_root {
1359 Some(root) => block_on(self.index.delete(root, pubkey))
1360 .context("remove mirrored profile-by-pubkey entry")?,
1361 None => None,
1362 };
1363
1364 let mut next_search_root = search_root.cloned();
1365 if let Some(current) = existing_event.as_ref() {
1366 for term in profile_search_terms_for_event(current) {
1367 let Some(root) = next_search_root.as_ref() else {
1368 break;
1369 };
1370 next_search_root = block_on(
1371 self.index
1372 .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1373 )
1374 .context("remove overmuted profile search term")?;
1375 }
1376 }
1377
1378 Ok((next_by_pubkey_root, next_search_root, true))
1379 }
1380}
1381
1382fn latest_metadata_events_by_pubkey<'a>(events: &'a [Event]) -> BTreeMap<String, &'a Event> {
1383 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
1384 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
1385 let pubkey = event.pubkey.to_hex();
1386 match latest_by_pubkey.get(&pubkey) {
1387 Some(current) if compare_nostr_events(event, current).is_le() => {}
1388 _ => {
1389 latest_by_pubkey.insert(pubkey, event);
1390 }
1391 }
1392 }
1393 latest_by_pubkey
1394}
1395
1396fn serialize_profile_search_entry(entry: &StoredProfileSearchEntry) -> Result<String> {
1397 serde_json::to_string(entry).context("encode stored profile search entry json")
1398}
1399
1400fn cid_to_nhash(cid: &Cid) -> Result<String> {
1401 nhash_encode_full(&NHashData {
1402 hash: cid.hash,
1403 decrypt_key: cid.key,
1404 })
1405 .context("encode mirrored profile event nhash")
1406}
1407
1408fn build_profile_search_entry(
1409 event: &Event,
1410 mirrored_cid: &Cid,
1411) -> Result<StoredProfileSearchEntry> {
1412 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1413 Ok(serde_json::Value::Object(profile)) => profile,
1414 _ => serde_json::Map::new(),
1415 };
1416 let names = extract_profile_names(&profile);
1417 let primary_name = names.first().cloned();
1418 let nip05 = normalize_profile_nip05(&profile, primary_name.as_deref());
1419 let name = primary_name
1420 .clone()
1421 .or_else(|| nip05.clone())
1422 .unwrap_or_else(|| event.pubkey.to_hex());
1423
1424 Ok(StoredProfileSearchEntry {
1425 pubkey: event.pubkey.to_hex(),
1426 name,
1427 aliases: names.into_iter().skip(1).collect(),
1428 nip05,
1429 created_at: event.created_at.as_u64(),
1430 event_nhash: cid_to_nhash(mirrored_cid)?,
1431 })
1432}
1433
1434fn filter_list_options(filter: &Filter, limit: usize, exact: bool) -> ListEventsOptions {
1435 ListEventsOptions {
1436 limit: exact.then_some(limit.max(1)),
1437 since: filter.since.map(|timestamp| timestamp.as_u64()),
1438 until: filter.until.map(|timestamp| timestamp.as_u64()),
1439 }
1440}
1441
1442fn dedupe_events(events: Vec<Event>) -> Vec<Event> {
1443 let mut seen = HashSet::new();
1444 let mut deduped = Vec::new();
1445 for event in events {
1446 if seen.insert(event.id.to_bytes()) {
1447 deduped.push(event);
1448 }
1449 }
1450 deduped.sort_by(|a, b| {
1451 b.created_at
1452 .as_u64()
1453 .cmp(&a.created_at.as_u64())
1454 .then_with(|| a.id.cmp(&b.id))
1455 });
1456 deduped
1457}
1458
1459impl SocialGraphBackend for SocialGraphStore {
1460 fn stats(&self) -> Result<SocialGraphStats> {
1461 SocialGraphStore::stats(self)
1462 }
1463
1464 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1465 SocialGraphStore::users_by_follow_distance(self, distance)
1466 }
1467
1468 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1469 SocialGraphStore::follow_distance(self, pk_bytes)
1470 }
1471
1472 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1473 SocialGraphStore::follow_list_created_at(self, owner)
1474 }
1475
1476 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1477 SocialGraphStore::followed_targets(self, owner)
1478 }
1479
1480 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1481 SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1482 }
1483
1484 fn profile_search_root(&self) -> Result<Option<Cid>> {
1485 SocialGraphStore::profile_search_root(self)
1486 }
1487
1488 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1489 SocialGraphStore::snapshot_chunks(self, root, options)
1490 }
1491
1492 fn ingest_event(&self, event: &Event) -> Result<()> {
1493 SocialGraphStore::ingest_event(self, event)
1494 }
1495
1496 fn ingest_event_with_storage_class(
1497 &self,
1498 event: &Event,
1499 storage_class: EventStorageClass,
1500 ) -> Result<()> {
1501 SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1502 }
1503
1504 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1505 SocialGraphStore::ingest_events(self, events)
1506 }
1507
1508 fn ingest_events_with_storage_class(
1509 &self,
1510 events: &[Event],
1511 storage_class: EventStorageClass,
1512 ) -> Result<()> {
1513 SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1514 }
1515
1516 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1517 SocialGraphStore::apply_graph_events_only(self, events)
1518 }
1519
1520 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1521 SocialGraphStore::query_events(self, filter, limit)
1522 }
1523}
1524
1525impl NostrSocialGraphBackend for SocialGraphStore {
1526 type Error = UpstreamGraphBackendError;
1527
1528 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1529 let graph = self.graph.lock().unwrap();
1530 graph
1531 .get_root()
1532 .context("read social graph root")
1533 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1534 }
1535
1536 fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1537 let root_bytes =
1538 decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1539 SocialGraphStore::set_root(self, &root_bytes)
1540 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1541 }
1542
1543 fn handle_event(
1544 &mut self,
1545 event: &GraphEvent,
1546 allow_unknown_authors: bool,
1547 overmute_threshold: f64,
1548 ) -> std::result::Result<(), Self::Error> {
1549 {
1550 let mut graph = self.graph.lock().unwrap();
1551 graph
1552 .handle_event(event, allow_unknown_authors, overmute_threshold)
1553 .context("ingest social graph event into heed backend")
1554 .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1555 }
1556 self.invalidate_distance_cache();
1557 Ok(())
1558 }
1559
1560 fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1561 let graph = self.graph.lock().unwrap();
1562 graph
1563 .get_follow_distance(user)
1564 .context("read social graph follow distance")
1565 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1566 }
1567
1568 fn is_following(
1569 &self,
1570 follower: &str,
1571 followed_user: &str,
1572 ) -> std::result::Result<bool, Self::Error> {
1573 let graph = self.graph.lock().unwrap();
1574 graph
1575 .is_following(follower, followed_user)
1576 .context("read social graph following edge")
1577 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1578 }
1579
1580 fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1581 let graph = self.graph.lock().unwrap();
1582 graph
1583 .get_followed_by_user(user)
1584 .context("read followed-by-user list")
1585 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1586 }
1587
1588 fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1589 let graph = self.graph.lock().unwrap();
1590 graph
1591 .get_followers_by_user(user)
1592 .context("read followers-by-user list")
1593 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1594 }
1595
1596 fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1597 let graph = self.graph.lock().unwrap();
1598 graph
1599 .get_muted_by_user(user)
1600 .context("read muted-by-user list")
1601 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1602 }
1603
1604 fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1605 let graph = self.graph.lock().unwrap();
1606 graph
1607 .get_user_muted_by(user)
1608 .context("read user-muted-by list")
1609 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1610 }
1611
1612 fn get_follow_list_created_at(
1613 &self,
1614 user: &str,
1615 ) -> std::result::Result<Option<u64>, Self::Error> {
1616 let graph = self.graph.lock().unwrap();
1617 graph
1618 .get_follow_list_created_at(user)
1619 .context("read social graph follow list timestamp")
1620 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1621 }
1622
1623 fn get_mute_list_created_at(
1624 &self,
1625 user: &str,
1626 ) -> std::result::Result<Option<u64>, Self::Error> {
1627 let graph = self.graph.lock().unwrap();
1628 graph
1629 .get_mute_list_created_at(user)
1630 .context("read social graph mute list timestamp")
1631 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1632 }
1633
1634 fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1635 let graph = self.graph.lock().unwrap();
1636 graph
1637 .is_overmuted(user, threshold)
1638 .context("check social graph overmute")
1639 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1640 }
1641}
1642
1643impl<T> SocialGraphBackend for Arc<T>
1644where
1645 T: SocialGraphBackend + ?Sized,
1646{
1647 fn stats(&self) -> Result<SocialGraphStats> {
1648 self.as_ref().stats()
1649 }
1650
1651 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1652 self.as_ref().users_by_follow_distance(distance)
1653 }
1654
1655 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1656 self.as_ref().follow_distance(pk_bytes)
1657 }
1658
1659 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1660 self.as_ref().follow_list_created_at(owner)
1661 }
1662
1663 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1664 self.as_ref().followed_targets(owner)
1665 }
1666
1667 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1668 self.as_ref().is_overmuted_user(user_pk, threshold)
1669 }
1670
1671 fn profile_search_root(&self) -> Result<Option<Cid>> {
1672 self.as_ref().profile_search_root()
1673 }
1674
1675 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1676 self.as_ref().snapshot_chunks(root, options)
1677 }
1678
1679 fn ingest_event(&self, event: &Event) -> Result<()> {
1680 self.as_ref().ingest_event(event)
1681 }
1682
1683 fn ingest_event_with_storage_class(
1684 &self,
1685 event: &Event,
1686 storage_class: EventStorageClass,
1687 ) -> Result<()> {
1688 self.as_ref()
1689 .ingest_event_with_storage_class(event, storage_class)
1690 }
1691
1692 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1693 self.as_ref().ingest_events(events)
1694 }
1695
1696 fn ingest_events_with_storage_class(
1697 &self,
1698 events: &[Event],
1699 storage_class: EventStorageClass,
1700 ) -> Result<()> {
1701 self.as_ref()
1702 .ingest_events_with_storage_class(events, storage_class)
1703 }
1704
1705 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1706 self.as_ref().ingest_graph_events(events)
1707 }
1708
1709 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1710 self.as_ref().query_events(filter, limit)
1711 }
1712}
1713
1714fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1715 if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1716 return Ok(false);
1717 }
1718
1719 let GraphStats {
1720 users,
1721 follows,
1722 mutes,
1723 ..
1724 } = graph.size().context("size social graph")?;
1725 Ok(users <= 1 && follows == 0 && mutes == 0)
1726}
1727
1728fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1729 let mut set = UserSet::new();
1730 for value in values {
1731 set.insert(decode_pubkey(&value)?);
1732 }
1733 Ok(set)
1734}
1735
1736fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1737 let mut bytes = [0u8; 32];
1738 hex::decode_to_slice(value, &mut bytes)
1739 .with_context(|| format!("decode social graph pubkey {value}"))?;
1740 Ok(bytes)
1741}
1742
1743fn is_social_graph_event(kind: Kind) -> bool {
1744 kind == Kind::ContactList || kind == Kind::MuteList
1745}
1746
1747fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1748 GraphEvent {
1749 created_at: event.created_at.as_u64(),
1750 content: event.content.clone(),
1751 tags: event
1752 .tags
1753 .iter()
1754 .map(|tag| tag.as_slice().to_vec())
1755 .collect(),
1756 kind: event.kind.as_u16() as u32,
1757 pubkey: event.pubkey.to_hex(),
1758 id: event.id.to_hex(),
1759 sig: event.sig.to_string(),
1760 }
1761}
1762
1763fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1764 StoredNostrEvent {
1765 id: event.id.to_hex(),
1766 pubkey: event.pubkey.to_hex(),
1767 created_at: event.created_at.as_u64(),
1768 kind: event.kind.as_u16() as u32,
1769 tags: event
1770 .tags
1771 .iter()
1772 .map(|tag| tag.as_slice().to_vec())
1773 .collect(),
1774 content: event.content.clone(),
1775 sig: event.sig.to_string(),
1776 }
1777}
1778
1779fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1780 let value = serde_json::json!({
1781 "id": event.id,
1782 "pubkey": event.pubkey,
1783 "created_at": event.created_at,
1784 "kind": event.kind,
1785 "tags": event.tags,
1786 "content": event.content,
1787 "sig": event.sig,
1788 });
1789 Event::from_json(value.to_string()).context("decode stored nostr event")
1790}
1791
1792pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1793 nostr_event_from_stored(event)
1794}
1795
1796fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1797 rmp_serde::to_vec_named(&StoredCid {
1798 hash: cid.hash,
1799 key: cid.key,
1800 })
1801 .context("encode social graph events root")
1802}
1803
1804fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1805 let stored: StoredCid =
1806 rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1807 Ok(Some(Cid {
1808 hash: stored.hash,
1809 key: stored.key,
1810 }))
1811}
1812
1813fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1814 let Ok(bytes) = std::fs::read(path) else {
1815 return Ok(None);
1816 };
1817 decode_cid(&bytes)
1818}
1819
1820fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1821 let Some(root) = root else {
1822 if path.exists() {
1823 std::fs::remove_file(path)?;
1824 }
1825 return Ok(());
1826 };
1827
1828 let encoded = encode_cid(root)?;
1829 let tmp_path = path.with_extension("tmp");
1830 std::fs::write(&tmp_path, encoded)?;
1831 std::fs::rename(tmp_path, path)?;
1832 Ok(())
1833}
1834
1835fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1836 let raw = value.as_str()?;
1837 let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1838 if trimmed.is_empty() {
1839 return None;
1840 }
1841 Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1842}
1843
1844fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1845 let mut names = Vec::new();
1846 let mut seen = HashSet::new();
1847
1848 for key in ["display_name", "displayName", "name", "username"] {
1849 let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1850 continue;
1851 };
1852 let lowered = value.to_lowercase();
1853 if seen.insert(lowered) {
1854 names.push(value);
1855 }
1856 }
1857
1858 names
1859}
1860
1861fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1862 if local_part.len() == 1 || local_part.starts_with("npub1") {
1863 return true;
1864 }
1865
1866 primary_name
1867 .to_lowercase()
1868 .split_whitespace()
1869 .collect::<String>()
1870 .contains(local_part)
1871}
1872
1873fn normalize_profile_nip05(
1874 profile: &serde_json::Map<String, serde_json::Value>,
1875 primary_name: Option<&str>,
1876) -> Option<String> {
1877 let raw = profile.get("nip05")?.as_str()?;
1878 let local_part = raw.split('@').next()?.trim().to_lowercase();
1879 if local_part.is_empty() {
1880 return None;
1881 }
1882 let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1883 if truncated.is_empty() {
1884 return None;
1885 }
1886 if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1887 return None;
1888 }
1889 Some(truncated)
1890}
1891
1892fn is_search_stop_word(word: &str) -> bool {
1893 matches!(
1894 word,
1895 "a" | "an"
1896 | "the"
1897 | "and"
1898 | "or"
1899 | "but"
1900 | "in"
1901 | "on"
1902 | "at"
1903 | "to"
1904 | "for"
1905 | "of"
1906 | "with"
1907 | "by"
1908 | "from"
1909 | "is"
1910 | "it"
1911 | "as"
1912 | "be"
1913 | "was"
1914 | "are"
1915 | "this"
1916 | "that"
1917 | "these"
1918 | "those"
1919 | "i"
1920 | "you"
1921 | "he"
1922 | "she"
1923 | "we"
1924 | "they"
1925 | "my"
1926 | "your"
1927 | "his"
1928 | "her"
1929 | "its"
1930 | "our"
1931 | "their"
1932 | "what"
1933 | "which"
1934 | "who"
1935 | "whom"
1936 | "how"
1937 | "when"
1938 | "where"
1939 | "why"
1940 | "will"
1941 | "would"
1942 | "could"
1943 | "should"
1944 | "can"
1945 | "may"
1946 | "might"
1947 | "must"
1948 | "have"
1949 | "has"
1950 | "had"
1951 | "do"
1952 | "does"
1953 | "did"
1954 | "been"
1955 | "being"
1956 | "get"
1957 | "got"
1958 | "just"
1959 | "now"
1960 | "then"
1961 | "so"
1962 | "if"
1963 | "not"
1964 | "no"
1965 | "yes"
1966 | "all"
1967 | "any"
1968 | "some"
1969 | "more"
1970 | "most"
1971 | "other"
1972 | "into"
1973 | "over"
1974 | "after"
1975 | "before"
1976 | "about"
1977 | "up"
1978 | "down"
1979 | "out"
1980 | "off"
1981 | "through"
1982 | "during"
1983 | "under"
1984 | "again"
1985 | "further"
1986 | "once"
1987 )
1988}
1989
1990fn is_pure_search_number(word: &str) -> bool {
1991 if !word.chars().all(|ch| ch.is_ascii_digit()) {
1992 return false;
1993 }
1994 !(word.len() == 4
1995 && word
1996 .parse::<u16>()
1997 .is_ok_and(|year| (1900..=2099).contains(&year)))
1998}
1999
2000fn split_compound_search_word(word: &str) -> Vec<String> {
2001 let mut parts = Vec::new();
2002 let mut current = String::new();
2003 let chars: Vec<char> = word.chars().collect();
2004
2005 for (index, ch) in chars.iter().copied().enumerate() {
2006 let split_before = current.chars().last().is_some_and(|prev| {
2007 (prev.is_lowercase() && ch.is_uppercase())
2008 || (prev.is_ascii_digit() && ch.is_alphabetic())
2009 || (prev.is_alphabetic() && ch.is_ascii_digit())
2010 || (prev.is_uppercase()
2011 && ch.is_uppercase()
2012 && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
2013 });
2014
2015 if split_before && !current.is_empty() {
2016 parts.push(std::mem::take(&mut current));
2017 }
2018
2019 current.push(ch);
2020 }
2021
2022 if !current.is_empty() {
2023 parts.push(current);
2024 }
2025
2026 parts
2027}
2028
2029fn parse_search_keywords(text: &str) -> Vec<String> {
2030 let mut keywords = Vec::new();
2031 let mut seen = HashSet::new();
2032
2033 for word in text
2034 .split(|ch: char| !ch.is_alphanumeric())
2035 .filter(|word| !word.is_empty())
2036 {
2037 let mut variants = Vec::with_capacity(1 + word.len() / 4);
2038 variants.push(word.to_lowercase());
2039 variants.extend(
2040 split_compound_search_word(word)
2041 .into_iter()
2042 .map(|part| part.to_lowercase()),
2043 );
2044
2045 for lowered in variants {
2046 if lowered.chars().count() < 2
2047 || is_search_stop_word(&lowered)
2048 || is_pure_search_number(&lowered)
2049 {
2050 continue;
2051 }
2052 if seen.insert(lowered.clone()) {
2053 keywords.push(lowered);
2054 }
2055 }
2056 }
2057
2058 keywords
2059}
2060
2061fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
2062 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
2063 Ok(serde_json::Value::Object(profile)) => profile,
2064 _ => serde_json::Map::new(),
2065 };
2066 let names = extract_profile_names(&profile);
2067 let primary_name = names.first().map(String::as_str);
2068 let mut parts = Vec::new();
2069 if let Some(name) = primary_name {
2070 parts.push(name.to_string());
2071 }
2072 if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
2073 parts.push(nip05);
2074 }
2075 parts.push(event.pubkey.to_hex());
2076 if names.len() > 1 {
2077 parts.extend(names.into_iter().skip(1));
2078 }
2079 parse_search_keywords(&parts.join(" "))
2080}
2081
2082fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
2083 left.created_at
2084 .as_u64()
2085 .cmp(&right.created_at.as_u64())
2086 .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
2087}
2088
2089fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
2090 anyhow::anyhow!("nostr event store error: {err}")
2091}
2092
2093fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
2094 let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
2095 let page_size = page_size_bytes() as u64;
2096 let rounded = requested
2097 .checked_add(page_size.saturating_sub(1))
2098 .map(|size| size / page_size * page_size)
2099 .unwrap_or(requested);
2100 let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
2101
2102 let env = unsafe {
2103 heed::EnvOpenOptions::new()
2104 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
2105 .max_dbs(SOCIALGRAPH_MAX_DBS)
2106 .open(db_dir)
2107 }
2108 .context("open social graph LMDB env for resize")?;
2109 if env.info().map_size < map_size {
2110 unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
2111 }
2112
2113 Ok(())
2114}
2115
2116fn page_size_bytes() -> usize {
2117 page_size::get_granularity()
2118}
2119
2120#[cfg(test)]
2121mod tests {
2122 use super::*;
2123 use async_trait::async_trait;
2124 use hashtree_config::StorageBackend;
2125 use hashtree_core::{Hash, MemoryStore, Store, StoreError};
2126 use hashtree_nostr::NostrEventStoreOptions;
2127 use std::collections::HashSet;
2128 use std::fs::{self, File};
2129 use std::io::{BufRead, BufReader};
2130 use std::path::{Path, PathBuf};
2131 use std::process::{Command, Stdio};
2132 use std::sync::Mutex;
2133 use std::time::Duration;
2134
2135 use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
2136 use tempfile::TempDir;
2137
2138 const WELLORDER_FIXTURE_URL: &str =
2139 "https://wellorder.xyz/nostr/nostr-wellorder-early-500k-v1.jsonl.bz2";
2140
2141 #[derive(Debug, Clone, Default)]
2142 struct ReadTraceSnapshot {
2143 get_calls: u64,
2144 total_bytes: u64,
2145 unique_blocks: usize,
2146 unique_bytes: u64,
2147 cache_hits: u64,
2148 remote_fetches: u64,
2149 remote_bytes: u64,
2150 }
2151
2152 #[derive(Debug, Default)]
2153 struct ReadTraceState {
2154 get_calls: u64,
2155 total_bytes: u64,
2156 unique_hashes: HashSet<Hash>,
2157 unique_bytes: u64,
2158 cache_hits: u64,
2159 remote_fetches: u64,
2160 remote_bytes: u64,
2161 }
2162
2163 #[derive(Debug)]
2164 struct CountingStore<S: Store> {
2165 base: Arc<S>,
2166 state: Mutex<ReadTraceState>,
2167 }
2168
2169 impl<S: Store> CountingStore<S> {
2170 fn new(base: Arc<S>) -> Self {
2171 Self {
2172 base,
2173 state: Mutex::new(ReadTraceState::default()),
2174 }
2175 }
2176
2177 fn reset(&self) {
2178 *self.state.lock().unwrap() = ReadTraceState::default();
2179 }
2180
2181 fn snapshot(&self) -> ReadTraceSnapshot {
2182 let state = self.state.lock().unwrap();
2183 ReadTraceSnapshot {
2184 get_calls: state.get_calls,
2185 total_bytes: state.total_bytes,
2186 unique_blocks: state.unique_hashes.len(),
2187 unique_bytes: state.unique_bytes,
2188 cache_hits: state.cache_hits,
2189 remote_fetches: state.remote_fetches,
2190 remote_bytes: state.remote_bytes,
2191 }
2192 }
2193
2194 fn record_read(&self, hash: &Hash, bytes: usize) {
2195 let mut state = self.state.lock().unwrap();
2196 state.get_calls += 1;
2197 state.total_bytes += bytes as u64;
2198 if state.unique_hashes.insert(*hash) {
2199 state.unique_bytes += bytes as u64;
2200 }
2201 }
2202 }
2203
2204 #[async_trait]
2205 impl<S: Store> Store for CountingStore<S> {
2206 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2207 self.base.put(hash, data).await
2208 }
2209
2210 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2211 self.base.put_many(items).await
2212 }
2213
2214 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2215 let data = self.base.get(hash).await?;
2216 if let Some(bytes) = data.as_ref() {
2217 self.record_read(hash, bytes.len());
2218 }
2219 Ok(data)
2220 }
2221
2222 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2223 self.base.has(hash).await
2224 }
2225
2226 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2227 self.base.delete(hash).await
2228 }
2229 }
2230
2231 #[derive(Debug)]
2232 struct ReadThroughStore<R: Store> {
2233 cache: Arc<MemoryStore>,
2234 remote: Arc<R>,
2235 state: Mutex<ReadTraceState>,
2236 }
2237
2238 impl<R: Store> ReadThroughStore<R> {
2239 fn new(cache: Arc<MemoryStore>, remote: Arc<R>) -> Self {
2240 Self {
2241 cache,
2242 remote,
2243 state: Mutex::new(ReadTraceState::default()),
2244 }
2245 }
2246
2247 fn snapshot(&self) -> ReadTraceSnapshot {
2248 let state = self.state.lock().unwrap();
2249 ReadTraceSnapshot {
2250 get_calls: state.get_calls,
2251 total_bytes: state.total_bytes,
2252 unique_blocks: state.unique_hashes.len(),
2253 unique_bytes: state.unique_bytes,
2254 cache_hits: state.cache_hits,
2255 remote_fetches: state.remote_fetches,
2256 remote_bytes: state.remote_bytes,
2257 }
2258 }
2259 }
2260
2261 #[async_trait]
2262 impl<R: Store> Store for ReadThroughStore<R> {
2263 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2264 self.cache.put(hash, data).await
2265 }
2266
2267 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2268 self.cache.put_many(items).await
2269 }
2270
2271 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2272 {
2273 let mut state = self.state.lock().unwrap();
2274 state.get_calls += 1;
2275 }
2276
2277 if let Some(bytes) = self.cache.get(hash).await? {
2278 let mut state = self.state.lock().unwrap();
2279 state.cache_hits += 1;
2280 state.total_bytes += bytes.len() as u64;
2281 if state.unique_hashes.insert(*hash) {
2282 state.unique_bytes += bytes.len() as u64;
2283 }
2284 return Ok(Some(bytes));
2285 }
2286
2287 let data = self.remote.get(hash).await?;
2288 if let Some(bytes) = data.as_ref() {
2289 let _ = self.cache.put(*hash, bytes.clone()).await?;
2290 let mut state = self.state.lock().unwrap();
2291 state.remote_fetches += 1;
2292 state.remote_bytes += bytes.len() as u64;
2293 state.total_bytes += bytes.len() as u64;
2294 if state.unique_hashes.insert(*hash) {
2295 state.unique_bytes += bytes.len() as u64;
2296 }
2297 }
2298 Ok(data)
2299 }
2300
2301 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2302 if self.cache.has(hash).await? {
2303 return Ok(true);
2304 }
2305 self.remote.has(hash).await
2306 }
2307
2308 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2309 let cache_deleted = self.cache.delete(hash).await?;
2310 let remote_deleted = self.remote.delete(hash).await?;
2311 Ok(cache_deleted || remote_deleted)
2312 }
2313 }
2314
2315 #[derive(Debug, Clone)]
2316 enum BenchmarkQueryCase {
2317 ById {
2318 id: String,
2319 },
2320 ByAuthor {
2321 pubkey: String,
2322 limit: usize,
2323 },
2324 ByAuthorKind {
2325 pubkey: String,
2326 kind: u32,
2327 limit: usize,
2328 },
2329 ByKind {
2330 kind: u32,
2331 limit: usize,
2332 },
2333 ByTag {
2334 tag_name: String,
2335 tag_value: String,
2336 limit: usize,
2337 },
2338 Recent {
2339 limit: usize,
2340 },
2341 Replaceable {
2342 pubkey: String,
2343 kind: u32,
2344 },
2345 ParameterizedReplaceable {
2346 pubkey: String,
2347 kind: u32,
2348 d_tag: String,
2349 },
2350 }
2351
2352 impl BenchmarkQueryCase {
2353 fn name(&self) -> &'static str {
2354 match self {
2355 BenchmarkQueryCase::ById { .. } => "by_id",
2356 BenchmarkQueryCase::ByAuthor { .. } => "by_author",
2357 BenchmarkQueryCase::ByAuthorKind { .. } => "by_author_kind",
2358 BenchmarkQueryCase::ByKind { .. } => "by_kind",
2359 BenchmarkQueryCase::ByTag { .. } => "by_tag",
2360 BenchmarkQueryCase::Recent { .. } => "recent",
2361 BenchmarkQueryCase::Replaceable { .. } => "replaceable",
2362 BenchmarkQueryCase::ParameterizedReplaceable { .. } => "parameterized_replaceable",
2363 }
2364 }
2365
2366 async fn execute<S: Store>(
2367 &self,
2368 store: &NostrEventStore<S>,
2369 root: &Cid,
2370 ) -> Result<usize, NostrEventStoreError> {
2371 match self {
2372 BenchmarkQueryCase::ById { id } => {
2373 Ok(store.get_by_id(Some(root), id).await?.into_iter().count())
2374 }
2375 BenchmarkQueryCase::ByAuthor { pubkey, limit } => Ok(store
2376 .list_by_author(
2377 Some(root),
2378 pubkey,
2379 ListEventsOptions {
2380 limit: Some(*limit),
2381 ..Default::default()
2382 },
2383 )
2384 .await?
2385 .len()),
2386 BenchmarkQueryCase::ByAuthorKind {
2387 pubkey,
2388 kind,
2389 limit,
2390 } => Ok(store
2391 .list_by_author_and_kind(
2392 Some(root),
2393 pubkey,
2394 *kind,
2395 ListEventsOptions {
2396 limit: Some(*limit),
2397 ..Default::default()
2398 },
2399 )
2400 .await?
2401 .len()),
2402 BenchmarkQueryCase::ByKind { kind, limit } => Ok(store
2403 .list_by_kind(
2404 Some(root),
2405 *kind,
2406 ListEventsOptions {
2407 limit: Some(*limit),
2408 ..Default::default()
2409 },
2410 )
2411 .await?
2412 .len()),
2413 BenchmarkQueryCase::ByTag {
2414 tag_name,
2415 tag_value,
2416 limit,
2417 } => Ok(store
2418 .list_by_tag(
2419 Some(root),
2420 tag_name,
2421 tag_value,
2422 ListEventsOptions {
2423 limit: Some(*limit),
2424 ..Default::default()
2425 },
2426 )
2427 .await?
2428 .len()),
2429 BenchmarkQueryCase::Recent { limit } => Ok(store
2430 .list_recent(
2431 Some(root),
2432 ListEventsOptions {
2433 limit: Some(*limit),
2434 ..Default::default()
2435 },
2436 )
2437 .await?
2438 .len()),
2439 BenchmarkQueryCase::Replaceable { pubkey, kind } => Ok(store
2440 .get_replaceable(Some(root), pubkey, *kind)
2441 .await?
2442 .into_iter()
2443 .count()),
2444 BenchmarkQueryCase::ParameterizedReplaceable {
2445 pubkey,
2446 kind,
2447 d_tag,
2448 } => Ok(store
2449 .get_parameterized_replaceable(Some(root), pubkey, *kind, d_tag)
2450 .await?
2451 .into_iter()
2452 .count()),
2453 }
2454 }
2455 }
2456
2457 #[derive(Debug, Clone, Copy)]
2458 struct NetworkModel {
2459 name: &'static str,
2460 rtt_ms: f64,
2461 bandwidth_mib_per_s: f64,
2462 }
2463
2464 #[derive(Debug, Clone)]
2465 struct QueryBenchmarkResult {
2466 average_duration: Duration,
2467 p95_duration: Duration,
2468 reads: ReadTraceSnapshot,
2469 }
2470
2471 const NETWORK_MODELS: [NetworkModel; 3] = [
2472 NetworkModel {
2473 name: "lan",
2474 rtt_ms: 2.0,
2475 bandwidth_mib_per_s: 100.0,
2476 },
2477 NetworkModel {
2478 name: "wan",
2479 rtt_ms: 40.0,
2480 bandwidth_mib_per_s: 20.0,
2481 },
2482 NetworkModel {
2483 name: "slow",
2484 rtt_ms: 120.0,
2485 bandwidth_mib_per_s: 5.0,
2486 },
2487 ];
2488
2489 #[test]
2490 fn test_open_social_graph_store() {
2491 let _guard = test_lock();
2492 let tmp = TempDir::new().unwrap();
2493 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2494 assert_eq!(Arc::strong_count(&graph_store), 1);
2495 }
2496
2497 #[test]
2498 fn test_set_root_and_get_follow_distance() {
2499 let _guard = test_lock();
2500 let tmp = TempDir::new().unwrap();
2501 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2502 let root_pk = [1u8; 32];
2503 set_social_graph_root(&graph_store, &root_pk);
2504 assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
2505 }
2506
2507 #[test]
2508 fn test_ingest_event_updates_follows_and_mutes() {
2509 let _guard = test_lock();
2510 let tmp = TempDir::new().unwrap();
2511 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2512
2513 let root_keys = Keys::generate();
2514 let alice_keys = Keys::generate();
2515 let bob_keys = Keys::generate();
2516
2517 let root_pk = root_keys.public_key().to_bytes();
2518 set_social_graph_root(&graph_store, &root_pk);
2519
2520 let follow = EventBuilder::new(
2521 Kind::ContactList,
2522 "",
2523 vec![Tag::public_key(alice_keys.public_key())],
2524 )
2525 .custom_created_at(Timestamp::from_secs(10))
2526 .to_event(&root_keys)
2527 .unwrap();
2528 ingest_event(&graph_store, "follow", &follow.as_json());
2529
2530 let mute = EventBuilder::new(
2531 Kind::MuteList,
2532 "",
2533 vec![Tag::public_key(bob_keys.public_key())],
2534 )
2535 .custom_created_at(Timestamp::from_secs(11))
2536 .to_event(&root_keys)
2537 .unwrap();
2538 ingest_event(&graph_store, "mute", &mute.as_json());
2539
2540 assert_eq!(
2541 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
2542 Some(1)
2543 );
2544 assert!(is_overmuted(
2545 &graph_store,
2546 &root_pk,
2547 &bob_keys.public_key().to_bytes(),
2548 1.0
2549 ));
2550 }
2551
2552 #[test]
2553 fn test_metadata_ingest_builds_profile_search_index_and_replaces_old_terms() {
2554 let _guard = test_lock();
2555 let tmp = TempDir::new().unwrap();
2556 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2557 let keys = Keys::generate();
2558
2559 let older = EventBuilder::new(
2560 Kind::Metadata,
2561 serde_json::json!({
2562 "display_name": "sirius",
2563 "name": "Martti Malmi",
2564 "username": "mmalmi",
2565 "nip05": "siriusdev@iris.to",
2566 })
2567 .to_string(),
2568 [],
2569 )
2570 .custom_created_at(Timestamp::from_secs(5))
2571 .to_event(&keys)
2572 .unwrap();
2573 let newer = EventBuilder::new(
2574 Kind::Metadata,
2575 serde_json::json!({
2576 "display_name": "bird",
2577 "nip05": "birdman@iris.to",
2578 })
2579 .to_string(),
2580 [],
2581 )
2582 .custom_created_at(Timestamp::from_secs(6))
2583 .to_event(&keys)
2584 .unwrap();
2585
2586 ingest_parsed_event(&graph_store, &older).unwrap();
2587
2588 let pubkey = keys.public_key().to_hex();
2589 let entries = graph_store
2590 .profile_search_entries_for_prefix("p:sirius")
2591 .unwrap();
2592 assert!(entries
2593 .iter()
2594 .any(|(key, entry)| key == &format!("p:sirius:{pubkey}") && entry.name == "sirius"));
2595 assert!(entries.iter().any(|(key, entry)| {
2596 key == &format!("p:siriusdev:{pubkey}")
2597 && entry.nip05.as_deref() == Some("siriusdev")
2598 && entry.aliases == vec!["Martti Malmi".to_string(), "mmalmi".to_string()]
2599 && entry.event_nhash.starts_with("nhash1")
2600 }));
2601 assert!(entries.iter().all(|(_, entry)| entry.pubkey == pubkey));
2602 assert_eq!(
2603 graph_store
2604 .latest_profile_event(&pubkey)
2605 .unwrap()
2606 .expect("latest mirrored profile")
2607 .id,
2608 older.id
2609 );
2610
2611 ingest_parsed_event(&graph_store, &newer).unwrap();
2612
2613 assert!(graph_store
2614 .profile_search_entries_for_prefix("p:sirius")
2615 .unwrap()
2616 .is_empty());
2617 let bird_entries = graph_store
2618 .profile_search_entries_for_prefix("p:bird")
2619 .unwrap();
2620 assert_eq!(bird_entries.len(), 2);
2621 assert!(bird_entries
2622 .iter()
2623 .any(|(key, entry)| key == &format!("p:bird:{pubkey}") && entry.name == "bird"));
2624 assert!(bird_entries.iter().any(|(key, entry)| {
2625 key == &format!("p:birdman:{pubkey}")
2626 && entry.nip05.as_deref() == Some("birdman")
2627 && entry.aliases.is_empty()
2628 }));
2629 assert_eq!(
2630 graph_store
2631 .latest_profile_event(&pubkey)
2632 .unwrap()
2633 .expect("latest mirrored profile")
2634 .id,
2635 newer.id
2636 );
2637 }
2638
2639 #[test]
2640 fn test_ambient_metadata_events_are_mirrored_into_public_profile_index() {
2641 let _guard = test_lock();
2642 let tmp = TempDir::new().unwrap();
2643 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2644 let keys = Keys::generate();
2645
2646 let profile = EventBuilder::new(
2647 Kind::Metadata,
2648 serde_json::json!({
2649 "display_name": "ambient bird",
2650 })
2651 .to_string(),
2652 [],
2653 )
2654 .custom_created_at(Timestamp::from_secs(5))
2655 .to_event(&keys)
2656 .unwrap();
2657
2658 ingest_parsed_event_with_storage_class(&graph_store, &profile, EventStorageClass::Ambient)
2659 .unwrap();
2660
2661 let pubkey = keys.public_key().to_hex();
2662 let mirrored = graph_store
2663 .latest_profile_event(&pubkey)
2664 .unwrap()
2665 .expect("mirrored ambient profile");
2666 assert_eq!(mirrored.id, profile.id);
2667 assert_eq!(
2668 graph_store
2669 .profile_search_entries_for_prefix("p:ambient")
2670 .unwrap()
2671 .len(),
2672 1
2673 );
2674 }
2675
2676 #[test]
2677 fn test_metadata_ingest_splits_compound_profile_terms_without_losing_whole_token() {
2678 let _guard = test_lock();
2679 let tmp = TempDir::new().unwrap();
2680 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2681 let keys = Keys::generate();
2682
2683 let profile = EventBuilder::new(
2684 Kind::Metadata,
2685 serde_json::json!({
2686 "display_name": "SirLibre",
2687 "username": "XMLHttpRequest42",
2688 })
2689 .to_string(),
2690 [],
2691 )
2692 .custom_created_at(Timestamp::from_secs(5))
2693 .to_event(&keys)
2694 .unwrap();
2695
2696 ingest_parsed_event(&graph_store, &profile).unwrap();
2697
2698 let pubkey = keys.public_key().to_hex();
2699 assert!(
2700 graph_store
2701 .profile_search_entries_for_prefix("p:sirlibre")
2702 .unwrap()
2703 .iter()
2704 .any(|(key, entry)| key == &format!("p:sirlibre:{pubkey}")
2705 && entry.name == "SirLibre")
2706 );
2707 assert!(graph_store
2708 .profile_search_entries_for_prefix("p:libre")
2709 .unwrap()
2710 .iter()
2711 .any(|(key, entry)| key == &format!("p:libre:{pubkey}") && entry.name == "SirLibre"));
2712 assert!(graph_store
2713 .profile_search_entries_for_prefix("p:xml")
2714 .unwrap()
2715 .iter()
2716 .any(|(key, entry)| {
2717 key == &format!("p:xml:{pubkey}")
2718 && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2719 }));
2720 assert!(graph_store
2721 .profile_search_entries_for_prefix("p:request")
2722 .unwrap()
2723 .iter()
2724 .any(|(key, entry)| {
2725 key == &format!("p:request:{pubkey}")
2726 && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2727 }));
2728 }
2729
2730 #[test]
2731 fn test_profile_search_index_persists_across_reopen() {
2732 let _guard = test_lock();
2733 let tmp = TempDir::new().unwrap();
2734 let keys = Keys::generate();
2735 let pubkey = keys.public_key().to_hex();
2736
2737 {
2738 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2739 let profile = EventBuilder::new(
2740 Kind::Metadata,
2741 serde_json::json!({
2742 "display_name": "reopen user",
2743 })
2744 .to_string(),
2745 [],
2746 )
2747 .custom_created_at(Timestamp::from_secs(5))
2748 .to_event(&keys)
2749 .unwrap();
2750 ingest_parsed_event(&graph_store, &profile).unwrap();
2751 assert!(graph_store.profile_search_root().unwrap().is_some());
2752 }
2753
2754 let reopened = open_social_graph_store(tmp.path()).unwrap();
2755 assert!(reopened.profile_search_root().unwrap().is_some());
2756 assert_eq!(
2757 reopened
2758 .latest_profile_event(&pubkey)
2759 .unwrap()
2760 .expect("mirrored profile after reopen")
2761 .pubkey,
2762 keys.public_key()
2763 );
2764 let links = reopened
2765 .profile_search_entries_for_prefix("p:reopen")
2766 .unwrap();
2767 assert_eq!(links.len(), 1);
2768 assert_eq!(links[0].0, format!("p:reopen:{pubkey}"));
2769 assert_eq!(links[0].1.name, "reopen user");
2770 }
2771
2772 #[test]
2773 fn test_profile_search_index_with_shared_hashtree_storage() {
2774 let _guard = test_lock();
2775 let tmp = TempDir::new().unwrap();
2776 let store =
2777 crate::storage::HashtreeStore::with_options(tmp.path(), None, 1024 * 1024 * 1024)
2778 .unwrap();
2779 let graph_store =
2780 open_social_graph_store_with_storage(tmp.path(), store.store_arc(), None).unwrap();
2781 let keys = Keys::generate();
2782 let pubkey = keys.public_key().to_hex();
2783
2784 let profile = EventBuilder::new(
2785 Kind::Metadata,
2786 serde_json::json!({
2787 "display_name": "shared storage user",
2788 "nip05": "shareduser@example.com",
2789 })
2790 .to_string(),
2791 [],
2792 )
2793 .custom_created_at(Timestamp::from_secs(5))
2794 .to_event(&keys)
2795 .unwrap();
2796
2797 graph_store
2798 .sync_profile_index_for_events(std::slice::from_ref(&profile))
2799 .unwrap();
2800 assert!(graph_store.profile_search_root().unwrap().is_some());
2801 assert!(graph_store.profile_search_root().unwrap().is_some());
2802 let links = graph_store
2803 .profile_search_entries_for_prefix("p:shared")
2804 .unwrap();
2805 assert_eq!(links.len(), 2);
2806 assert!(links
2807 .iter()
2808 .any(|(key, entry)| key == &format!("p:shared:{pubkey}")
2809 && entry.name == "shared storage user"));
2810 assert!(links
2811 .iter()
2812 .any(|(key, entry)| key == &format!("p:shareduser:{pubkey}")
2813 && entry.nip05.as_deref() == Some("shareduser")));
2814 }
2815
2816 #[test]
2817 fn test_rebuild_profile_index_from_stored_events_uses_ambient_and_public_metadata() {
2818 let _guard = test_lock();
2819 let tmp = TempDir::new().unwrap();
2820 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2821 let public_keys = Keys::generate();
2822 let ambient_keys = Keys::generate();
2823 let public_pubkey = public_keys.public_key().to_hex();
2824 let ambient_pubkey = ambient_keys.public_key().to_hex();
2825
2826 let older = EventBuilder::new(
2827 Kind::Metadata,
2828 serde_json::json!({
2829 "display_name": "petri old",
2830 })
2831 .to_string(),
2832 [],
2833 )
2834 .custom_created_at(Timestamp::from_secs(5))
2835 .to_event(&public_keys)
2836 .unwrap();
2837 let newer = EventBuilder::new(
2838 Kind::Metadata,
2839 serde_json::json!({
2840 "display_name": "petri",
2841 "name": "Petri Example",
2842 "nip05": "petri@example.com",
2843 })
2844 .to_string(),
2845 [],
2846 )
2847 .custom_created_at(Timestamp::from_secs(6))
2848 .to_event(&public_keys)
2849 .unwrap();
2850 let ambient = EventBuilder::new(
2851 Kind::Metadata,
2852 serde_json::json!({
2853 "display_name": "ambient petri",
2854 })
2855 .to_string(),
2856 [],
2857 )
2858 .custom_created_at(Timestamp::from_secs(7))
2859 .to_event(&ambient_keys)
2860 .unwrap();
2861
2862 ingest_parsed_event_with_storage_class(&graph_store, &older, EventStorageClass::Public)
2863 .unwrap();
2864 ingest_parsed_event_with_storage_class(&graph_store, &newer, EventStorageClass::Public)
2865 .unwrap();
2866 ingest_parsed_event_with_storage_class(&graph_store, &ambient, EventStorageClass::Ambient)
2867 .unwrap();
2868
2869 graph_store
2870 .profile_index
2871 .write_by_pubkey_root(None)
2872 .unwrap();
2873 graph_store.profile_index.write_search_root(None).unwrap();
2874
2875 let rebuilt = graph_store
2876 .rebuild_profile_index_from_stored_events()
2877 .unwrap();
2878 assert_eq!(rebuilt, 2);
2879
2880 let entries = graph_store
2881 .profile_search_entries_for_prefix("p:petri")
2882 .unwrap();
2883 assert_eq!(entries.len(), 2);
2884 assert!(entries.iter().any(|(key, entry)| {
2885 key == &format!("p:petri:{public_pubkey}")
2886 && entry.name == "petri"
2887 && entry.aliases == vec!["Petri Example".to_string()]
2888 && entry.nip05.is_none()
2889 }));
2890 assert!(entries.iter().any(|(key, entry)| {
2891 key == &format!("p:petri:{ambient_pubkey}")
2892 && entry.name == "ambient petri"
2893 && entry.aliases.is_empty()
2894 && entry.nip05.is_none()
2895 }));
2896 }
2897
2898 #[test]
2899 fn test_rebuild_profile_index_excludes_overmuted_users() {
2900 let _guard = test_lock();
2901 let tmp = TempDir::new().unwrap();
2902 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2903 let root_keys = Keys::generate();
2904 let muted_keys = Keys::generate();
2905 let muted_pubkey = muted_keys.public_key().to_hex();
2906
2907 set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
2908 graph_store.set_profile_index_overmute_threshold(1.0);
2909
2910 let profile = EventBuilder::new(
2911 Kind::Metadata,
2912 serde_json::json!({
2913 "display_name": "muted petri",
2914 })
2915 .to_string(),
2916 [],
2917 )
2918 .custom_created_at(Timestamp::from_secs(5))
2919 .to_event(&muted_keys)
2920 .unwrap();
2921 ingest_parsed_event(&graph_store, &profile).unwrap();
2922 assert!(graph_store
2923 .latest_profile_event(&muted_pubkey)
2924 .unwrap()
2925 .is_some());
2926
2927 let mute = EventBuilder::new(
2928 Kind::MuteList,
2929 "",
2930 vec![Tag::public_key(muted_keys.public_key())],
2931 )
2932 .custom_created_at(Timestamp::from_secs(6))
2933 .to_event(&root_keys)
2934 .unwrap();
2935 ingest_parsed_event(&graph_store, &mute).unwrap();
2936 assert!(graph_store
2937 .is_overmuted_user(&muted_keys.public_key().to_bytes(), 1.0)
2938 .unwrap());
2939
2940 let rebuilt = graph_store
2941 .rebuild_profile_index_from_stored_events()
2942 .unwrap();
2943 assert_eq!(rebuilt, 0);
2944 assert!(graph_store
2945 .latest_profile_event(&muted_pubkey)
2946 .unwrap()
2947 .is_none());
2948 assert!(graph_store
2949 .profile_search_entries_for_prefix("p:muted")
2950 .unwrap()
2951 .is_empty());
2952 }
2953
2954 #[test]
2955 fn test_query_events_by_author() {
2956 let _guard = test_lock();
2957 let tmp = TempDir::new().unwrap();
2958 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2959 let keys = Keys::generate();
2960
2961 let older = EventBuilder::new(Kind::TextNote, "older", [])
2962 .custom_created_at(Timestamp::from_secs(5))
2963 .to_event(&keys)
2964 .unwrap();
2965 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2966 .custom_created_at(Timestamp::from_secs(6))
2967 .to_event(&keys)
2968 .unwrap();
2969
2970 ingest_parsed_event(&graph_store, &older).unwrap();
2971 ingest_parsed_event(&graph_store, &newer).unwrap();
2972
2973 let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
2974 let events = query_events(&graph_store, &filter, 10);
2975 assert_eq!(events.len(), 2);
2976 assert_eq!(events[0].id, newer.id);
2977 assert_eq!(events[1].id, older.id);
2978 }
2979
2980 #[test]
2981 fn test_query_events_by_kind() {
2982 let _guard = test_lock();
2983 let tmp = TempDir::new().unwrap();
2984 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2985 let first_keys = Keys::generate();
2986 let second_keys = Keys::generate();
2987
2988 let older = EventBuilder::new(Kind::TextNote, "older", [])
2989 .custom_created_at(Timestamp::from_secs(5))
2990 .to_event(&first_keys)
2991 .unwrap();
2992 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2993 .custom_created_at(Timestamp::from_secs(6))
2994 .to_event(&second_keys)
2995 .unwrap();
2996 let other_kind = EventBuilder::new(Kind::Metadata, "profile", [])
2997 .custom_created_at(Timestamp::from_secs(7))
2998 .to_event(&second_keys)
2999 .unwrap();
3000
3001 ingest_parsed_event(&graph_store, &older).unwrap();
3002 ingest_parsed_event(&graph_store, &newer).unwrap();
3003 ingest_parsed_event(&graph_store, &other_kind).unwrap();
3004
3005 let filter = Filter::new().kind(Kind::TextNote);
3006 let events = query_events(&graph_store, &filter, 10);
3007 assert_eq!(events.len(), 2);
3008 assert_eq!(events[0].id, newer.id);
3009 assert_eq!(events[1].id, older.id);
3010 }
3011
3012 #[test]
3013 fn test_query_events_by_id() {
3014 let _guard = test_lock();
3015 let tmp = TempDir::new().unwrap();
3016 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3017 let keys = Keys::generate();
3018
3019 let first = EventBuilder::new(Kind::TextNote, "first", [])
3020 .custom_created_at(Timestamp::from_secs(5))
3021 .to_event(&keys)
3022 .unwrap();
3023 let target = EventBuilder::new(Kind::TextNote, "target", [])
3024 .custom_created_at(Timestamp::from_secs(6))
3025 .to_event(&keys)
3026 .unwrap();
3027
3028 ingest_parsed_event(&graph_store, &first).unwrap();
3029 ingest_parsed_event(&graph_store, &target).unwrap();
3030
3031 let filter = Filter::new().id(target.id).kind(Kind::TextNote);
3032 let events = query_events(&graph_store, &filter, 10);
3033 assert_eq!(events.len(), 1);
3034 assert_eq!(events[0].id, target.id);
3035 }
3036
3037 #[test]
3038 fn test_query_events_search_is_case_insensitive() {
3039 let _guard = test_lock();
3040 let tmp = TempDir::new().unwrap();
3041 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3042 let keys = Keys::generate();
3043 let other_keys = Keys::generate();
3044
3045 let matching = EventBuilder::new(Kind::TextNote, "Hello Nostr Search", [])
3046 .custom_created_at(Timestamp::from_secs(5))
3047 .to_event(&keys)
3048 .unwrap();
3049 let other = EventBuilder::new(Kind::TextNote, "goodbye world", [])
3050 .custom_created_at(Timestamp::from_secs(6))
3051 .to_event(&other_keys)
3052 .unwrap();
3053
3054 ingest_parsed_event(&graph_store, &matching).unwrap();
3055 ingest_parsed_event(&graph_store, &other).unwrap();
3056
3057 let filter = Filter::new().kind(Kind::TextNote).search("nostr search");
3058 let events = query_events(&graph_store, &filter, 10);
3059 assert_eq!(events.len(), 1);
3060 assert_eq!(events[0].id, matching.id);
3061 }
3062
3063 #[test]
3064 fn test_query_events_since_until_are_inclusive() {
3065 let _guard = test_lock();
3066 let tmp = TempDir::new().unwrap();
3067 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3068 let keys = Keys::generate();
3069
3070 let before = EventBuilder::new(Kind::TextNote, "before", [])
3071 .custom_created_at(Timestamp::from_secs(5))
3072 .to_event(&keys)
3073 .unwrap();
3074 let start = EventBuilder::new(Kind::TextNote, "start", [])
3075 .custom_created_at(Timestamp::from_secs(6))
3076 .to_event(&keys)
3077 .unwrap();
3078 let end = EventBuilder::new(Kind::TextNote, "end", [])
3079 .custom_created_at(Timestamp::from_secs(10))
3080 .to_event(&keys)
3081 .unwrap();
3082 let after = EventBuilder::new(Kind::TextNote, "after", [])
3083 .custom_created_at(Timestamp::from_secs(11))
3084 .to_event(&keys)
3085 .unwrap();
3086
3087 ingest_parsed_event(&graph_store, &before).unwrap();
3088 ingest_parsed_event(&graph_store, &start).unwrap();
3089 ingest_parsed_event(&graph_store, &end).unwrap();
3090 ingest_parsed_event(&graph_store, &after).unwrap();
3091
3092 let filter = Filter::new()
3093 .kind(Kind::TextNote)
3094 .since(Timestamp::from_secs(6))
3095 .until(Timestamp::from_secs(10));
3096 let events = query_events(&graph_store, &filter, 10);
3097 let ids = events.into_iter().map(|event| event.id).collect::<Vec<_>>();
3098 assert_eq!(ids, vec![end.id, start.id]);
3099 }
3100
3101 #[test]
3102 fn test_query_events_replaceable_kind_returns_latest_winner() {
3103 let _guard = test_lock();
3104 let tmp = TempDir::new().unwrap();
3105 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3106 let keys = Keys::generate();
3107
3108 let older = EventBuilder::new(Kind::Custom(10_000), "older mute list", [])
3109 .custom_created_at(Timestamp::from_secs(5))
3110 .to_event(&keys)
3111 .unwrap();
3112 let newer = EventBuilder::new(Kind::Custom(10_000), "newer mute list", [])
3113 .custom_created_at(Timestamp::from_secs(6))
3114 .to_event(&keys)
3115 .unwrap();
3116
3117 ingest_parsed_event(&graph_store, &older).unwrap();
3118 ingest_parsed_event(&graph_store, &newer).unwrap();
3119
3120 let filter = Filter::new()
3121 .author(keys.public_key())
3122 .kind(Kind::Custom(10_000));
3123 let events = query_events(&graph_store, &filter, 10);
3124 assert_eq!(events.len(), 1);
3125 assert_eq!(events[0].id, newer.id);
3126 }
3127
3128 #[test]
3129 fn test_query_events_kind_41_replaceable_returns_latest_winner() {
3130 let _guard = test_lock();
3131 let tmp = TempDir::new().unwrap();
3132 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3133 let keys = Keys::generate();
3134
3135 let older = EventBuilder::new(Kind::Custom(41), "older channel metadata", [])
3136 .custom_created_at(Timestamp::from_secs(5))
3137 .to_event(&keys)
3138 .unwrap();
3139 let newer = EventBuilder::new(Kind::Custom(41), "newer channel metadata", [])
3140 .custom_created_at(Timestamp::from_secs(6))
3141 .to_event(&keys)
3142 .unwrap();
3143
3144 ingest_parsed_event(&graph_store, &older).unwrap();
3145 ingest_parsed_event(&graph_store, &newer).unwrap();
3146
3147 let filter = Filter::new()
3148 .author(keys.public_key())
3149 .kind(Kind::Custom(41));
3150 let events = query_events(&graph_store, &filter, 10);
3151 assert_eq!(events.len(), 1);
3152 assert_eq!(events[0].id, newer.id);
3153 }
3154
3155 #[test]
3156 fn test_public_and_ambient_indexes_stay_separate() {
3157 let _guard = test_lock();
3158 let tmp = TempDir::new().unwrap();
3159 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3160 let public_keys = Keys::generate();
3161 let ambient_keys = Keys::generate();
3162
3163 let public_event = EventBuilder::new(Kind::TextNote, "public", [])
3164 .custom_created_at(Timestamp::from_secs(5))
3165 .to_event(&public_keys)
3166 .unwrap();
3167 let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
3168 .custom_created_at(Timestamp::from_secs(6))
3169 .to_event(&ambient_keys)
3170 .unwrap();
3171
3172 ingest_parsed_event_with_storage_class(
3173 &graph_store,
3174 &public_event,
3175 EventStorageClass::Public,
3176 )
3177 .unwrap();
3178 ingest_parsed_event_with_storage_class(
3179 &graph_store,
3180 &ambient_event,
3181 EventStorageClass::Ambient,
3182 )
3183 .unwrap();
3184
3185 let filter = Filter::new().kind(Kind::TextNote);
3186 let all_events = graph_store
3187 .query_events_in_scope(&filter, 10, EventQueryScope::All)
3188 .unwrap();
3189 assert_eq!(all_events.len(), 2);
3190
3191 let public_events = graph_store
3192 .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3193 .unwrap();
3194 assert_eq!(public_events.len(), 1);
3195 assert_eq!(public_events[0].id, public_event.id);
3196
3197 let ambient_events = graph_store
3198 .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3199 .unwrap();
3200 assert_eq!(ambient_events.len(), 1);
3201 assert_eq!(ambient_events[0].id, ambient_event.id);
3202 }
3203
3204 #[test]
3205 fn test_default_ingest_classifies_root_author_as_public() {
3206 let _guard = test_lock();
3207 let tmp = TempDir::new().unwrap();
3208 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3209 let root_keys = Keys::generate();
3210 let other_keys = Keys::generate();
3211 set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
3212
3213 let root_event = EventBuilder::new(Kind::TextNote, "root", [])
3214 .custom_created_at(Timestamp::from_secs(5))
3215 .to_event(&root_keys)
3216 .unwrap();
3217 let other_event = EventBuilder::new(Kind::TextNote, "other", [])
3218 .custom_created_at(Timestamp::from_secs(6))
3219 .to_event(&other_keys)
3220 .unwrap();
3221
3222 ingest_parsed_event(&graph_store, &root_event).unwrap();
3223 ingest_parsed_event(&graph_store, &other_event).unwrap();
3224
3225 let filter = Filter::new().kind(Kind::TextNote);
3226 let public_events = graph_store
3227 .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3228 .unwrap();
3229 assert_eq!(public_events.len(), 1);
3230 assert_eq!(public_events[0].id, root_event.id);
3231
3232 let ambient_events = graph_store
3233 .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3234 .unwrap();
3235 assert_eq!(ambient_events.len(), 1);
3236 assert_eq!(ambient_events[0].id, other_event.id);
3237 }
3238
3239 #[test]
3240 fn test_query_events_survives_reopen() {
3241 let _guard = test_lock();
3242 let tmp = TempDir::new().unwrap();
3243 let db_dir = tmp.path().join("socialgraph-store");
3244 let keys = Keys::generate();
3245 let other_keys = Keys::generate();
3246
3247 {
3248 let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
3249 let older = EventBuilder::new(Kind::TextNote, "older", [])
3250 .custom_created_at(Timestamp::from_secs(5))
3251 .to_event(&keys)
3252 .unwrap();
3253 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
3254 .custom_created_at(Timestamp::from_secs(6))
3255 .to_event(&keys)
3256 .unwrap();
3257 let latest = EventBuilder::new(Kind::TextNote, "latest", [])
3258 .custom_created_at(Timestamp::from_secs(7))
3259 .to_event(&other_keys)
3260 .unwrap();
3261
3262 ingest_parsed_event(&graph_store, &older).unwrap();
3263 ingest_parsed_event(&graph_store, &newer).unwrap();
3264 ingest_parsed_event(&graph_store, &latest).unwrap();
3265 }
3266
3267 let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
3268
3269 let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
3270 let author_events = query_events(&reopened, &author_filter, 10);
3271 assert_eq!(author_events.len(), 2);
3272 assert_eq!(author_events[0].content, "newer");
3273 assert_eq!(author_events[1].content, "older");
3274
3275 let recent_filter = Filter::new().kind(Kind::TextNote);
3276 let recent_events = query_events(&reopened, &recent_filter, 2);
3277 assert_eq!(recent_events.len(), 2);
3278 assert_eq!(recent_events[0].content, "latest");
3279 assert_eq!(recent_events[1].content, "newer");
3280 }
3281
3282 #[test]
3283 fn test_query_events_parameterized_replaceable_by_d_tag() {
3284 let _guard = test_lock();
3285 let tmp = TempDir::new().unwrap();
3286 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3287 let keys = Keys::generate();
3288
3289 let older = EventBuilder::new(
3290 Kind::Custom(30078),
3291 "",
3292 vec![
3293 Tag::identifier("video"),
3294 Tag::parse(&["l", "hashtree"]).unwrap(),
3295 Tag::parse(&["hash", &"11".repeat(32)]).unwrap(),
3296 ],
3297 )
3298 .custom_created_at(Timestamp::from_secs(5))
3299 .to_event(&keys)
3300 .unwrap();
3301 let newer = EventBuilder::new(
3302 Kind::Custom(30078),
3303 "",
3304 vec![
3305 Tag::identifier("video"),
3306 Tag::parse(&["l", "hashtree"]).unwrap(),
3307 Tag::parse(&["hash", &"22".repeat(32)]).unwrap(),
3308 ],
3309 )
3310 .custom_created_at(Timestamp::from_secs(6))
3311 .to_event(&keys)
3312 .unwrap();
3313 let other_tree = EventBuilder::new(
3314 Kind::Custom(30078),
3315 "",
3316 vec![
3317 Tag::identifier("files"),
3318 Tag::parse(&["l", "hashtree"]).unwrap(),
3319 Tag::parse(&["hash", &"33".repeat(32)]).unwrap(),
3320 ],
3321 )
3322 .custom_created_at(Timestamp::from_secs(7))
3323 .to_event(&keys)
3324 .unwrap();
3325
3326 ingest_parsed_event(&graph_store, &older).unwrap();
3327 ingest_parsed_event(&graph_store, &newer).unwrap();
3328 ingest_parsed_event(&graph_store, &other_tree).unwrap();
3329
3330 let filter = Filter::new()
3331 .author(keys.public_key())
3332 .kind(Kind::Custom(30078))
3333 .identifier("video");
3334 let events = query_events(&graph_store, &filter, 10);
3335 assert_eq!(events.len(), 1);
3336 assert_eq!(events[0].id, newer.id);
3337 }
3338
3339 #[test]
3340 fn test_query_events_by_hashtag_uses_tag_index() {
3341 let _guard = test_lock();
3342 let tmp = TempDir::new().unwrap();
3343 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3344 let keys = Keys::generate();
3345 let other_keys = Keys::generate();
3346
3347 let first = EventBuilder::new(
3348 Kind::TextNote,
3349 "first",
3350 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3351 )
3352 .custom_created_at(Timestamp::from_secs(5))
3353 .to_event(&keys)
3354 .unwrap();
3355 let second = EventBuilder::new(
3356 Kind::TextNote,
3357 "second",
3358 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3359 )
3360 .custom_created_at(Timestamp::from_secs(6))
3361 .to_event(&other_keys)
3362 .unwrap();
3363 let unrelated = EventBuilder::new(
3364 Kind::TextNote,
3365 "third",
3366 vec![Tag::parse(&["t", "other"]).unwrap()],
3367 )
3368 .custom_created_at(Timestamp::from_secs(7))
3369 .to_event(&other_keys)
3370 .unwrap();
3371
3372 ingest_parsed_event(&graph_store, &first).unwrap();
3373 ingest_parsed_event(&graph_store, &second).unwrap();
3374 ingest_parsed_event(&graph_store, &unrelated).unwrap();
3375
3376 let filter = Filter::new().hashtag("hashtree");
3377 let events = query_events(&graph_store, &filter, 10);
3378 assert_eq!(events.len(), 2);
3379 assert_eq!(events[0].id, second.id);
3380 assert_eq!(events[1].id, first.id);
3381 }
3382
3383 #[test]
3384 fn test_query_events_combines_indexes_then_applies_search_filter() {
3385 let _guard = test_lock();
3386 let tmp = TempDir::new().unwrap();
3387 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3388 let keys = Keys::generate();
3389 let other_keys = Keys::generate();
3390
3391 let matching = EventBuilder::new(
3392 Kind::TextNote,
3393 "hashtree video release",
3394 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3395 )
3396 .custom_created_at(Timestamp::from_secs(5))
3397 .to_event(&keys)
3398 .unwrap();
3399 let non_matching = EventBuilder::new(
3400 Kind::TextNote,
3401 "plain text note",
3402 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3403 )
3404 .custom_created_at(Timestamp::from_secs(6))
3405 .to_event(&other_keys)
3406 .unwrap();
3407
3408 ingest_parsed_event(&graph_store, &matching).unwrap();
3409 ingest_parsed_event(&graph_store, &non_matching).unwrap();
3410
3411 let filter = Filter::new().hashtag("hashtree").search("video");
3412 let events = query_events(&graph_store, &filter, 10);
3413 assert_eq!(events.len(), 1);
3414 assert_eq!(events[0].id, matching.id);
3415 }
3416
3417 fn benchmark_dataset_path() -> Option<PathBuf> {
3418 std::env::var_os("HASHTREE_BENCH_DATASET_PATH").map(PathBuf::from)
3419 }
3420
3421 fn benchmark_dataset_url() -> String {
3422 std::env::var("HASHTREE_BENCH_DATASET_URL")
3423 .ok()
3424 .filter(|value| !value.is_empty())
3425 .unwrap_or_else(|| WELLORDER_FIXTURE_URL.to_string())
3426 }
3427
3428 fn benchmark_stream_warmup_events(measured_events: usize) -> usize {
3429 std::env::var("HASHTREE_BENCH_WARMUP_EVENTS")
3430 .ok()
3431 .and_then(|value| value.parse::<usize>().ok())
3432 .unwrap_or_else(|| measured_events.clamp(1, 200))
3433 }
3434
3435 fn ensure_benchmark_dataset(path: &Path, url: &str) -> Result<()> {
3436 if path.exists() {
3437 return Ok(());
3438 }
3439
3440 let parent = path
3441 .parent()
3442 .context("benchmark dataset path has no parent directory")?;
3443 fs::create_dir_all(parent).context("create benchmark dataset directory")?;
3444
3445 let tmp = path.with_extension("tmp");
3446 let mut response = reqwest::blocking::get(url)
3447 .context("download benchmark dataset")?
3448 .error_for_status()
3449 .context("benchmark dataset request failed")?;
3450 let mut file = File::create(&tmp).context("create temporary benchmark dataset file")?;
3451 std::io::copy(&mut response, &mut file).context("write benchmark dataset")?;
3452 fs::rename(&tmp, path).context("move benchmark dataset into place")?;
3453
3454 Ok(())
3455 }
3456
3457 fn load_benchmark_dataset(path: &Path, max_events: usize) -> Result<Vec<Event>> {
3458 if max_events == 0 {
3459 return Ok(Vec::new());
3460 }
3461
3462 let mut child = Command::new("bzip2")
3463 .args(["-dc", &path.to_string_lossy()])
3464 .stdout(Stdio::piped())
3465 .spawn()
3466 .context("spawn bzip2 for benchmark dataset")?;
3467 let stdout = child
3468 .stdout
3469 .take()
3470 .context("benchmark dataset stdout missing")?;
3471 let mut events = Vec::with_capacity(max_events);
3472
3473 {
3474 let reader = BufReader::new(stdout);
3475 for line in reader.lines() {
3476 if events.len() >= max_events {
3477 break;
3478 }
3479 let line = line.context("read benchmark dataset line")?;
3480 let trimmed = line.trim();
3481 if trimmed.is_empty() {
3482 continue;
3483 }
3484 if let Ok(event) = Event::from_json(trimmed.to_string()) {
3485 events.push(event);
3486 }
3487 }
3488 }
3489
3490 if events.len() < max_events {
3491 let status = child.wait().context("wait for benchmark dataset reader")?;
3492 anyhow::ensure!(
3493 status.success(),
3494 "benchmark dataset reader exited with status {status}"
3495 );
3496 } else {
3497 let _ = child.kill();
3498 let _ = child.wait();
3499 }
3500
3501 Ok(events)
3502 }
3503
3504 fn build_synthetic_benchmark_events(event_count: usize, author_count: usize) -> Vec<Event> {
3505 let authors = (0..author_count)
3506 .map(|_| Keys::generate())
3507 .collect::<Vec<_>>();
3508 let mut events = Vec::with_capacity(event_count);
3509 for i in 0..event_count {
3510 let kind = if i % 8 < 5 {
3511 Kind::TextNote
3512 } else {
3513 Kind::Custom(30_023)
3514 };
3515 let mut tags = Vec::new();
3516 if kind == Kind::TextNote && i % 16 == 0 {
3517 tags.push(Tag::parse(&["t", "hashtree"]).unwrap());
3518 }
3519 let content = if kind == Kind::TextNote && i % 32 == 0 {
3520 format!("benchmark target event {i}")
3521 } else {
3522 format!("benchmark event {i}")
3523 };
3524 let event = EventBuilder::new(kind, content, tags)
3525 .custom_created_at(Timestamp::from_secs(1_700_000_000 + i as u64))
3526 .to_event(&authors[i % author_count])
3527 .unwrap();
3528 events.push(event);
3529 }
3530 events
3531 }
3532
3533 fn load_benchmark_events(
3534 event_count: usize,
3535 author_count: usize,
3536 ) -> Result<(String, Vec<Event>)> {
3537 if let Some(path) = benchmark_dataset_path() {
3538 let url = benchmark_dataset_url();
3539 ensure_benchmark_dataset(&path, &url)?;
3540 let events = load_benchmark_dataset(&path, event_count)?;
3541 return Ok((format!("dataset:{}", path.display()), events));
3542 }
3543
3544 Ok((
3545 format!("synthetic:{author_count}-authors"),
3546 build_synthetic_benchmark_events(event_count, author_count),
3547 ))
3548 }
3549
3550 fn first_tag_filter(event: &Event) -> Option<Filter> {
3551 event.tags.iter().find_map(|tag| match tag.as_slice() {
3552 [name, value, ..]
3553 if name.len() == 1
3554 && !value.is_empty()
3555 && name.as_bytes()[0].is_ascii_lowercase() =>
3556 {
3557 let letter = SingleLetterTag::from_char(name.chars().next()?).ok()?;
3558 Some(Filter::new().custom_tag(letter, [value.to_string()]))
3559 }
3560 _ => None,
3561 })
3562 }
3563
3564 fn first_search_term(event: &Event) -> Option<String> {
3565 event
3566 .content
3567 .split(|ch: char| !ch.is_alphanumeric())
3568 .find(|token| token.len() >= 4)
3569 .map(|token| token.to_ascii_lowercase())
3570 }
3571
3572 fn benchmark_match_count(events: &[Event], filter: &Filter, limit: usize) -> usize {
3573 events
3574 .iter()
3575 .filter(|event| filter.match_event(event))
3576 .count()
3577 .min(limit)
3578 }
3579
3580 fn benchmark_btree_orders() -> Vec<usize> {
3581 std::env::var("HASHTREE_BTREE_ORDERS")
3582 .ok()
3583 .map(|value| {
3584 value
3585 .split(',')
3586 .filter_map(|part| part.trim().parse::<usize>().ok())
3587 .filter(|order| *order >= 2)
3588 .collect::<Vec<_>>()
3589 })
3590 .filter(|orders| !orders.is_empty())
3591 .unwrap_or_else(|| vec![16, 24, 32, 48, 64])
3592 }
3593
3594 fn benchmark_read_iterations() -> usize {
3595 std::env::var("HASHTREE_BENCH_READ_ITERATIONS")
3596 .ok()
3597 .and_then(|value| value.parse::<usize>().ok())
3598 .unwrap_or(5)
3599 .max(1)
3600 }
3601
3602 fn average_duration(samples: &[Duration]) -> Duration {
3603 if samples.is_empty() {
3604 return Duration::ZERO;
3605 }
3606
3607 Duration::from_secs_f64(
3608 samples.iter().map(Duration::as_secs_f64).sum::<f64>() / samples.len() as f64,
3609 )
3610 }
3611
3612 fn average_read_trace(samples: &[ReadTraceSnapshot]) -> ReadTraceSnapshot {
3613 if samples.is_empty() {
3614 return ReadTraceSnapshot::default();
3615 }
3616
3617 let len = samples.len() as u64;
3618 ReadTraceSnapshot {
3619 get_calls: samples.iter().map(|sample| sample.get_calls).sum::<u64>() / len,
3620 total_bytes: samples.iter().map(|sample| sample.total_bytes).sum::<u64>() / len,
3621 unique_blocks: (samples
3622 .iter()
3623 .map(|sample| sample.unique_blocks as u64)
3624 .sum::<u64>()
3625 / len) as usize,
3626 unique_bytes: samples
3627 .iter()
3628 .map(|sample| sample.unique_bytes)
3629 .sum::<u64>()
3630 / len,
3631 cache_hits: samples.iter().map(|sample| sample.cache_hits).sum::<u64>() / len,
3632 remote_fetches: samples
3633 .iter()
3634 .map(|sample| sample.remote_fetches)
3635 .sum::<u64>()
3636 / len,
3637 remote_bytes: samples
3638 .iter()
3639 .map(|sample| sample.remote_bytes)
3640 .sum::<u64>()
3641 / len,
3642 }
3643 }
3644
3645 fn estimate_serialized_remote_ms(snapshot: &ReadTraceSnapshot, model: NetworkModel) -> f64 {
3646 let transfer_ms = if model.bandwidth_mib_per_s <= 0.0 {
3647 0.0
3648 } else {
3649 (snapshot.remote_bytes as f64 / (model.bandwidth_mib_per_s * 1024.0 * 1024.0)) * 1000.0
3650 };
3651 snapshot.remote_fetches as f64 * model.rtt_ms + transfer_ms
3652 }
3653
3654 #[derive(Debug, Clone)]
3655 struct IndexBenchmarkDataset {
3656 source: String,
3657 events: Vec<Event>,
3658 guaranteed_tag_name: String,
3659 guaranteed_tag_value: String,
3660 replaceable_pubkey: String,
3661 replaceable_kind: u32,
3662 parameterized_pubkey: String,
3663 parameterized_kind: u32,
3664 parameterized_d_tag: String,
3665 }
3666
3667 fn load_index_benchmark_dataset(
3668 event_count: usize,
3669 author_count: usize,
3670 ) -> Result<IndexBenchmarkDataset> {
3671 let (source, mut events) = load_benchmark_events(event_count, author_count)?;
3672 let base_timestamp = events
3673 .iter()
3674 .map(|event| event.created_at.as_u64())
3675 .max()
3676 .unwrap_or(1_700_000_000)
3677 + 1;
3678
3679 let replaceable_keys = Keys::generate();
3680 let parameterized_keys = Keys::generate();
3681 let tagged_keys = Keys::generate();
3682 let guaranteed_tag_name = "t".to_string();
3683 let guaranteed_tag_value = "btreebench".to_string();
3684 let replaceable_kind = 10_000u32;
3685 let parameterized_kind = 30_023u32;
3686 let parameterized_d_tag = "btree-bench".to_string();
3687
3688 let tagged = EventBuilder::new(
3689 Kind::TextNote,
3690 "btree benchmark tagged note",
3691 vec![Tag::parse(&["t", &guaranteed_tag_value]).unwrap()],
3692 )
3693 .custom_created_at(Timestamp::from_secs(base_timestamp))
3694 .to_event(&tagged_keys)
3695 .unwrap();
3696 let replaceable_old = EventBuilder::new(
3697 Kind::Custom(replaceable_kind.try_into().unwrap()),
3698 "replaceable old",
3699 [],
3700 )
3701 .custom_created_at(Timestamp::from_secs(base_timestamp + 1))
3702 .to_event(&replaceable_keys)
3703 .unwrap();
3704 let replaceable_new = EventBuilder::new(
3705 Kind::Custom(replaceable_kind.try_into().unwrap()),
3706 "replaceable new",
3707 [],
3708 )
3709 .custom_created_at(Timestamp::from_secs(base_timestamp + 2))
3710 .to_event(&replaceable_keys)
3711 .unwrap();
3712 let parameterized_old = EventBuilder::new(
3713 Kind::Custom(parameterized_kind.try_into().unwrap()),
3714 "",
3715 vec![Tag::identifier(¶meterized_d_tag)],
3716 )
3717 .custom_created_at(Timestamp::from_secs(base_timestamp + 3))
3718 .to_event(¶meterized_keys)
3719 .unwrap();
3720 let parameterized_new = EventBuilder::new(
3721 Kind::Custom(parameterized_kind.try_into().unwrap()),
3722 "",
3723 vec![Tag::identifier(¶meterized_d_tag)],
3724 )
3725 .custom_created_at(Timestamp::from_secs(base_timestamp + 4))
3726 .to_event(¶meterized_keys)
3727 .unwrap();
3728
3729 events.extend([
3730 tagged,
3731 replaceable_old,
3732 replaceable_new,
3733 parameterized_old,
3734 parameterized_new,
3735 ]);
3736
3737 Ok(IndexBenchmarkDataset {
3738 source,
3739 events,
3740 guaranteed_tag_name,
3741 guaranteed_tag_value,
3742 replaceable_pubkey: replaceable_keys.public_key().to_hex(),
3743 replaceable_kind,
3744 parameterized_pubkey: parameterized_keys.public_key().to_hex(),
3745 parameterized_kind,
3746 parameterized_d_tag,
3747 })
3748 }
3749
3750 fn build_btree_query_cases(dataset: &IndexBenchmarkDataset) -> Vec<BenchmarkQueryCase> {
3751 let primary_kind = dataset
3752 .events
3753 .iter()
3754 .find(|event| event.kind == Kind::TextNote)
3755 .map(|event| event.kind)
3756 .or_else(|| dataset.events.first().map(|event| event.kind))
3757 .expect("benchmark requires at least one event");
3758 let primary_kind_u32 = primary_kind.as_u16() as u32;
3759
3760 let author_pubkey = dataset
3761 .events
3762 .iter()
3763 .filter(|event| event.kind == primary_kind)
3764 .fold(HashMap::<String, usize>::new(), |mut counts, event| {
3765 *counts.entry(event.pubkey.to_hex()).or_default() += 1;
3766 counts
3767 })
3768 .into_iter()
3769 .max_by_key(|(_, count)| *count)
3770 .map(|(pubkey, _)| pubkey)
3771 .expect("benchmark requires an author for the selected kind");
3772
3773 let by_id_id = dataset.events[dataset.events.len() / 2].id.to_hex();
3774
3775 vec![
3776 BenchmarkQueryCase::ById { id: by_id_id },
3777 BenchmarkQueryCase::ByAuthor {
3778 pubkey: author_pubkey.clone(),
3779 limit: 50,
3780 },
3781 BenchmarkQueryCase::ByAuthorKind {
3782 pubkey: author_pubkey,
3783 kind: primary_kind_u32,
3784 limit: 50,
3785 },
3786 BenchmarkQueryCase::ByKind {
3787 kind: primary_kind_u32,
3788 limit: 200,
3789 },
3790 BenchmarkQueryCase::ByTag {
3791 tag_name: dataset.guaranteed_tag_name.clone(),
3792 tag_value: dataset.guaranteed_tag_value.clone(),
3793 limit: 100,
3794 },
3795 BenchmarkQueryCase::Recent { limit: 100 },
3796 BenchmarkQueryCase::Replaceable {
3797 pubkey: dataset.replaceable_pubkey.clone(),
3798 kind: dataset.replaceable_kind,
3799 },
3800 BenchmarkQueryCase::ParameterizedReplaceable {
3801 pubkey: dataset.parameterized_pubkey.clone(),
3802 kind: dataset.parameterized_kind,
3803 d_tag: dataset.parameterized_d_tag.clone(),
3804 },
3805 ]
3806 }
3807
3808 fn benchmark_warm_query_case<S: Store + 'static>(
3809 base: Arc<S>,
3810 root: &Cid,
3811 order: usize,
3812 case: &BenchmarkQueryCase,
3813 iterations: usize,
3814 ) -> QueryBenchmarkResult {
3815 let trace_store = Arc::new(CountingStore::new(base));
3816 let event_store = NostrEventStore::with_options(
3817 Arc::clone(&trace_store),
3818 NostrEventStoreOptions {
3819 btree_order: Some(order),
3820 },
3821 );
3822 let mut durations = Vec::with_capacity(iterations);
3823 let mut traces = Vec::with_capacity(iterations);
3824 for _ in 0..iterations {
3825 trace_store.reset();
3826 let started = Instant::now();
3827 let matches = block_on(case.execute(&event_store, root)).unwrap();
3828 durations.push(started.elapsed());
3829 traces.push(trace_store.snapshot());
3830 assert!(
3831 matches > 0,
3832 "benchmark query {} returned no matches",
3833 case.name()
3834 );
3835 }
3836 let mut sorted = durations.clone();
3837 sorted.sort_unstable();
3838 QueryBenchmarkResult {
3839 average_duration: average_duration(&durations),
3840 p95_duration: duration_percentile(&sorted, 95, 100),
3841 reads: average_read_trace(&traces),
3842 }
3843 }
3844
3845 fn benchmark_cold_query_case<S: Store + 'static>(
3846 remote: Arc<S>,
3847 root: &Cid,
3848 order: usize,
3849 case: &BenchmarkQueryCase,
3850 iterations: usize,
3851 ) -> QueryBenchmarkResult {
3852 let mut durations = Vec::with_capacity(iterations);
3853 let mut traces = Vec::with_capacity(iterations);
3854 for _ in 0..iterations {
3855 let cache = Arc::new(MemoryStore::new());
3856 let trace_store = Arc::new(ReadThroughStore::new(cache, Arc::clone(&remote)));
3857 let event_store = NostrEventStore::with_options(
3858 Arc::clone(&trace_store),
3859 NostrEventStoreOptions {
3860 btree_order: Some(order),
3861 },
3862 );
3863 let started = Instant::now();
3864 let matches = block_on(case.execute(&event_store, root)).unwrap();
3865 durations.push(started.elapsed());
3866 traces.push(trace_store.snapshot());
3867 assert!(
3868 matches > 0,
3869 "benchmark query {} returned no matches",
3870 case.name()
3871 );
3872 }
3873 let mut sorted = durations.clone();
3874 sorted.sort_unstable();
3875 QueryBenchmarkResult {
3876 average_duration: average_duration(&durations),
3877 p95_duration: duration_percentile(&sorted, 95, 100),
3878 reads: average_read_trace(&traces),
3879 }
3880 }
3881
3882 fn duration_percentile(
3883 sorted: &[std::time::Duration],
3884 numerator: usize,
3885 denominator: usize,
3886 ) -> std::time::Duration {
3887 if sorted.is_empty() {
3888 return std::time::Duration::ZERO;
3889 }
3890 let index = ((sorted.len() - 1) * numerator) / denominator;
3891 sorted[index]
3892 }
3893
3894 #[test]
3895 #[ignore = "benchmark"]
3896 fn benchmark_query_events_large_dataset() {
3897 let _guard = test_lock();
3898 let tmp = TempDir::new().unwrap();
3899 let graph_store =
3900 open_social_graph_store_with_mapsize(tmp.path(), Some(512 * 1024 * 1024)).unwrap();
3901 set_nostr_profile_enabled(true);
3902 reset_nostr_profile();
3903
3904 let author_count = 64usize;
3905 let measured_event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3906 .ok()
3907 .and_then(|value| value.parse::<usize>().ok())
3908 .unwrap_or(600usize);
3909 let warmup_event_count = benchmark_stream_warmup_events(measured_event_count);
3910 let total_event_count = warmup_event_count + measured_event_count;
3911 let (source, events) = load_benchmark_events(total_event_count, author_count).unwrap();
3912 let loaded_event_count = events.len();
3913 let warmup_event_count = warmup_event_count.min(loaded_event_count.saturating_sub(1));
3914 let (warmup_events, measured_events) = events.split_at(warmup_event_count);
3915
3916 println!(
3917 "starting steady-state dataset benchmark with {} warmup events and {} measured stream events from {}",
3918 warmup_events.len(),
3919 measured_events.len(),
3920 source
3921 );
3922 if !warmup_events.is_empty() {
3923 ingest_parsed_events(&graph_store, warmup_events).unwrap();
3924 }
3925
3926 let stream_start = Instant::now();
3927 let mut per_event_latencies = Vec::with_capacity(measured_events.len());
3928 for event in measured_events {
3929 let event_start = Instant::now();
3930 ingest_parsed_event(&graph_store, event).unwrap();
3931 per_event_latencies.push(event_start.elapsed());
3932 }
3933 let ingest_duration = stream_start.elapsed();
3934
3935 let mut sorted_latencies = per_event_latencies.clone();
3936 sorted_latencies.sort_unstable();
3937 let average_latency = if per_event_latencies.is_empty() {
3938 std::time::Duration::ZERO
3939 } else {
3940 std::time::Duration::from_secs_f64(
3941 per_event_latencies
3942 .iter()
3943 .map(std::time::Duration::as_secs_f64)
3944 .sum::<f64>()
3945 / per_event_latencies.len() as f64,
3946 )
3947 };
3948 let ingest_capacity_eps = if ingest_duration.is_zero() {
3949 f64::INFINITY
3950 } else {
3951 measured_events.len() as f64 / ingest_duration.as_secs_f64()
3952 };
3953 println!(
3954 "benchmark steady-state ingest complete in {:?} (avg={:?} p50={:?} p95={:?} p99={:?} capacity={:.2} events/s)",
3955 ingest_duration,
3956 average_latency,
3957 duration_percentile(&sorted_latencies, 50, 100),
3958 duration_percentile(&sorted_latencies, 95, 100),
3959 duration_percentile(&sorted_latencies, 99, 100),
3960 ingest_capacity_eps
3961 );
3962 let mut profile = take_nostr_profile();
3963 profile.sort_by(|left, right| right.total.cmp(&left.total));
3964 for stat in profile {
3965 let pct = if ingest_duration.is_zero() {
3966 0.0
3967 } else {
3968 (stat.total.as_secs_f64() / ingest_duration.as_secs_f64()) * 100.0
3969 };
3970 let average = if stat.count == 0 {
3971 std::time::Duration::ZERO
3972 } else {
3973 std::time::Duration::from_secs_f64(stat.total.as_secs_f64() / stat.count as f64)
3974 };
3975 println!(
3976 "ingest profile: label={} total={:?} pct={:.1}% count={} avg={:?} max={:?}",
3977 stat.label, stat.total, pct, stat.count, average, stat.max
3978 );
3979 }
3980 set_nostr_profile_enabled(false);
3981
3982 let kind = events
3983 .iter()
3984 .find(|event| event.kind == Kind::TextNote)
3985 .map(|event| event.kind)
3986 .or_else(|| events.first().map(|event| event.kind))
3987 .expect("benchmark requires at least one event");
3988 let kind_filter = Filter::new().kind(kind);
3989 let kind_start = Instant::now();
3990 let kind_events = query_events(&graph_store, &kind_filter, 200);
3991 let kind_duration = kind_start.elapsed();
3992 assert_eq!(
3993 kind_events.len(),
3994 benchmark_match_count(&events, &kind_filter, 200)
3995 );
3996 assert!(kind_events
3997 .windows(2)
3998 .all(|window| window[0].created_at >= window[1].created_at));
3999
4000 let author_pubkey = events
4001 .iter()
4002 .find(|event| event.kind == kind)
4003 .map(|event| event.pubkey)
4004 .expect("benchmark requires an author for the selected kind");
4005 let author_filter = Filter::new().author(author_pubkey).kind(kind);
4006 let author_start = Instant::now();
4007 let author_events = query_events(&graph_store, &author_filter, 50);
4008 let author_duration = author_start.elapsed();
4009 assert_eq!(
4010 author_events.len(),
4011 benchmark_match_count(&events, &author_filter, 50)
4012 );
4013
4014 let tag_filter = events
4015 .iter()
4016 .find_map(first_tag_filter)
4017 .expect("benchmark requires at least one tagged event");
4018 let tag_start = Instant::now();
4019 let tag_events = query_events(&graph_store, &tag_filter, 100);
4020 let tag_duration = tag_start.elapsed();
4021 assert_eq!(
4022 tag_events.len(),
4023 benchmark_match_count(&events, &tag_filter, 100)
4024 );
4025
4026 let search_source = events
4027 .iter()
4028 .find_map(|event| first_search_term(event).map(|term| (event.kind, term)))
4029 .expect("benchmark requires at least one searchable event");
4030 let search_filter = Filter::new().kind(search_source.0).search(search_source.1);
4031 let search_start = Instant::now();
4032 let search_events = query_events(&graph_store, &search_filter, 100);
4033 let search_duration = search_start.elapsed();
4034 assert_eq!(
4035 search_events.len(),
4036 benchmark_match_count(&events, &search_filter, 100)
4037 );
4038
4039 println!(
4040 "steady-state benchmark: source={} warmup_events={} stream_events={} ingest={:?} avg={:?} p50={:?} p95={:?} p99={:?} capacity_eps={:.2} kind={:?} author={:?} tag={:?} search={:?}",
4041 source,
4042 warmup_events.len(),
4043 measured_events.len(),
4044 ingest_duration,
4045 average_latency,
4046 duration_percentile(&sorted_latencies, 50, 100),
4047 duration_percentile(&sorted_latencies, 95, 100),
4048 duration_percentile(&sorted_latencies, 99, 100),
4049 ingest_capacity_eps,
4050 kind_duration,
4051 author_duration,
4052 tag_duration,
4053 search_duration
4054 );
4055 }
4056
4057 #[test]
4058 #[ignore = "benchmark"]
4059 fn benchmark_nostr_btree_query_tradeoffs() {
4060 let _guard = test_lock();
4061 let event_count = std::env::var("HASHTREE_BENCH_EVENTS")
4062 .ok()
4063 .and_then(|value| value.parse::<usize>().ok())
4064 .unwrap_or(2_000usize);
4065 let iterations = benchmark_read_iterations();
4066 let orders = benchmark_btree_orders();
4067 let dataset = load_index_benchmark_dataset(event_count, 64).unwrap();
4068 let cases = build_btree_query_cases(&dataset);
4069 let stored_events = dataset
4070 .events
4071 .iter()
4072 .map(stored_event_from_nostr)
4073 .collect::<Vec<_>>();
4074
4075 println!(
4076 "btree-order benchmark: source={} events={} iterations={} orders={:?}",
4077 dataset.source,
4078 stored_events.len(),
4079 iterations,
4080 orders
4081 );
4082 println!(
4083 "network models are serialized fetch estimates: {}",
4084 NETWORK_MODELS
4085 .iter()
4086 .map(|model| format!(
4087 "{}={}ms_rtt/{}MiBps",
4088 model.name, model.rtt_ms, model.bandwidth_mib_per_s
4089 ))
4090 .collect::<Vec<_>>()
4091 .join(", ")
4092 );
4093
4094 for order in orders {
4095 let tmp = TempDir::new().unwrap();
4096 let local_store =
4097 Arc::new(LocalStore::new(tmp.path().join("blobs"), &StorageBackend::Lmdb).unwrap());
4098 let event_store = NostrEventStore::with_options(
4099 Arc::clone(&local_store),
4100 NostrEventStoreOptions {
4101 btree_order: Some(order),
4102 },
4103 );
4104 let root = block_on(event_store.build(None, stored_events.clone()))
4105 .unwrap()
4106 .expect("benchmark build root");
4107
4108 println!("btree-order={} root={}", order, hex::encode(root.hash));
4109 let mut warm_total_ms = 0.0f64;
4110 let mut model_totals = NETWORK_MODELS
4111 .iter()
4112 .map(|model| (model.name, 0.0f64))
4113 .collect::<HashMap<_, _>>();
4114
4115 for case in &cases {
4116 let warm = benchmark_warm_query_case(
4117 Arc::clone(&local_store),
4118 &root,
4119 order,
4120 case,
4121 iterations,
4122 );
4123 let cold = benchmark_cold_query_case(
4124 Arc::clone(&local_store),
4125 &root,
4126 order,
4127 case,
4128 iterations,
4129 );
4130 warm_total_ms += warm.average_duration.as_secs_f64() * 1000.0;
4131
4132 let model_estimates = NETWORK_MODELS
4133 .iter()
4134 .map(|model| {
4135 let estimate = estimate_serialized_remote_ms(&cold.reads, *model);
4136 *model_totals.get_mut(model.name).unwrap() += estimate;
4137 format!("{}={:.2}ms", model.name, estimate)
4138 })
4139 .collect::<Vec<_>>()
4140 .join(" ");
4141
4142 println!(
4143 "btree-order={} query={} warm_avg={:?} warm_p95={:?} warm_blocks={} warm_unique_bytes={} cold_fetches={} cold_bytes={} cold_local_avg={:?} {}",
4144 order,
4145 case.name(),
4146 warm.average_duration,
4147 warm.p95_duration,
4148 warm.reads.unique_blocks,
4149 warm.reads.unique_bytes,
4150 cold.reads.remote_fetches,
4151 cold.reads.remote_bytes,
4152 cold.average_duration,
4153 model_estimates
4154 );
4155 }
4156
4157 println!(
4158 "btree-order={} summary unweighted_warm_avg_ms={:.3} {}",
4159 order,
4160 warm_total_ms / cases.len() as f64,
4161 NETWORK_MODELS
4162 .iter()
4163 .map(|model| format!(
4164 "{}={:.2}ms",
4165 model.name,
4166 model_totals[model.name] / cases.len() as f64
4167 ))
4168 .collect::<Vec<_>>()
4169 .join(" ")
4170 );
4171 }
4172 }
4173
4174 #[test]
4175 fn test_ensure_social_graph_mapsize_rounds_and_applies() {
4176 let _guard = test_lock();
4177 let tmp = TempDir::new().unwrap();
4178 ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
4179 let requested = 70 * 1024 * 1024;
4180 ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
4181 let env = unsafe {
4182 heed::EnvOpenOptions::new()
4183 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
4184 .max_dbs(SOCIALGRAPH_MAX_DBS)
4185 .open(tmp.path())
4186 }
4187 .unwrap();
4188 assert!(env.info().map_size >= requested as usize);
4189 assert_eq!(env.info().map_size % page_size_bytes(), 0);
4190 }
4191
4192 #[test]
4193 fn test_ingest_events_batches_graph_updates() {
4194 let _guard = test_lock();
4195 let tmp = TempDir::new().unwrap();
4196 let graph_store = open_social_graph_store(tmp.path()).unwrap();
4197
4198 let root_keys = Keys::generate();
4199 let alice_keys = Keys::generate();
4200 let bob_keys = Keys::generate();
4201
4202 let root_pk = root_keys.public_key().to_bytes();
4203 set_social_graph_root(&graph_store, &root_pk);
4204
4205 let root_follows_alice = EventBuilder::new(
4206 Kind::ContactList,
4207 "",
4208 vec![Tag::public_key(alice_keys.public_key())],
4209 )
4210 .custom_created_at(Timestamp::from_secs(10))
4211 .to_event(&root_keys)
4212 .unwrap();
4213 let alice_follows_bob = EventBuilder::new(
4214 Kind::ContactList,
4215 "",
4216 vec![Tag::public_key(bob_keys.public_key())],
4217 )
4218 .custom_created_at(Timestamp::from_secs(11))
4219 .to_event(&alice_keys)
4220 .unwrap();
4221
4222 ingest_parsed_events(
4223 &graph_store,
4224 &[root_follows_alice.clone(), alice_follows_bob.clone()],
4225 )
4226 .unwrap();
4227
4228 assert_eq!(
4229 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4230 Some(1)
4231 );
4232 assert_eq!(
4233 get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
4234 Some(2)
4235 );
4236
4237 let filter = Filter::new().kind(Kind::ContactList);
4238 let stored = query_events(&graph_store, &filter, 10);
4239 let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
4240 assert!(ids.contains(&root_follows_alice.id));
4241 assert!(ids.contains(&alice_follows_bob.id));
4242 }
4243
4244 #[test]
4245 fn test_ingest_graph_events_updates_graph_without_indexing_events() {
4246 let _guard = test_lock();
4247 let tmp = TempDir::new().unwrap();
4248 let graph_store = open_social_graph_store(tmp.path()).unwrap();
4249
4250 let root_keys = Keys::generate();
4251 let alice_keys = Keys::generate();
4252
4253 let root_pk = root_keys.public_key().to_bytes();
4254 set_social_graph_root(&graph_store, &root_pk);
4255
4256 let root_follows_alice = EventBuilder::new(
4257 Kind::ContactList,
4258 "",
4259 vec![Tag::public_key(alice_keys.public_key())],
4260 )
4261 .custom_created_at(Timestamp::from_secs(10))
4262 .to_event(&root_keys)
4263 .unwrap();
4264
4265 ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
4266 .unwrap();
4267
4268 assert_eq!(
4269 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4270 Some(1)
4271 );
4272 let filter = Filter::new().kind(Kind::ContactList);
4273 assert!(query_events(&graph_store, &filter, 10).is_empty());
4274 }
4275}