1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10 LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
21use hashtree_index::BTree;
22use hashtree_nostr::{
23 is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
24 NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
25};
26#[cfg(test)]
27use hashtree_nostr::{
28 reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
29 take_profile as take_nostr_profile,
30};
31use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
32use nostr_social_graph::{
33 BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
34 SocialGraphBackend as NostrSocialGraphBackend,
35};
36use nostr_social_graph_heed::HeedSocialGraph;
37
38use crate::storage::{LocalStore, StorageRouter};
39
40#[cfg(test)]
41use std::sync::{Mutex, MutexGuard, OnceLock};
42#[cfg(test)]
43use std::time::Instant;
44
45pub type UserSet = BTreeSet<[u8; 32]>;
46
47const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
48const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
49const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
50const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
51const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
52const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
53const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
54const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
55const SOCIALGRAPH_MAX_DBS: u32 = 16;
56const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
57const PROFILE_SEARCH_PREFIX: &str = "p:";
58const PROFILE_NAME_MAX_LENGTH: usize = 100;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum EventStorageClass {
62 Public,
63 Ambient,
64}
65
66#[cfg_attr(not(test), allow(dead_code))]
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub(crate) enum EventQueryScope {
69 PublicOnly,
70 AmbientOnly,
71 All,
72}
73
74struct EventIndexBucket {
75 event_store: NostrEventStore<StorageRouter>,
76 root_path: PathBuf,
77}
78
79struct ProfileIndexBucket {
80 tree: HashTree<StorageRouter>,
81 index: BTree<StorageRouter>,
82 by_pubkey_root_path: PathBuf,
83 search_root_path: PathBuf,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct StoredCid {
88 hash: [u8; 32],
89 key: Option<[u8; 32]>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
93pub struct StoredProfileSearchEntry {
94 pub pubkey: String,
95 pub name: String,
96 #[serde(default)]
97 pub aliases: Vec<String>,
98 #[serde(default)]
99 pub nip05: Option<String>,
100 pub created_at: u64,
101 pub event_nhash: String,
102}
103
104#[derive(Debug, Clone, Default, serde::Serialize)]
105pub struct SocialGraphStats {
106 pub total_users: usize,
107 pub root: Option<String>,
108 pub total_follows: usize,
109 pub max_depth: u32,
110 pub size_by_distance: BTreeMap<u32, usize>,
111 pub enabled: bool,
112}
113
114#[derive(Debug, Clone)]
115struct DistanceCache {
116 stats: SocialGraphStats,
117 users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
118}
119
120#[derive(Debug, thiserror::Error)]
121#[error("{0}")]
122pub struct UpstreamGraphBackendError(String);
123
124pub struct SocialGraphStore {
125 graph: StdMutex<HeedSocialGraph>,
126 distance_cache: StdMutex<Option<DistanceCache>>,
127 public_events: EventIndexBucket,
128 ambient_events: EventIndexBucket,
129 profile_index: ProfileIndexBucket,
130 profile_index_overmute_threshold: StdMutex<f64>,
131}
132
133pub trait SocialGraphBackend: Send + Sync {
134 fn stats(&self) -> Result<SocialGraphStats>;
135 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
136 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
137 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
138 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
139 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
140 fn profile_search_root(&self) -> Result<Option<Cid>> {
141 Ok(None)
142 }
143 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
144 fn ingest_event(&self, event: &Event) -> Result<()>;
145 fn ingest_event_with_storage_class(
146 &self,
147 event: &Event,
148 storage_class: EventStorageClass,
149 ) -> Result<()> {
150 let _ = storage_class;
151 self.ingest_event(event)
152 }
153 fn ingest_events(&self, events: &[Event]) -> Result<()> {
154 for event in events {
155 self.ingest_event(event)?;
156 }
157 Ok(())
158 }
159 fn ingest_events_with_storage_class(
160 &self,
161 events: &[Event],
162 storage_class: EventStorageClass,
163 ) -> Result<()> {
164 for event in events {
165 self.ingest_event_with_storage_class(event, storage_class)?;
166 }
167 Ok(())
168 }
169 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
170 self.ingest_events(events)
171 }
172 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
173}
174
175#[cfg(test)]
176pub type TestLockGuard = MutexGuard<'static, ()>;
177
178#[cfg(test)]
179static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
180
181#[cfg(test)]
182pub fn test_lock() -> TestLockGuard {
183 NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
184}
185
186pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
187 open_social_graph_store_with_mapsize(data_dir, None)
188}
189
190pub fn open_social_graph_store_with_mapsize(
191 data_dir: &Path,
192 mapsize_bytes: Option<u64>,
193) -> Result<Arc<SocialGraphStore>> {
194 let db_dir = data_dir.join("socialgraph");
195 open_social_graph_store_at_path(&db_dir, mapsize_bytes)
196}
197
198pub fn open_social_graph_store_with_storage(
199 data_dir: &Path,
200 store: Arc<StorageRouter>,
201 mapsize_bytes: Option<u64>,
202) -> Result<Arc<SocialGraphStore>> {
203 let db_dir = data_dir.join("socialgraph");
204 open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
205}
206
207pub fn open_social_graph_store_at_path(
208 db_dir: &Path,
209 mapsize_bytes: Option<u64>,
210) -> Result<Arc<SocialGraphStore>> {
211 let config = hashtree_config::Config::load_or_default();
212 let backend = &config.storage.backend;
213 let local_store = Arc::new(
214 LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
215 .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
216 );
217 let store = Arc::new(StorageRouter::new(local_store));
218 open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
219}
220
221pub fn open_social_graph_store_at_path_with_storage(
222 db_dir: &Path,
223 store: Arc<StorageRouter>,
224 mapsize_bytes: Option<u64>,
225) -> Result<Arc<SocialGraphStore>> {
226 let ambient_backend = store.local_store().backend();
227 let ambient_local = Arc::new(
228 LocalStore::new_with_lmdb_map_size(
229 db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
230 &ambient_backend,
231 mapsize_bytes,
232 )
233 .map_err(|err| {
234 anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
235 })?,
236 );
237 let ambient_store = Arc::new(StorageRouter::new(ambient_local));
238 open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
239}
240
241pub fn open_social_graph_store_at_path_with_storage_split(
242 db_dir: &Path,
243 public_store: Arc<StorageRouter>,
244 ambient_store: Arc<StorageRouter>,
245 mapsize_bytes: Option<u64>,
246) -> Result<Arc<SocialGraphStore>> {
247 std::fs::create_dir_all(db_dir)?;
248 if let Some(size) = mapsize_bytes {
249 ensure_social_graph_mapsize(db_dir, size)?;
250 }
251 let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
252 .context("open nostr-social-graph heed backend")?;
253
254 Ok(Arc::new(SocialGraphStore {
255 graph: StdMutex::new(graph),
256 distance_cache: StdMutex::new(None),
257 public_events: EventIndexBucket {
258 event_store: NostrEventStore::new(Arc::clone(&public_store)),
259 root_path: db_dir.join(EVENTS_ROOT_FILE),
260 },
261 ambient_events: EventIndexBucket {
262 event_store: NostrEventStore::new(ambient_store),
263 root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
264 },
265 profile_index: ProfileIndexBucket {
266 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
267 index: BTree::new(
268 public_store,
269 hashtree_index::BTreeOptions {
270 order: Some(PROFILE_SEARCH_INDEX_ORDER),
271 },
272 ),
273 by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
274 search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
275 },
276 profile_index_overmute_threshold: StdMutex::new(1.0),
277 }))
278}
279
280pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
281 if let Err(err) = store.set_root(pk_bytes) {
282 tracing::warn!("Failed to set social graph root: {err}");
283 }
284}
285
286pub fn get_follow_distance(
287 backend: &(impl SocialGraphBackend + ?Sized),
288 pk_bytes: &[u8; 32],
289) -> Option<u32> {
290 backend.follow_distance(pk_bytes).ok().flatten()
291}
292
293pub fn get_follows(
294 backend: &(impl SocialGraphBackend + ?Sized),
295 pk_bytes: &[u8; 32],
296) -> Vec<[u8; 32]> {
297 match backend.followed_targets(pk_bytes) {
298 Ok(set) => set.into_iter().collect(),
299 Err(_) => Vec::new(),
300 }
301}
302
303pub fn is_overmuted(
304 backend: &(impl SocialGraphBackend + ?Sized),
305 _root_pk: &[u8; 32],
306 user_pk: &[u8; 32],
307 threshold: f64,
308) -> bool {
309 backend
310 .is_overmuted_user(user_pk, threshold)
311 .unwrap_or(false)
312}
313
314pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
315 let event = match Event::from_json(event_json) {
316 Ok(event) => event,
317 Err(_) => return,
318 };
319
320 if let Err(err) = backend.ingest_event(&event) {
321 tracing::warn!("Failed to ingest social graph event: {err}");
322 }
323}
324
325pub fn ingest_parsed_event(
326 backend: &(impl SocialGraphBackend + ?Sized),
327 event: &Event,
328) -> Result<()> {
329 backend.ingest_event(event)
330}
331
332pub fn ingest_parsed_event_with_storage_class(
333 backend: &(impl SocialGraphBackend + ?Sized),
334 event: &Event,
335 storage_class: EventStorageClass,
336) -> Result<()> {
337 backend.ingest_event_with_storage_class(event, storage_class)
338}
339
340pub fn ingest_parsed_events(
341 backend: &(impl SocialGraphBackend + ?Sized),
342 events: &[Event],
343) -> Result<()> {
344 backend.ingest_events(events)
345}
346
347pub fn ingest_parsed_events_with_storage_class(
348 backend: &(impl SocialGraphBackend + ?Sized),
349 events: &[Event],
350 storage_class: EventStorageClass,
351) -> Result<()> {
352 backend.ingest_events_with_storage_class(events, storage_class)
353}
354
355pub fn ingest_graph_parsed_events(
356 backend: &(impl SocialGraphBackend + ?Sized),
357 events: &[Event],
358) -> Result<()> {
359 backend.ingest_graph_events(events)
360}
361
362pub fn query_events(
363 backend: &(impl SocialGraphBackend + ?Sized),
364 filter: &Filter,
365 limit: usize,
366) -> Vec<Event> {
367 backend.query_events(filter, limit).unwrap_or_default()
368}
369
370impl SocialGraphStore {
371 pub fn set_profile_index_overmute_threshold(&self, threshold: f64) {
372 *self
373 .profile_index_overmute_threshold
374 .lock()
375 .expect("profile index overmute threshold") = threshold;
376 }
377
378 fn profile_index_overmute_threshold(&self) -> f64 {
379 *self
380 .profile_index_overmute_threshold
381 .lock()
382 .expect("profile index overmute threshold")
383 }
384
385 fn invalidate_distance_cache(&self) {
386 *self.distance_cache.lock().unwrap() = None;
387 }
388
389 fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
390 let unique_ids = state
391 .unique_ids
392 .into_iter()
393 .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
394 .collect::<Result<HashMap<_, _>>>()?;
395
396 let mut users_by_distance = BTreeMap::new();
397 let mut size_by_distance = BTreeMap::new();
398 for (distance, users) in state.users_by_follow_distance {
399 let decoded = users
400 .into_iter()
401 .filter_map(|id| unique_ids.get(&id).copied())
402 .collect::<Vec<_>>();
403 size_by_distance.insert(distance, decoded.len());
404 users_by_distance.insert(distance, decoded);
405 }
406
407 let total_follows = state
408 .followed_by_user
409 .iter()
410 .map(|(_, targets)| targets.len())
411 .sum::<usize>();
412 let total_users = size_by_distance.values().copied().sum();
413 let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
414
415 Ok(DistanceCache {
416 stats: SocialGraphStats {
417 total_users,
418 root: Some(state.root),
419 total_follows,
420 max_depth,
421 size_by_distance,
422 enabled: true,
423 },
424 users_by_distance,
425 })
426 }
427
428 fn load_distance_cache(&self) -> Result<DistanceCache> {
429 if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
430 return Ok(cache);
431 }
432
433 let state = {
434 let graph = self.graph.lock().unwrap();
435 graph.export_state().context("export social graph state")?
436 };
437 let cache = Self::build_distance_cache(state)?;
438 *self.distance_cache.lock().unwrap() = Some(cache.clone());
439 Ok(cache)
440 }
441
442 fn set_root(&self, root: &[u8; 32]) -> Result<()> {
443 let root_hex = hex::encode(root);
444 {
445 let mut graph = self.graph.lock().unwrap();
446 if should_replace_placeholder_root(&graph)? {
447 let fresh = SocialGraph::new(&root_hex);
448 graph
449 .replace_state(&fresh.export_state())
450 .context("replace placeholder social graph root")?;
451 } else {
452 graph
453 .set_root(&root_hex)
454 .context("set nostr-social-graph root")?;
455 }
456 }
457 self.invalidate_distance_cache();
458 Ok(())
459 }
460
461 fn stats(&self) -> Result<SocialGraphStats> {
462 Ok(self.load_distance_cache()?.stats)
463 }
464
465 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
466 let graph = self.graph.lock().unwrap();
467 let distance = graph
468 .get_follow_distance(&hex::encode(pk_bytes))
469 .context("read social graph follow distance")?;
470 Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
471 }
472
473 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
474 Ok(self
475 .load_distance_cache()?
476 .users_by_distance
477 .get(&distance)
478 .cloned()
479 .unwrap_or_default())
480 }
481
482 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
483 let graph = self.graph.lock().unwrap();
484 graph
485 .get_follow_list_created_at(&hex::encode(owner))
486 .context("read social graph follow list timestamp")
487 }
488
489 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
490 let graph = self.graph.lock().unwrap();
491 decode_pubkey_set(
492 graph
493 .get_followed_by_user(&hex::encode(owner))
494 .context("read followed targets")?,
495 )
496 }
497
498 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
499 if threshold <= 0.0 {
500 return Ok(false);
501 }
502 let graph = self.graph.lock().unwrap();
503 graph
504 .is_overmuted(&hex::encode(user_pk), threshold)
505 .context("check social graph overmute")
506 }
507
508 #[cfg_attr(not(test), allow(dead_code))]
509 pub fn profile_search_root(&self) -> Result<Option<Cid>> {
510 self.profile_index.search_root()
511 }
512
513 pub(crate) fn public_events_root(&self) -> Result<Option<Cid>> {
514 self.public_events.events_root()
515 }
516
517 pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
518 self.public_events.write_events_root(root)
519 }
520
521 #[cfg_attr(not(test), allow(dead_code))]
522 pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
523 self.profile_index.profile_event_for_pubkey(pubkey_hex)
524 }
525
526 #[cfg_attr(not(test), allow(dead_code))]
527 pub fn profile_search_entries_for_prefix(
528 &self,
529 prefix: &str,
530 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
531 self.profile_index.search_entries_for_prefix(prefix)
532 }
533
534 pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
535 self.update_profile_index_for_events(events)
536 }
537
538 pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
539 let latest_by_pubkey = self.filtered_latest_metadata_events_by_pubkey(events)?;
540 let (by_pubkey_root, search_root) = self
541 .profile_index
542 .rebuild_profile_events(latest_by_pubkey.into_values())?;
543 self.profile_index
544 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
545 self.profile_index.write_search_root(search_root.as_ref())?;
546 Ok(())
547 }
548
549 pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
550 let public_events_root = self.public_events.events_root()?;
551 let ambient_events_root = self.ambient_events.events_root()?;
552 if public_events_root.is_none() && ambient_events_root.is_none() {
553 self.profile_index.write_by_pubkey_root(None)?;
554 self.profile_index.write_search_root(None)?;
555 return Ok(0);
556 }
557
558 let mut events = Vec::new();
559 for (bucket, root) in [
560 (&self.public_events, public_events_root),
561 (&self.ambient_events, ambient_events_root),
562 ] {
563 let Some(root) = root else {
564 continue;
565 };
566 let stored = block_on(bucket.event_store.list_by_kind_lossy(
567 Some(&root),
568 Kind::Metadata.as_u16() as u32,
569 ListEventsOptions::default(),
570 ))
571 .map_err(map_event_store_error)?;
572 events.extend(
573 stored
574 .into_iter()
575 .map(nostr_event_from_stored)
576 .collect::<Result<Vec<_>>>()?,
577 );
578 }
579
580 let latest_count = self.filtered_latest_metadata_events_by_pubkey(&events)?.len();
581 self.rebuild_profile_index_for_events(&events)?;
582 Ok(latest_count)
583 }
584
585 fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
586 let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
587 let threshold = self.profile_index_overmute_threshold();
588
589 if latest_by_pubkey.is_empty() {
590 return Ok(());
591 }
592
593 let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
594 let mut search_root = self.profile_index.search_root()?;
595 let mut changed = false;
596
597 for event in latest_by_pubkey.into_values() {
598 let overmuted = self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)?;
599 let (next_by_pubkey_root, next_search_root, updated) = if overmuted {
600 self.profile_index.remove_profile_event(
601 by_pubkey_root.as_ref(),
602 search_root.as_ref(),
603 &event.pubkey.to_hex(),
604 )?
605 } else {
606 self.profile_index
607 .update_profile_event(by_pubkey_root.as_ref(), search_root.as_ref(), event)?
608 };
609 if updated {
610 by_pubkey_root = next_by_pubkey_root;
611 search_root = next_search_root;
612 changed = true;
613 }
614 }
615
616 if changed {
617 self.profile_index
618 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
619 self.profile_index.write_search_root(search_root.as_ref())?;
620 }
621
622 Ok(())
623 }
624
625 fn filtered_latest_metadata_events_by_pubkey<'a>(
626 &self,
627 events: &'a [Event],
628 ) -> Result<BTreeMap<String, &'a Event>> {
629 let threshold = self.profile_index_overmute_threshold();
630 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
631 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
632 if self.is_overmuted_user(&event.pubkey.to_bytes(), threshold)? {
633 continue;
634 }
635 let pubkey = event.pubkey.to_hex();
636 match latest_by_pubkey.get(&pubkey) {
637 Some(current) if compare_nostr_events(event, current).is_le() => {}
638 _ => {
639 latest_by_pubkey.insert(pubkey, event);
640 }
641 }
642 }
643 Ok(latest_by_pubkey)
644 }
645
646 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
647 let state = {
648 let graph = self.graph.lock().unwrap();
649 graph.export_state().context("export social graph state")?
650 };
651 let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
652 let root_hex = hex::encode(root);
653 if graph.get_root() != root_hex {
654 graph
655 .set_root(&root_hex)
656 .context("set snapshot social graph root")?;
657 }
658 let chunks = graph
659 .to_binary_chunks_with_budget(*options)
660 .context("encode social graph snapshot")?;
661 Ok(chunks.into_iter().map(Bytes::from).collect())
662 }
663
664 fn ingest_event(&self, event: &Event) -> Result<()> {
665 self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
666 }
667
668 fn ingest_events(&self, events: &[Event]) -> Result<()> {
669 if events.is_empty() {
670 return Ok(());
671 }
672
673 let mut public = Vec::new();
674 let mut ambient = Vec::new();
675 for event in events {
676 match self.default_storage_class_for(event)? {
677 EventStorageClass::Public => public.push(event.clone()),
678 EventStorageClass::Ambient => ambient.push(event.clone()),
679 }
680 }
681
682 if !public.is_empty() {
683 self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
684 }
685 if !ambient.is_empty() {
686 self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
687 }
688
689 Ok(())
690 }
691
692 fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
693 let graph_events = events
694 .iter()
695 .filter(|event| is_social_graph_event(event.kind))
696 .collect::<Vec<_>>();
697 if graph_events.is_empty() {
698 return Ok(());
699 }
700
701 {
702 let mut graph = self.graph.lock().unwrap();
703 let mut snapshot = SocialGraph::from_state(
704 graph
705 .export_state()
706 .context("export social graph state for graph-only ingest")?,
707 )
708 .context("rebuild social graph state for graph-only ingest")?;
709 for event in graph_events {
710 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
711 }
712 graph
713 .replace_state(&snapshot.export_state())
714 .context("replace graph-only social graph state")?;
715 }
716 self.invalidate_distance_cache();
717 Ok(())
718 }
719
720 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
721 self.query_events_in_scope(filter, limit, EventQueryScope::All)
722 }
723
724 fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
725 let graph = self.graph.lock().unwrap();
726 let root_hex = graph.get_root().context("read social graph root")?;
727 if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
728 return Ok(EventStorageClass::Public);
729 }
730 Ok(EventStorageClass::Ambient)
731 }
732
733 fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
734 match storage_class {
735 EventStorageClass::Public => &self.public_events,
736 EventStorageClass::Ambient => &self.ambient_events,
737 }
738 }
739
740 fn ingest_event_with_storage_class(
741 &self,
742 event: &Event,
743 storage_class: EventStorageClass,
744 ) -> Result<()> {
745 let current_root = self.bucket(storage_class).events_root()?;
746 let next_root = self
747 .bucket(storage_class)
748 .store_event(current_root.as_ref(), event)?;
749 self.bucket(storage_class)
750 .write_events_root(Some(&next_root))?;
751
752 self.update_profile_index_for_events(std::slice::from_ref(event))?;
753
754 if is_social_graph_event(event.kind) {
755 {
756 let mut graph = self.graph.lock().unwrap();
757 graph
758 .handle_event(&graph_event_from_nostr(event), true, 0.0)
759 .context("ingest social graph event into nostr-social-graph")?;
760 }
761 self.invalidate_distance_cache();
762 }
763
764 Ok(())
765 }
766
767 fn ingest_events_with_storage_class(
768 &self,
769 events: &[Event],
770 storage_class: EventStorageClass,
771 ) -> Result<()> {
772 if events.is_empty() {
773 return Ok(());
774 }
775
776 let bucket = self.bucket(storage_class);
777 let current_root = bucket.events_root()?;
778 let stored_events = events
779 .iter()
780 .map(stored_event_from_nostr)
781 .collect::<Vec<_>>();
782 let next_root = block_on(
783 bucket
784 .event_store
785 .build(current_root.as_ref(), stored_events),
786 )
787 .map_err(map_event_store_error)?;
788 bucket.write_events_root(next_root.as_ref())?;
789
790 self.update_profile_index_for_events(events)?;
791
792 let graph_events = events
793 .iter()
794 .filter(|event| is_social_graph_event(event.kind))
795 .collect::<Vec<_>>();
796 if graph_events.is_empty() {
797 return Ok(());
798 }
799
800 {
801 let mut graph = self.graph.lock().unwrap();
802 let mut snapshot = SocialGraph::from_state(
803 graph
804 .export_state()
805 .context("export social graph state for batch ingest")?,
806 )
807 .context("rebuild social graph state for batch ingest")?;
808 for event in graph_events {
809 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
810 }
811 graph
812 .replace_state(&snapshot.export_state())
813 .context("replace batched social graph state")?;
814 }
815 self.invalidate_distance_cache();
816
817 Ok(())
818 }
819
820 pub(crate) fn query_events_in_scope(
821 &self,
822 filter: &Filter,
823 limit: usize,
824 scope: EventQueryScope,
825 ) -> Result<Vec<Event>> {
826 if limit == 0 {
827 return Ok(Vec::new());
828 }
829
830 let buckets: &[&EventIndexBucket] = match scope {
831 EventQueryScope::PublicOnly => &[&self.public_events],
832 EventQueryScope::AmbientOnly => &[&self.ambient_events],
833 EventQueryScope::All => &[&self.public_events, &self.ambient_events],
834 };
835
836 let mut candidates = Vec::new();
837 for bucket in buckets {
838 candidates.extend(bucket.query_events(filter, limit)?);
839 }
840
841 let mut deduped = dedupe_events(candidates);
842 deduped.retain(|event| filter.match_event(event));
843 deduped.truncate(limit);
844 Ok(deduped)
845 }
846}
847
848impl EventIndexBucket {
849 fn events_root(&self) -> Result<Option<Cid>> {
850 let _profile = NostrProfileGuard::new("socialgraph.events_root.read");
851 read_root_file(&self.root_path)
852 }
853
854 fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
855 let _profile = NostrProfileGuard::new("socialgraph.events_root.write");
856 write_root_file(&self.root_path, root)
857 }
858
859 fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
860 let stored = stored_event_from_nostr(event);
861 let _profile = NostrProfileGuard::new("socialgraph.event_store.add");
862 block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
863 }
864
865 fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
866 let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
867 .map_err(map_event_store_error)?;
868 stored.map(nostr_event_from_stored).transpose()
869 }
870
871 fn load_events_for_author(
872 &self,
873 root: &Cid,
874 author: &nostr::PublicKey,
875 filter: &Filter,
876 limit: usize,
877 exact: bool,
878 ) -> Result<Vec<Event>> {
879 let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
880 if kinds.len() == 1 {
881 kinds.iter().next().map(|kind| kind.as_u16() as u32)
882 } else {
883 None
884 }
885 });
886 let author_hex = author.to_hex();
887 let options = filter_list_options(filter, limit, exact);
888 let stored = match kind_filter {
889 Some(kind) => block_on(self.event_store.list_by_author_and_kind(
890 Some(root),
891 &author_hex,
892 kind,
893 options.clone(),
894 ))
895 .map_err(map_event_store_error)?,
896 None => block_on(
897 self.event_store
898 .list_by_author(Some(root), &author_hex, options),
899 )
900 .map_err(map_event_store_error)?,
901 };
902 stored
903 .into_iter()
904 .map(nostr_event_from_stored)
905 .collect::<Result<Vec<_>>>()
906 }
907
908 fn load_events_for_kind(
909 &self,
910 root: &Cid,
911 kind: Kind,
912 filter: &Filter,
913 limit: usize,
914 exact: bool,
915 ) -> Result<Vec<Event>> {
916 let stored = block_on(self.event_store.list_by_kind(
917 Some(root),
918 kind.as_u16() as u32,
919 filter_list_options(filter, limit, exact),
920 ))
921 .map_err(map_event_store_error)?;
922 stored
923 .into_iter()
924 .map(nostr_event_from_stored)
925 .collect::<Result<Vec<_>>>()
926 }
927
928 fn load_recent_events(
929 &self,
930 root: &Cid,
931 filter: &Filter,
932 limit: usize,
933 exact: bool,
934 ) -> Result<Vec<Event>> {
935 let stored = block_on(
936 self.event_store
937 .list_recent(Some(root), filter_list_options(filter, limit, exact)),
938 )
939 .map_err(map_event_store_error)?;
940 stored
941 .into_iter()
942 .map(nostr_event_from_stored)
943 .collect::<Result<Vec<_>>>()
944 }
945
946 fn load_events_for_tag(
947 &self,
948 root: &Cid,
949 tag_name: &str,
950 values: &[String],
951 filter: &Filter,
952 limit: usize,
953 exact: bool,
954 ) -> Result<Vec<Event>> {
955 let mut events = Vec::new();
956 let options = filter_list_options(filter, limit, exact);
957 for value in values {
958 let stored = block_on(self.event_store.list_by_tag(
959 Some(root),
960 tag_name,
961 value,
962 options.clone(),
963 ))
964 .map_err(map_event_store_error)?;
965 events.extend(
966 stored
967 .into_iter()
968 .map(nostr_event_from_stored)
969 .collect::<Result<Vec<_>>>()?,
970 );
971 }
972 Ok(dedupe_events(events))
973 }
974
975 fn choose_tag_source(&self, filter: &Filter) -> Option<(String, Vec<String>)> {
976 filter
977 .generic_tags
978 .iter()
979 .min_by_key(|(_, values)| values.len())
980 .map(|(tag, values)| {
981 (
982 tag.as_char().to_ascii_lowercase().to_string(),
983 values.iter().cloned().collect(),
984 )
985 })
986 }
987
988 fn load_major_index_candidates(
989 &self,
990 root: &Cid,
991 filter: &Filter,
992 limit: usize,
993 ) -> Result<Option<Vec<Event>>> {
994 if let Some(events) = self.load_direct_replaceable_candidates(root, filter)? {
995 return Ok(Some(events));
996 }
997
998 if let Some((tag_name, values)) = self.choose_tag_source(filter) {
999 let exact = filter.authors.is_none()
1000 && filter.kinds.is_none()
1001 && filter.search.is_none()
1002 && filter.generic_tags.len() == 1;
1003 return Ok(Some(self.load_events_for_tag(
1004 root, &tag_name, &values, filter, limit, exact,
1005 )?));
1006 }
1007
1008 if let (Some(authors), Some(kinds)) = (filter.authors.as_ref(), filter.kinds.as_ref()) {
1009 if authors.len() == 1 && kinds.len() == 1 {
1010 let author = authors.iter().next().expect("checked single author");
1011 let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1012 return Ok(Some(
1013 self.load_events_for_author(root, author, filter, limit, exact)?,
1014 ));
1015 }
1016
1017 if kinds.len() < authors.len() {
1018 let mut events = Vec::new();
1019 for kind in kinds {
1020 events.extend(self.load_events_for_kind(root, *kind, filter, limit, false)?);
1021 }
1022 return Ok(Some(dedupe_events(events)));
1023 }
1024
1025 let mut events = Vec::new();
1026 for author in authors {
1027 events.extend(self.load_events_for_author(root, author, filter, limit, false)?);
1028 }
1029 return Ok(Some(dedupe_events(events)));
1030 }
1031
1032 if let Some(authors) = filter.authors.as_ref() {
1033 let mut events = Vec::new();
1034 let exact = filter.generic_tags.is_empty() && filter.search.is_none();
1035 for author in authors {
1036 events.extend(self.load_events_for_author(root, author, filter, limit, exact)?);
1037 }
1038 return Ok(Some(dedupe_events(events)));
1039 }
1040
1041 if let Some(kinds) = filter.kinds.as_ref() {
1042 let mut events = Vec::new();
1043 let exact = filter.authors.is_none()
1044 && filter.generic_tags.is_empty()
1045 && filter.search.is_none();
1046 for kind in kinds {
1047 events.extend(self.load_events_for_kind(root, *kind, filter, limit, exact)?);
1048 }
1049 return Ok(Some(dedupe_events(events)));
1050 }
1051
1052 Ok(None)
1053 }
1054
1055 fn load_direct_replaceable_candidates(
1056 &self,
1057 root: &Cid,
1058 filter: &Filter,
1059 ) -> Result<Option<Vec<Event>>> {
1060 let Some(authors) = filter.authors.as_ref() else {
1061 return Ok(None);
1062 };
1063 let Some(kinds) = filter.kinds.as_ref() else {
1064 return Ok(None);
1065 };
1066 if kinds.len() != 1 {
1067 return Ok(None);
1068 }
1069
1070 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
1071
1072 if is_parameterized_replaceable_kind(kind) {
1073 let d_tag = SingleLetterTag::lowercase(nostr::Alphabet::D);
1074 let Some(d_values) = filter.generic_tags.get(&d_tag) else {
1075 return Ok(None);
1076 };
1077 let mut events = Vec::new();
1078 for author in authors {
1079 let author_hex = author.to_hex();
1080 for d_value in d_values {
1081 if let Some(stored) = block_on(self.event_store.get_parameterized_replaceable(
1082 Some(root),
1083 &author_hex,
1084 kind,
1085 d_value,
1086 ))
1087 .map_err(map_event_store_error)?
1088 {
1089 events.push(nostr_event_from_stored(stored)?);
1090 }
1091 }
1092 }
1093 return Ok(Some(dedupe_events(events)));
1094 }
1095
1096 if is_replaceable_kind(kind) {
1097 let mut events = Vec::new();
1098 for author in authors {
1099 if let Some(stored) = block_on(self.event_store.get_replaceable(
1100 Some(root),
1101 &author.to_hex(),
1102 kind,
1103 ))
1104 .map_err(map_event_store_error)?
1105 {
1106 events.push(nostr_event_from_stored(stored)?);
1107 }
1108 }
1109 return Ok(Some(dedupe_events(events)));
1110 }
1111
1112 Ok(None)
1113 }
1114
1115 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1116 if limit == 0 {
1117 return Ok(Vec::new());
1118 }
1119
1120 let events_root = self.events_root()?;
1121 let Some(root) = events_root.as_ref() else {
1122 return Ok(Vec::new());
1123 };
1124 let mut candidates = Vec::new();
1125 let mut seen: HashSet<[u8; 32]> = HashSet::new();
1126
1127 if let Some(ids) = filter.ids.as_ref() {
1128 for id in ids {
1129 let id_bytes = id.to_bytes();
1130 if !seen.insert(id_bytes) {
1131 continue;
1132 }
1133 if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
1134 if filter.match_event(&event) {
1135 candidates.push(event);
1136 }
1137 }
1138 if candidates.len() >= limit {
1139 break;
1140 }
1141 }
1142 } else {
1143 let base_events = match self.load_major_index_candidates(root, filter, limit)? {
1144 Some(events) => events,
1145 None => self.load_recent_events(
1146 root,
1147 filter,
1148 limit,
1149 filter.authors.is_none()
1150 && filter.kinds.is_none()
1151 && filter.generic_tags.is_empty()
1152 && filter.search.is_none(),
1153 )?,
1154 };
1155
1156 for event in base_events {
1157 let id_bytes = event.id.to_bytes();
1158 if !seen.insert(id_bytes) {
1159 continue;
1160 }
1161 if filter.match_event(&event) {
1162 candidates.push(event);
1163 }
1164 if candidates.len() >= limit {
1165 break;
1166 }
1167 }
1168 }
1169
1170 candidates.sort_by(|a, b| {
1171 b.created_at
1172 .as_u64()
1173 .cmp(&a.created_at.as_u64())
1174 .then_with(|| a.id.cmp(&b.id))
1175 });
1176 candidates.truncate(limit);
1177 Ok(candidates)
1178 }
1179}
1180
1181impl ProfileIndexBucket {
1182 fn by_pubkey_root(&self) -> Result<Option<Cid>> {
1183 let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.read");
1184 read_root_file(&self.by_pubkey_root_path)
1185 }
1186
1187 fn search_root(&self) -> Result<Option<Cid>> {
1188 let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.read");
1189 read_root_file(&self.search_root_path)
1190 }
1191
1192 fn write_by_pubkey_root(&self, root: Option<&Cid>) -> Result<()> {
1193 let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.write");
1194 write_root_file(&self.by_pubkey_root_path, root)
1195 }
1196
1197 fn write_search_root(&self, root: Option<&Cid>) -> Result<()> {
1198 let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.write");
1199 write_root_file(&self.search_root_path, root)
1200 }
1201
1202 fn mirror_profile_event(&self, event: &Event) -> Result<Cid> {
1203 let bytes = event.as_json().into_bytes();
1204 block_on(self.tree.put_file(&bytes))
1205 .map(|(cid, _size)| cid)
1206 .context("store mirrored profile event")
1207 }
1208
1209 fn load_profile_event(&self, cid: &Cid) -> Result<Option<Event>> {
1210 let bytes = block_on(self.tree.get(cid, None)).context("read mirrored profile event")?;
1211 let Some(bytes) = bytes else {
1212 return Ok(None);
1213 };
1214 let json = String::from_utf8(bytes).context("decode mirrored profile event as utf-8")?;
1215 Ok(Some(
1216 Event::from_json(json).context("decode mirrored profile event json")?,
1217 ))
1218 }
1219
1220 #[cfg_attr(not(test), allow(dead_code))]
1221 fn profile_event_for_pubkey(&self, pubkey_hex: &str) -> Result<Option<Event>> {
1222 let root = self.by_pubkey_root()?;
1223 let Some(cid) = block_on(self.index.get_link(root.as_ref(), pubkey_hex))
1224 .context("read mirrored profile event cid by pubkey")?
1225 else {
1226 return Ok(None);
1227 };
1228 self.load_profile_event(&cid)
1229 }
1230
1231 #[cfg_attr(not(test), allow(dead_code))]
1232 fn search_entries_for_prefix(
1233 &self,
1234 prefix: &str,
1235 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
1236 let Some(root) = self.search_root()? else {
1237 return Ok(Vec::new());
1238 };
1239 let entries =
1240 block_on(self.index.prefix(&root, prefix)).context("query profile search prefix")?;
1241 entries
1242 .into_iter()
1243 .map(|(key, value)| {
1244 let entry = serde_json::from_str(&value)
1245 .context("decode stored profile search entry json")?;
1246 Ok((key, entry))
1247 })
1248 .collect()
1249 }
1250
1251 fn rebuild_profile_events<'a, I>(&self, events: I) -> Result<(Option<Cid>, Option<Cid>)>
1252 where
1253 I: IntoIterator<Item = &'a Event>,
1254 {
1255 let mut by_pubkey_entries = Vec::<(String, Cid)>::new();
1256 let mut search_entries = Vec::<(String, String)>::new();
1257
1258 for event in events {
1259 let pubkey = event.pubkey.to_hex();
1260 let mirrored_cid = self.mirror_profile_event(event)?;
1261 let search_value =
1262 serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1263 by_pubkey_entries.push((pubkey.clone(), mirrored_cid.clone()));
1264 for term in profile_search_terms_for_event(event) {
1265 search_entries.push((
1266 format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1267 search_value.clone(),
1268 ));
1269 }
1270 }
1271
1272 let by_pubkey_root = block_on(self.index.build_links(by_pubkey_entries))
1273 .context("bulk build mirrored profile-by-pubkey index")?;
1274 let search_root = block_on(self.index.build(search_entries))
1275 .context("bulk build mirrored profile search index")?;
1276 Ok((by_pubkey_root, search_root))
1277 }
1278
1279 fn update_profile_event(
1280 &self,
1281 by_pubkey_root: Option<&Cid>,
1282 search_root: Option<&Cid>,
1283 event: &Event,
1284 ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1285 let pubkey = event.pubkey.to_hex();
1286 let existing_cid = block_on(self.index.get_link(by_pubkey_root, &pubkey))
1287 .context("lookup existing mirrored profile event")?;
1288
1289 let existing_event = match existing_cid.as_ref() {
1290 Some(cid) => self.load_profile_event(cid)?,
1291 None => None,
1292 };
1293
1294 if existing_event
1295 .as_ref()
1296 .is_some_and(|current| compare_nostr_events(event, current).is_le())
1297 {
1298 return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1299 }
1300
1301 let mirrored_cid = self.mirror_profile_event(event)?;
1302 let next_by_pubkey_root = Some(
1303 block_on(
1304 self.index
1305 .insert_link(by_pubkey_root, &pubkey, &mirrored_cid),
1306 )
1307 .context("write mirrored profile event index")?,
1308 );
1309
1310 let mut next_search_root = search_root.cloned();
1311 if let Some(current) = existing_event.as_ref() {
1312 for term in profile_search_terms_for_event(current) {
1313 let Some(root) = next_search_root.as_ref() else {
1314 break;
1315 };
1316 next_search_root = block_on(
1317 self.index
1318 .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1319 )
1320 .context("remove stale profile search term")?;
1321 }
1322 }
1323
1324 let search_value =
1325 serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1326 for term in profile_search_terms_for_event(event) {
1327 next_search_root = Some(
1328 block_on(self.index.insert(
1329 next_search_root.as_ref(),
1330 &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1331 &search_value,
1332 ))
1333 .context("write profile search term")?,
1334 );
1335 }
1336
1337 Ok((next_by_pubkey_root, next_search_root, true))
1338 }
1339
1340 fn remove_profile_event(
1341 &self,
1342 by_pubkey_root: Option<&Cid>,
1343 search_root: Option<&Cid>,
1344 pubkey: &str,
1345 ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1346 let existing_cid = block_on(self.index.get_link(by_pubkey_root, pubkey))
1347 .context("lookup mirrored profile event for removal")?;
1348 let Some(existing_cid) = existing_cid else {
1349 return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1350 };
1351
1352 let existing_event = self.load_profile_event(&existing_cid)?;
1353 let next_by_pubkey_root = match by_pubkey_root {
1354 Some(root) => block_on(self.index.delete(root, pubkey))
1355 .context("remove mirrored profile-by-pubkey entry")?,
1356 None => None,
1357 };
1358
1359 let mut next_search_root = search_root.cloned();
1360 if let Some(current) = existing_event.as_ref() {
1361 for term in profile_search_terms_for_event(current) {
1362 let Some(root) = next_search_root.as_ref() else {
1363 break;
1364 };
1365 next_search_root = block_on(
1366 self.index
1367 .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1368 )
1369 .context("remove overmuted profile search term")?;
1370 }
1371 }
1372
1373 Ok((next_by_pubkey_root, next_search_root, true))
1374 }
1375}
1376
1377fn latest_metadata_events_by_pubkey<'a>(events: &'a [Event]) -> BTreeMap<String, &'a Event> {
1378 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
1379 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
1380 let pubkey = event.pubkey.to_hex();
1381 match latest_by_pubkey.get(&pubkey) {
1382 Some(current) if compare_nostr_events(event, current).is_le() => {}
1383 _ => {
1384 latest_by_pubkey.insert(pubkey, event);
1385 }
1386 }
1387 }
1388 latest_by_pubkey
1389}
1390
1391fn serialize_profile_search_entry(entry: &StoredProfileSearchEntry) -> Result<String> {
1392 serde_json::to_string(entry).context("encode stored profile search entry json")
1393}
1394
1395fn cid_to_nhash(cid: &Cid) -> Result<String> {
1396 nhash_encode_full(&NHashData {
1397 hash: cid.hash,
1398 decrypt_key: cid.key,
1399 })
1400 .context("encode mirrored profile event nhash")
1401}
1402
1403fn build_profile_search_entry(
1404 event: &Event,
1405 mirrored_cid: &Cid,
1406) -> Result<StoredProfileSearchEntry> {
1407 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1408 Ok(serde_json::Value::Object(profile)) => profile,
1409 _ => serde_json::Map::new(),
1410 };
1411 let names = extract_profile_names(&profile);
1412 let primary_name = names.first().cloned();
1413 let nip05 = normalize_profile_nip05(&profile, primary_name.as_deref());
1414 let name = primary_name
1415 .clone()
1416 .or_else(|| nip05.clone())
1417 .unwrap_or_else(|| event.pubkey.to_hex());
1418
1419 Ok(StoredProfileSearchEntry {
1420 pubkey: event.pubkey.to_hex(),
1421 name,
1422 aliases: names.into_iter().skip(1).collect(),
1423 nip05,
1424 created_at: event.created_at.as_u64(),
1425 event_nhash: cid_to_nhash(mirrored_cid)?,
1426 })
1427}
1428
1429fn filter_list_options(filter: &Filter, limit: usize, exact: bool) -> ListEventsOptions {
1430 ListEventsOptions {
1431 limit: exact.then_some(limit.max(1)),
1432 since: filter.since.map(|timestamp| timestamp.as_u64()),
1433 until: filter.until.map(|timestamp| timestamp.as_u64()),
1434 }
1435}
1436
1437fn dedupe_events(events: Vec<Event>) -> Vec<Event> {
1438 let mut seen = HashSet::new();
1439 let mut deduped = Vec::new();
1440 for event in events {
1441 if seen.insert(event.id.to_bytes()) {
1442 deduped.push(event);
1443 }
1444 }
1445 deduped.sort_by(|a, b| {
1446 b.created_at
1447 .as_u64()
1448 .cmp(&a.created_at.as_u64())
1449 .then_with(|| a.id.cmp(&b.id))
1450 });
1451 deduped
1452}
1453
1454impl SocialGraphBackend for SocialGraphStore {
1455 fn stats(&self) -> Result<SocialGraphStats> {
1456 SocialGraphStore::stats(self)
1457 }
1458
1459 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1460 SocialGraphStore::users_by_follow_distance(self, distance)
1461 }
1462
1463 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1464 SocialGraphStore::follow_distance(self, pk_bytes)
1465 }
1466
1467 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1468 SocialGraphStore::follow_list_created_at(self, owner)
1469 }
1470
1471 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1472 SocialGraphStore::followed_targets(self, owner)
1473 }
1474
1475 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1476 SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1477 }
1478
1479 fn profile_search_root(&self) -> Result<Option<Cid>> {
1480 SocialGraphStore::profile_search_root(self)
1481 }
1482
1483 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1484 SocialGraphStore::snapshot_chunks(self, root, options)
1485 }
1486
1487 fn ingest_event(&self, event: &Event) -> Result<()> {
1488 SocialGraphStore::ingest_event(self, event)
1489 }
1490
1491 fn ingest_event_with_storage_class(
1492 &self,
1493 event: &Event,
1494 storage_class: EventStorageClass,
1495 ) -> Result<()> {
1496 SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1497 }
1498
1499 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1500 SocialGraphStore::ingest_events(self, events)
1501 }
1502
1503 fn ingest_events_with_storage_class(
1504 &self,
1505 events: &[Event],
1506 storage_class: EventStorageClass,
1507 ) -> Result<()> {
1508 SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1509 }
1510
1511 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1512 SocialGraphStore::apply_graph_events_only(self, events)
1513 }
1514
1515 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1516 SocialGraphStore::query_events(self, filter, limit)
1517 }
1518}
1519
1520impl NostrSocialGraphBackend for SocialGraphStore {
1521 type Error = UpstreamGraphBackendError;
1522
1523 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1524 let graph = self.graph.lock().unwrap();
1525 graph
1526 .get_root()
1527 .context("read social graph root")
1528 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1529 }
1530
1531 fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1532 let root_bytes =
1533 decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1534 SocialGraphStore::set_root(self, &root_bytes)
1535 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1536 }
1537
1538 fn handle_event(
1539 &mut self,
1540 event: &GraphEvent,
1541 allow_unknown_authors: bool,
1542 overmute_threshold: f64,
1543 ) -> std::result::Result<(), Self::Error> {
1544 {
1545 let mut graph = self.graph.lock().unwrap();
1546 graph
1547 .handle_event(event, allow_unknown_authors, overmute_threshold)
1548 .context("ingest social graph event into heed backend")
1549 .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1550 }
1551 self.invalidate_distance_cache();
1552 Ok(())
1553 }
1554
1555 fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1556 let graph = self.graph.lock().unwrap();
1557 graph
1558 .get_follow_distance(user)
1559 .context("read social graph follow distance")
1560 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1561 }
1562
1563 fn is_following(
1564 &self,
1565 follower: &str,
1566 followed_user: &str,
1567 ) -> std::result::Result<bool, Self::Error> {
1568 let graph = self.graph.lock().unwrap();
1569 graph
1570 .is_following(follower, followed_user)
1571 .context("read social graph following edge")
1572 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1573 }
1574
1575 fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1576 let graph = self.graph.lock().unwrap();
1577 graph
1578 .get_followed_by_user(user)
1579 .context("read followed-by-user list")
1580 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1581 }
1582
1583 fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1584 let graph = self.graph.lock().unwrap();
1585 graph
1586 .get_followers_by_user(user)
1587 .context("read followers-by-user list")
1588 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1589 }
1590
1591 fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1592 let graph = self.graph.lock().unwrap();
1593 graph
1594 .get_muted_by_user(user)
1595 .context("read muted-by-user list")
1596 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1597 }
1598
1599 fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1600 let graph = self.graph.lock().unwrap();
1601 graph
1602 .get_user_muted_by(user)
1603 .context("read user-muted-by list")
1604 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1605 }
1606
1607 fn get_follow_list_created_at(
1608 &self,
1609 user: &str,
1610 ) -> std::result::Result<Option<u64>, Self::Error> {
1611 let graph = self.graph.lock().unwrap();
1612 graph
1613 .get_follow_list_created_at(user)
1614 .context("read social graph follow list timestamp")
1615 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1616 }
1617
1618 fn get_mute_list_created_at(
1619 &self,
1620 user: &str,
1621 ) -> std::result::Result<Option<u64>, Self::Error> {
1622 let graph = self.graph.lock().unwrap();
1623 graph
1624 .get_mute_list_created_at(user)
1625 .context("read social graph mute list timestamp")
1626 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1627 }
1628
1629 fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1630 let graph = self.graph.lock().unwrap();
1631 graph
1632 .is_overmuted(user, threshold)
1633 .context("check social graph overmute")
1634 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1635 }
1636}
1637
1638impl<T> SocialGraphBackend for Arc<T>
1639where
1640 T: SocialGraphBackend + ?Sized,
1641{
1642 fn stats(&self) -> Result<SocialGraphStats> {
1643 self.as_ref().stats()
1644 }
1645
1646 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1647 self.as_ref().users_by_follow_distance(distance)
1648 }
1649
1650 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1651 self.as_ref().follow_distance(pk_bytes)
1652 }
1653
1654 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1655 self.as_ref().follow_list_created_at(owner)
1656 }
1657
1658 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1659 self.as_ref().followed_targets(owner)
1660 }
1661
1662 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1663 self.as_ref().is_overmuted_user(user_pk, threshold)
1664 }
1665
1666 fn profile_search_root(&self) -> Result<Option<Cid>> {
1667 self.as_ref().profile_search_root()
1668 }
1669
1670 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1671 self.as_ref().snapshot_chunks(root, options)
1672 }
1673
1674 fn ingest_event(&self, event: &Event) -> Result<()> {
1675 self.as_ref().ingest_event(event)
1676 }
1677
1678 fn ingest_event_with_storage_class(
1679 &self,
1680 event: &Event,
1681 storage_class: EventStorageClass,
1682 ) -> Result<()> {
1683 self.as_ref()
1684 .ingest_event_with_storage_class(event, storage_class)
1685 }
1686
1687 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1688 self.as_ref().ingest_events(events)
1689 }
1690
1691 fn ingest_events_with_storage_class(
1692 &self,
1693 events: &[Event],
1694 storage_class: EventStorageClass,
1695 ) -> Result<()> {
1696 self.as_ref()
1697 .ingest_events_with_storage_class(events, storage_class)
1698 }
1699
1700 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1701 self.as_ref().ingest_graph_events(events)
1702 }
1703
1704 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1705 self.as_ref().query_events(filter, limit)
1706 }
1707}
1708
1709fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1710 if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1711 return Ok(false);
1712 }
1713
1714 let GraphStats {
1715 users,
1716 follows,
1717 mutes,
1718 ..
1719 } = graph.size().context("size social graph")?;
1720 Ok(users <= 1 && follows == 0 && mutes == 0)
1721}
1722
1723fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1724 let mut set = UserSet::new();
1725 for value in values {
1726 set.insert(decode_pubkey(&value)?);
1727 }
1728 Ok(set)
1729}
1730
1731fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1732 let mut bytes = [0u8; 32];
1733 hex::decode_to_slice(value, &mut bytes)
1734 .with_context(|| format!("decode social graph pubkey {value}"))?;
1735 Ok(bytes)
1736}
1737
1738fn is_social_graph_event(kind: Kind) -> bool {
1739 kind == Kind::ContactList || kind == Kind::MuteList
1740}
1741
1742fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1743 GraphEvent {
1744 created_at: event.created_at.as_u64(),
1745 content: event.content.clone(),
1746 tags: event
1747 .tags
1748 .iter()
1749 .map(|tag| tag.as_slice().to_vec())
1750 .collect(),
1751 kind: event.kind.as_u16() as u32,
1752 pubkey: event.pubkey.to_hex(),
1753 id: event.id.to_hex(),
1754 sig: event.sig.to_string(),
1755 }
1756}
1757
1758fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1759 StoredNostrEvent {
1760 id: event.id.to_hex(),
1761 pubkey: event.pubkey.to_hex(),
1762 created_at: event.created_at.as_u64(),
1763 kind: event.kind.as_u16() as u32,
1764 tags: event
1765 .tags
1766 .iter()
1767 .map(|tag| tag.as_slice().to_vec())
1768 .collect(),
1769 content: event.content.clone(),
1770 sig: event.sig.to_string(),
1771 }
1772}
1773
1774fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1775 let value = serde_json::json!({
1776 "id": event.id,
1777 "pubkey": event.pubkey,
1778 "created_at": event.created_at,
1779 "kind": event.kind,
1780 "tags": event.tags,
1781 "content": event.content,
1782 "sig": event.sig,
1783 });
1784 Event::from_json(value.to_string()).context("decode stored nostr event")
1785}
1786
1787pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1788 nostr_event_from_stored(event)
1789}
1790
1791fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1792 rmp_serde::to_vec_named(&StoredCid {
1793 hash: cid.hash,
1794 key: cid.key,
1795 })
1796 .context("encode social graph events root")
1797}
1798
1799fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1800 let stored: StoredCid =
1801 rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1802 Ok(Some(Cid {
1803 hash: stored.hash,
1804 key: stored.key,
1805 }))
1806}
1807
1808fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1809 let Ok(bytes) = std::fs::read(path) else {
1810 return Ok(None);
1811 };
1812 decode_cid(&bytes)
1813}
1814
1815fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1816 let Some(root) = root else {
1817 if path.exists() {
1818 std::fs::remove_file(path)?;
1819 }
1820 return Ok(());
1821 };
1822
1823 let encoded = encode_cid(root)?;
1824 let tmp_path = path.with_extension("tmp");
1825 std::fs::write(&tmp_path, encoded)?;
1826 std::fs::rename(tmp_path, path)?;
1827 Ok(())
1828}
1829
1830fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1831 let raw = value.as_str()?;
1832 let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1833 if trimmed.is_empty() {
1834 return None;
1835 }
1836 Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1837}
1838
1839fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1840 let mut names = Vec::new();
1841 let mut seen = HashSet::new();
1842
1843 for key in ["display_name", "displayName", "name", "username"] {
1844 let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1845 continue;
1846 };
1847 let lowered = value.to_lowercase();
1848 if seen.insert(lowered) {
1849 names.push(value);
1850 }
1851 }
1852
1853 names
1854}
1855
1856fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1857 if local_part.len() == 1 || local_part.starts_with("npub1") {
1858 return true;
1859 }
1860
1861 primary_name
1862 .to_lowercase()
1863 .split_whitespace()
1864 .collect::<String>()
1865 .contains(local_part)
1866}
1867
1868fn normalize_profile_nip05(
1869 profile: &serde_json::Map<String, serde_json::Value>,
1870 primary_name: Option<&str>,
1871) -> Option<String> {
1872 let raw = profile.get("nip05")?.as_str()?;
1873 let local_part = raw.split('@').next()?.trim().to_lowercase();
1874 if local_part.is_empty() {
1875 return None;
1876 }
1877 let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1878 if truncated.is_empty() {
1879 return None;
1880 }
1881 if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1882 return None;
1883 }
1884 Some(truncated)
1885}
1886
1887fn is_search_stop_word(word: &str) -> bool {
1888 matches!(
1889 word,
1890 "a" | "an"
1891 | "the"
1892 | "and"
1893 | "or"
1894 | "but"
1895 | "in"
1896 | "on"
1897 | "at"
1898 | "to"
1899 | "for"
1900 | "of"
1901 | "with"
1902 | "by"
1903 | "from"
1904 | "is"
1905 | "it"
1906 | "as"
1907 | "be"
1908 | "was"
1909 | "are"
1910 | "this"
1911 | "that"
1912 | "these"
1913 | "those"
1914 | "i"
1915 | "you"
1916 | "he"
1917 | "she"
1918 | "we"
1919 | "they"
1920 | "my"
1921 | "your"
1922 | "his"
1923 | "her"
1924 | "its"
1925 | "our"
1926 | "their"
1927 | "what"
1928 | "which"
1929 | "who"
1930 | "whom"
1931 | "how"
1932 | "when"
1933 | "where"
1934 | "why"
1935 | "will"
1936 | "would"
1937 | "could"
1938 | "should"
1939 | "can"
1940 | "may"
1941 | "might"
1942 | "must"
1943 | "have"
1944 | "has"
1945 | "had"
1946 | "do"
1947 | "does"
1948 | "did"
1949 | "been"
1950 | "being"
1951 | "get"
1952 | "got"
1953 | "just"
1954 | "now"
1955 | "then"
1956 | "so"
1957 | "if"
1958 | "not"
1959 | "no"
1960 | "yes"
1961 | "all"
1962 | "any"
1963 | "some"
1964 | "more"
1965 | "most"
1966 | "other"
1967 | "into"
1968 | "over"
1969 | "after"
1970 | "before"
1971 | "about"
1972 | "up"
1973 | "down"
1974 | "out"
1975 | "off"
1976 | "through"
1977 | "during"
1978 | "under"
1979 | "again"
1980 | "further"
1981 | "once"
1982 )
1983}
1984
1985fn is_pure_search_number(word: &str) -> bool {
1986 if !word.chars().all(|ch| ch.is_ascii_digit()) {
1987 return false;
1988 }
1989 !(word.len() == 4
1990 && word
1991 .parse::<u16>()
1992 .is_ok_and(|year| (1900..=2099).contains(&year)))
1993}
1994
1995fn split_compound_search_word(word: &str) -> Vec<String> {
1996 let mut parts = Vec::new();
1997 let mut current = String::new();
1998 let chars: Vec<char> = word.chars().collect();
1999
2000 for (index, ch) in chars.iter().copied().enumerate() {
2001 let split_before = current.chars().last().is_some_and(|prev| {
2002 (prev.is_lowercase() && ch.is_uppercase())
2003 || (prev.is_ascii_digit() && ch.is_alphabetic())
2004 || (prev.is_alphabetic() && ch.is_ascii_digit())
2005 || (prev.is_uppercase()
2006 && ch.is_uppercase()
2007 && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
2008 });
2009
2010 if split_before && !current.is_empty() {
2011 parts.push(std::mem::take(&mut current));
2012 }
2013
2014 current.push(ch);
2015 }
2016
2017 if !current.is_empty() {
2018 parts.push(current);
2019 }
2020
2021 parts
2022}
2023
2024fn parse_search_keywords(text: &str) -> Vec<String> {
2025 let mut keywords = Vec::new();
2026 let mut seen = HashSet::new();
2027
2028 for word in text
2029 .split(|ch: char| !ch.is_alphanumeric())
2030 .filter(|word| !word.is_empty())
2031 {
2032 let mut variants = Vec::with_capacity(1 + word.len() / 4);
2033 variants.push(word.to_lowercase());
2034 variants.extend(
2035 split_compound_search_word(word)
2036 .into_iter()
2037 .map(|part| part.to_lowercase()),
2038 );
2039
2040 for lowered in variants {
2041 if lowered.chars().count() < 2
2042 || is_search_stop_word(&lowered)
2043 || is_pure_search_number(&lowered)
2044 {
2045 continue;
2046 }
2047 if seen.insert(lowered.clone()) {
2048 keywords.push(lowered);
2049 }
2050 }
2051 }
2052
2053 keywords
2054}
2055
2056fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
2057 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
2058 Ok(serde_json::Value::Object(profile)) => profile,
2059 _ => serde_json::Map::new(),
2060 };
2061 let names = extract_profile_names(&profile);
2062 let primary_name = names.first().map(String::as_str);
2063 let mut parts = Vec::new();
2064 if let Some(name) = primary_name {
2065 parts.push(name.to_string());
2066 }
2067 if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
2068 parts.push(nip05);
2069 }
2070 parts.push(event.pubkey.to_hex());
2071 if names.len() > 1 {
2072 parts.extend(names.into_iter().skip(1));
2073 }
2074 parse_search_keywords(&parts.join(" "))
2075}
2076
2077fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
2078 left.created_at
2079 .as_u64()
2080 .cmp(&right.created_at.as_u64())
2081 .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
2082}
2083
2084fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
2085 anyhow::anyhow!("nostr event store error: {err}")
2086}
2087
2088fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
2089 let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
2090 let page_size = page_size_bytes() as u64;
2091 let rounded = requested
2092 .checked_add(page_size.saturating_sub(1))
2093 .map(|size| size / page_size * page_size)
2094 .unwrap_or(requested);
2095 let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
2096
2097 let env = unsafe {
2098 heed::EnvOpenOptions::new()
2099 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
2100 .max_dbs(SOCIALGRAPH_MAX_DBS)
2101 .open(db_dir)
2102 }
2103 .context("open social graph LMDB env for resize")?;
2104 if env.info().map_size < map_size {
2105 unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
2106 }
2107
2108 Ok(())
2109}
2110
2111fn page_size_bytes() -> usize {
2112 page_size::get_granularity()
2113}
2114
2115#[cfg(test)]
2116mod tests {
2117 use super::*;
2118 use async_trait::async_trait;
2119 use hashtree_config::StorageBackend;
2120 use hashtree_core::{Hash, MemoryStore, Store, StoreError};
2121 use hashtree_nostr::NostrEventStoreOptions;
2122 use std::collections::HashSet;
2123 use std::fs::{self, File};
2124 use std::io::{BufRead, BufReader};
2125 use std::path::{Path, PathBuf};
2126 use std::process::{Command, Stdio};
2127 use std::sync::Mutex;
2128 use std::time::Duration;
2129
2130 use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
2131 use tempfile::TempDir;
2132
2133 const WELLORDER_FIXTURE_URL: &str =
2134 "https://wellorder.xyz/nostr/nostr-wellorder-early-500k-v1.jsonl.bz2";
2135
2136 #[derive(Debug, Clone, Default)]
2137 struct ReadTraceSnapshot {
2138 get_calls: u64,
2139 total_bytes: u64,
2140 unique_blocks: usize,
2141 unique_bytes: u64,
2142 cache_hits: u64,
2143 remote_fetches: u64,
2144 remote_bytes: u64,
2145 }
2146
2147 #[derive(Debug, Default)]
2148 struct ReadTraceState {
2149 get_calls: u64,
2150 total_bytes: u64,
2151 unique_hashes: HashSet<Hash>,
2152 unique_bytes: u64,
2153 cache_hits: u64,
2154 remote_fetches: u64,
2155 remote_bytes: u64,
2156 }
2157
2158 #[derive(Debug)]
2159 struct CountingStore<S: Store> {
2160 base: Arc<S>,
2161 state: Mutex<ReadTraceState>,
2162 }
2163
2164 impl<S: Store> CountingStore<S> {
2165 fn new(base: Arc<S>) -> Self {
2166 Self {
2167 base,
2168 state: Mutex::new(ReadTraceState::default()),
2169 }
2170 }
2171
2172 fn reset(&self) {
2173 *self.state.lock().unwrap() = ReadTraceState::default();
2174 }
2175
2176 fn snapshot(&self) -> ReadTraceSnapshot {
2177 let state = self.state.lock().unwrap();
2178 ReadTraceSnapshot {
2179 get_calls: state.get_calls,
2180 total_bytes: state.total_bytes,
2181 unique_blocks: state.unique_hashes.len(),
2182 unique_bytes: state.unique_bytes,
2183 cache_hits: state.cache_hits,
2184 remote_fetches: state.remote_fetches,
2185 remote_bytes: state.remote_bytes,
2186 }
2187 }
2188
2189 fn record_read(&self, hash: &Hash, bytes: usize) {
2190 let mut state = self.state.lock().unwrap();
2191 state.get_calls += 1;
2192 state.total_bytes += bytes as u64;
2193 if state.unique_hashes.insert(*hash) {
2194 state.unique_bytes += bytes as u64;
2195 }
2196 }
2197 }
2198
2199 #[async_trait]
2200 impl<S: Store> Store for CountingStore<S> {
2201 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2202 self.base.put(hash, data).await
2203 }
2204
2205 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2206 self.base.put_many(items).await
2207 }
2208
2209 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2210 let data = self.base.get(hash).await?;
2211 if let Some(bytes) = data.as_ref() {
2212 self.record_read(hash, bytes.len());
2213 }
2214 Ok(data)
2215 }
2216
2217 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2218 self.base.has(hash).await
2219 }
2220
2221 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2222 self.base.delete(hash).await
2223 }
2224 }
2225
2226 #[derive(Debug)]
2227 struct ReadThroughStore<R: Store> {
2228 cache: Arc<MemoryStore>,
2229 remote: Arc<R>,
2230 state: Mutex<ReadTraceState>,
2231 }
2232
2233 impl<R: Store> ReadThroughStore<R> {
2234 fn new(cache: Arc<MemoryStore>, remote: Arc<R>) -> Self {
2235 Self {
2236 cache,
2237 remote,
2238 state: Mutex::new(ReadTraceState::default()),
2239 }
2240 }
2241
2242 fn snapshot(&self) -> ReadTraceSnapshot {
2243 let state = self.state.lock().unwrap();
2244 ReadTraceSnapshot {
2245 get_calls: state.get_calls,
2246 total_bytes: state.total_bytes,
2247 unique_blocks: state.unique_hashes.len(),
2248 unique_bytes: state.unique_bytes,
2249 cache_hits: state.cache_hits,
2250 remote_fetches: state.remote_fetches,
2251 remote_bytes: state.remote_bytes,
2252 }
2253 }
2254 }
2255
2256 #[async_trait]
2257 impl<R: Store> Store for ReadThroughStore<R> {
2258 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2259 self.cache.put(hash, data).await
2260 }
2261
2262 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2263 self.cache.put_many(items).await
2264 }
2265
2266 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2267 {
2268 let mut state = self.state.lock().unwrap();
2269 state.get_calls += 1;
2270 }
2271
2272 if let Some(bytes) = self.cache.get(hash).await? {
2273 let mut state = self.state.lock().unwrap();
2274 state.cache_hits += 1;
2275 state.total_bytes += bytes.len() as u64;
2276 if state.unique_hashes.insert(*hash) {
2277 state.unique_bytes += bytes.len() as u64;
2278 }
2279 return Ok(Some(bytes));
2280 }
2281
2282 let data = self.remote.get(hash).await?;
2283 if let Some(bytes) = data.as_ref() {
2284 let _ = self.cache.put(*hash, bytes.clone()).await?;
2285 let mut state = self.state.lock().unwrap();
2286 state.remote_fetches += 1;
2287 state.remote_bytes += bytes.len() as u64;
2288 state.total_bytes += bytes.len() as u64;
2289 if state.unique_hashes.insert(*hash) {
2290 state.unique_bytes += bytes.len() as u64;
2291 }
2292 }
2293 Ok(data)
2294 }
2295
2296 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2297 if self.cache.has(hash).await? {
2298 return Ok(true);
2299 }
2300 self.remote.has(hash).await
2301 }
2302
2303 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2304 let cache_deleted = self.cache.delete(hash).await?;
2305 let remote_deleted = self.remote.delete(hash).await?;
2306 Ok(cache_deleted || remote_deleted)
2307 }
2308 }
2309
2310 #[derive(Debug, Clone)]
2311 enum BenchmarkQueryCase {
2312 ById {
2313 id: String,
2314 },
2315 ByAuthor {
2316 pubkey: String,
2317 limit: usize,
2318 },
2319 ByAuthorKind {
2320 pubkey: String,
2321 kind: u32,
2322 limit: usize,
2323 },
2324 ByKind {
2325 kind: u32,
2326 limit: usize,
2327 },
2328 ByTag {
2329 tag_name: String,
2330 tag_value: String,
2331 limit: usize,
2332 },
2333 Recent {
2334 limit: usize,
2335 },
2336 Replaceable {
2337 pubkey: String,
2338 kind: u32,
2339 },
2340 ParameterizedReplaceable {
2341 pubkey: String,
2342 kind: u32,
2343 d_tag: String,
2344 },
2345 }
2346
2347 impl BenchmarkQueryCase {
2348 fn name(&self) -> &'static str {
2349 match self {
2350 BenchmarkQueryCase::ById { .. } => "by_id",
2351 BenchmarkQueryCase::ByAuthor { .. } => "by_author",
2352 BenchmarkQueryCase::ByAuthorKind { .. } => "by_author_kind",
2353 BenchmarkQueryCase::ByKind { .. } => "by_kind",
2354 BenchmarkQueryCase::ByTag { .. } => "by_tag",
2355 BenchmarkQueryCase::Recent { .. } => "recent",
2356 BenchmarkQueryCase::Replaceable { .. } => "replaceable",
2357 BenchmarkQueryCase::ParameterizedReplaceable { .. } => "parameterized_replaceable",
2358 }
2359 }
2360
2361 async fn execute<S: Store>(
2362 &self,
2363 store: &NostrEventStore<S>,
2364 root: &Cid,
2365 ) -> Result<usize, NostrEventStoreError> {
2366 match self {
2367 BenchmarkQueryCase::ById { id } => {
2368 Ok(store.get_by_id(Some(root), id).await?.into_iter().count())
2369 }
2370 BenchmarkQueryCase::ByAuthor { pubkey, limit } => Ok(store
2371 .list_by_author(
2372 Some(root),
2373 pubkey,
2374 ListEventsOptions {
2375 limit: Some(*limit),
2376 ..Default::default()
2377 },
2378 )
2379 .await?
2380 .len()),
2381 BenchmarkQueryCase::ByAuthorKind {
2382 pubkey,
2383 kind,
2384 limit,
2385 } => Ok(store
2386 .list_by_author_and_kind(
2387 Some(root),
2388 pubkey,
2389 *kind,
2390 ListEventsOptions {
2391 limit: Some(*limit),
2392 ..Default::default()
2393 },
2394 )
2395 .await?
2396 .len()),
2397 BenchmarkQueryCase::ByKind { kind, limit } => Ok(store
2398 .list_by_kind(
2399 Some(root),
2400 *kind,
2401 ListEventsOptions {
2402 limit: Some(*limit),
2403 ..Default::default()
2404 },
2405 )
2406 .await?
2407 .len()),
2408 BenchmarkQueryCase::ByTag {
2409 tag_name,
2410 tag_value,
2411 limit,
2412 } => Ok(store
2413 .list_by_tag(
2414 Some(root),
2415 tag_name,
2416 tag_value,
2417 ListEventsOptions {
2418 limit: Some(*limit),
2419 ..Default::default()
2420 },
2421 )
2422 .await?
2423 .len()),
2424 BenchmarkQueryCase::Recent { limit } => Ok(store
2425 .list_recent(
2426 Some(root),
2427 ListEventsOptions {
2428 limit: Some(*limit),
2429 ..Default::default()
2430 },
2431 )
2432 .await?
2433 .len()),
2434 BenchmarkQueryCase::Replaceable { pubkey, kind } => Ok(store
2435 .get_replaceable(Some(root), pubkey, *kind)
2436 .await?
2437 .into_iter()
2438 .count()),
2439 BenchmarkQueryCase::ParameterizedReplaceable {
2440 pubkey,
2441 kind,
2442 d_tag,
2443 } => Ok(store
2444 .get_parameterized_replaceable(Some(root), pubkey, *kind, d_tag)
2445 .await?
2446 .into_iter()
2447 .count()),
2448 }
2449 }
2450 }
2451
2452 #[derive(Debug, Clone, Copy)]
2453 struct NetworkModel {
2454 name: &'static str,
2455 rtt_ms: f64,
2456 bandwidth_mib_per_s: f64,
2457 }
2458
2459 #[derive(Debug, Clone)]
2460 struct QueryBenchmarkResult {
2461 average_duration: Duration,
2462 p95_duration: Duration,
2463 reads: ReadTraceSnapshot,
2464 }
2465
2466 const NETWORK_MODELS: [NetworkModel; 3] = [
2467 NetworkModel {
2468 name: "lan",
2469 rtt_ms: 2.0,
2470 bandwidth_mib_per_s: 100.0,
2471 },
2472 NetworkModel {
2473 name: "wan",
2474 rtt_ms: 40.0,
2475 bandwidth_mib_per_s: 20.0,
2476 },
2477 NetworkModel {
2478 name: "slow",
2479 rtt_ms: 120.0,
2480 bandwidth_mib_per_s: 5.0,
2481 },
2482 ];
2483
2484 #[test]
2485 fn test_open_social_graph_store() {
2486 let _guard = test_lock();
2487 let tmp = TempDir::new().unwrap();
2488 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2489 assert_eq!(Arc::strong_count(&graph_store), 1);
2490 }
2491
2492 #[test]
2493 fn test_set_root_and_get_follow_distance() {
2494 let _guard = test_lock();
2495 let tmp = TempDir::new().unwrap();
2496 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2497 let root_pk = [1u8; 32];
2498 set_social_graph_root(&graph_store, &root_pk);
2499 assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
2500 }
2501
2502 #[test]
2503 fn test_ingest_event_updates_follows_and_mutes() {
2504 let _guard = test_lock();
2505 let tmp = TempDir::new().unwrap();
2506 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2507
2508 let root_keys = Keys::generate();
2509 let alice_keys = Keys::generate();
2510 let bob_keys = Keys::generate();
2511
2512 let root_pk = root_keys.public_key().to_bytes();
2513 set_social_graph_root(&graph_store, &root_pk);
2514
2515 let follow = EventBuilder::new(
2516 Kind::ContactList,
2517 "",
2518 vec![Tag::public_key(alice_keys.public_key())],
2519 )
2520 .custom_created_at(Timestamp::from_secs(10))
2521 .to_event(&root_keys)
2522 .unwrap();
2523 ingest_event(&graph_store, "follow", &follow.as_json());
2524
2525 let mute = EventBuilder::new(
2526 Kind::MuteList,
2527 "",
2528 vec![Tag::public_key(bob_keys.public_key())],
2529 )
2530 .custom_created_at(Timestamp::from_secs(11))
2531 .to_event(&root_keys)
2532 .unwrap();
2533 ingest_event(&graph_store, "mute", &mute.as_json());
2534
2535 assert_eq!(
2536 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
2537 Some(1)
2538 );
2539 assert!(is_overmuted(
2540 &graph_store,
2541 &root_pk,
2542 &bob_keys.public_key().to_bytes(),
2543 1.0
2544 ));
2545 }
2546
2547 #[test]
2548 fn test_metadata_ingest_builds_profile_search_index_and_replaces_old_terms() {
2549 let _guard = test_lock();
2550 let tmp = TempDir::new().unwrap();
2551 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2552 let keys = Keys::generate();
2553
2554 let older = EventBuilder::new(
2555 Kind::Metadata,
2556 serde_json::json!({
2557 "display_name": "sirius",
2558 "name": "Martti Malmi",
2559 "username": "mmalmi",
2560 "nip05": "siriusdev@iris.to",
2561 })
2562 .to_string(),
2563 [],
2564 )
2565 .custom_created_at(Timestamp::from_secs(5))
2566 .to_event(&keys)
2567 .unwrap();
2568 let newer = EventBuilder::new(
2569 Kind::Metadata,
2570 serde_json::json!({
2571 "display_name": "bird",
2572 "nip05": "birdman@iris.to",
2573 })
2574 .to_string(),
2575 [],
2576 )
2577 .custom_created_at(Timestamp::from_secs(6))
2578 .to_event(&keys)
2579 .unwrap();
2580
2581 ingest_parsed_event(&graph_store, &older).unwrap();
2582
2583 let pubkey = keys.public_key().to_hex();
2584 let entries = graph_store
2585 .profile_search_entries_for_prefix("p:sirius")
2586 .unwrap();
2587 assert!(entries
2588 .iter()
2589 .any(|(key, entry)| key == &format!("p:sirius:{pubkey}") && entry.name == "sirius"));
2590 assert!(entries.iter().any(|(key, entry)| {
2591 key == &format!("p:siriusdev:{pubkey}")
2592 && entry.nip05.as_deref() == Some("siriusdev")
2593 && entry.aliases == vec!["Martti Malmi".to_string(), "mmalmi".to_string()]
2594 && entry.event_nhash.starts_with("nhash1")
2595 }));
2596 assert!(entries.iter().all(|(_, entry)| entry.pubkey == pubkey));
2597 assert_eq!(
2598 graph_store
2599 .latest_profile_event(&pubkey)
2600 .unwrap()
2601 .expect("latest mirrored profile")
2602 .id,
2603 older.id
2604 );
2605
2606 ingest_parsed_event(&graph_store, &newer).unwrap();
2607
2608 assert!(graph_store
2609 .profile_search_entries_for_prefix("p:sirius")
2610 .unwrap()
2611 .is_empty());
2612 let bird_entries = graph_store
2613 .profile_search_entries_for_prefix("p:bird")
2614 .unwrap();
2615 assert_eq!(bird_entries.len(), 2);
2616 assert!(bird_entries
2617 .iter()
2618 .any(|(key, entry)| key == &format!("p:bird:{pubkey}") && entry.name == "bird"));
2619 assert!(bird_entries.iter().any(|(key, entry)| {
2620 key == &format!("p:birdman:{pubkey}")
2621 && entry.nip05.as_deref() == Some("birdman")
2622 && entry.aliases.is_empty()
2623 }));
2624 assert_eq!(
2625 graph_store
2626 .latest_profile_event(&pubkey)
2627 .unwrap()
2628 .expect("latest mirrored profile")
2629 .id,
2630 newer.id
2631 );
2632 }
2633
2634 #[test]
2635 fn test_ambient_metadata_events_are_mirrored_into_public_profile_index() {
2636 let _guard = test_lock();
2637 let tmp = TempDir::new().unwrap();
2638 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2639 let keys = Keys::generate();
2640
2641 let profile = EventBuilder::new(
2642 Kind::Metadata,
2643 serde_json::json!({
2644 "display_name": "ambient bird",
2645 })
2646 .to_string(),
2647 [],
2648 )
2649 .custom_created_at(Timestamp::from_secs(5))
2650 .to_event(&keys)
2651 .unwrap();
2652
2653 ingest_parsed_event_with_storage_class(&graph_store, &profile, EventStorageClass::Ambient)
2654 .unwrap();
2655
2656 let pubkey = keys.public_key().to_hex();
2657 let mirrored = graph_store
2658 .latest_profile_event(&pubkey)
2659 .unwrap()
2660 .expect("mirrored ambient profile");
2661 assert_eq!(mirrored.id, profile.id);
2662 assert_eq!(
2663 graph_store
2664 .profile_search_entries_for_prefix("p:ambient")
2665 .unwrap()
2666 .len(),
2667 1
2668 );
2669 }
2670
2671 #[test]
2672 fn test_metadata_ingest_splits_compound_profile_terms_without_losing_whole_token() {
2673 let _guard = test_lock();
2674 let tmp = TempDir::new().unwrap();
2675 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2676 let keys = Keys::generate();
2677
2678 let profile = EventBuilder::new(
2679 Kind::Metadata,
2680 serde_json::json!({
2681 "display_name": "SirLibre",
2682 "username": "XMLHttpRequest42",
2683 })
2684 .to_string(),
2685 [],
2686 )
2687 .custom_created_at(Timestamp::from_secs(5))
2688 .to_event(&keys)
2689 .unwrap();
2690
2691 ingest_parsed_event(&graph_store, &profile).unwrap();
2692
2693 let pubkey = keys.public_key().to_hex();
2694 assert!(
2695 graph_store
2696 .profile_search_entries_for_prefix("p:sirlibre")
2697 .unwrap()
2698 .iter()
2699 .any(|(key, entry)| key == &format!("p:sirlibre:{pubkey}")
2700 && entry.name == "SirLibre")
2701 );
2702 assert!(graph_store
2703 .profile_search_entries_for_prefix("p:libre")
2704 .unwrap()
2705 .iter()
2706 .any(|(key, entry)| key == &format!("p:libre:{pubkey}") && entry.name == "SirLibre"));
2707 assert!(graph_store
2708 .profile_search_entries_for_prefix("p:xml")
2709 .unwrap()
2710 .iter()
2711 .any(|(key, entry)| {
2712 key == &format!("p:xml:{pubkey}")
2713 && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2714 }));
2715 assert!(graph_store
2716 .profile_search_entries_for_prefix("p:request")
2717 .unwrap()
2718 .iter()
2719 .any(|(key, entry)| {
2720 key == &format!("p:request:{pubkey}")
2721 && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2722 }));
2723 }
2724
2725 #[test]
2726 fn test_profile_search_index_persists_across_reopen() {
2727 let _guard = test_lock();
2728 let tmp = TempDir::new().unwrap();
2729 let keys = Keys::generate();
2730 let pubkey = keys.public_key().to_hex();
2731
2732 {
2733 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2734 let profile = EventBuilder::new(
2735 Kind::Metadata,
2736 serde_json::json!({
2737 "display_name": "reopen user",
2738 })
2739 .to_string(),
2740 [],
2741 )
2742 .custom_created_at(Timestamp::from_secs(5))
2743 .to_event(&keys)
2744 .unwrap();
2745 ingest_parsed_event(&graph_store, &profile).unwrap();
2746 assert!(graph_store.profile_search_root().unwrap().is_some());
2747 }
2748
2749 let reopened = open_social_graph_store(tmp.path()).unwrap();
2750 assert!(reopened.profile_search_root().unwrap().is_some());
2751 assert_eq!(
2752 reopened
2753 .latest_profile_event(&pubkey)
2754 .unwrap()
2755 .expect("mirrored profile after reopen")
2756 .pubkey,
2757 keys.public_key()
2758 );
2759 let links = reopened
2760 .profile_search_entries_for_prefix("p:reopen")
2761 .unwrap();
2762 assert_eq!(links.len(), 1);
2763 assert_eq!(links[0].0, format!("p:reopen:{pubkey}"));
2764 assert_eq!(links[0].1.name, "reopen user");
2765 }
2766
2767 #[test]
2768 fn test_profile_search_index_with_shared_hashtree_storage() {
2769 let _guard = test_lock();
2770 let tmp = TempDir::new().unwrap();
2771 let store =
2772 crate::storage::HashtreeStore::with_options(tmp.path(), None, 1024 * 1024 * 1024)
2773 .unwrap();
2774 let graph_store =
2775 open_social_graph_store_with_storage(tmp.path(), store.store_arc(), None).unwrap();
2776 let keys = Keys::generate();
2777 let pubkey = keys.public_key().to_hex();
2778
2779 let profile = EventBuilder::new(
2780 Kind::Metadata,
2781 serde_json::json!({
2782 "display_name": "shared storage user",
2783 "nip05": "shareduser@example.com",
2784 })
2785 .to_string(),
2786 [],
2787 )
2788 .custom_created_at(Timestamp::from_secs(5))
2789 .to_event(&keys)
2790 .unwrap();
2791
2792 graph_store
2793 .sync_profile_index_for_events(std::slice::from_ref(&profile))
2794 .unwrap();
2795 assert!(graph_store.profile_search_root().unwrap().is_some());
2796 assert!(graph_store.profile_search_root().unwrap().is_some());
2797 let links = graph_store
2798 .profile_search_entries_for_prefix("p:shared")
2799 .unwrap();
2800 assert_eq!(links.len(), 2);
2801 assert!(links
2802 .iter()
2803 .any(|(key, entry)| key == &format!("p:shared:{pubkey}")
2804 && entry.name == "shared storage user"));
2805 assert!(links
2806 .iter()
2807 .any(|(key, entry)| key == &format!("p:shareduser:{pubkey}")
2808 && entry.nip05.as_deref() == Some("shareduser")));
2809 }
2810
2811 #[test]
2812 fn test_rebuild_profile_index_from_stored_events_uses_ambient_and_public_metadata() {
2813 let _guard = test_lock();
2814 let tmp = TempDir::new().unwrap();
2815 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2816 let public_keys = Keys::generate();
2817 let ambient_keys = Keys::generate();
2818 let public_pubkey = public_keys.public_key().to_hex();
2819 let ambient_pubkey = ambient_keys.public_key().to_hex();
2820
2821 let older = EventBuilder::new(
2822 Kind::Metadata,
2823 serde_json::json!({
2824 "display_name": "petri old",
2825 })
2826 .to_string(),
2827 [],
2828 )
2829 .custom_created_at(Timestamp::from_secs(5))
2830 .to_event(&public_keys)
2831 .unwrap();
2832 let newer = EventBuilder::new(
2833 Kind::Metadata,
2834 serde_json::json!({
2835 "display_name": "petri",
2836 "name": "Petri Example",
2837 "nip05": "petri@example.com",
2838 })
2839 .to_string(),
2840 [],
2841 )
2842 .custom_created_at(Timestamp::from_secs(6))
2843 .to_event(&public_keys)
2844 .unwrap();
2845 let ambient = EventBuilder::new(
2846 Kind::Metadata,
2847 serde_json::json!({
2848 "display_name": "ambient petri",
2849 })
2850 .to_string(),
2851 [],
2852 )
2853 .custom_created_at(Timestamp::from_secs(7))
2854 .to_event(&ambient_keys)
2855 .unwrap();
2856
2857 ingest_parsed_event_with_storage_class(&graph_store, &older, EventStorageClass::Public)
2858 .unwrap();
2859 ingest_parsed_event_with_storage_class(&graph_store, &newer, EventStorageClass::Public)
2860 .unwrap();
2861 ingest_parsed_event_with_storage_class(&graph_store, &ambient, EventStorageClass::Ambient)
2862 .unwrap();
2863
2864 graph_store
2865 .profile_index
2866 .write_by_pubkey_root(None)
2867 .unwrap();
2868 graph_store.profile_index.write_search_root(None).unwrap();
2869
2870 let rebuilt = graph_store
2871 .rebuild_profile_index_from_stored_events()
2872 .unwrap();
2873 assert_eq!(rebuilt, 2);
2874
2875 let entries = graph_store
2876 .profile_search_entries_for_prefix("p:petri")
2877 .unwrap();
2878 assert_eq!(entries.len(), 2);
2879 assert!(entries.iter().any(|(key, entry)| {
2880 key == &format!("p:petri:{public_pubkey}")
2881 && entry.name == "petri"
2882 && entry.aliases == vec!["Petri Example".to_string()]
2883 && entry.nip05.is_none()
2884 }));
2885 assert!(entries.iter().any(|(key, entry)| {
2886 key == &format!("p:petri:{ambient_pubkey}")
2887 && entry.name == "ambient petri"
2888 && entry.aliases.is_empty()
2889 && entry.nip05.is_none()
2890 }));
2891 }
2892
2893 #[test]
2894 fn test_rebuild_profile_index_excludes_overmuted_users() {
2895 let _guard = test_lock();
2896 let tmp = TempDir::new().unwrap();
2897 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2898 let root_keys = Keys::generate();
2899 let muted_keys = Keys::generate();
2900 let muted_pubkey = muted_keys.public_key().to_hex();
2901
2902 set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
2903 graph_store.set_profile_index_overmute_threshold(1.0);
2904
2905 let profile = EventBuilder::new(
2906 Kind::Metadata,
2907 serde_json::json!({
2908 "display_name": "muted petri",
2909 })
2910 .to_string(),
2911 [],
2912 )
2913 .custom_created_at(Timestamp::from_secs(5))
2914 .to_event(&muted_keys)
2915 .unwrap();
2916 ingest_parsed_event(&graph_store, &profile).unwrap();
2917 assert!(graph_store.latest_profile_event(&muted_pubkey).unwrap().is_some());
2918
2919 let mute = EventBuilder::new(
2920 Kind::MuteList,
2921 "",
2922 vec![Tag::public_key(muted_keys.public_key())],
2923 )
2924 .custom_created_at(Timestamp::from_secs(6))
2925 .to_event(&root_keys)
2926 .unwrap();
2927 ingest_parsed_event(&graph_store, &mute).unwrap();
2928 assert!(graph_store
2929 .is_overmuted_user(&muted_keys.public_key().to_bytes(), 1.0)
2930 .unwrap());
2931
2932 let rebuilt = graph_store
2933 .rebuild_profile_index_from_stored_events()
2934 .unwrap();
2935 assert_eq!(rebuilt, 0);
2936 assert!(graph_store.latest_profile_event(&muted_pubkey).unwrap().is_none());
2937 assert!(graph_store
2938 .profile_search_entries_for_prefix("p:muted")
2939 .unwrap()
2940 .is_empty());
2941 }
2942
2943 #[test]
2944 fn test_query_events_by_author() {
2945 let _guard = test_lock();
2946 let tmp = TempDir::new().unwrap();
2947 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2948 let keys = Keys::generate();
2949
2950 let older = EventBuilder::new(Kind::TextNote, "older", [])
2951 .custom_created_at(Timestamp::from_secs(5))
2952 .to_event(&keys)
2953 .unwrap();
2954 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2955 .custom_created_at(Timestamp::from_secs(6))
2956 .to_event(&keys)
2957 .unwrap();
2958
2959 ingest_parsed_event(&graph_store, &older).unwrap();
2960 ingest_parsed_event(&graph_store, &newer).unwrap();
2961
2962 let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
2963 let events = query_events(&graph_store, &filter, 10);
2964 assert_eq!(events.len(), 2);
2965 assert_eq!(events[0].id, newer.id);
2966 assert_eq!(events[1].id, older.id);
2967 }
2968
2969 #[test]
2970 fn test_query_events_by_kind() {
2971 let _guard = test_lock();
2972 let tmp = TempDir::new().unwrap();
2973 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2974 let first_keys = Keys::generate();
2975 let second_keys = Keys::generate();
2976
2977 let older = EventBuilder::new(Kind::TextNote, "older", [])
2978 .custom_created_at(Timestamp::from_secs(5))
2979 .to_event(&first_keys)
2980 .unwrap();
2981 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2982 .custom_created_at(Timestamp::from_secs(6))
2983 .to_event(&second_keys)
2984 .unwrap();
2985 let other_kind = EventBuilder::new(Kind::Metadata, "profile", [])
2986 .custom_created_at(Timestamp::from_secs(7))
2987 .to_event(&second_keys)
2988 .unwrap();
2989
2990 ingest_parsed_event(&graph_store, &older).unwrap();
2991 ingest_parsed_event(&graph_store, &newer).unwrap();
2992 ingest_parsed_event(&graph_store, &other_kind).unwrap();
2993
2994 let filter = Filter::new().kind(Kind::TextNote);
2995 let events = query_events(&graph_store, &filter, 10);
2996 assert_eq!(events.len(), 2);
2997 assert_eq!(events[0].id, newer.id);
2998 assert_eq!(events[1].id, older.id);
2999 }
3000
3001 #[test]
3002 fn test_query_events_by_id() {
3003 let _guard = test_lock();
3004 let tmp = TempDir::new().unwrap();
3005 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3006 let keys = Keys::generate();
3007
3008 let first = EventBuilder::new(Kind::TextNote, "first", [])
3009 .custom_created_at(Timestamp::from_secs(5))
3010 .to_event(&keys)
3011 .unwrap();
3012 let target = EventBuilder::new(Kind::TextNote, "target", [])
3013 .custom_created_at(Timestamp::from_secs(6))
3014 .to_event(&keys)
3015 .unwrap();
3016
3017 ingest_parsed_event(&graph_store, &first).unwrap();
3018 ingest_parsed_event(&graph_store, &target).unwrap();
3019
3020 let filter = Filter::new().id(target.id).kind(Kind::TextNote);
3021 let events = query_events(&graph_store, &filter, 10);
3022 assert_eq!(events.len(), 1);
3023 assert_eq!(events[0].id, target.id);
3024 }
3025
3026 #[test]
3027 fn test_query_events_search_is_case_insensitive() {
3028 let _guard = test_lock();
3029 let tmp = TempDir::new().unwrap();
3030 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3031 let keys = Keys::generate();
3032 let other_keys = Keys::generate();
3033
3034 let matching = EventBuilder::new(Kind::TextNote, "Hello Nostr Search", [])
3035 .custom_created_at(Timestamp::from_secs(5))
3036 .to_event(&keys)
3037 .unwrap();
3038 let other = EventBuilder::new(Kind::TextNote, "goodbye world", [])
3039 .custom_created_at(Timestamp::from_secs(6))
3040 .to_event(&other_keys)
3041 .unwrap();
3042
3043 ingest_parsed_event(&graph_store, &matching).unwrap();
3044 ingest_parsed_event(&graph_store, &other).unwrap();
3045
3046 let filter = Filter::new().kind(Kind::TextNote).search("nostr search");
3047 let events = query_events(&graph_store, &filter, 10);
3048 assert_eq!(events.len(), 1);
3049 assert_eq!(events[0].id, matching.id);
3050 }
3051
3052 #[test]
3053 fn test_query_events_since_until_are_inclusive() {
3054 let _guard = test_lock();
3055 let tmp = TempDir::new().unwrap();
3056 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3057 let keys = Keys::generate();
3058
3059 let before = EventBuilder::new(Kind::TextNote, "before", [])
3060 .custom_created_at(Timestamp::from_secs(5))
3061 .to_event(&keys)
3062 .unwrap();
3063 let start = EventBuilder::new(Kind::TextNote, "start", [])
3064 .custom_created_at(Timestamp::from_secs(6))
3065 .to_event(&keys)
3066 .unwrap();
3067 let end = EventBuilder::new(Kind::TextNote, "end", [])
3068 .custom_created_at(Timestamp::from_secs(10))
3069 .to_event(&keys)
3070 .unwrap();
3071 let after = EventBuilder::new(Kind::TextNote, "after", [])
3072 .custom_created_at(Timestamp::from_secs(11))
3073 .to_event(&keys)
3074 .unwrap();
3075
3076 ingest_parsed_event(&graph_store, &before).unwrap();
3077 ingest_parsed_event(&graph_store, &start).unwrap();
3078 ingest_parsed_event(&graph_store, &end).unwrap();
3079 ingest_parsed_event(&graph_store, &after).unwrap();
3080
3081 let filter = Filter::new()
3082 .kind(Kind::TextNote)
3083 .since(Timestamp::from_secs(6))
3084 .until(Timestamp::from_secs(10));
3085 let events = query_events(&graph_store, &filter, 10);
3086 let ids = events.into_iter().map(|event| event.id).collect::<Vec<_>>();
3087 assert_eq!(ids, vec![end.id, start.id]);
3088 }
3089
3090 #[test]
3091 fn test_query_events_replaceable_kind_returns_latest_winner() {
3092 let _guard = test_lock();
3093 let tmp = TempDir::new().unwrap();
3094 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3095 let keys = Keys::generate();
3096
3097 let older = EventBuilder::new(Kind::Custom(10_000), "older mute list", [])
3098 .custom_created_at(Timestamp::from_secs(5))
3099 .to_event(&keys)
3100 .unwrap();
3101 let newer = EventBuilder::new(Kind::Custom(10_000), "newer mute list", [])
3102 .custom_created_at(Timestamp::from_secs(6))
3103 .to_event(&keys)
3104 .unwrap();
3105
3106 ingest_parsed_event(&graph_store, &older).unwrap();
3107 ingest_parsed_event(&graph_store, &newer).unwrap();
3108
3109 let filter = Filter::new()
3110 .author(keys.public_key())
3111 .kind(Kind::Custom(10_000));
3112 let events = query_events(&graph_store, &filter, 10);
3113 assert_eq!(events.len(), 1);
3114 assert_eq!(events[0].id, newer.id);
3115 }
3116
3117 #[test]
3118 fn test_query_events_kind_41_replaceable_returns_latest_winner() {
3119 let _guard = test_lock();
3120 let tmp = TempDir::new().unwrap();
3121 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3122 let keys = Keys::generate();
3123
3124 let older = EventBuilder::new(Kind::Custom(41), "older channel metadata", [])
3125 .custom_created_at(Timestamp::from_secs(5))
3126 .to_event(&keys)
3127 .unwrap();
3128 let newer = EventBuilder::new(Kind::Custom(41), "newer channel metadata", [])
3129 .custom_created_at(Timestamp::from_secs(6))
3130 .to_event(&keys)
3131 .unwrap();
3132
3133 ingest_parsed_event(&graph_store, &older).unwrap();
3134 ingest_parsed_event(&graph_store, &newer).unwrap();
3135
3136 let filter = Filter::new()
3137 .author(keys.public_key())
3138 .kind(Kind::Custom(41));
3139 let events = query_events(&graph_store, &filter, 10);
3140 assert_eq!(events.len(), 1);
3141 assert_eq!(events[0].id, newer.id);
3142 }
3143
3144 #[test]
3145 fn test_public_and_ambient_indexes_stay_separate() {
3146 let _guard = test_lock();
3147 let tmp = TempDir::new().unwrap();
3148 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3149 let public_keys = Keys::generate();
3150 let ambient_keys = Keys::generate();
3151
3152 let public_event = EventBuilder::new(Kind::TextNote, "public", [])
3153 .custom_created_at(Timestamp::from_secs(5))
3154 .to_event(&public_keys)
3155 .unwrap();
3156 let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
3157 .custom_created_at(Timestamp::from_secs(6))
3158 .to_event(&ambient_keys)
3159 .unwrap();
3160
3161 ingest_parsed_event_with_storage_class(
3162 &graph_store,
3163 &public_event,
3164 EventStorageClass::Public,
3165 )
3166 .unwrap();
3167 ingest_parsed_event_with_storage_class(
3168 &graph_store,
3169 &ambient_event,
3170 EventStorageClass::Ambient,
3171 )
3172 .unwrap();
3173
3174 let filter = Filter::new().kind(Kind::TextNote);
3175 let all_events = graph_store
3176 .query_events_in_scope(&filter, 10, EventQueryScope::All)
3177 .unwrap();
3178 assert_eq!(all_events.len(), 2);
3179
3180 let public_events = graph_store
3181 .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3182 .unwrap();
3183 assert_eq!(public_events.len(), 1);
3184 assert_eq!(public_events[0].id, public_event.id);
3185
3186 let ambient_events = graph_store
3187 .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3188 .unwrap();
3189 assert_eq!(ambient_events.len(), 1);
3190 assert_eq!(ambient_events[0].id, ambient_event.id);
3191 }
3192
3193 #[test]
3194 fn test_default_ingest_classifies_root_author_as_public() {
3195 let _guard = test_lock();
3196 let tmp = TempDir::new().unwrap();
3197 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3198 let root_keys = Keys::generate();
3199 let other_keys = Keys::generate();
3200 set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
3201
3202 let root_event = EventBuilder::new(Kind::TextNote, "root", [])
3203 .custom_created_at(Timestamp::from_secs(5))
3204 .to_event(&root_keys)
3205 .unwrap();
3206 let other_event = EventBuilder::new(Kind::TextNote, "other", [])
3207 .custom_created_at(Timestamp::from_secs(6))
3208 .to_event(&other_keys)
3209 .unwrap();
3210
3211 ingest_parsed_event(&graph_store, &root_event).unwrap();
3212 ingest_parsed_event(&graph_store, &other_event).unwrap();
3213
3214 let filter = Filter::new().kind(Kind::TextNote);
3215 let public_events = graph_store
3216 .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3217 .unwrap();
3218 assert_eq!(public_events.len(), 1);
3219 assert_eq!(public_events[0].id, root_event.id);
3220
3221 let ambient_events = graph_store
3222 .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3223 .unwrap();
3224 assert_eq!(ambient_events.len(), 1);
3225 assert_eq!(ambient_events[0].id, other_event.id);
3226 }
3227
3228 #[test]
3229 fn test_query_events_survives_reopen() {
3230 let _guard = test_lock();
3231 let tmp = TempDir::new().unwrap();
3232 let db_dir = tmp.path().join("socialgraph-store");
3233 let keys = Keys::generate();
3234 let other_keys = Keys::generate();
3235
3236 {
3237 let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
3238 let older = EventBuilder::new(Kind::TextNote, "older", [])
3239 .custom_created_at(Timestamp::from_secs(5))
3240 .to_event(&keys)
3241 .unwrap();
3242 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
3243 .custom_created_at(Timestamp::from_secs(6))
3244 .to_event(&keys)
3245 .unwrap();
3246 let latest = EventBuilder::new(Kind::TextNote, "latest", [])
3247 .custom_created_at(Timestamp::from_secs(7))
3248 .to_event(&other_keys)
3249 .unwrap();
3250
3251 ingest_parsed_event(&graph_store, &older).unwrap();
3252 ingest_parsed_event(&graph_store, &newer).unwrap();
3253 ingest_parsed_event(&graph_store, &latest).unwrap();
3254 }
3255
3256 let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
3257
3258 let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
3259 let author_events = query_events(&reopened, &author_filter, 10);
3260 assert_eq!(author_events.len(), 2);
3261 assert_eq!(author_events[0].content, "newer");
3262 assert_eq!(author_events[1].content, "older");
3263
3264 let recent_filter = Filter::new().kind(Kind::TextNote);
3265 let recent_events = query_events(&reopened, &recent_filter, 2);
3266 assert_eq!(recent_events.len(), 2);
3267 assert_eq!(recent_events[0].content, "latest");
3268 assert_eq!(recent_events[1].content, "newer");
3269 }
3270
3271 #[test]
3272 fn test_query_events_parameterized_replaceable_by_d_tag() {
3273 let _guard = test_lock();
3274 let tmp = TempDir::new().unwrap();
3275 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3276 let keys = Keys::generate();
3277
3278 let older = EventBuilder::new(
3279 Kind::Custom(30078),
3280 "",
3281 vec![
3282 Tag::identifier("video"),
3283 Tag::parse(&["l", "hashtree"]).unwrap(),
3284 Tag::parse(&["hash", &"11".repeat(32)]).unwrap(),
3285 ],
3286 )
3287 .custom_created_at(Timestamp::from_secs(5))
3288 .to_event(&keys)
3289 .unwrap();
3290 let newer = EventBuilder::new(
3291 Kind::Custom(30078),
3292 "",
3293 vec![
3294 Tag::identifier("video"),
3295 Tag::parse(&["l", "hashtree"]).unwrap(),
3296 Tag::parse(&["hash", &"22".repeat(32)]).unwrap(),
3297 ],
3298 )
3299 .custom_created_at(Timestamp::from_secs(6))
3300 .to_event(&keys)
3301 .unwrap();
3302 let other_tree = EventBuilder::new(
3303 Kind::Custom(30078),
3304 "",
3305 vec![
3306 Tag::identifier("files"),
3307 Tag::parse(&["l", "hashtree"]).unwrap(),
3308 Tag::parse(&["hash", &"33".repeat(32)]).unwrap(),
3309 ],
3310 )
3311 .custom_created_at(Timestamp::from_secs(7))
3312 .to_event(&keys)
3313 .unwrap();
3314
3315 ingest_parsed_event(&graph_store, &older).unwrap();
3316 ingest_parsed_event(&graph_store, &newer).unwrap();
3317 ingest_parsed_event(&graph_store, &other_tree).unwrap();
3318
3319 let filter = Filter::new()
3320 .author(keys.public_key())
3321 .kind(Kind::Custom(30078))
3322 .identifier("video");
3323 let events = query_events(&graph_store, &filter, 10);
3324 assert_eq!(events.len(), 1);
3325 assert_eq!(events[0].id, newer.id);
3326 }
3327
3328 #[test]
3329 fn test_query_events_by_hashtag_uses_tag_index() {
3330 let _guard = test_lock();
3331 let tmp = TempDir::new().unwrap();
3332 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3333 let keys = Keys::generate();
3334 let other_keys = Keys::generate();
3335
3336 let first = EventBuilder::new(
3337 Kind::TextNote,
3338 "first",
3339 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3340 )
3341 .custom_created_at(Timestamp::from_secs(5))
3342 .to_event(&keys)
3343 .unwrap();
3344 let second = EventBuilder::new(
3345 Kind::TextNote,
3346 "second",
3347 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3348 )
3349 .custom_created_at(Timestamp::from_secs(6))
3350 .to_event(&other_keys)
3351 .unwrap();
3352 let unrelated = EventBuilder::new(
3353 Kind::TextNote,
3354 "third",
3355 vec![Tag::parse(&["t", "other"]).unwrap()],
3356 )
3357 .custom_created_at(Timestamp::from_secs(7))
3358 .to_event(&other_keys)
3359 .unwrap();
3360
3361 ingest_parsed_event(&graph_store, &first).unwrap();
3362 ingest_parsed_event(&graph_store, &second).unwrap();
3363 ingest_parsed_event(&graph_store, &unrelated).unwrap();
3364
3365 let filter = Filter::new().hashtag("hashtree");
3366 let events = query_events(&graph_store, &filter, 10);
3367 assert_eq!(events.len(), 2);
3368 assert_eq!(events[0].id, second.id);
3369 assert_eq!(events[1].id, first.id);
3370 }
3371
3372 #[test]
3373 fn test_query_events_combines_indexes_then_applies_search_filter() {
3374 let _guard = test_lock();
3375 let tmp = TempDir::new().unwrap();
3376 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3377 let keys = Keys::generate();
3378 let other_keys = Keys::generate();
3379
3380 let matching = EventBuilder::new(
3381 Kind::TextNote,
3382 "hashtree video release",
3383 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3384 )
3385 .custom_created_at(Timestamp::from_secs(5))
3386 .to_event(&keys)
3387 .unwrap();
3388 let non_matching = EventBuilder::new(
3389 Kind::TextNote,
3390 "plain text note",
3391 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3392 )
3393 .custom_created_at(Timestamp::from_secs(6))
3394 .to_event(&other_keys)
3395 .unwrap();
3396
3397 ingest_parsed_event(&graph_store, &matching).unwrap();
3398 ingest_parsed_event(&graph_store, &non_matching).unwrap();
3399
3400 let filter = Filter::new().hashtag("hashtree").search("video");
3401 let events = query_events(&graph_store, &filter, 10);
3402 assert_eq!(events.len(), 1);
3403 assert_eq!(events[0].id, matching.id);
3404 }
3405
3406 fn benchmark_dataset_path() -> Option<PathBuf> {
3407 std::env::var_os("HASHTREE_BENCH_DATASET_PATH").map(PathBuf::from)
3408 }
3409
3410 fn benchmark_dataset_url() -> String {
3411 std::env::var("HASHTREE_BENCH_DATASET_URL")
3412 .ok()
3413 .filter(|value| !value.is_empty())
3414 .unwrap_or_else(|| WELLORDER_FIXTURE_URL.to_string())
3415 }
3416
3417 fn benchmark_stream_warmup_events(measured_events: usize) -> usize {
3418 std::env::var("HASHTREE_BENCH_WARMUP_EVENTS")
3419 .ok()
3420 .and_then(|value| value.parse::<usize>().ok())
3421 .unwrap_or_else(|| measured_events.clamp(1, 200))
3422 }
3423
3424 fn ensure_benchmark_dataset(path: &Path, url: &str) -> Result<()> {
3425 if path.exists() {
3426 return Ok(());
3427 }
3428
3429 let parent = path
3430 .parent()
3431 .context("benchmark dataset path has no parent directory")?;
3432 fs::create_dir_all(parent).context("create benchmark dataset directory")?;
3433
3434 let tmp = path.with_extension("tmp");
3435 let mut response = reqwest::blocking::get(url)
3436 .context("download benchmark dataset")?
3437 .error_for_status()
3438 .context("benchmark dataset request failed")?;
3439 let mut file = File::create(&tmp).context("create temporary benchmark dataset file")?;
3440 std::io::copy(&mut response, &mut file).context("write benchmark dataset")?;
3441 fs::rename(&tmp, path).context("move benchmark dataset into place")?;
3442
3443 Ok(())
3444 }
3445
3446 fn load_benchmark_dataset(path: &Path, max_events: usize) -> Result<Vec<Event>> {
3447 if max_events == 0 {
3448 return Ok(Vec::new());
3449 }
3450
3451 let mut child = Command::new("bzip2")
3452 .args(["-dc", &path.to_string_lossy()])
3453 .stdout(Stdio::piped())
3454 .spawn()
3455 .context("spawn bzip2 for benchmark dataset")?;
3456 let stdout = child
3457 .stdout
3458 .take()
3459 .context("benchmark dataset stdout missing")?;
3460 let mut events = Vec::with_capacity(max_events);
3461
3462 {
3463 let reader = BufReader::new(stdout);
3464 for line in reader.lines() {
3465 if events.len() >= max_events {
3466 break;
3467 }
3468 let line = line.context("read benchmark dataset line")?;
3469 let trimmed = line.trim();
3470 if trimmed.is_empty() {
3471 continue;
3472 }
3473 if let Ok(event) = Event::from_json(trimmed.to_string()) {
3474 events.push(event);
3475 }
3476 }
3477 }
3478
3479 if events.len() < max_events {
3480 let status = child.wait().context("wait for benchmark dataset reader")?;
3481 anyhow::ensure!(
3482 status.success(),
3483 "benchmark dataset reader exited with status {status}"
3484 );
3485 } else {
3486 let _ = child.kill();
3487 let _ = child.wait();
3488 }
3489
3490 Ok(events)
3491 }
3492
3493 fn build_synthetic_benchmark_events(event_count: usize, author_count: usize) -> Vec<Event> {
3494 let authors = (0..author_count)
3495 .map(|_| Keys::generate())
3496 .collect::<Vec<_>>();
3497 let mut events = Vec::with_capacity(event_count);
3498 for i in 0..event_count {
3499 let kind = if i % 8 < 5 {
3500 Kind::TextNote
3501 } else {
3502 Kind::Custom(30_023)
3503 };
3504 let mut tags = Vec::new();
3505 if kind == Kind::TextNote && i % 16 == 0 {
3506 tags.push(Tag::parse(&["t", "hashtree"]).unwrap());
3507 }
3508 let content = if kind == Kind::TextNote && i % 32 == 0 {
3509 format!("benchmark target event {i}")
3510 } else {
3511 format!("benchmark event {i}")
3512 };
3513 let event = EventBuilder::new(kind, content, tags)
3514 .custom_created_at(Timestamp::from_secs(1_700_000_000 + i as u64))
3515 .to_event(&authors[i % author_count])
3516 .unwrap();
3517 events.push(event);
3518 }
3519 events
3520 }
3521
3522 fn load_benchmark_events(
3523 event_count: usize,
3524 author_count: usize,
3525 ) -> Result<(String, Vec<Event>)> {
3526 if let Some(path) = benchmark_dataset_path() {
3527 let url = benchmark_dataset_url();
3528 ensure_benchmark_dataset(&path, &url)?;
3529 let events = load_benchmark_dataset(&path, event_count)?;
3530 return Ok((format!("dataset:{}", path.display()), events));
3531 }
3532
3533 Ok((
3534 format!("synthetic:{author_count}-authors"),
3535 build_synthetic_benchmark_events(event_count, author_count),
3536 ))
3537 }
3538
3539 fn first_tag_filter(event: &Event) -> Option<Filter> {
3540 event.tags.iter().find_map(|tag| match tag.as_slice() {
3541 [name, value, ..]
3542 if name.len() == 1
3543 && !value.is_empty()
3544 && name.as_bytes()[0].is_ascii_lowercase() =>
3545 {
3546 let letter = SingleLetterTag::from_char(name.chars().next()?).ok()?;
3547 Some(Filter::new().custom_tag(letter, [value.to_string()]))
3548 }
3549 _ => None,
3550 })
3551 }
3552
3553 fn first_search_term(event: &Event) -> Option<String> {
3554 event
3555 .content
3556 .split(|ch: char| !ch.is_alphanumeric())
3557 .find(|token| token.len() >= 4)
3558 .map(|token| token.to_ascii_lowercase())
3559 }
3560
3561 fn benchmark_match_count(events: &[Event], filter: &Filter, limit: usize) -> usize {
3562 events
3563 .iter()
3564 .filter(|event| filter.match_event(event))
3565 .count()
3566 .min(limit)
3567 }
3568
3569 fn benchmark_btree_orders() -> Vec<usize> {
3570 std::env::var("HASHTREE_BTREE_ORDERS")
3571 .ok()
3572 .map(|value| {
3573 value
3574 .split(',')
3575 .filter_map(|part| part.trim().parse::<usize>().ok())
3576 .filter(|order| *order >= 2)
3577 .collect::<Vec<_>>()
3578 })
3579 .filter(|orders| !orders.is_empty())
3580 .unwrap_or_else(|| vec![16, 24, 32, 48, 64])
3581 }
3582
3583 fn benchmark_read_iterations() -> usize {
3584 std::env::var("HASHTREE_BENCH_READ_ITERATIONS")
3585 .ok()
3586 .and_then(|value| value.parse::<usize>().ok())
3587 .unwrap_or(5)
3588 .max(1)
3589 }
3590
3591 fn average_duration(samples: &[Duration]) -> Duration {
3592 if samples.is_empty() {
3593 return Duration::ZERO;
3594 }
3595
3596 Duration::from_secs_f64(
3597 samples.iter().map(Duration::as_secs_f64).sum::<f64>() / samples.len() as f64,
3598 )
3599 }
3600
3601 fn average_read_trace(samples: &[ReadTraceSnapshot]) -> ReadTraceSnapshot {
3602 if samples.is_empty() {
3603 return ReadTraceSnapshot::default();
3604 }
3605
3606 let len = samples.len() as u64;
3607 ReadTraceSnapshot {
3608 get_calls: samples.iter().map(|sample| sample.get_calls).sum::<u64>() / len,
3609 total_bytes: samples.iter().map(|sample| sample.total_bytes).sum::<u64>() / len,
3610 unique_blocks: (samples
3611 .iter()
3612 .map(|sample| sample.unique_blocks as u64)
3613 .sum::<u64>()
3614 / len) as usize,
3615 unique_bytes: samples
3616 .iter()
3617 .map(|sample| sample.unique_bytes)
3618 .sum::<u64>()
3619 / len,
3620 cache_hits: samples.iter().map(|sample| sample.cache_hits).sum::<u64>() / len,
3621 remote_fetches: samples
3622 .iter()
3623 .map(|sample| sample.remote_fetches)
3624 .sum::<u64>()
3625 / len,
3626 remote_bytes: samples
3627 .iter()
3628 .map(|sample| sample.remote_bytes)
3629 .sum::<u64>()
3630 / len,
3631 }
3632 }
3633
3634 fn estimate_serialized_remote_ms(snapshot: &ReadTraceSnapshot, model: NetworkModel) -> f64 {
3635 let transfer_ms = if model.bandwidth_mib_per_s <= 0.0 {
3636 0.0
3637 } else {
3638 (snapshot.remote_bytes as f64 / (model.bandwidth_mib_per_s * 1024.0 * 1024.0)) * 1000.0
3639 };
3640 snapshot.remote_fetches as f64 * model.rtt_ms + transfer_ms
3641 }
3642
3643 #[derive(Debug, Clone)]
3644 struct IndexBenchmarkDataset {
3645 source: String,
3646 events: Vec<Event>,
3647 guaranteed_tag_name: String,
3648 guaranteed_tag_value: String,
3649 replaceable_pubkey: String,
3650 replaceable_kind: u32,
3651 parameterized_pubkey: String,
3652 parameterized_kind: u32,
3653 parameterized_d_tag: String,
3654 }
3655
3656 fn load_index_benchmark_dataset(
3657 event_count: usize,
3658 author_count: usize,
3659 ) -> Result<IndexBenchmarkDataset> {
3660 let (source, mut events) = load_benchmark_events(event_count, author_count)?;
3661 let base_timestamp = events
3662 .iter()
3663 .map(|event| event.created_at.as_u64())
3664 .max()
3665 .unwrap_or(1_700_000_000)
3666 + 1;
3667
3668 let replaceable_keys = Keys::generate();
3669 let parameterized_keys = Keys::generate();
3670 let tagged_keys = Keys::generate();
3671 let guaranteed_tag_name = "t".to_string();
3672 let guaranteed_tag_value = "btreebench".to_string();
3673 let replaceable_kind = 10_000u32;
3674 let parameterized_kind = 30_023u32;
3675 let parameterized_d_tag = "btree-bench".to_string();
3676
3677 let tagged = EventBuilder::new(
3678 Kind::TextNote,
3679 "btree benchmark tagged note",
3680 vec![Tag::parse(&["t", &guaranteed_tag_value]).unwrap()],
3681 )
3682 .custom_created_at(Timestamp::from_secs(base_timestamp))
3683 .to_event(&tagged_keys)
3684 .unwrap();
3685 let replaceable_old = EventBuilder::new(
3686 Kind::Custom(replaceable_kind.try_into().unwrap()),
3687 "replaceable old",
3688 [],
3689 )
3690 .custom_created_at(Timestamp::from_secs(base_timestamp + 1))
3691 .to_event(&replaceable_keys)
3692 .unwrap();
3693 let replaceable_new = EventBuilder::new(
3694 Kind::Custom(replaceable_kind.try_into().unwrap()),
3695 "replaceable new",
3696 [],
3697 )
3698 .custom_created_at(Timestamp::from_secs(base_timestamp + 2))
3699 .to_event(&replaceable_keys)
3700 .unwrap();
3701 let parameterized_old = EventBuilder::new(
3702 Kind::Custom(parameterized_kind.try_into().unwrap()),
3703 "",
3704 vec![Tag::identifier(¶meterized_d_tag)],
3705 )
3706 .custom_created_at(Timestamp::from_secs(base_timestamp + 3))
3707 .to_event(¶meterized_keys)
3708 .unwrap();
3709 let parameterized_new = EventBuilder::new(
3710 Kind::Custom(parameterized_kind.try_into().unwrap()),
3711 "",
3712 vec![Tag::identifier(¶meterized_d_tag)],
3713 )
3714 .custom_created_at(Timestamp::from_secs(base_timestamp + 4))
3715 .to_event(¶meterized_keys)
3716 .unwrap();
3717
3718 events.extend([
3719 tagged,
3720 replaceable_old,
3721 replaceable_new,
3722 parameterized_old,
3723 parameterized_new,
3724 ]);
3725
3726 Ok(IndexBenchmarkDataset {
3727 source,
3728 events,
3729 guaranteed_tag_name,
3730 guaranteed_tag_value,
3731 replaceable_pubkey: replaceable_keys.public_key().to_hex(),
3732 replaceable_kind,
3733 parameterized_pubkey: parameterized_keys.public_key().to_hex(),
3734 parameterized_kind,
3735 parameterized_d_tag,
3736 })
3737 }
3738
3739 fn build_btree_query_cases(dataset: &IndexBenchmarkDataset) -> Vec<BenchmarkQueryCase> {
3740 let primary_kind = dataset
3741 .events
3742 .iter()
3743 .find(|event| event.kind == Kind::TextNote)
3744 .map(|event| event.kind)
3745 .or_else(|| dataset.events.first().map(|event| event.kind))
3746 .expect("benchmark requires at least one event");
3747 let primary_kind_u32 = primary_kind.as_u16() as u32;
3748
3749 let author_pubkey = dataset
3750 .events
3751 .iter()
3752 .filter(|event| event.kind == primary_kind)
3753 .fold(HashMap::<String, usize>::new(), |mut counts, event| {
3754 *counts.entry(event.pubkey.to_hex()).or_default() += 1;
3755 counts
3756 })
3757 .into_iter()
3758 .max_by_key(|(_, count)| *count)
3759 .map(|(pubkey, _)| pubkey)
3760 .expect("benchmark requires an author for the selected kind");
3761
3762 let by_id_id = dataset.events[dataset.events.len() / 2].id.to_hex();
3763
3764 vec![
3765 BenchmarkQueryCase::ById { id: by_id_id },
3766 BenchmarkQueryCase::ByAuthor {
3767 pubkey: author_pubkey.clone(),
3768 limit: 50,
3769 },
3770 BenchmarkQueryCase::ByAuthorKind {
3771 pubkey: author_pubkey,
3772 kind: primary_kind_u32,
3773 limit: 50,
3774 },
3775 BenchmarkQueryCase::ByKind {
3776 kind: primary_kind_u32,
3777 limit: 200,
3778 },
3779 BenchmarkQueryCase::ByTag {
3780 tag_name: dataset.guaranteed_tag_name.clone(),
3781 tag_value: dataset.guaranteed_tag_value.clone(),
3782 limit: 100,
3783 },
3784 BenchmarkQueryCase::Recent { limit: 100 },
3785 BenchmarkQueryCase::Replaceable {
3786 pubkey: dataset.replaceable_pubkey.clone(),
3787 kind: dataset.replaceable_kind,
3788 },
3789 BenchmarkQueryCase::ParameterizedReplaceable {
3790 pubkey: dataset.parameterized_pubkey.clone(),
3791 kind: dataset.parameterized_kind,
3792 d_tag: dataset.parameterized_d_tag.clone(),
3793 },
3794 ]
3795 }
3796
3797 fn benchmark_warm_query_case<S: Store + 'static>(
3798 base: Arc<S>,
3799 root: &Cid,
3800 order: usize,
3801 case: &BenchmarkQueryCase,
3802 iterations: usize,
3803 ) -> QueryBenchmarkResult {
3804 let trace_store = Arc::new(CountingStore::new(base));
3805 let event_store = NostrEventStore::with_options(
3806 Arc::clone(&trace_store),
3807 NostrEventStoreOptions {
3808 btree_order: Some(order),
3809 },
3810 );
3811 let mut durations = Vec::with_capacity(iterations);
3812 let mut traces = Vec::with_capacity(iterations);
3813 for _ in 0..iterations {
3814 trace_store.reset();
3815 let started = Instant::now();
3816 let matches = block_on(case.execute(&event_store, root)).unwrap();
3817 durations.push(started.elapsed());
3818 traces.push(trace_store.snapshot());
3819 assert!(
3820 matches > 0,
3821 "benchmark query {} returned no matches",
3822 case.name()
3823 );
3824 }
3825 let mut sorted = durations.clone();
3826 sorted.sort_unstable();
3827 QueryBenchmarkResult {
3828 average_duration: average_duration(&durations),
3829 p95_duration: duration_percentile(&sorted, 95, 100),
3830 reads: average_read_trace(&traces),
3831 }
3832 }
3833
3834 fn benchmark_cold_query_case<S: Store + 'static>(
3835 remote: Arc<S>,
3836 root: &Cid,
3837 order: usize,
3838 case: &BenchmarkQueryCase,
3839 iterations: usize,
3840 ) -> QueryBenchmarkResult {
3841 let mut durations = Vec::with_capacity(iterations);
3842 let mut traces = Vec::with_capacity(iterations);
3843 for _ in 0..iterations {
3844 let cache = Arc::new(MemoryStore::new());
3845 let trace_store = Arc::new(ReadThroughStore::new(cache, Arc::clone(&remote)));
3846 let event_store = NostrEventStore::with_options(
3847 Arc::clone(&trace_store),
3848 NostrEventStoreOptions {
3849 btree_order: Some(order),
3850 },
3851 );
3852 let started = Instant::now();
3853 let matches = block_on(case.execute(&event_store, root)).unwrap();
3854 durations.push(started.elapsed());
3855 traces.push(trace_store.snapshot());
3856 assert!(
3857 matches > 0,
3858 "benchmark query {} returned no matches",
3859 case.name()
3860 );
3861 }
3862 let mut sorted = durations.clone();
3863 sorted.sort_unstable();
3864 QueryBenchmarkResult {
3865 average_duration: average_duration(&durations),
3866 p95_duration: duration_percentile(&sorted, 95, 100),
3867 reads: average_read_trace(&traces),
3868 }
3869 }
3870
3871 fn duration_percentile(
3872 sorted: &[std::time::Duration],
3873 numerator: usize,
3874 denominator: usize,
3875 ) -> std::time::Duration {
3876 if sorted.is_empty() {
3877 return std::time::Duration::ZERO;
3878 }
3879 let index = ((sorted.len() - 1) * numerator) / denominator;
3880 sorted[index]
3881 }
3882
3883 #[test]
3884 #[ignore = "benchmark"]
3885 fn benchmark_query_events_large_dataset() {
3886 let _guard = test_lock();
3887 let tmp = TempDir::new().unwrap();
3888 let graph_store =
3889 open_social_graph_store_with_mapsize(tmp.path(), Some(512 * 1024 * 1024)).unwrap();
3890 set_nostr_profile_enabled(true);
3891 reset_nostr_profile();
3892
3893 let author_count = 64usize;
3894 let measured_event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3895 .ok()
3896 .and_then(|value| value.parse::<usize>().ok())
3897 .unwrap_or(600usize);
3898 let warmup_event_count = benchmark_stream_warmup_events(measured_event_count);
3899 let total_event_count = warmup_event_count + measured_event_count;
3900 let (source, events) = load_benchmark_events(total_event_count, author_count).unwrap();
3901 let loaded_event_count = events.len();
3902 let warmup_event_count = warmup_event_count.min(loaded_event_count.saturating_sub(1));
3903 let (warmup_events, measured_events) = events.split_at(warmup_event_count);
3904
3905 println!(
3906 "starting steady-state dataset benchmark with {} warmup events and {} measured stream events from {}",
3907 warmup_events.len(),
3908 measured_events.len(),
3909 source
3910 );
3911 if !warmup_events.is_empty() {
3912 ingest_parsed_events(&graph_store, warmup_events).unwrap();
3913 }
3914
3915 let stream_start = Instant::now();
3916 let mut per_event_latencies = Vec::with_capacity(measured_events.len());
3917 for event in measured_events {
3918 let event_start = Instant::now();
3919 ingest_parsed_event(&graph_store, event).unwrap();
3920 per_event_latencies.push(event_start.elapsed());
3921 }
3922 let ingest_duration = stream_start.elapsed();
3923
3924 let mut sorted_latencies = per_event_latencies.clone();
3925 sorted_latencies.sort_unstable();
3926 let average_latency = if per_event_latencies.is_empty() {
3927 std::time::Duration::ZERO
3928 } else {
3929 std::time::Duration::from_secs_f64(
3930 per_event_latencies
3931 .iter()
3932 .map(std::time::Duration::as_secs_f64)
3933 .sum::<f64>()
3934 / per_event_latencies.len() as f64,
3935 )
3936 };
3937 let ingest_capacity_eps = if ingest_duration.is_zero() {
3938 f64::INFINITY
3939 } else {
3940 measured_events.len() as f64 / ingest_duration.as_secs_f64()
3941 };
3942 println!(
3943 "benchmark steady-state ingest complete in {:?} (avg={:?} p50={:?} p95={:?} p99={:?} capacity={:.2} events/s)",
3944 ingest_duration,
3945 average_latency,
3946 duration_percentile(&sorted_latencies, 50, 100),
3947 duration_percentile(&sorted_latencies, 95, 100),
3948 duration_percentile(&sorted_latencies, 99, 100),
3949 ingest_capacity_eps
3950 );
3951 let mut profile = take_nostr_profile();
3952 profile.sort_by(|left, right| right.total.cmp(&left.total));
3953 for stat in profile {
3954 let pct = if ingest_duration.is_zero() {
3955 0.0
3956 } else {
3957 (stat.total.as_secs_f64() / ingest_duration.as_secs_f64()) * 100.0
3958 };
3959 let average = if stat.count == 0 {
3960 std::time::Duration::ZERO
3961 } else {
3962 std::time::Duration::from_secs_f64(stat.total.as_secs_f64() / stat.count as f64)
3963 };
3964 println!(
3965 "ingest profile: label={} total={:?} pct={:.1}% count={} avg={:?} max={:?}",
3966 stat.label, stat.total, pct, stat.count, average, stat.max
3967 );
3968 }
3969 set_nostr_profile_enabled(false);
3970
3971 let kind = events
3972 .iter()
3973 .find(|event| event.kind == Kind::TextNote)
3974 .map(|event| event.kind)
3975 .or_else(|| events.first().map(|event| event.kind))
3976 .expect("benchmark requires at least one event");
3977 let kind_filter = Filter::new().kind(kind);
3978 let kind_start = Instant::now();
3979 let kind_events = query_events(&graph_store, &kind_filter, 200);
3980 let kind_duration = kind_start.elapsed();
3981 assert_eq!(
3982 kind_events.len(),
3983 benchmark_match_count(&events, &kind_filter, 200)
3984 );
3985 assert!(kind_events
3986 .windows(2)
3987 .all(|window| window[0].created_at >= window[1].created_at));
3988
3989 let author_pubkey = events
3990 .iter()
3991 .find(|event| event.kind == kind)
3992 .map(|event| event.pubkey)
3993 .expect("benchmark requires an author for the selected kind");
3994 let author_filter = Filter::new().author(author_pubkey).kind(kind);
3995 let author_start = Instant::now();
3996 let author_events = query_events(&graph_store, &author_filter, 50);
3997 let author_duration = author_start.elapsed();
3998 assert_eq!(
3999 author_events.len(),
4000 benchmark_match_count(&events, &author_filter, 50)
4001 );
4002
4003 let tag_filter = events
4004 .iter()
4005 .find_map(first_tag_filter)
4006 .expect("benchmark requires at least one tagged event");
4007 let tag_start = Instant::now();
4008 let tag_events = query_events(&graph_store, &tag_filter, 100);
4009 let tag_duration = tag_start.elapsed();
4010 assert_eq!(
4011 tag_events.len(),
4012 benchmark_match_count(&events, &tag_filter, 100)
4013 );
4014
4015 let search_source = events
4016 .iter()
4017 .find_map(|event| first_search_term(event).map(|term| (event.kind, term)))
4018 .expect("benchmark requires at least one searchable event");
4019 let search_filter = Filter::new().kind(search_source.0).search(search_source.1);
4020 let search_start = Instant::now();
4021 let search_events = query_events(&graph_store, &search_filter, 100);
4022 let search_duration = search_start.elapsed();
4023 assert_eq!(
4024 search_events.len(),
4025 benchmark_match_count(&events, &search_filter, 100)
4026 );
4027
4028 println!(
4029 "steady-state benchmark: source={} warmup_events={} stream_events={} ingest={:?} avg={:?} p50={:?} p95={:?} p99={:?} capacity_eps={:.2} kind={:?} author={:?} tag={:?} search={:?}",
4030 source,
4031 warmup_events.len(),
4032 measured_events.len(),
4033 ingest_duration,
4034 average_latency,
4035 duration_percentile(&sorted_latencies, 50, 100),
4036 duration_percentile(&sorted_latencies, 95, 100),
4037 duration_percentile(&sorted_latencies, 99, 100),
4038 ingest_capacity_eps,
4039 kind_duration,
4040 author_duration,
4041 tag_duration,
4042 search_duration
4043 );
4044 }
4045
4046 #[test]
4047 #[ignore = "benchmark"]
4048 fn benchmark_nostr_btree_query_tradeoffs() {
4049 let _guard = test_lock();
4050 let event_count = std::env::var("HASHTREE_BENCH_EVENTS")
4051 .ok()
4052 .and_then(|value| value.parse::<usize>().ok())
4053 .unwrap_or(2_000usize);
4054 let iterations = benchmark_read_iterations();
4055 let orders = benchmark_btree_orders();
4056 let dataset = load_index_benchmark_dataset(event_count, 64).unwrap();
4057 let cases = build_btree_query_cases(&dataset);
4058 let stored_events = dataset
4059 .events
4060 .iter()
4061 .map(stored_event_from_nostr)
4062 .collect::<Vec<_>>();
4063
4064 println!(
4065 "btree-order benchmark: source={} events={} iterations={} orders={:?}",
4066 dataset.source,
4067 stored_events.len(),
4068 iterations,
4069 orders
4070 );
4071 println!(
4072 "network models are serialized fetch estimates: {}",
4073 NETWORK_MODELS
4074 .iter()
4075 .map(|model| format!(
4076 "{}={}ms_rtt/{}MiBps",
4077 model.name, model.rtt_ms, model.bandwidth_mib_per_s
4078 ))
4079 .collect::<Vec<_>>()
4080 .join(", ")
4081 );
4082
4083 for order in orders {
4084 let tmp = TempDir::new().unwrap();
4085 let local_store =
4086 Arc::new(LocalStore::new(tmp.path().join("blobs"), &StorageBackend::Lmdb).unwrap());
4087 let event_store = NostrEventStore::with_options(
4088 Arc::clone(&local_store),
4089 NostrEventStoreOptions {
4090 btree_order: Some(order),
4091 },
4092 );
4093 let root = block_on(event_store.build(None, stored_events.clone()))
4094 .unwrap()
4095 .expect("benchmark build root");
4096
4097 println!("btree-order={} root={}", order, hex::encode(root.hash));
4098 let mut warm_total_ms = 0.0f64;
4099 let mut model_totals = NETWORK_MODELS
4100 .iter()
4101 .map(|model| (model.name, 0.0f64))
4102 .collect::<HashMap<_, _>>();
4103
4104 for case in &cases {
4105 let warm = benchmark_warm_query_case(
4106 Arc::clone(&local_store),
4107 &root,
4108 order,
4109 case,
4110 iterations,
4111 );
4112 let cold = benchmark_cold_query_case(
4113 Arc::clone(&local_store),
4114 &root,
4115 order,
4116 case,
4117 iterations,
4118 );
4119 warm_total_ms += warm.average_duration.as_secs_f64() * 1000.0;
4120
4121 let model_estimates = NETWORK_MODELS
4122 .iter()
4123 .map(|model| {
4124 let estimate = estimate_serialized_remote_ms(&cold.reads, *model);
4125 *model_totals.get_mut(model.name).unwrap() += estimate;
4126 format!("{}={:.2}ms", model.name, estimate)
4127 })
4128 .collect::<Vec<_>>()
4129 .join(" ");
4130
4131 println!(
4132 "btree-order={} query={} warm_avg={:?} warm_p95={:?} warm_blocks={} warm_unique_bytes={} cold_fetches={} cold_bytes={} cold_local_avg={:?} {}",
4133 order,
4134 case.name(),
4135 warm.average_duration,
4136 warm.p95_duration,
4137 warm.reads.unique_blocks,
4138 warm.reads.unique_bytes,
4139 cold.reads.remote_fetches,
4140 cold.reads.remote_bytes,
4141 cold.average_duration,
4142 model_estimates
4143 );
4144 }
4145
4146 println!(
4147 "btree-order={} summary unweighted_warm_avg_ms={:.3} {}",
4148 order,
4149 warm_total_ms / cases.len() as f64,
4150 NETWORK_MODELS
4151 .iter()
4152 .map(|model| format!(
4153 "{}={:.2}ms",
4154 model.name,
4155 model_totals[model.name] / cases.len() as f64
4156 ))
4157 .collect::<Vec<_>>()
4158 .join(" ")
4159 );
4160 }
4161 }
4162
4163 #[test]
4164 fn test_ensure_social_graph_mapsize_rounds_and_applies() {
4165 let _guard = test_lock();
4166 let tmp = TempDir::new().unwrap();
4167 ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
4168 let requested = 70 * 1024 * 1024;
4169 ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
4170 let env = unsafe {
4171 heed::EnvOpenOptions::new()
4172 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
4173 .max_dbs(SOCIALGRAPH_MAX_DBS)
4174 .open(tmp.path())
4175 }
4176 .unwrap();
4177 assert!(env.info().map_size >= requested as usize);
4178 assert_eq!(env.info().map_size % page_size_bytes(), 0);
4179 }
4180
4181 #[test]
4182 fn test_ingest_events_batches_graph_updates() {
4183 let _guard = test_lock();
4184 let tmp = TempDir::new().unwrap();
4185 let graph_store = open_social_graph_store(tmp.path()).unwrap();
4186
4187 let root_keys = Keys::generate();
4188 let alice_keys = Keys::generate();
4189 let bob_keys = Keys::generate();
4190
4191 let root_pk = root_keys.public_key().to_bytes();
4192 set_social_graph_root(&graph_store, &root_pk);
4193
4194 let root_follows_alice = EventBuilder::new(
4195 Kind::ContactList,
4196 "",
4197 vec![Tag::public_key(alice_keys.public_key())],
4198 )
4199 .custom_created_at(Timestamp::from_secs(10))
4200 .to_event(&root_keys)
4201 .unwrap();
4202 let alice_follows_bob = EventBuilder::new(
4203 Kind::ContactList,
4204 "",
4205 vec![Tag::public_key(bob_keys.public_key())],
4206 )
4207 .custom_created_at(Timestamp::from_secs(11))
4208 .to_event(&alice_keys)
4209 .unwrap();
4210
4211 ingest_parsed_events(
4212 &graph_store,
4213 &[root_follows_alice.clone(), alice_follows_bob.clone()],
4214 )
4215 .unwrap();
4216
4217 assert_eq!(
4218 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4219 Some(1)
4220 );
4221 assert_eq!(
4222 get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
4223 Some(2)
4224 );
4225
4226 let filter = Filter::new().kind(Kind::ContactList);
4227 let stored = query_events(&graph_store, &filter, 10);
4228 let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
4229 assert!(ids.contains(&root_follows_alice.id));
4230 assert!(ids.contains(&alice_follows_bob.id));
4231 }
4232
4233 #[test]
4234 fn test_ingest_graph_events_updates_graph_without_indexing_events() {
4235 let _guard = test_lock();
4236 let tmp = TempDir::new().unwrap();
4237 let graph_store = open_social_graph_store(tmp.path()).unwrap();
4238
4239 let root_keys = Keys::generate();
4240 let alice_keys = Keys::generate();
4241
4242 let root_pk = root_keys.public_key().to_bytes();
4243 set_social_graph_root(&graph_store, &root_pk);
4244
4245 let root_follows_alice = EventBuilder::new(
4246 Kind::ContactList,
4247 "",
4248 vec![Tag::public_key(alice_keys.public_key())],
4249 )
4250 .custom_created_at(Timestamp::from_secs(10))
4251 .to_event(&root_keys)
4252 .unwrap();
4253
4254 ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
4255 .unwrap();
4256
4257 assert_eq!(
4258 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4259 Some(1)
4260 );
4261 let filter = Filter::new().kind(Kind::ContactList);
4262 assert!(query_events(&graph_store, &filter, 10).is_empty());
4263 }
4264}