1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10 LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::{nhash_encode_full, Cid, HashTree, HashTreeConfig, NHashData};
21use hashtree_index::BTree;
22use hashtree_nostr::{
23 is_parameterized_replaceable_kind, is_replaceable_kind, ListEventsOptions, NostrEventStore,
24 NostrEventStoreError, ProfileGuard as NostrProfileGuard, StoredNostrEvent,
25};
26#[cfg(test)]
27use hashtree_nostr::{
28 reset_profile as reset_nostr_profile, set_profile_enabled as set_nostr_profile_enabled,
29 take_profile as take_nostr_profile,
30};
31use nostr::{Event, Filter, JsonUtil, Kind, SingleLetterTag};
32use nostr_social_graph::{
33 BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
34 SocialGraphBackend as NostrSocialGraphBackend,
35};
36use nostr_social_graph_heed::HeedSocialGraph;
37
38use crate::storage::{LocalStore, StorageRouter};
39
40#[cfg(test)]
41use std::sync::{Mutex, MutexGuard, OnceLock};
42#[cfg(test)]
43use std::time::Instant;
44
45pub type UserSet = BTreeSet<[u8; 32]>;
46
47const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
48const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
49const AMBIENT_EVENTS_ROOT_FILE: &str = "events-root-ambient.msgpack";
50const AMBIENT_EVENTS_BLOB_DIR: &str = "ambient-blobs";
51const PROFILE_SEARCH_ROOT_FILE: &str = "profile-search-root.msgpack";
52const PROFILES_BY_PUBKEY_ROOT_FILE: &str = "profiles-by-pubkey-root.msgpack";
53const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
54const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
55const SOCIALGRAPH_MAX_DBS: u32 = 16;
56const PROFILE_SEARCH_INDEX_ORDER: usize = 64;
57const PROFILE_SEARCH_PREFIX: &str = "p:";
58const PROFILE_NAME_MAX_LENGTH: usize = 100;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum EventStorageClass {
62 Public,
63 Ambient,
64}
65
66#[cfg_attr(not(test), allow(dead_code))]
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub(crate) enum EventQueryScope {
69 PublicOnly,
70 AmbientOnly,
71 All,
72}
73
74struct EventIndexBucket {
75 event_store: NostrEventStore<StorageRouter>,
76 root_path: PathBuf,
77}
78
79struct ProfileIndexBucket {
80 tree: HashTree<StorageRouter>,
81 index: BTree<StorageRouter>,
82 by_pubkey_root_path: PathBuf,
83 search_root_path: PathBuf,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct StoredCid {
88 hash: [u8; 32],
89 key: Option<[u8; 32]>,
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
93pub struct StoredProfileSearchEntry {
94 pub pubkey: String,
95 pub name: String,
96 #[serde(default)]
97 pub aliases: Vec<String>,
98 #[serde(default)]
99 pub nip05: Option<String>,
100 pub created_at: u64,
101 pub event_nhash: String,
102}
103
104#[derive(Debug, Clone, Default, serde::Serialize)]
105pub struct SocialGraphStats {
106 pub total_users: usize,
107 pub root: Option<String>,
108 pub total_follows: usize,
109 pub max_depth: u32,
110 pub size_by_distance: BTreeMap<u32, usize>,
111 pub enabled: bool,
112}
113
114#[derive(Debug, Clone)]
115struct DistanceCache {
116 stats: SocialGraphStats,
117 users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
118}
119
120#[derive(Debug, thiserror::Error)]
121#[error("{0}")]
122pub struct UpstreamGraphBackendError(String);
123
124pub struct SocialGraphStore {
125 graph: StdMutex<HeedSocialGraph>,
126 distance_cache: StdMutex<Option<DistanceCache>>,
127 public_events: EventIndexBucket,
128 ambient_events: EventIndexBucket,
129 profile_index: ProfileIndexBucket,
130}
131
132pub trait SocialGraphBackend: Send + Sync {
133 fn stats(&self) -> Result<SocialGraphStats>;
134 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
135 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
136 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
137 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
138 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
139 fn profile_search_root(&self) -> Result<Option<Cid>> {
140 Ok(None)
141 }
142 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
143 fn ingest_event(&self, event: &Event) -> Result<()>;
144 fn ingest_event_with_storage_class(
145 &self,
146 event: &Event,
147 storage_class: EventStorageClass,
148 ) -> Result<()> {
149 let _ = storage_class;
150 self.ingest_event(event)
151 }
152 fn ingest_events(&self, events: &[Event]) -> Result<()> {
153 for event in events {
154 self.ingest_event(event)?;
155 }
156 Ok(())
157 }
158 fn ingest_events_with_storage_class(
159 &self,
160 events: &[Event],
161 storage_class: EventStorageClass,
162 ) -> Result<()> {
163 for event in events {
164 self.ingest_event_with_storage_class(event, storage_class)?;
165 }
166 Ok(())
167 }
168 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
169 self.ingest_events(events)
170 }
171 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
172}
173
174#[cfg(test)]
175pub type TestLockGuard = MutexGuard<'static, ()>;
176
177#[cfg(test)]
178static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
179
180#[cfg(test)]
181pub fn test_lock() -> TestLockGuard {
182 NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
183}
184
185pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
186 open_social_graph_store_with_mapsize(data_dir, None)
187}
188
189pub fn open_social_graph_store_with_mapsize(
190 data_dir: &Path,
191 mapsize_bytes: Option<u64>,
192) -> Result<Arc<SocialGraphStore>> {
193 let db_dir = data_dir.join("socialgraph");
194 open_social_graph_store_at_path(&db_dir, mapsize_bytes)
195}
196
197pub fn open_social_graph_store_with_storage(
198 data_dir: &Path,
199 store: Arc<StorageRouter>,
200 mapsize_bytes: Option<u64>,
201) -> Result<Arc<SocialGraphStore>> {
202 let db_dir = data_dir.join("socialgraph");
203 open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
204}
205
206pub fn open_social_graph_store_at_path(
207 db_dir: &Path,
208 mapsize_bytes: Option<u64>,
209) -> Result<Arc<SocialGraphStore>> {
210 let config = hashtree_config::Config::load_or_default();
211 let backend = &config.storage.backend;
212 let local_store = Arc::new(
213 LocalStore::new_with_lmdb_map_size(db_dir.join("blobs"), backend, mapsize_bytes)
214 .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
215 );
216 let store = Arc::new(StorageRouter::new(local_store));
217 open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
218}
219
220pub fn open_social_graph_store_at_path_with_storage(
221 db_dir: &Path,
222 store: Arc<StorageRouter>,
223 mapsize_bytes: Option<u64>,
224) -> Result<Arc<SocialGraphStore>> {
225 let ambient_backend = store.local_store().backend();
226 let ambient_local = Arc::new(
227 LocalStore::new_with_lmdb_map_size(
228 db_dir.join(AMBIENT_EVENTS_BLOB_DIR),
229 &ambient_backend,
230 mapsize_bytes,
231 )
232 .map_err(|err| {
233 anyhow::anyhow!("Failed to create social graph ambient blob store: {err}")
234 })?,
235 );
236 let ambient_store = Arc::new(StorageRouter::new(ambient_local));
237 open_social_graph_store_at_path_with_storage_split(db_dir, store, ambient_store, mapsize_bytes)
238}
239
240pub fn open_social_graph_store_at_path_with_storage_split(
241 db_dir: &Path,
242 public_store: Arc<StorageRouter>,
243 ambient_store: Arc<StorageRouter>,
244 mapsize_bytes: Option<u64>,
245) -> Result<Arc<SocialGraphStore>> {
246 std::fs::create_dir_all(db_dir)?;
247 if let Some(size) = mapsize_bytes {
248 ensure_social_graph_mapsize(db_dir, size)?;
249 }
250 let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
251 .context("open nostr-social-graph heed backend")?;
252
253 Ok(Arc::new(SocialGraphStore {
254 graph: StdMutex::new(graph),
255 distance_cache: StdMutex::new(None),
256 public_events: EventIndexBucket {
257 event_store: NostrEventStore::new(Arc::clone(&public_store)),
258 root_path: db_dir.join(EVENTS_ROOT_FILE),
259 },
260 ambient_events: EventIndexBucket {
261 event_store: NostrEventStore::new(ambient_store),
262 root_path: db_dir.join(AMBIENT_EVENTS_ROOT_FILE),
263 },
264 profile_index: ProfileIndexBucket {
265 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&public_store))),
266 index: BTree::new(
267 public_store,
268 hashtree_index::BTreeOptions {
269 order: Some(PROFILE_SEARCH_INDEX_ORDER),
270 },
271 ),
272 by_pubkey_root_path: db_dir.join(PROFILES_BY_PUBKEY_ROOT_FILE),
273 search_root_path: db_dir.join(PROFILE_SEARCH_ROOT_FILE),
274 },
275 }))
276}
277
278pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
279 if let Err(err) = store.set_root(pk_bytes) {
280 tracing::warn!("Failed to set social graph root: {err}");
281 }
282}
283
284pub fn get_follow_distance(
285 backend: &(impl SocialGraphBackend + ?Sized),
286 pk_bytes: &[u8; 32],
287) -> Option<u32> {
288 backend.follow_distance(pk_bytes).ok().flatten()
289}
290
291pub fn get_follows(
292 backend: &(impl SocialGraphBackend + ?Sized),
293 pk_bytes: &[u8; 32],
294) -> Vec<[u8; 32]> {
295 match backend.followed_targets(pk_bytes) {
296 Ok(set) => set.into_iter().collect(),
297 Err(_) => Vec::new(),
298 }
299}
300
301pub fn is_overmuted(
302 backend: &(impl SocialGraphBackend + ?Sized),
303 _root_pk: &[u8; 32],
304 user_pk: &[u8; 32],
305 threshold: f64,
306) -> bool {
307 backend
308 .is_overmuted_user(user_pk, threshold)
309 .unwrap_or(false)
310}
311
312pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
313 let event = match Event::from_json(event_json) {
314 Ok(event) => event,
315 Err(_) => return,
316 };
317
318 if let Err(err) = backend.ingest_event(&event) {
319 tracing::warn!("Failed to ingest social graph event: {err}");
320 }
321}
322
323pub fn ingest_parsed_event(
324 backend: &(impl SocialGraphBackend + ?Sized),
325 event: &Event,
326) -> Result<()> {
327 backend.ingest_event(event)
328}
329
330pub fn ingest_parsed_event_with_storage_class(
331 backend: &(impl SocialGraphBackend + ?Sized),
332 event: &Event,
333 storage_class: EventStorageClass,
334) -> Result<()> {
335 backend.ingest_event_with_storage_class(event, storage_class)
336}
337
338pub fn ingest_parsed_events(
339 backend: &(impl SocialGraphBackend + ?Sized),
340 events: &[Event],
341) -> Result<()> {
342 backend.ingest_events(events)
343}
344
345pub fn ingest_parsed_events_with_storage_class(
346 backend: &(impl SocialGraphBackend + ?Sized),
347 events: &[Event],
348 storage_class: EventStorageClass,
349) -> Result<()> {
350 backend.ingest_events_with_storage_class(events, storage_class)
351}
352
353pub fn ingest_graph_parsed_events(
354 backend: &(impl SocialGraphBackend + ?Sized),
355 events: &[Event],
356) -> Result<()> {
357 backend.ingest_graph_events(events)
358}
359
360pub fn query_events(
361 backend: &(impl SocialGraphBackend + ?Sized),
362 filter: &Filter,
363 limit: usize,
364) -> Vec<Event> {
365 backend.query_events(filter, limit).unwrap_or_default()
366}
367
368impl SocialGraphStore {
369 fn invalidate_distance_cache(&self) {
370 *self.distance_cache.lock().unwrap() = None;
371 }
372
373 fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
374 let unique_ids = state
375 .unique_ids
376 .into_iter()
377 .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
378 .collect::<Result<HashMap<_, _>>>()?;
379
380 let mut users_by_distance = BTreeMap::new();
381 let mut size_by_distance = BTreeMap::new();
382 for (distance, users) in state.users_by_follow_distance {
383 let decoded = users
384 .into_iter()
385 .filter_map(|id| unique_ids.get(&id).copied())
386 .collect::<Vec<_>>();
387 size_by_distance.insert(distance, decoded.len());
388 users_by_distance.insert(distance, decoded);
389 }
390
391 let total_follows = state
392 .followed_by_user
393 .iter()
394 .map(|(_, targets)| targets.len())
395 .sum::<usize>();
396 let total_users = size_by_distance.values().copied().sum();
397 let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
398
399 Ok(DistanceCache {
400 stats: SocialGraphStats {
401 total_users,
402 root: Some(state.root),
403 total_follows,
404 max_depth,
405 size_by_distance,
406 enabled: true,
407 },
408 users_by_distance,
409 })
410 }
411
412 fn load_distance_cache(&self) -> Result<DistanceCache> {
413 if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
414 return Ok(cache);
415 }
416
417 let state = {
418 let graph = self.graph.lock().unwrap();
419 graph.export_state().context("export social graph state")?
420 };
421 let cache = Self::build_distance_cache(state)?;
422 *self.distance_cache.lock().unwrap() = Some(cache.clone());
423 Ok(cache)
424 }
425
426 fn set_root(&self, root: &[u8; 32]) -> Result<()> {
427 let root_hex = hex::encode(root);
428 {
429 let mut graph = self.graph.lock().unwrap();
430 if should_replace_placeholder_root(&graph)? {
431 let fresh = SocialGraph::new(&root_hex);
432 graph
433 .replace_state(&fresh.export_state())
434 .context("replace placeholder social graph root")?;
435 } else {
436 graph
437 .set_root(&root_hex)
438 .context("set nostr-social-graph root")?;
439 }
440 }
441 self.invalidate_distance_cache();
442 Ok(())
443 }
444
445 fn stats(&self) -> Result<SocialGraphStats> {
446 Ok(self.load_distance_cache()?.stats)
447 }
448
449 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
450 let graph = self.graph.lock().unwrap();
451 let distance = graph
452 .get_follow_distance(&hex::encode(pk_bytes))
453 .context("read social graph follow distance")?;
454 Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
455 }
456
457 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
458 Ok(self
459 .load_distance_cache()?
460 .users_by_distance
461 .get(&distance)
462 .cloned()
463 .unwrap_or_default())
464 }
465
466 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
467 let graph = self.graph.lock().unwrap();
468 graph
469 .get_follow_list_created_at(&hex::encode(owner))
470 .context("read social graph follow list timestamp")
471 }
472
473 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
474 let graph = self.graph.lock().unwrap();
475 decode_pubkey_set(
476 graph
477 .get_followed_by_user(&hex::encode(owner))
478 .context("read followed targets")?,
479 )
480 }
481
482 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
483 if threshold <= 0.0 {
484 return Ok(false);
485 }
486 let graph = self.graph.lock().unwrap();
487 graph
488 .is_overmuted(&hex::encode(user_pk), threshold)
489 .context("check social graph overmute")
490 }
491
492 #[cfg_attr(not(test), allow(dead_code))]
493 pub fn profile_search_root(&self) -> Result<Option<Cid>> {
494 self.profile_index.search_root()
495 }
496
497 pub(crate) fn public_events_root(&self) -> Result<Option<Cid>> {
498 self.public_events.events_root()
499 }
500
501 pub(crate) fn write_public_events_root(&self, root: Option<&Cid>) -> Result<()> {
502 self.public_events.write_events_root(root)
503 }
504
505 #[cfg_attr(not(test), allow(dead_code))]
506 pub fn latest_profile_event(&self, pubkey_hex: &str) -> Result<Option<Event>> {
507 self.profile_index.profile_event_for_pubkey(pubkey_hex)
508 }
509
510 #[cfg_attr(not(test), allow(dead_code))]
511 pub fn profile_search_entries_for_prefix(
512 &self,
513 prefix: &str,
514 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
515 self.profile_index.search_entries_for_prefix(prefix)
516 }
517
518 pub fn sync_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
519 self.update_profile_index_for_events(events)
520 }
521
522 pub(crate) fn rebuild_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
523 let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
524 let (by_pubkey_root, search_root) = self
525 .profile_index
526 .rebuild_profile_events(latest_by_pubkey.into_values())?;
527 self.profile_index
528 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
529 self.profile_index.write_search_root(search_root.as_ref())?;
530 Ok(())
531 }
532
533 pub fn rebuild_profile_index_from_stored_events(&self) -> Result<usize> {
534 let public_events_root = self.public_events.events_root()?;
535 let ambient_events_root = self.ambient_events.events_root()?;
536 if public_events_root.is_none() && ambient_events_root.is_none() {
537 self.profile_index.write_by_pubkey_root(None)?;
538 self.profile_index.write_search_root(None)?;
539 return Ok(0);
540 }
541
542 let mut events = Vec::new();
543 for (bucket, root) in [
544 (&self.public_events, public_events_root),
545 (&self.ambient_events, ambient_events_root),
546 ] {
547 let Some(root) = root else {
548 continue;
549 };
550 let stored = block_on(bucket.event_store.list_by_kind_lossy(
551 Some(&root),
552 Kind::Metadata.as_u16() as u32,
553 ListEventsOptions::default(),
554 ))
555 .map_err(map_event_store_error)?;
556 events.extend(
557 stored
558 .into_iter()
559 .map(nostr_event_from_stored)
560 .collect::<Result<Vec<_>>>()?,
561 );
562 }
563
564 let latest_count = latest_metadata_events_by_pubkey(&events).len();
565 self.rebuild_profile_index_for_events(&events)?;
566 Ok(latest_count)
567 }
568
569 fn update_profile_index_for_events(&self, events: &[Event]) -> Result<()> {
570 let latest_by_pubkey = latest_metadata_events_by_pubkey(events);
571
572 if latest_by_pubkey.is_empty() {
573 return Ok(());
574 }
575
576 let mut by_pubkey_root = self.profile_index.by_pubkey_root()?;
577 let mut search_root = self.profile_index.search_root()?;
578 let mut changed = false;
579
580 for event in latest_by_pubkey.into_values() {
581 let (next_by_pubkey_root, next_search_root, updated) = self
582 .profile_index
583 .update_profile_event(by_pubkey_root.as_ref(), search_root.as_ref(), event)?;
584 if updated {
585 by_pubkey_root = next_by_pubkey_root;
586 search_root = next_search_root;
587 changed = true;
588 }
589 }
590
591 if changed {
592 self.profile_index
593 .write_by_pubkey_root(by_pubkey_root.as_ref())?;
594 self.profile_index.write_search_root(search_root.as_ref())?;
595 }
596
597 Ok(())
598 }
599
600 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
601 let state = {
602 let graph = self.graph.lock().unwrap();
603 graph.export_state().context("export social graph state")?
604 };
605 let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
606 let root_hex = hex::encode(root);
607 if graph.get_root() != root_hex {
608 graph
609 .set_root(&root_hex)
610 .context("set snapshot social graph root")?;
611 }
612 let chunks = graph
613 .to_binary_chunks_with_budget(*options)
614 .context("encode social graph snapshot")?;
615 Ok(chunks.into_iter().map(Bytes::from).collect())
616 }
617
618 fn ingest_event(&self, event: &Event) -> Result<()> {
619 self.ingest_event_with_storage_class(event, self.default_storage_class_for(event)?)
620 }
621
622 fn ingest_events(&self, events: &[Event]) -> Result<()> {
623 if events.is_empty() {
624 return Ok(());
625 }
626
627 let mut public = Vec::new();
628 let mut ambient = Vec::new();
629 for event in events {
630 match self.default_storage_class_for(event)? {
631 EventStorageClass::Public => public.push(event.clone()),
632 EventStorageClass::Ambient => ambient.push(event.clone()),
633 }
634 }
635
636 if !public.is_empty() {
637 self.ingest_events_with_storage_class(&public, EventStorageClass::Public)?;
638 }
639 if !ambient.is_empty() {
640 self.ingest_events_with_storage_class(&ambient, EventStorageClass::Ambient)?;
641 }
642
643 Ok(())
644 }
645
646 fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
647 let graph_events = events
648 .iter()
649 .filter(|event| is_social_graph_event(event.kind))
650 .collect::<Vec<_>>();
651 if graph_events.is_empty() {
652 return Ok(());
653 }
654
655 {
656 let mut graph = self.graph.lock().unwrap();
657 let mut snapshot = SocialGraph::from_state(
658 graph
659 .export_state()
660 .context("export social graph state for graph-only ingest")?,
661 )
662 .context("rebuild social graph state for graph-only ingest")?;
663 for event in graph_events {
664 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
665 }
666 graph
667 .replace_state(&snapshot.export_state())
668 .context("replace graph-only social graph state")?;
669 }
670 self.invalidate_distance_cache();
671 Ok(())
672 }
673
674 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
675 self.query_events_in_scope(filter, limit, EventQueryScope::All)
676 }
677
678 fn default_storage_class_for(&self, event: &Event) -> Result<EventStorageClass> {
679 let graph = self.graph.lock().unwrap();
680 let root_hex = graph.get_root().context("read social graph root")?;
681 if root_hex != DEFAULT_ROOT_HEX && root_hex == event.pubkey.to_hex() {
682 return Ok(EventStorageClass::Public);
683 }
684 Ok(EventStorageClass::Ambient)
685 }
686
687 fn bucket(&self, storage_class: EventStorageClass) -> &EventIndexBucket {
688 match storage_class {
689 EventStorageClass::Public => &self.public_events,
690 EventStorageClass::Ambient => &self.ambient_events,
691 }
692 }
693
694 fn ingest_event_with_storage_class(
695 &self,
696 event: &Event,
697 storage_class: EventStorageClass,
698 ) -> Result<()> {
699 let current_root = self.bucket(storage_class).events_root()?;
700 let next_root = self
701 .bucket(storage_class)
702 .store_event(current_root.as_ref(), event)?;
703 self.bucket(storage_class)
704 .write_events_root(Some(&next_root))?;
705
706 self.update_profile_index_for_events(std::slice::from_ref(event))?;
707
708 if is_social_graph_event(event.kind) {
709 {
710 let mut graph = self.graph.lock().unwrap();
711 graph
712 .handle_event(&graph_event_from_nostr(event), true, 0.0)
713 .context("ingest social graph event into nostr-social-graph")?;
714 }
715 self.invalidate_distance_cache();
716 }
717
718 Ok(())
719 }
720
721 fn ingest_events_with_storage_class(
722 &self,
723 events: &[Event],
724 storage_class: EventStorageClass,
725 ) -> Result<()> {
726 if events.is_empty() {
727 return Ok(());
728 }
729
730 let bucket = self.bucket(storage_class);
731 let current_root = bucket.events_root()?;
732 let stored_events = events
733 .iter()
734 .map(stored_event_from_nostr)
735 .collect::<Vec<_>>();
736 let next_root = block_on(
737 bucket
738 .event_store
739 .build(current_root.as_ref(), stored_events),
740 )
741 .map_err(map_event_store_error)?;
742 bucket.write_events_root(next_root.as_ref())?;
743
744 self.update_profile_index_for_events(events)?;
745
746 let graph_events = events
747 .iter()
748 .filter(|event| is_social_graph_event(event.kind))
749 .collect::<Vec<_>>();
750 if graph_events.is_empty() {
751 return Ok(());
752 }
753
754 {
755 let mut graph = self.graph.lock().unwrap();
756 let mut snapshot = SocialGraph::from_state(
757 graph
758 .export_state()
759 .context("export social graph state for batch ingest")?,
760 )
761 .context("rebuild social graph state for batch ingest")?;
762 for event in graph_events {
763 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
764 }
765 graph
766 .replace_state(&snapshot.export_state())
767 .context("replace batched social graph state")?;
768 }
769 self.invalidate_distance_cache();
770
771 Ok(())
772 }
773
774 pub(crate) fn query_events_in_scope(
775 &self,
776 filter: &Filter,
777 limit: usize,
778 scope: EventQueryScope,
779 ) -> Result<Vec<Event>> {
780 if limit == 0 {
781 return Ok(Vec::new());
782 }
783
784 let buckets: &[&EventIndexBucket] = match scope {
785 EventQueryScope::PublicOnly => &[&self.public_events],
786 EventQueryScope::AmbientOnly => &[&self.ambient_events],
787 EventQueryScope::All => &[&self.public_events, &self.ambient_events],
788 };
789
790 let mut candidates = Vec::new();
791 for bucket in buckets {
792 candidates.extend(bucket.query_events(filter, limit)?);
793 }
794
795 let mut deduped = dedupe_events(candidates);
796 deduped.retain(|event| filter.match_event(event));
797 deduped.truncate(limit);
798 Ok(deduped)
799 }
800}
801
802impl EventIndexBucket {
803 fn events_root(&self) -> Result<Option<Cid>> {
804 let _profile = NostrProfileGuard::new("socialgraph.events_root.read");
805 read_root_file(&self.root_path)
806 }
807
808 fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
809 let _profile = NostrProfileGuard::new("socialgraph.events_root.write");
810 write_root_file(&self.root_path, root)
811 }
812
813 fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
814 let stored = stored_event_from_nostr(event);
815 let _profile = NostrProfileGuard::new("socialgraph.event_store.add");
816 block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
817 }
818
819 fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
820 let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
821 .map_err(map_event_store_error)?;
822 stored.map(nostr_event_from_stored).transpose()
823 }
824
825 fn load_events_for_author(
826 &self,
827 root: &Cid,
828 author: &nostr::PublicKey,
829 filter: &Filter,
830 limit: usize,
831 exact: bool,
832 ) -> Result<Vec<Event>> {
833 let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
834 if kinds.len() == 1 {
835 kinds.iter().next().map(|kind| kind.as_u16() as u32)
836 } else {
837 None
838 }
839 });
840 let author_hex = author.to_hex();
841 let options = filter_list_options(filter, limit, exact);
842 let stored = match kind_filter {
843 Some(kind) => block_on(self.event_store.list_by_author_and_kind(
844 Some(root),
845 &author_hex,
846 kind,
847 options.clone(),
848 ))
849 .map_err(map_event_store_error)?,
850 None => block_on(
851 self.event_store
852 .list_by_author(Some(root), &author_hex, options),
853 )
854 .map_err(map_event_store_error)?,
855 };
856 stored
857 .into_iter()
858 .map(nostr_event_from_stored)
859 .collect::<Result<Vec<_>>>()
860 }
861
862 fn load_events_for_kind(
863 &self,
864 root: &Cid,
865 kind: Kind,
866 filter: &Filter,
867 limit: usize,
868 exact: bool,
869 ) -> Result<Vec<Event>> {
870 let stored = block_on(self.event_store.list_by_kind(
871 Some(root),
872 kind.as_u16() as u32,
873 filter_list_options(filter, limit, exact),
874 ))
875 .map_err(map_event_store_error)?;
876 stored
877 .into_iter()
878 .map(nostr_event_from_stored)
879 .collect::<Result<Vec<_>>>()
880 }
881
882 fn load_recent_events(
883 &self,
884 root: &Cid,
885 filter: &Filter,
886 limit: usize,
887 exact: bool,
888 ) -> Result<Vec<Event>> {
889 let stored = block_on(
890 self.event_store
891 .list_recent(Some(root), filter_list_options(filter, limit, exact)),
892 )
893 .map_err(map_event_store_error)?;
894 stored
895 .into_iter()
896 .map(nostr_event_from_stored)
897 .collect::<Result<Vec<_>>>()
898 }
899
900 fn load_events_for_tag(
901 &self,
902 root: &Cid,
903 tag_name: &str,
904 values: &[String],
905 filter: &Filter,
906 limit: usize,
907 exact: bool,
908 ) -> Result<Vec<Event>> {
909 let mut events = Vec::new();
910 let options = filter_list_options(filter, limit, exact);
911 for value in values {
912 let stored = block_on(self.event_store.list_by_tag(
913 Some(root),
914 tag_name,
915 value,
916 options.clone(),
917 ))
918 .map_err(map_event_store_error)?;
919 events.extend(
920 stored
921 .into_iter()
922 .map(nostr_event_from_stored)
923 .collect::<Result<Vec<_>>>()?,
924 );
925 }
926 Ok(dedupe_events(events))
927 }
928
929 fn choose_tag_source(&self, filter: &Filter) -> Option<(String, Vec<String>)> {
930 filter
931 .generic_tags
932 .iter()
933 .min_by_key(|(_, values)| values.len())
934 .map(|(tag, values)| {
935 (
936 tag.as_char().to_ascii_lowercase().to_string(),
937 values.iter().cloned().collect(),
938 )
939 })
940 }
941
942 fn load_major_index_candidates(
943 &self,
944 root: &Cid,
945 filter: &Filter,
946 limit: usize,
947 ) -> Result<Option<Vec<Event>>> {
948 if let Some(events) = self.load_direct_replaceable_candidates(root, filter)? {
949 return Ok(Some(events));
950 }
951
952 if let Some((tag_name, values)) = self.choose_tag_source(filter) {
953 let exact = filter.authors.is_none()
954 && filter.kinds.is_none()
955 && filter.search.is_none()
956 && filter.generic_tags.len() == 1;
957 return Ok(Some(self.load_events_for_tag(
958 root, &tag_name, &values, filter, limit, exact,
959 )?));
960 }
961
962 if let (Some(authors), Some(kinds)) = (filter.authors.as_ref(), filter.kinds.as_ref()) {
963 if authors.len() == 1 && kinds.len() == 1 {
964 let author = authors.iter().next().expect("checked single author");
965 let exact = filter.generic_tags.is_empty() && filter.search.is_none();
966 return Ok(Some(
967 self.load_events_for_author(root, author, filter, limit, exact)?,
968 ));
969 }
970
971 if kinds.len() < authors.len() {
972 let mut events = Vec::new();
973 for kind in kinds {
974 events.extend(self.load_events_for_kind(root, *kind, filter, limit, false)?);
975 }
976 return Ok(Some(dedupe_events(events)));
977 }
978
979 let mut events = Vec::new();
980 for author in authors {
981 events.extend(self.load_events_for_author(root, author, filter, limit, false)?);
982 }
983 return Ok(Some(dedupe_events(events)));
984 }
985
986 if let Some(authors) = filter.authors.as_ref() {
987 let mut events = Vec::new();
988 let exact = filter.generic_tags.is_empty() && filter.search.is_none();
989 for author in authors {
990 events.extend(self.load_events_for_author(root, author, filter, limit, exact)?);
991 }
992 return Ok(Some(dedupe_events(events)));
993 }
994
995 if let Some(kinds) = filter.kinds.as_ref() {
996 let mut events = Vec::new();
997 let exact = filter.authors.is_none()
998 && filter.generic_tags.is_empty()
999 && filter.search.is_none();
1000 for kind in kinds {
1001 events.extend(self.load_events_for_kind(root, *kind, filter, limit, exact)?);
1002 }
1003 return Ok(Some(dedupe_events(events)));
1004 }
1005
1006 Ok(None)
1007 }
1008
1009 fn load_direct_replaceable_candidates(
1010 &self,
1011 root: &Cid,
1012 filter: &Filter,
1013 ) -> Result<Option<Vec<Event>>> {
1014 let Some(authors) = filter.authors.as_ref() else {
1015 return Ok(None);
1016 };
1017 let Some(kinds) = filter.kinds.as_ref() else {
1018 return Ok(None);
1019 };
1020 if kinds.len() != 1 {
1021 return Ok(None);
1022 }
1023
1024 let kind = kinds.iter().next().expect("checked single kind").as_u16() as u32;
1025
1026 if is_parameterized_replaceable_kind(kind) {
1027 let d_tag = SingleLetterTag::lowercase(nostr::Alphabet::D);
1028 let Some(d_values) = filter.generic_tags.get(&d_tag) else {
1029 return Ok(None);
1030 };
1031 let mut events = Vec::new();
1032 for author in authors {
1033 let author_hex = author.to_hex();
1034 for d_value in d_values {
1035 if let Some(stored) = block_on(self.event_store.get_parameterized_replaceable(
1036 Some(root),
1037 &author_hex,
1038 kind,
1039 d_value,
1040 ))
1041 .map_err(map_event_store_error)?
1042 {
1043 events.push(nostr_event_from_stored(stored)?);
1044 }
1045 }
1046 }
1047 return Ok(Some(dedupe_events(events)));
1048 }
1049
1050 if is_replaceable_kind(kind) {
1051 let mut events = Vec::new();
1052 for author in authors {
1053 if let Some(stored) = block_on(self.event_store.get_replaceable(
1054 Some(root),
1055 &author.to_hex(),
1056 kind,
1057 ))
1058 .map_err(map_event_store_error)?
1059 {
1060 events.push(nostr_event_from_stored(stored)?);
1061 }
1062 }
1063 return Ok(Some(dedupe_events(events)));
1064 }
1065
1066 Ok(None)
1067 }
1068
1069 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1070 if limit == 0 {
1071 return Ok(Vec::new());
1072 }
1073
1074 let events_root = self.events_root()?;
1075 let Some(root) = events_root.as_ref() else {
1076 return Ok(Vec::new());
1077 };
1078 let mut candidates = Vec::new();
1079 let mut seen: HashSet<[u8; 32]> = HashSet::new();
1080
1081 if let Some(ids) = filter.ids.as_ref() {
1082 for id in ids {
1083 let id_bytes = id.to_bytes();
1084 if !seen.insert(id_bytes) {
1085 continue;
1086 }
1087 if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
1088 if filter.match_event(&event) {
1089 candidates.push(event);
1090 }
1091 }
1092 if candidates.len() >= limit {
1093 break;
1094 }
1095 }
1096 } else {
1097 let base_events = match self.load_major_index_candidates(root, filter, limit)? {
1098 Some(events) => events,
1099 None => self.load_recent_events(
1100 root,
1101 filter,
1102 limit,
1103 filter.authors.is_none()
1104 && filter.kinds.is_none()
1105 && filter.generic_tags.is_empty()
1106 && filter.search.is_none(),
1107 )?,
1108 };
1109
1110 for event in base_events {
1111 let id_bytes = event.id.to_bytes();
1112 if !seen.insert(id_bytes) {
1113 continue;
1114 }
1115 if filter.match_event(&event) {
1116 candidates.push(event);
1117 }
1118 if candidates.len() >= limit {
1119 break;
1120 }
1121 }
1122 }
1123
1124 candidates.sort_by(|a, b| {
1125 b.created_at
1126 .as_u64()
1127 .cmp(&a.created_at.as_u64())
1128 .then_with(|| a.id.cmp(&b.id))
1129 });
1130 candidates.truncate(limit);
1131 Ok(candidates)
1132 }
1133}
1134
1135impl ProfileIndexBucket {
1136 fn by_pubkey_root(&self) -> Result<Option<Cid>> {
1137 let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.read");
1138 read_root_file(&self.by_pubkey_root_path)
1139 }
1140
1141 fn search_root(&self) -> Result<Option<Cid>> {
1142 let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.read");
1143 read_root_file(&self.search_root_path)
1144 }
1145
1146 fn write_by_pubkey_root(&self, root: Option<&Cid>) -> Result<()> {
1147 let _profile = NostrProfileGuard::new("socialgraph.profile_by_pubkey_root.write");
1148 write_root_file(&self.by_pubkey_root_path, root)
1149 }
1150
1151 fn write_search_root(&self, root: Option<&Cid>) -> Result<()> {
1152 let _profile = NostrProfileGuard::new("socialgraph.profile_search_root.write");
1153 write_root_file(&self.search_root_path, root)
1154 }
1155
1156 fn mirror_profile_event(&self, event: &Event) -> Result<Cid> {
1157 let bytes = event.as_json().into_bytes();
1158 block_on(self.tree.put_file(&bytes))
1159 .map(|(cid, _size)| cid)
1160 .context("store mirrored profile event")
1161 }
1162
1163 fn load_profile_event(&self, cid: &Cid) -> Result<Option<Event>> {
1164 let bytes = block_on(self.tree.get(cid, None)).context("read mirrored profile event")?;
1165 let Some(bytes) = bytes else {
1166 return Ok(None);
1167 };
1168 let json = String::from_utf8(bytes).context("decode mirrored profile event as utf-8")?;
1169 Ok(Some(
1170 Event::from_json(json).context("decode mirrored profile event json")?,
1171 ))
1172 }
1173
1174 #[cfg_attr(not(test), allow(dead_code))]
1175 fn profile_event_for_pubkey(&self, pubkey_hex: &str) -> Result<Option<Event>> {
1176 let root = self.by_pubkey_root()?;
1177 let Some(cid) = block_on(self.index.get_link(root.as_ref(), pubkey_hex))
1178 .context("read mirrored profile event cid by pubkey")?
1179 else {
1180 return Ok(None);
1181 };
1182 self.load_profile_event(&cid)
1183 }
1184
1185 #[cfg_attr(not(test), allow(dead_code))]
1186 fn search_entries_for_prefix(
1187 &self,
1188 prefix: &str,
1189 ) -> Result<Vec<(String, StoredProfileSearchEntry)>> {
1190 let Some(root) = self.search_root()? else {
1191 return Ok(Vec::new());
1192 };
1193 let entries = block_on(self.index.prefix(&root, prefix)).context("query profile search prefix")?;
1194 entries
1195 .into_iter()
1196 .map(|(key, value)| {
1197 let entry = serde_json::from_str(&value)
1198 .context("decode stored profile search entry json")?;
1199 Ok((key, entry))
1200 })
1201 .collect()
1202 }
1203
1204 fn rebuild_profile_events<'a, I>(&self, events: I) -> Result<(Option<Cid>, Option<Cid>)>
1205 where
1206 I: IntoIterator<Item = &'a Event>,
1207 {
1208 let mut by_pubkey_entries = Vec::<(String, Cid)>::new();
1209 let mut search_entries = Vec::<(String, String)>::new();
1210
1211 for event in events {
1212 let pubkey = event.pubkey.to_hex();
1213 let mirrored_cid = self.mirror_profile_event(event)?;
1214 let search_value =
1215 serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1216 by_pubkey_entries.push((pubkey.clone(), mirrored_cid.clone()));
1217 for term in profile_search_terms_for_event(event) {
1218 search_entries.push((format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"), search_value.clone()));
1219 }
1220 }
1221
1222 let by_pubkey_root = block_on(self.index.build_links(by_pubkey_entries))
1223 .context("bulk build mirrored profile-by-pubkey index")?;
1224 let search_root = block_on(self.index.build(search_entries))
1225 .context("bulk build mirrored profile search index")?;
1226 Ok((by_pubkey_root, search_root))
1227 }
1228
1229 fn update_profile_event(
1230 &self,
1231 by_pubkey_root: Option<&Cid>,
1232 search_root: Option<&Cid>,
1233 event: &Event,
1234 ) -> Result<(Option<Cid>, Option<Cid>, bool)> {
1235 let pubkey = event.pubkey.to_hex();
1236 let existing_cid = block_on(self.index.get_link(by_pubkey_root, &pubkey))
1237 .context("lookup existing mirrored profile event")?;
1238
1239 let existing_event = match existing_cid.as_ref() {
1240 Some(cid) => self.load_profile_event(cid)?,
1241 None => None,
1242 };
1243
1244 if existing_event
1245 .as_ref()
1246 .is_some_and(|current| compare_nostr_events(event, current).is_le())
1247 {
1248 return Ok((by_pubkey_root.cloned(), search_root.cloned(), false));
1249 }
1250
1251 let mirrored_cid = self.mirror_profile_event(event)?;
1252 let next_by_pubkey_root = Some(
1253 block_on(
1254 self.index
1255 .insert_link(by_pubkey_root, &pubkey, &mirrored_cid),
1256 )
1257 .context("write mirrored profile event index")?,
1258 );
1259
1260 let mut next_search_root = search_root.cloned();
1261 if let Some(current) = existing_event.as_ref() {
1262 for term in profile_search_terms_for_event(current) {
1263 let Some(root) = next_search_root.as_ref() else {
1264 break;
1265 };
1266 next_search_root = block_on(
1267 self.index
1268 .delete(root, &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}")),
1269 )
1270 .context("remove stale profile search term")?;
1271 }
1272 }
1273
1274 let search_value =
1275 serialize_profile_search_entry(&build_profile_search_entry(event, &mirrored_cid)?)?;
1276 for term in profile_search_terms_for_event(event) {
1277 next_search_root = Some(
1278 block_on(self.index.insert(
1279 next_search_root.as_ref(),
1280 &format!("{PROFILE_SEARCH_PREFIX}{term}:{pubkey}"),
1281 &search_value,
1282 ))
1283 .context("write profile search term")?,
1284 );
1285 }
1286
1287 Ok((next_by_pubkey_root, next_search_root, true))
1288 }
1289}
1290
1291fn latest_metadata_events_by_pubkey<'a>(events: &'a [Event]) -> BTreeMap<String, &'a Event> {
1292 let mut latest_by_pubkey = BTreeMap::<String, &Event>::new();
1293 for event in events.iter().filter(|event| event.kind == Kind::Metadata) {
1294 let pubkey = event.pubkey.to_hex();
1295 match latest_by_pubkey.get(&pubkey) {
1296 Some(current) if compare_nostr_events(event, current).is_le() => {}
1297 _ => {
1298 latest_by_pubkey.insert(pubkey, event);
1299 }
1300 }
1301 }
1302 latest_by_pubkey
1303}
1304
1305fn serialize_profile_search_entry(entry: &StoredProfileSearchEntry) -> Result<String> {
1306 serde_json::to_string(entry).context("encode stored profile search entry json")
1307}
1308
1309fn cid_to_nhash(cid: &Cid) -> Result<String> {
1310 nhash_encode_full(&NHashData {
1311 hash: cid.hash,
1312 decrypt_key: cid.key,
1313 })
1314 .context("encode mirrored profile event nhash")
1315}
1316
1317fn build_profile_search_entry(event: &Event, mirrored_cid: &Cid) -> Result<StoredProfileSearchEntry> {
1318 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1319 Ok(serde_json::Value::Object(profile)) => profile,
1320 _ => serde_json::Map::new(),
1321 };
1322 let names = extract_profile_names(&profile);
1323 let primary_name = names.first().cloned();
1324 let nip05 = normalize_profile_nip05(&profile, primary_name.as_deref());
1325 let name = primary_name
1326 .clone()
1327 .or_else(|| nip05.clone())
1328 .unwrap_or_else(|| event.pubkey.to_hex());
1329
1330 Ok(StoredProfileSearchEntry {
1331 pubkey: event.pubkey.to_hex(),
1332 name,
1333 aliases: names.into_iter().skip(1).collect(),
1334 nip05,
1335 created_at: event.created_at.as_u64(),
1336 event_nhash: cid_to_nhash(mirrored_cid)?,
1337 })
1338}
1339
1340fn filter_list_options(filter: &Filter, limit: usize, exact: bool) -> ListEventsOptions {
1341 ListEventsOptions {
1342 limit: exact.then_some(limit.max(1)),
1343 since: filter.since.map(|timestamp| timestamp.as_u64()),
1344 until: filter.until.map(|timestamp| timestamp.as_u64()),
1345 }
1346}
1347
1348fn dedupe_events(events: Vec<Event>) -> Vec<Event> {
1349 let mut seen = HashSet::new();
1350 let mut deduped = Vec::new();
1351 for event in events {
1352 if seen.insert(event.id.to_bytes()) {
1353 deduped.push(event);
1354 }
1355 }
1356 deduped.sort_by(|a, b| {
1357 b.created_at
1358 .as_u64()
1359 .cmp(&a.created_at.as_u64())
1360 .then_with(|| a.id.cmp(&b.id))
1361 });
1362 deduped
1363}
1364
1365impl SocialGraphBackend for SocialGraphStore {
1366 fn stats(&self) -> Result<SocialGraphStats> {
1367 SocialGraphStore::stats(self)
1368 }
1369
1370 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1371 SocialGraphStore::users_by_follow_distance(self, distance)
1372 }
1373
1374 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1375 SocialGraphStore::follow_distance(self, pk_bytes)
1376 }
1377
1378 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1379 SocialGraphStore::follow_list_created_at(self, owner)
1380 }
1381
1382 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1383 SocialGraphStore::followed_targets(self, owner)
1384 }
1385
1386 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1387 SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
1388 }
1389
1390 fn profile_search_root(&self) -> Result<Option<Cid>> {
1391 SocialGraphStore::profile_search_root(self)
1392 }
1393
1394 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1395 SocialGraphStore::snapshot_chunks(self, root, options)
1396 }
1397
1398 fn ingest_event(&self, event: &Event) -> Result<()> {
1399 SocialGraphStore::ingest_event(self, event)
1400 }
1401
1402 fn ingest_event_with_storage_class(
1403 &self,
1404 event: &Event,
1405 storage_class: EventStorageClass,
1406 ) -> Result<()> {
1407 SocialGraphStore::ingest_event_with_storage_class(self, event, storage_class)
1408 }
1409
1410 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1411 SocialGraphStore::ingest_events(self, events)
1412 }
1413
1414 fn ingest_events_with_storage_class(
1415 &self,
1416 events: &[Event],
1417 storage_class: EventStorageClass,
1418 ) -> Result<()> {
1419 SocialGraphStore::ingest_events_with_storage_class(self, events, storage_class)
1420 }
1421
1422 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1423 SocialGraphStore::apply_graph_events_only(self, events)
1424 }
1425
1426 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1427 SocialGraphStore::query_events(self, filter, limit)
1428 }
1429}
1430
1431impl NostrSocialGraphBackend for SocialGraphStore {
1432 type Error = UpstreamGraphBackendError;
1433
1434 fn get_root(&self) -> std::result::Result<String, Self::Error> {
1435 let graph = self.graph.lock().unwrap();
1436 graph
1437 .get_root()
1438 .context("read social graph root")
1439 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1440 }
1441
1442 fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
1443 let root_bytes =
1444 decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1445 SocialGraphStore::set_root(self, &root_bytes)
1446 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1447 }
1448
1449 fn handle_event(
1450 &mut self,
1451 event: &GraphEvent,
1452 allow_unknown_authors: bool,
1453 overmute_threshold: f64,
1454 ) -> std::result::Result<(), Self::Error> {
1455 {
1456 let mut graph = self.graph.lock().unwrap();
1457 graph
1458 .handle_event(event, allow_unknown_authors, overmute_threshold)
1459 .context("ingest social graph event into heed backend")
1460 .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
1461 }
1462 self.invalidate_distance_cache();
1463 Ok(())
1464 }
1465
1466 fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
1467 let graph = self.graph.lock().unwrap();
1468 graph
1469 .get_follow_distance(user)
1470 .context("read social graph follow distance")
1471 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1472 }
1473
1474 fn is_following(
1475 &self,
1476 follower: &str,
1477 followed_user: &str,
1478 ) -> std::result::Result<bool, Self::Error> {
1479 let graph = self.graph.lock().unwrap();
1480 graph
1481 .is_following(follower, followed_user)
1482 .context("read social graph following edge")
1483 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1484 }
1485
1486 fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1487 let graph = self.graph.lock().unwrap();
1488 graph
1489 .get_followed_by_user(user)
1490 .context("read followed-by-user list")
1491 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1492 }
1493
1494 fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1495 let graph = self.graph.lock().unwrap();
1496 graph
1497 .get_followers_by_user(user)
1498 .context("read followers-by-user list")
1499 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1500 }
1501
1502 fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1503 let graph = self.graph.lock().unwrap();
1504 graph
1505 .get_muted_by_user(user)
1506 .context("read muted-by-user list")
1507 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1508 }
1509
1510 fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
1511 let graph = self.graph.lock().unwrap();
1512 graph
1513 .get_user_muted_by(user)
1514 .context("read user-muted-by list")
1515 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1516 }
1517
1518 fn get_follow_list_created_at(
1519 &self,
1520 user: &str,
1521 ) -> std::result::Result<Option<u64>, Self::Error> {
1522 let graph = self.graph.lock().unwrap();
1523 graph
1524 .get_follow_list_created_at(user)
1525 .context("read social graph follow list timestamp")
1526 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1527 }
1528
1529 fn get_mute_list_created_at(
1530 &self,
1531 user: &str,
1532 ) -> std::result::Result<Option<u64>, Self::Error> {
1533 let graph = self.graph.lock().unwrap();
1534 graph
1535 .get_mute_list_created_at(user)
1536 .context("read social graph mute list timestamp")
1537 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1538 }
1539
1540 fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
1541 let graph = self.graph.lock().unwrap();
1542 graph
1543 .is_overmuted(user, threshold)
1544 .context("check social graph overmute")
1545 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
1546 }
1547}
1548
1549impl<T> SocialGraphBackend for Arc<T>
1550where
1551 T: SocialGraphBackend + ?Sized,
1552{
1553 fn stats(&self) -> Result<SocialGraphStats> {
1554 self.as_ref().stats()
1555 }
1556
1557 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
1558 self.as_ref().users_by_follow_distance(distance)
1559 }
1560
1561 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
1562 self.as_ref().follow_distance(pk_bytes)
1563 }
1564
1565 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
1566 self.as_ref().follow_list_created_at(owner)
1567 }
1568
1569 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
1570 self.as_ref().followed_targets(owner)
1571 }
1572
1573 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
1574 self.as_ref().is_overmuted_user(user_pk, threshold)
1575 }
1576
1577 fn profile_search_root(&self) -> Result<Option<Cid>> {
1578 self.as_ref().profile_search_root()
1579 }
1580
1581 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
1582 self.as_ref().snapshot_chunks(root, options)
1583 }
1584
1585 fn ingest_event(&self, event: &Event) -> Result<()> {
1586 self.as_ref().ingest_event(event)
1587 }
1588
1589 fn ingest_event_with_storage_class(
1590 &self,
1591 event: &Event,
1592 storage_class: EventStorageClass,
1593 ) -> Result<()> {
1594 self.as_ref()
1595 .ingest_event_with_storage_class(event, storage_class)
1596 }
1597
1598 fn ingest_events(&self, events: &[Event]) -> Result<()> {
1599 self.as_ref().ingest_events(events)
1600 }
1601
1602 fn ingest_events_with_storage_class(
1603 &self,
1604 events: &[Event],
1605 storage_class: EventStorageClass,
1606 ) -> Result<()> {
1607 self.as_ref()
1608 .ingest_events_with_storage_class(events, storage_class)
1609 }
1610
1611 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
1612 self.as_ref().ingest_graph_events(events)
1613 }
1614
1615 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
1616 self.as_ref().query_events(filter, limit)
1617 }
1618}
1619
1620fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
1621 if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
1622 return Ok(false);
1623 }
1624
1625 let GraphStats {
1626 users,
1627 follows,
1628 mutes,
1629 ..
1630 } = graph.size().context("size social graph")?;
1631 Ok(users <= 1 && follows == 0 && mutes == 0)
1632}
1633
1634fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
1635 let mut set = UserSet::new();
1636 for value in values {
1637 set.insert(decode_pubkey(&value)?);
1638 }
1639 Ok(set)
1640}
1641
1642fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
1643 let mut bytes = [0u8; 32];
1644 hex::decode_to_slice(value, &mut bytes)
1645 .with_context(|| format!("decode social graph pubkey {value}"))?;
1646 Ok(bytes)
1647}
1648
1649fn is_social_graph_event(kind: Kind) -> bool {
1650 kind == Kind::ContactList || kind == Kind::MuteList
1651}
1652
1653fn graph_event_from_nostr(event: &Event) -> GraphEvent {
1654 GraphEvent {
1655 created_at: event.created_at.as_u64(),
1656 content: event.content.clone(),
1657 tags: event
1658 .tags
1659 .iter()
1660 .map(|tag| tag.as_slice().to_vec())
1661 .collect(),
1662 kind: event.kind.as_u16() as u32,
1663 pubkey: event.pubkey.to_hex(),
1664 id: event.id.to_hex(),
1665 sig: event.sig.to_string(),
1666 }
1667}
1668
1669fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
1670 StoredNostrEvent {
1671 id: event.id.to_hex(),
1672 pubkey: event.pubkey.to_hex(),
1673 created_at: event.created_at.as_u64(),
1674 kind: event.kind.as_u16() as u32,
1675 tags: event
1676 .tags
1677 .iter()
1678 .map(|tag| tag.as_slice().to_vec())
1679 .collect(),
1680 content: event.content.clone(),
1681 sig: event.sig.to_string(),
1682 }
1683}
1684
1685fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
1686 let value = serde_json::json!({
1687 "id": event.id,
1688 "pubkey": event.pubkey,
1689 "created_at": event.created_at,
1690 "kind": event.kind,
1691 "tags": event.tags,
1692 "content": event.content,
1693 "sig": event.sig,
1694 });
1695 Event::from_json(value.to_string()).context("decode stored nostr event")
1696}
1697
1698pub(crate) fn stored_event_to_nostr_event(event: StoredNostrEvent) -> Result<Event> {
1699 nostr_event_from_stored(event)
1700}
1701
1702fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
1703 rmp_serde::to_vec_named(&StoredCid {
1704 hash: cid.hash,
1705 key: cid.key,
1706 })
1707 .context("encode social graph events root")
1708}
1709
1710fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
1711 let stored: StoredCid =
1712 rmp_serde::from_slice(bytes).context("decode social graph events root")?;
1713 Ok(Some(Cid {
1714 hash: stored.hash,
1715 key: stored.key,
1716 }))
1717}
1718
1719fn read_root_file(path: &Path) -> Result<Option<Cid>> {
1720 let Ok(bytes) = std::fs::read(path) else {
1721 return Ok(None);
1722 };
1723 decode_cid(&bytes)
1724}
1725
1726fn write_root_file(path: &Path, root: Option<&Cid>) -> Result<()> {
1727 let Some(root) = root else {
1728 if path.exists() {
1729 std::fs::remove_file(path)?;
1730 }
1731 return Ok(());
1732 };
1733
1734 let encoded = encode_cid(root)?;
1735 let tmp_path = path.with_extension("tmp");
1736 std::fs::write(&tmp_path, encoded)?;
1737 std::fs::rename(tmp_path, path)?;
1738 Ok(())
1739}
1740
1741fn normalize_profile_name(value: &serde_json::Value) -> Option<String> {
1742 let raw = value.as_str()?;
1743 let trimmed = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1744 if trimmed.is_empty() {
1745 return None;
1746 }
1747 Some(trimmed.chars().take(PROFILE_NAME_MAX_LENGTH).collect())
1748}
1749
1750fn extract_profile_names(profile: &serde_json::Map<String, serde_json::Value>) -> Vec<String> {
1751 let mut names = Vec::new();
1752 let mut seen = HashSet::new();
1753
1754 for key in ["display_name", "displayName", "name", "username"] {
1755 let Some(value) = profile.get(key).and_then(normalize_profile_name) else {
1756 continue;
1757 };
1758 let lowered = value.to_lowercase();
1759 if seen.insert(lowered) {
1760 names.push(value);
1761 }
1762 }
1763
1764 names
1765}
1766
1767fn should_reject_profile_nip05(local_part: &str, primary_name: &str) -> bool {
1768 if local_part.len() == 1 || local_part.starts_with("npub1") {
1769 return true;
1770 }
1771
1772 primary_name
1773 .to_lowercase()
1774 .split_whitespace()
1775 .collect::<String>()
1776 .contains(local_part)
1777}
1778
1779fn normalize_profile_nip05(
1780 profile: &serde_json::Map<String, serde_json::Value>,
1781 primary_name: Option<&str>,
1782) -> Option<String> {
1783 let raw = profile.get("nip05")?.as_str()?;
1784 let local_part = raw.split('@').next()?.trim().to_lowercase();
1785 if local_part.is_empty() {
1786 return None;
1787 }
1788 let truncated: String = local_part.chars().take(PROFILE_NAME_MAX_LENGTH).collect();
1789 if truncated.is_empty() {
1790 return None;
1791 }
1792 if primary_name.is_some_and(|name| should_reject_profile_nip05(&truncated, name)) {
1793 return None;
1794 }
1795 Some(truncated)
1796}
1797
1798fn is_search_stop_word(word: &str) -> bool {
1799 matches!(
1800 word,
1801 "a" | "an"
1802 | "the"
1803 | "and"
1804 | "or"
1805 | "but"
1806 | "in"
1807 | "on"
1808 | "at"
1809 | "to"
1810 | "for"
1811 | "of"
1812 | "with"
1813 | "by"
1814 | "from"
1815 | "is"
1816 | "it"
1817 | "as"
1818 | "be"
1819 | "was"
1820 | "are"
1821 | "this"
1822 | "that"
1823 | "these"
1824 | "those"
1825 | "i"
1826 | "you"
1827 | "he"
1828 | "she"
1829 | "we"
1830 | "they"
1831 | "my"
1832 | "your"
1833 | "his"
1834 | "her"
1835 | "its"
1836 | "our"
1837 | "their"
1838 | "what"
1839 | "which"
1840 | "who"
1841 | "whom"
1842 | "how"
1843 | "when"
1844 | "where"
1845 | "why"
1846 | "will"
1847 | "would"
1848 | "could"
1849 | "should"
1850 | "can"
1851 | "may"
1852 | "might"
1853 | "must"
1854 | "have"
1855 | "has"
1856 | "had"
1857 | "do"
1858 | "does"
1859 | "did"
1860 | "been"
1861 | "being"
1862 | "get"
1863 | "got"
1864 | "just"
1865 | "now"
1866 | "then"
1867 | "so"
1868 | "if"
1869 | "not"
1870 | "no"
1871 | "yes"
1872 | "all"
1873 | "any"
1874 | "some"
1875 | "more"
1876 | "most"
1877 | "other"
1878 | "into"
1879 | "over"
1880 | "after"
1881 | "before"
1882 | "about"
1883 | "up"
1884 | "down"
1885 | "out"
1886 | "off"
1887 | "through"
1888 | "during"
1889 | "under"
1890 | "again"
1891 | "further"
1892 | "once"
1893 )
1894}
1895
1896fn is_pure_search_number(word: &str) -> bool {
1897 if !word.chars().all(|ch| ch.is_ascii_digit()) {
1898 return false;
1899 }
1900 !(word.len() == 4
1901 && word
1902 .parse::<u16>()
1903 .is_ok_and(|year| (1900..=2099).contains(&year)))
1904}
1905
1906fn split_compound_search_word(word: &str) -> Vec<String> {
1907 let mut parts = Vec::new();
1908 let mut current = String::new();
1909 let chars: Vec<char> = word.chars().collect();
1910
1911 for (index, ch) in chars.iter().copied().enumerate() {
1912 let split_before = current.chars().last().is_some_and(|prev| {
1913 (prev.is_lowercase() && ch.is_uppercase())
1914 || (prev.is_ascii_digit() && ch.is_alphabetic())
1915 || (prev.is_alphabetic() && ch.is_ascii_digit())
1916 || (prev.is_uppercase()
1917 && ch.is_uppercase()
1918 && chars.get(index + 1).is_some_and(|next| next.is_lowercase()))
1919 });
1920
1921 if split_before && !current.is_empty() {
1922 parts.push(std::mem::take(&mut current));
1923 }
1924
1925 current.push(ch);
1926 }
1927
1928 if !current.is_empty() {
1929 parts.push(current);
1930 }
1931
1932 parts
1933}
1934
1935fn parse_search_keywords(text: &str) -> Vec<String> {
1936 let mut keywords = Vec::new();
1937 let mut seen = HashSet::new();
1938
1939 for word in text
1940 .split(|ch: char| !ch.is_alphanumeric())
1941 .filter(|word| !word.is_empty())
1942 {
1943 let mut variants = Vec::with_capacity(1 + word.len() / 4);
1944 variants.push(word.to_lowercase());
1945 variants.extend(
1946 split_compound_search_word(word)
1947 .into_iter()
1948 .map(|part| part.to_lowercase()),
1949 );
1950
1951 for lowered in variants {
1952 if lowered.chars().count() < 2
1953 || is_search_stop_word(&lowered)
1954 || is_pure_search_number(&lowered)
1955 {
1956 continue;
1957 }
1958 if seen.insert(lowered.clone()) {
1959 keywords.push(lowered);
1960 }
1961 }
1962 }
1963
1964 keywords
1965}
1966
1967fn profile_search_terms_for_event(event: &Event) -> Vec<String> {
1968 let profile = match serde_json::from_str::<serde_json::Value>(&event.content) {
1969 Ok(serde_json::Value::Object(profile)) => profile,
1970 _ => serde_json::Map::new(),
1971 };
1972 let names = extract_profile_names(&profile);
1973 let primary_name = names.first().map(String::as_str);
1974 let mut parts = Vec::new();
1975 if let Some(name) = primary_name {
1976 parts.push(name.to_string());
1977 }
1978 if let Some(nip05) = normalize_profile_nip05(&profile, primary_name) {
1979 parts.push(nip05);
1980 }
1981 parts.push(event.pubkey.to_hex());
1982 if names.len() > 1 {
1983 parts.extend(names.into_iter().skip(1));
1984 }
1985 parse_search_keywords(&parts.join(" "))
1986}
1987
1988fn compare_nostr_events(left: &Event, right: &Event) -> std::cmp::Ordering {
1989 left.created_at
1990 .as_u64()
1991 .cmp(&right.created_at.as_u64())
1992 .then_with(|| left.id.to_hex().cmp(&right.id.to_hex()))
1993}
1994
1995fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
1996 anyhow::anyhow!("nostr event store error: {err}")
1997}
1998
1999fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
2000 let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
2001 let page_size = page_size_bytes() as u64;
2002 let rounded = requested
2003 .checked_add(page_size.saturating_sub(1))
2004 .map(|size| size / page_size * page_size)
2005 .unwrap_or(requested);
2006 let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
2007
2008 let env = unsafe {
2009 heed::EnvOpenOptions::new()
2010 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
2011 .max_dbs(SOCIALGRAPH_MAX_DBS)
2012 .open(db_dir)
2013 }
2014 .context("open social graph LMDB env for resize")?;
2015 if env.info().map_size < map_size {
2016 unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
2017 }
2018
2019 Ok(())
2020}
2021
2022fn page_size_bytes() -> usize {
2023 page_size::get_granularity()
2024}
2025
2026#[cfg(test)]
2027mod tests {
2028 use super::*;
2029 use async_trait::async_trait;
2030 use hashtree_config::StorageBackend;
2031 use hashtree_core::{Hash, MemoryStore, Store, StoreError};
2032 use hashtree_nostr::NostrEventStoreOptions;
2033 use std::collections::HashSet;
2034 use std::fs::{self, File};
2035 use std::io::{BufRead, BufReader};
2036 use std::path::{Path, PathBuf};
2037 use std::process::{Command, Stdio};
2038 use std::sync::Mutex;
2039 use std::time::Duration;
2040
2041 use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
2042 use tempfile::TempDir;
2043
2044 const WELLORDER_FIXTURE_URL: &str =
2045 "https://wellorder.xyz/nostr/nostr-wellorder-early-500k-v1.jsonl.bz2";
2046
2047 #[derive(Debug, Clone, Default)]
2048 struct ReadTraceSnapshot {
2049 get_calls: u64,
2050 total_bytes: u64,
2051 unique_blocks: usize,
2052 unique_bytes: u64,
2053 cache_hits: u64,
2054 remote_fetches: u64,
2055 remote_bytes: u64,
2056 }
2057
2058 #[derive(Debug, Default)]
2059 struct ReadTraceState {
2060 get_calls: u64,
2061 total_bytes: u64,
2062 unique_hashes: HashSet<Hash>,
2063 unique_bytes: u64,
2064 cache_hits: u64,
2065 remote_fetches: u64,
2066 remote_bytes: u64,
2067 }
2068
2069 #[derive(Debug)]
2070 struct CountingStore<S: Store> {
2071 base: Arc<S>,
2072 state: Mutex<ReadTraceState>,
2073 }
2074
2075 impl<S: Store> CountingStore<S> {
2076 fn new(base: Arc<S>) -> Self {
2077 Self {
2078 base,
2079 state: Mutex::new(ReadTraceState::default()),
2080 }
2081 }
2082
2083 fn reset(&self) {
2084 *self.state.lock().unwrap() = ReadTraceState::default();
2085 }
2086
2087 fn snapshot(&self) -> ReadTraceSnapshot {
2088 let state = self.state.lock().unwrap();
2089 ReadTraceSnapshot {
2090 get_calls: state.get_calls,
2091 total_bytes: state.total_bytes,
2092 unique_blocks: state.unique_hashes.len(),
2093 unique_bytes: state.unique_bytes,
2094 cache_hits: state.cache_hits,
2095 remote_fetches: state.remote_fetches,
2096 remote_bytes: state.remote_bytes,
2097 }
2098 }
2099
2100 fn record_read(&self, hash: &Hash, bytes: usize) {
2101 let mut state = self.state.lock().unwrap();
2102 state.get_calls += 1;
2103 state.total_bytes += bytes as u64;
2104 if state.unique_hashes.insert(*hash) {
2105 state.unique_bytes += bytes as u64;
2106 }
2107 }
2108 }
2109
2110 #[async_trait]
2111 impl<S: Store> Store for CountingStore<S> {
2112 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2113 self.base.put(hash, data).await
2114 }
2115
2116 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2117 self.base.put_many(items).await
2118 }
2119
2120 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2121 let data = self.base.get(hash).await?;
2122 if let Some(bytes) = data.as_ref() {
2123 self.record_read(hash, bytes.len());
2124 }
2125 Ok(data)
2126 }
2127
2128 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2129 self.base.has(hash).await
2130 }
2131
2132 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2133 self.base.delete(hash).await
2134 }
2135 }
2136
2137 #[derive(Debug)]
2138 struct ReadThroughStore<R: Store> {
2139 cache: Arc<MemoryStore>,
2140 remote: Arc<R>,
2141 state: Mutex<ReadTraceState>,
2142 }
2143
2144 impl<R: Store> ReadThroughStore<R> {
2145 fn new(cache: Arc<MemoryStore>, remote: Arc<R>) -> Self {
2146 Self {
2147 cache,
2148 remote,
2149 state: Mutex::new(ReadTraceState::default()),
2150 }
2151 }
2152
2153 fn snapshot(&self) -> ReadTraceSnapshot {
2154 let state = self.state.lock().unwrap();
2155 ReadTraceSnapshot {
2156 get_calls: state.get_calls,
2157 total_bytes: state.total_bytes,
2158 unique_blocks: state.unique_hashes.len(),
2159 unique_bytes: state.unique_bytes,
2160 cache_hits: state.cache_hits,
2161 remote_fetches: state.remote_fetches,
2162 remote_bytes: state.remote_bytes,
2163 }
2164 }
2165 }
2166
2167 #[async_trait]
2168 impl<R: Store> Store for ReadThroughStore<R> {
2169 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
2170 self.cache.put(hash, data).await
2171 }
2172
2173 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
2174 self.cache.put_many(items).await
2175 }
2176
2177 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
2178 {
2179 let mut state = self.state.lock().unwrap();
2180 state.get_calls += 1;
2181 }
2182
2183 if let Some(bytes) = self.cache.get(hash).await? {
2184 let mut state = self.state.lock().unwrap();
2185 state.cache_hits += 1;
2186 state.total_bytes += bytes.len() as u64;
2187 if state.unique_hashes.insert(*hash) {
2188 state.unique_bytes += bytes.len() as u64;
2189 }
2190 return Ok(Some(bytes));
2191 }
2192
2193 let data = self.remote.get(hash).await?;
2194 if let Some(bytes) = data.as_ref() {
2195 let _ = self.cache.put(*hash, bytes.clone()).await?;
2196 let mut state = self.state.lock().unwrap();
2197 state.remote_fetches += 1;
2198 state.remote_bytes += bytes.len() as u64;
2199 state.total_bytes += bytes.len() as u64;
2200 if state.unique_hashes.insert(*hash) {
2201 state.unique_bytes += bytes.len() as u64;
2202 }
2203 }
2204 Ok(data)
2205 }
2206
2207 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
2208 if self.cache.has(hash).await? {
2209 return Ok(true);
2210 }
2211 self.remote.has(hash).await
2212 }
2213
2214 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
2215 let cache_deleted = self.cache.delete(hash).await?;
2216 let remote_deleted = self.remote.delete(hash).await?;
2217 Ok(cache_deleted || remote_deleted)
2218 }
2219 }
2220
2221 #[derive(Debug, Clone)]
2222 enum BenchmarkQueryCase {
2223 ById {
2224 id: String,
2225 },
2226 ByAuthor {
2227 pubkey: String,
2228 limit: usize,
2229 },
2230 ByAuthorKind {
2231 pubkey: String,
2232 kind: u32,
2233 limit: usize,
2234 },
2235 ByKind {
2236 kind: u32,
2237 limit: usize,
2238 },
2239 ByTag {
2240 tag_name: String,
2241 tag_value: String,
2242 limit: usize,
2243 },
2244 Recent {
2245 limit: usize,
2246 },
2247 Replaceable {
2248 pubkey: String,
2249 kind: u32,
2250 },
2251 ParameterizedReplaceable {
2252 pubkey: String,
2253 kind: u32,
2254 d_tag: String,
2255 },
2256 }
2257
2258 impl BenchmarkQueryCase {
2259 fn name(&self) -> &'static str {
2260 match self {
2261 BenchmarkQueryCase::ById { .. } => "by_id",
2262 BenchmarkQueryCase::ByAuthor { .. } => "by_author",
2263 BenchmarkQueryCase::ByAuthorKind { .. } => "by_author_kind",
2264 BenchmarkQueryCase::ByKind { .. } => "by_kind",
2265 BenchmarkQueryCase::ByTag { .. } => "by_tag",
2266 BenchmarkQueryCase::Recent { .. } => "recent",
2267 BenchmarkQueryCase::Replaceable { .. } => "replaceable",
2268 BenchmarkQueryCase::ParameterizedReplaceable { .. } => "parameterized_replaceable",
2269 }
2270 }
2271
2272 async fn execute<S: Store>(
2273 &self,
2274 store: &NostrEventStore<S>,
2275 root: &Cid,
2276 ) -> Result<usize, NostrEventStoreError> {
2277 match self {
2278 BenchmarkQueryCase::ById { id } => {
2279 Ok(store.get_by_id(Some(root), id).await?.into_iter().count())
2280 }
2281 BenchmarkQueryCase::ByAuthor { pubkey, limit } => Ok(store
2282 .list_by_author(
2283 Some(root),
2284 pubkey,
2285 ListEventsOptions {
2286 limit: Some(*limit),
2287 ..Default::default()
2288 },
2289 )
2290 .await?
2291 .len()),
2292 BenchmarkQueryCase::ByAuthorKind {
2293 pubkey,
2294 kind,
2295 limit,
2296 } => Ok(store
2297 .list_by_author_and_kind(
2298 Some(root),
2299 pubkey,
2300 *kind,
2301 ListEventsOptions {
2302 limit: Some(*limit),
2303 ..Default::default()
2304 },
2305 )
2306 .await?
2307 .len()),
2308 BenchmarkQueryCase::ByKind { kind, limit } => Ok(store
2309 .list_by_kind(
2310 Some(root),
2311 *kind,
2312 ListEventsOptions {
2313 limit: Some(*limit),
2314 ..Default::default()
2315 },
2316 )
2317 .await?
2318 .len()),
2319 BenchmarkQueryCase::ByTag {
2320 tag_name,
2321 tag_value,
2322 limit,
2323 } => Ok(store
2324 .list_by_tag(
2325 Some(root),
2326 tag_name,
2327 tag_value,
2328 ListEventsOptions {
2329 limit: Some(*limit),
2330 ..Default::default()
2331 },
2332 )
2333 .await?
2334 .len()),
2335 BenchmarkQueryCase::Recent { limit } => Ok(store
2336 .list_recent(
2337 Some(root),
2338 ListEventsOptions {
2339 limit: Some(*limit),
2340 ..Default::default()
2341 },
2342 )
2343 .await?
2344 .len()),
2345 BenchmarkQueryCase::Replaceable { pubkey, kind } => Ok(store
2346 .get_replaceable(Some(root), pubkey, *kind)
2347 .await?
2348 .into_iter()
2349 .count()),
2350 BenchmarkQueryCase::ParameterizedReplaceable {
2351 pubkey,
2352 kind,
2353 d_tag,
2354 } => Ok(store
2355 .get_parameterized_replaceable(Some(root), pubkey, *kind, d_tag)
2356 .await?
2357 .into_iter()
2358 .count()),
2359 }
2360 }
2361 }
2362
2363 #[derive(Debug, Clone, Copy)]
2364 struct NetworkModel {
2365 name: &'static str,
2366 rtt_ms: f64,
2367 bandwidth_mib_per_s: f64,
2368 }
2369
2370 #[derive(Debug, Clone)]
2371 struct QueryBenchmarkResult {
2372 average_duration: Duration,
2373 p95_duration: Duration,
2374 reads: ReadTraceSnapshot,
2375 }
2376
2377 const NETWORK_MODELS: [NetworkModel; 3] = [
2378 NetworkModel {
2379 name: "lan",
2380 rtt_ms: 2.0,
2381 bandwidth_mib_per_s: 100.0,
2382 },
2383 NetworkModel {
2384 name: "wan",
2385 rtt_ms: 40.0,
2386 bandwidth_mib_per_s: 20.0,
2387 },
2388 NetworkModel {
2389 name: "slow",
2390 rtt_ms: 120.0,
2391 bandwidth_mib_per_s: 5.0,
2392 },
2393 ];
2394
2395 #[test]
2396 fn test_open_social_graph_store() {
2397 let _guard = test_lock();
2398 let tmp = TempDir::new().unwrap();
2399 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2400 assert_eq!(Arc::strong_count(&graph_store), 1);
2401 }
2402
2403 #[test]
2404 fn test_set_root_and_get_follow_distance() {
2405 let _guard = test_lock();
2406 let tmp = TempDir::new().unwrap();
2407 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2408 let root_pk = [1u8; 32];
2409 set_social_graph_root(&graph_store, &root_pk);
2410 assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
2411 }
2412
2413 #[test]
2414 fn test_ingest_event_updates_follows_and_mutes() {
2415 let _guard = test_lock();
2416 let tmp = TempDir::new().unwrap();
2417 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2418
2419 let root_keys = Keys::generate();
2420 let alice_keys = Keys::generate();
2421 let bob_keys = Keys::generate();
2422
2423 let root_pk = root_keys.public_key().to_bytes();
2424 set_social_graph_root(&graph_store, &root_pk);
2425
2426 let follow = EventBuilder::new(
2427 Kind::ContactList,
2428 "",
2429 vec![Tag::public_key(alice_keys.public_key())],
2430 )
2431 .custom_created_at(Timestamp::from_secs(10))
2432 .to_event(&root_keys)
2433 .unwrap();
2434 ingest_event(&graph_store, "follow", &follow.as_json());
2435
2436 let mute = EventBuilder::new(
2437 Kind::MuteList,
2438 "",
2439 vec![Tag::public_key(bob_keys.public_key())],
2440 )
2441 .custom_created_at(Timestamp::from_secs(11))
2442 .to_event(&root_keys)
2443 .unwrap();
2444 ingest_event(&graph_store, "mute", &mute.as_json());
2445
2446 assert_eq!(
2447 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
2448 Some(1)
2449 );
2450 assert!(is_overmuted(
2451 &graph_store,
2452 &root_pk,
2453 &bob_keys.public_key().to_bytes(),
2454 1.0
2455 ));
2456 }
2457
2458 #[test]
2459 fn test_metadata_ingest_builds_profile_search_index_and_replaces_old_terms() {
2460 let _guard = test_lock();
2461 let tmp = TempDir::new().unwrap();
2462 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2463 let keys = Keys::generate();
2464
2465 let older = EventBuilder::new(
2466 Kind::Metadata,
2467 serde_json::json!({
2468 "display_name": "sirius",
2469 "name": "Martti Malmi",
2470 "username": "mmalmi",
2471 "nip05": "siriusdev@iris.to",
2472 })
2473 .to_string(),
2474 [],
2475 )
2476 .custom_created_at(Timestamp::from_secs(5))
2477 .to_event(&keys)
2478 .unwrap();
2479 let newer = EventBuilder::new(
2480 Kind::Metadata,
2481 serde_json::json!({
2482 "display_name": "bird",
2483 "nip05": "birdman@iris.to",
2484 })
2485 .to_string(),
2486 [],
2487 )
2488 .custom_created_at(Timestamp::from_secs(6))
2489 .to_event(&keys)
2490 .unwrap();
2491
2492 ingest_parsed_event(&graph_store, &older).unwrap();
2493
2494 let pubkey = keys.public_key().to_hex();
2495 let entries = graph_store
2496 .profile_search_entries_for_prefix("p:sirius")
2497 .unwrap();
2498 assert!(entries
2499 .iter()
2500 .any(|(key, entry)| key == &format!("p:sirius:{pubkey}") && entry.name == "sirius"));
2501 assert!(entries.iter().any(|(key, entry)| {
2502 key == &format!("p:siriusdev:{pubkey}")
2503 && entry.nip05.as_deref() == Some("siriusdev")
2504 && entry.aliases == vec!["Martti Malmi".to_string(), "mmalmi".to_string()]
2505 && entry.event_nhash.starts_with("nhash1")
2506 }));
2507 assert!(entries
2508 .iter()
2509 .all(|(_, entry)| entry.pubkey == pubkey));
2510 assert_eq!(
2511 graph_store
2512 .latest_profile_event(&pubkey)
2513 .unwrap()
2514 .expect("latest mirrored profile")
2515 .id,
2516 older.id
2517 );
2518
2519 ingest_parsed_event(&graph_store, &newer).unwrap();
2520
2521 assert!(graph_store
2522 .profile_search_entries_for_prefix("p:sirius")
2523 .unwrap()
2524 .is_empty());
2525 let bird_entries = graph_store
2526 .profile_search_entries_for_prefix("p:bird")
2527 .unwrap();
2528 assert_eq!(bird_entries.len(), 2);
2529 assert!(bird_entries
2530 .iter()
2531 .any(|(key, entry)| key == &format!("p:bird:{pubkey}") && entry.name == "bird"));
2532 assert!(bird_entries
2533 .iter()
2534 .any(|(key, entry)| {
2535 key == &format!("p:birdman:{pubkey}")
2536 && entry.nip05.as_deref() == Some("birdman")
2537 && entry.aliases.is_empty()
2538 }));
2539 assert_eq!(
2540 graph_store
2541 .latest_profile_event(&pubkey)
2542 .unwrap()
2543 .expect("latest mirrored profile")
2544 .id,
2545 newer.id
2546 );
2547 }
2548
2549 #[test]
2550 fn test_ambient_metadata_events_are_mirrored_into_public_profile_index() {
2551 let _guard = test_lock();
2552 let tmp = TempDir::new().unwrap();
2553 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2554 let keys = Keys::generate();
2555
2556 let profile = EventBuilder::new(
2557 Kind::Metadata,
2558 serde_json::json!({
2559 "display_name": "ambient bird",
2560 })
2561 .to_string(),
2562 [],
2563 )
2564 .custom_created_at(Timestamp::from_secs(5))
2565 .to_event(&keys)
2566 .unwrap();
2567
2568 ingest_parsed_event_with_storage_class(&graph_store, &profile, EventStorageClass::Ambient)
2569 .unwrap();
2570
2571 let pubkey = keys.public_key().to_hex();
2572 let mirrored = graph_store
2573 .latest_profile_event(&pubkey)
2574 .unwrap()
2575 .expect("mirrored ambient profile");
2576 assert_eq!(mirrored.id, profile.id);
2577 assert_eq!(
2578 graph_store
2579 .profile_search_entries_for_prefix("p:ambient")
2580 .unwrap()
2581 .len(),
2582 1
2583 );
2584 }
2585
2586 #[test]
2587 fn test_metadata_ingest_splits_compound_profile_terms_without_losing_whole_token() {
2588 let _guard = test_lock();
2589 let tmp = TempDir::new().unwrap();
2590 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2591 let keys = Keys::generate();
2592
2593 let profile = EventBuilder::new(
2594 Kind::Metadata,
2595 serde_json::json!({
2596 "display_name": "SirLibre",
2597 "username": "XMLHttpRequest42",
2598 })
2599 .to_string(),
2600 [],
2601 )
2602 .custom_created_at(Timestamp::from_secs(5))
2603 .to_event(&keys)
2604 .unwrap();
2605
2606 ingest_parsed_event(&graph_store, &profile).unwrap();
2607
2608 let pubkey = keys.public_key().to_hex();
2609 assert!(graph_store
2610 .profile_search_entries_for_prefix("p:sirlibre")
2611 .unwrap()
2612 .iter()
2613 .any(|(key, entry)| key == &format!("p:sirlibre:{pubkey}") && entry.name == "SirLibre"));
2614 assert!(graph_store
2615 .profile_search_entries_for_prefix("p:libre")
2616 .unwrap()
2617 .iter()
2618 .any(|(key, entry)| key == &format!("p:libre:{pubkey}") && entry.name == "SirLibre"));
2619 assert!(graph_store
2620 .profile_search_entries_for_prefix("p:xml")
2621 .unwrap()
2622 .iter()
2623 .any(|(key, entry)| {
2624 key == &format!("p:xml:{pubkey}")
2625 && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2626 }));
2627 assert!(graph_store
2628 .profile_search_entries_for_prefix("p:request")
2629 .unwrap()
2630 .iter()
2631 .any(|(key, entry)| {
2632 key == &format!("p:request:{pubkey}")
2633 && entry.aliases == vec!["XMLHttpRequest42".to_string()]
2634 }));
2635 }
2636
2637 #[test]
2638 fn test_profile_search_index_persists_across_reopen() {
2639 let _guard = test_lock();
2640 let tmp = TempDir::new().unwrap();
2641 let keys = Keys::generate();
2642 let pubkey = keys.public_key().to_hex();
2643
2644 {
2645 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2646 let profile = EventBuilder::new(
2647 Kind::Metadata,
2648 serde_json::json!({
2649 "display_name": "reopen user",
2650 })
2651 .to_string(),
2652 [],
2653 )
2654 .custom_created_at(Timestamp::from_secs(5))
2655 .to_event(&keys)
2656 .unwrap();
2657 ingest_parsed_event(&graph_store, &profile).unwrap();
2658 assert!(graph_store.profile_search_root().unwrap().is_some());
2659 }
2660
2661 let reopened = open_social_graph_store(tmp.path()).unwrap();
2662 assert!(reopened.profile_search_root().unwrap().is_some());
2663 assert_eq!(
2664 reopened
2665 .latest_profile_event(&pubkey)
2666 .unwrap()
2667 .expect("mirrored profile after reopen")
2668 .pubkey,
2669 keys.public_key()
2670 );
2671 let links = reopened
2672 .profile_search_entries_for_prefix("p:reopen")
2673 .unwrap();
2674 assert_eq!(links.len(), 1);
2675 assert_eq!(links[0].0, format!("p:reopen:{pubkey}"));
2676 assert_eq!(links[0].1.name, "reopen user");
2677 }
2678
2679 #[test]
2680 fn test_profile_search_index_with_shared_hashtree_storage() {
2681 let _guard = test_lock();
2682 let tmp = TempDir::new().unwrap();
2683 let store =
2684 crate::storage::HashtreeStore::with_options(tmp.path(), None, 1024 * 1024 * 1024)
2685 .unwrap();
2686 let graph_store =
2687 open_social_graph_store_with_storage(tmp.path(), store.store_arc(), None).unwrap();
2688 let keys = Keys::generate();
2689 let pubkey = keys.public_key().to_hex();
2690
2691 let profile = EventBuilder::new(
2692 Kind::Metadata,
2693 serde_json::json!({
2694 "display_name": "shared storage user",
2695 "nip05": "shareduser@example.com",
2696 })
2697 .to_string(),
2698 [],
2699 )
2700 .custom_created_at(Timestamp::from_secs(5))
2701 .to_event(&keys)
2702 .unwrap();
2703
2704 graph_store
2705 .sync_profile_index_for_events(std::slice::from_ref(&profile))
2706 .unwrap();
2707 assert!(graph_store.profile_search_root().unwrap().is_some());
2708 assert!(graph_store.profile_search_root().unwrap().is_some());
2709 let links = graph_store
2710 .profile_search_entries_for_prefix("p:shared")
2711 .unwrap();
2712 assert_eq!(links.len(), 2);
2713 assert!(links
2714 .iter()
2715 .any(|(key, entry)| key == &format!("p:shared:{pubkey}") && entry.name == "shared storage user"));
2716 assert!(links
2717 .iter()
2718 .any(|(key, entry)| key == &format!("p:shareduser:{pubkey}") && entry.nip05.as_deref() == Some("shareduser")));
2719 }
2720
2721 #[test]
2722 fn test_rebuild_profile_index_from_stored_events_uses_ambient_and_public_metadata() {
2723 let _guard = test_lock();
2724 let tmp = TempDir::new().unwrap();
2725 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2726 let public_keys = Keys::generate();
2727 let ambient_keys = Keys::generate();
2728 let public_pubkey = public_keys.public_key().to_hex();
2729 let ambient_pubkey = ambient_keys.public_key().to_hex();
2730
2731 let older = EventBuilder::new(
2732 Kind::Metadata,
2733 serde_json::json!({
2734 "display_name": "petri old",
2735 })
2736 .to_string(),
2737 [],
2738 )
2739 .custom_created_at(Timestamp::from_secs(5))
2740 .to_event(&public_keys)
2741 .unwrap();
2742 let newer = EventBuilder::new(
2743 Kind::Metadata,
2744 serde_json::json!({
2745 "display_name": "petri",
2746 "name": "Petri Example",
2747 "nip05": "petri@example.com",
2748 })
2749 .to_string(),
2750 [],
2751 )
2752 .custom_created_at(Timestamp::from_secs(6))
2753 .to_event(&public_keys)
2754 .unwrap();
2755 let ambient = EventBuilder::new(
2756 Kind::Metadata,
2757 serde_json::json!({
2758 "display_name": "ambient petri",
2759 })
2760 .to_string(),
2761 [],
2762 )
2763 .custom_created_at(Timestamp::from_secs(7))
2764 .to_event(&ambient_keys)
2765 .unwrap();
2766
2767 ingest_parsed_event_with_storage_class(&graph_store, &older, EventStorageClass::Public)
2768 .unwrap();
2769 ingest_parsed_event_with_storage_class(&graph_store, &newer, EventStorageClass::Public)
2770 .unwrap();
2771 ingest_parsed_event_with_storage_class(&graph_store, &ambient, EventStorageClass::Ambient)
2772 .unwrap();
2773
2774 graph_store.profile_index.write_by_pubkey_root(None).unwrap();
2775 graph_store.profile_index.write_search_root(None).unwrap();
2776
2777 let rebuilt = graph_store
2778 .rebuild_profile_index_from_stored_events()
2779 .unwrap();
2780 assert_eq!(rebuilt, 2);
2781
2782 let entries = graph_store
2783 .profile_search_entries_for_prefix("p:petri")
2784 .unwrap();
2785 assert_eq!(entries.len(), 2);
2786 assert!(entries.iter().any(|(key, entry)| {
2787 key == &format!("p:petri:{public_pubkey}")
2788 && entry.name == "petri"
2789 && entry.aliases == vec!["Petri Example".to_string()]
2790 && entry.nip05.is_none()
2791 }));
2792 assert!(entries.iter().any(|(key, entry)| {
2793 key == &format!("p:petri:{ambient_pubkey}")
2794 && entry.name == "ambient petri"
2795 && entry.aliases.is_empty()
2796 && entry.nip05.is_none()
2797 }));
2798 }
2799
2800 #[test]
2801 fn test_query_events_by_author() {
2802 let _guard = test_lock();
2803 let tmp = TempDir::new().unwrap();
2804 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2805 let keys = Keys::generate();
2806
2807 let older = EventBuilder::new(Kind::TextNote, "older", [])
2808 .custom_created_at(Timestamp::from_secs(5))
2809 .to_event(&keys)
2810 .unwrap();
2811 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2812 .custom_created_at(Timestamp::from_secs(6))
2813 .to_event(&keys)
2814 .unwrap();
2815
2816 ingest_parsed_event(&graph_store, &older).unwrap();
2817 ingest_parsed_event(&graph_store, &newer).unwrap();
2818
2819 let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
2820 let events = query_events(&graph_store, &filter, 10);
2821 assert_eq!(events.len(), 2);
2822 assert_eq!(events[0].id, newer.id);
2823 assert_eq!(events[1].id, older.id);
2824 }
2825
2826 #[test]
2827 fn test_query_events_by_kind() {
2828 let _guard = test_lock();
2829 let tmp = TempDir::new().unwrap();
2830 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2831 let first_keys = Keys::generate();
2832 let second_keys = Keys::generate();
2833
2834 let older = EventBuilder::new(Kind::TextNote, "older", [])
2835 .custom_created_at(Timestamp::from_secs(5))
2836 .to_event(&first_keys)
2837 .unwrap();
2838 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
2839 .custom_created_at(Timestamp::from_secs(6))
2840 .to_event(&second_keys)
2841 .unwrap();
2842 let other_kind = EventBuilder::new(Kind::Metadata, "profile", [])
2843 .custom_created_at(Timestamp::from_secs(7))
2844 .to_event(&second_keys)
2845 .unwrap();
2846
2847 ingest_parsed_event(&graph_store, &older).unwrap();
2848 ingest_parsed_event(&graph_store, &newer).unwrap();
2849 ingest_parsed_event(&graph_store, &other_kind).unwrap();
2850
2851 let filter = Filter::new().kind(Kind::TextNote);
2852 let events = query_events(&graph_store, &filter, 10);
2853 assert_eq!(events.len(), 2);
2854 assert_eq!(events[0].id, newer.id);
2855 assert_eq!(events[1].id, older.id);
2856 }
2857
2858 #[test]
2859 fn test_query_events_by_id() {
2860 let _guard = test_lock();
2861 let tmp = TempDir::new().unwrap();
2862 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2863 let keys = Keys::generate();
2864
2865 let first = EventBuilder::new(Kind::TextNote, "first", [])
2866 .custom_created_at(Timestamp::from_secs(5))
2867 .to_event(&keys)
2868 .unwrap();
2869 let target = EventBuilder::new(Kind::TextNote, "target", [])
2870 .custom_created_at(Timestamp::from_secs(6))
2871 .to_event(&keys)
2872 .unwrap();
2873
2874 ingest_parsed_event(&graph_store, &first).unwrap();
2875 ingest_parsed_event(&graph_store, &target).unwrap();
2876
2877 let filter = Filter::new().id(target.id).kind(Kind::TextNote);
2878 let events = query_events(&graph_store, &filter, 10);
2879 assert_eq!(events.len(), 1);
2880 assert_eq!(events[0].id, target.id);
2881 }
2882
2883 #[test]
2884 fn test_query_events_search_is_case_insensitive() {
2885 let _guard = test_lock();
2886 let tmp = TempDir::new().unwrap();
2887 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2888 let keys = Keys::generate();
2889 let other_keys = Keys::generate();
2890
2891 let matching = EventBuilder::new(Kind::TextNote, "Hello Nostr Search", [])
2892 .custom_created_at(Timestamp::from_secs(5))
2893 .to_event(&keys)
2894 .unwrap();
2895 let other = EventBuilder::new(Kind::TextNote, "goodbye world", [])
2896 .custom_created_at(Timestamp::from_secs(6))
2897 .to_event(&other_keys)
2898 .unwrap();
2899
2900 ingest_parsed_event(&graph_store, &matching).unwrap();
2901 ingest_parsed_event(&graph_store, &other).unwrap();
2902
2903 let filter = Filter::new().kind(Kind::TextNote).search("nostr search");
2904 let events = query_events(&graph_store, &filter, 10);
2905 assert_eq!(events.len(), 1);
2906 assert_eq!(events[0].id, matching.id);
2907 }
2908
2909 #[test]
2910 fn test_query_events_since_until_are_inclusive() {
2911 let _guard = test_lock();
2912 let tmp = TempDir::new().unwrap();
2913 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2914 let keys = Keys::generate();
2915
2916 let before = EventBuilder::new(Kind::TextNote, "before", [])
2917 .custom_created_at(Timestamp::from_secs(5))
2918 .to_event(&keys)
2919 .unwrap();
2920 let start = EventBuilder::new(Kind::TextNote, "start", [])
2921 .custom_created_at(Timestamp::from_secs(6))
2922 .to_event(&keys)
2923 .unwrap();
2924 let end = EventBuilder::new(Kind::TextNote, "end", [])
2925 .custom_created_at(Timestamp::from_secs(10))
2926 .to_event(&keys)
2927 .unwrap();
2928 let after = EventBuilder::new(Kind::TextNote, "after", [])
2929 .custom_created_at(Timestamp::from_secs(11))
2930 .to_event(&keys)
2931 .unwrap();
2932
2933 ingest_parsed_event(&graph_store, &before).unwrap();
2934 ingest_parsed_event(&graph_store, &start).unwrap();
2935 ingest_parsed_event(&graph_store, &end).unwrap();
2936 ingest_parsed_event(&graph_store, &after).unwrap();
2937
2938 let filter = Filter::new()
2939 .kind(Kind::TextNote)
2940 .since(Timestamp::from_secs(6))
2941 .until(Timestamp::from_secs(10));
2942 let events = query_events(&graph_store, &filter, 10);
2943 let ids = events.into_iter().map(|event| event.id).collect::<Vec<_>>();
2944 assert_eq!(ids, vec![end.id, start.id]);
2945 }
2946
2947 #[test]
2948 fn test_query_events_replaceable_kind_returns_latest_winner() {
2949 let _guard = test_lock();
2950 let tmp = TempDir::new().unwrap();
2951 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2952 let keys = Keys::generate();
2953
2954 let older = EventBuilder::new(Kind::Custom(10_000), "older mute list", [])
2955 .custom_created_at(Timestamp::from_secs(5))
2956 .to_event(&keys)
2957 .unwrap();
2958 let newer = EventBuilder::new(Kind::Custom(10_000), "newer mute list", [])
2959 .custom_created_at(Timestamp::from_secs(6))
2960 .to_event(&keys)
2961 .unwrap();
2962
2963 ingest_parsed_event(&graph_store, &older).unwrap();
2964 ingest_parsed_event(&graph_store, &newer).unwrap();
2965
2966 let filter = Filter::new()
2967 .author(keys.public_key())
2968 .kind(Kind::Custom(10_000));
2969 let events = query_events(&graph_store, &filter, 10);
2970 assert_eq!(events.len(), 1);
2971 assert_eq!(events[0].id, newer.id);
2972 }
2973
2974 #[test]
2975 fn test_query_events_kind_41_replaceable_returns_latest_winner() {
2976 let _guard = test_lock();
2977 let tmp = TempDir::new().unwrap();
2978 let graph_store = open_social_graph_store(tmp.path()).unwrap();
2979 let keys = Keys::generate();
2980
2981 let older = EventBuilder::new(Kind::Custom(41), "older channel metadata", [])
2982 .custom_created_at(Timestamp::from_secs(5))
2983 .to_event(&keys)
2984 .unwrap();
2985 let newer = EventBuilder::new(Kind::Custom(41), "newer channel metadata", [])
2986 .custom_created_at(Timestamp::from_secs(6))
2987 .to_event(&keys)
2988 .unwrap();
2989
2990 ingest_parsed_event(&graph_store, &older).unwrap();
2991 ingest_parsed_event(&graph_store, &newer).unwrap();
2992
2993 let filter = Filter::new()
2994 .author(keys.public_key())
2995 .kind(Kind::Custom(41));
2996 let events = query_events(&graph_store, &filter, 10);
2997 assert_eq!(events.len(), 1);
2998 assert_eq!(events[0].id, newer.id);
2999 }
3000
3001 #[test]
3002 fn test_public_and_ambient_indexes_stay_separate() {
3003 let _guard = test_lock();
3004 let tmp = TempDir::new().unwrap();
3005 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3006 let public_keys = Keys::generate();
3007 let ambient_keys = Keys::generate();
3008
3009 let public_event = EventBuilder::new(Kind::TextNote, "public", [])
3010 .custom_created_at(Timestamp::from_secs(5))
3011 .to_event(&public_keys)
3012 .unwrap();
3013 let ambient_event = EventBuilder::new(Kind::TextNote, "ambient", [])
3014 .custom_created_at(Timestamp::from_secs(6))
3015 .to_event(&ambient_keys)
3016 .unwrap();
3017
3018 ingest_parsed_event_with_storage_class(
3019 &graph_store,
3020 &public_event,
3021 EventStorageClass::Public,
3022 )
3023 .unwrap();
3024 ingest_parsed_event_with_storage_class(
3025 &graph_store,
3026 &ambient_event,
3027 EventStorageClass::Ambient,
3028 )
3029 .unwrap();
3030
3031 let filter = Filter::new().kind(Kind::TextNote);
3032 let all_events = graph_store
3033 .query_events_in_scope(&filter, 10, EventQueryScope::All)
3034 .unwrap();
3035 assert_eq!(all_events.len(), 2);
3036
3037 let public_events = graph_store
3038 .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3039 .unwrap();
3040 assert_eq!(public_events.len(), 1);
3041 assert_eq!(public_events[0].id, public_event.id);
3042
3043 let ambient_events = graph_store
3044 .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3045 .unwrap();
3046 assert_eq!(ambient_events.len(), 1);
3047 assert_eq!(ambient_events[0].id, ambient_event.id);
3048 }
3049
3050 #[test]
3051 fn test_default_ingest_classifies_root_author_as_public() {
3052 let _guard = test_lock();
3053 let tmp = TempDir::new().unwrap();
3054 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3055 let root_keys = Keys::generate();
3056 let other_keys = Keys::generate();
3057 set_social_graph_root(&graph_store, &root_keys.public_key().to_bytes());
3058
3059 let root_event = EventBuilder::new(Kind::TextNote, "root", [])
3060 .custom_created_at(Timestamp::from_secs(5))
3061 .to_event(&root_keys)
3062 .unwrap();
3063 let other_event = EventBuilder::new(Kind::TextNote, "other", [])
3064 .custom_created_at(Timestamp::from_secs(6))
3065 .to_event(&other_keys)
3066 .unwrap();
3067
3068 ingest_parsed_event(&graph_store, &root_event).unwrap();
3069 ingest_parsed_event(&graph_store, &other_event).unwrap();
3070
3071 let filter = Filter::new().kind(Kind::TextNote);
3072 let public_events = graph_store
3073 .query_events_in_scope(&filter, 10, EventQueryScope::PublicOnly)
3074 .unwrap();
3075 assert_eq!(public_events.len(), 1);
3076 assert_eq!(public_events[0].id, root_event.id);
3077
3078 let ambient_events = graph_store
3079 .query_events_in_scope(&filter, 10, EventQueryScope::AmbientOnly)
3080 .unwrap();
3081 assert_eq!(ambient_events.len(), 1);
3082 assert_eq!(ambient_events[0].id, other_event.id);
3083 }
3084
3085 #[test]
3086 fn test_query_events_survives_reopen() {
3087 let _guard = test_lock();
3088 let tmp = TempDir::new().unwrap();
3089 let db_dir = tmp.path().join("socialgraph-store");
3090 let keys = Keys::generate();
3091 let other_keys = Keys::generate();
3092
3093 {
3094 let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
3095 let older = EventBuilder::new(Kind::TextNote, "older", [])
3096 .custom_created_at(Timestamp::from_secs(5))
3097 .to_event(&keys)
3098 .unwrap();
3099 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
3100 .custom_created_at(Timestamp::from_secs(6))
3101 .to_event(&keys)
3102 .unwrap();
3103 let latest = EventBuilder::new(Kind::TextNote, "latest", [])
3104 .custom_created_at(Timestamp::from_secs(7))
3105 .to_event(&other_keys)
3106 .unwrap();
3107
3108 ingest_parsed_event(&graph_store, &older).unwrap();
3109 ingest_parsed_event(&graph_store, &newer).unwrap();
3110 ingest_parsed_event(&graph_store, &latest).unwrap();
3111 }
3112
3113 let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
3114
3115 let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
3116 let author_events = query_events(&reopened, &author_filter, 10);
3117 assert_eq!(author_events.len(), 2);
3118 assert_eq!(author_events[0].content, "newer");
3119 assert_eq!(author_events[1].content, "older");
3120
3121 let recent_filter = Filter::new().kind(Kind::TextNote);
3122 let recent_events = query_events(&reopened, &recent_filter, 2);
3123 assert_eq!(recent_events.len(), 2);
3124 assert_eq!(recent_events[0].content, "latest");
3125 assert_eq!(recent_events[1].content, "newer");
3126 }
3127
3128 #[test]
3129 fn test_query_events_parameterized_replaceable_by_d_tag() {
3130 let _guard = test_lock();
3131 let tmp = TempDir::new().unwrap();
3132 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3133 let keys = Keys::generate();
3134
3135 let older = EventBuilder::new(
3136 Kind::Custom(30078),
3137 "",
3138 vec![
3139 Tag::identifier("video"),
3140 Tag::parse(&["l", "hashtree"]).unwrap(),
3141 Tag::parse(&["hash", &"11".repeat(32)]).unwrap(),
3142 ],
3143 )
3144 .custom_created_at(Timestamp::from_secs(5))
3145 .to_event(&keys)
3146 .unwrap();
3147 let newer = EventBuilder::new(
3148 Kind::Custom(30078),
3149 "",
3150 vec![
3151 Tag::identifier("video"),
3152 Tag::parse(&["l", "hashtree"]).unwrap(),
3153 Tag::parse(&["hash", &"22".repeat(32)]).unwrap(),
3154 ],
3155 )
3156 .custom_created_at(Timestamp::from_secs(6))
3157 .to_event(&keys)
3158 .unwrap();
3159 let other_tree = EventBuilder::new(
3160 Kind::Custom(30078),
3161 "",
3162 vec![
3163 Tag::identifier("files"),
3164 Tag::parse(&["l", "hashtree"]).unwrap(),
3165 Tag::parse(&["hash", &"33".repeat(32)]).unwrap(),
3166 ],
3167 )
3168 .custom_created_at(Timestamp::from_secs(7))
3169 .to_event(&keys)
3170 .unwrap();
3171
3172 ingest_parsed_event(&graph_store, &older).unwrap();
3173 ingest_parsed_event(&graph_store, &newer).unwrap();
3174 ingest_parsed_event(&graph_store, &other_tree).unwrap();
3175
3176 let filter = Filter::new()
3177 .author(keys.public_key())
3178 .kind(Kind::Custom(30078))
3179 .identifier("video");
3180 let events = query_events(&graph_store, &filter, 10);
3181 assert_eq!(events.len(), 1);
3182 assert_eq!(events[0].id, newer.id);
3183 }
3184
3185 #[test]
3186 fn test_query_events_by_hashtag_uses_tag_index() {
3187 let _guard = test_lock();
3188 let tmp = TempDir::new().unwrap();
3189 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3190 let keys = Keys::generate();
3191 let other_keys = Keys::generate();
3192
3193 let first = EventBuilder::new(
3194 Kind::TextNote,
3195 "first",
3196 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3197 )
3198 .custom_created_at(Timestamp::from_secs(5))
3199 .to_event(&keys)
3200 .unwrap();
3201 let second = EventBuilder::new(
3202 Kind::TextNote,
3203 "second",
3204 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3205 )
3206 .custom_created_at(Timestamp::from_secs(6))
3207 .to_event(&other_keys)
3208 .unwrap();
3209 let unrelated = EventBuilder::new(
3210 Kind::TextNote,
3211 "third",
3212 vec![Tag::parse(&["t", "other"]).unwrap()],
3213 )
3214 .custom_created_at(Timestamp::from_secs(7))
3215 .to_event(&other_keys)
3216 .unwrap();
3217
3218 ingest_parsed_event(&graph_store, &first).unwrap();
3219 ingest_parsed_event(&graph_store, &second).unwrap();
3220 ingest_parsed_event(&graph_store, &unrelated).unwrap();
3221
3222 let filter = Filter::new().hashtag("hashtree");
3223 let events = query_events(&graph_store, &filter, 10);
3224 assert_eq!(events.len(), 2);
3225 assert_eq!(events[0].id, second.id);
3226 assert_eq!(events[1].id, first.id);
3227 }
3228
3229 #[test]
3230 fn test_query_events_combines_indexes_then_applies_search_filter() {
3231 let _guard = test_lock();
3232 let tmp = TempDir::new().unwrap();
3233 let graph_store = open_social_graph_store(tmp.path()).unwrap();
3234 let keys = Keys::generate();
3235 let other_keys = Keys::generate();
3236
3237 let matching = EventBuilder::new(
3238 Kind::TextNote,
3239 "hashtree video release",
3240 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3241 )
3242 .custom_created_at(Timestamp::from_secs(5))
3243 .to_event(&keys)
3244 .unwrap();
3245 let non_matching = EventBuilder::new(
3246 Kind::TextNote,
3247 "plain text note",
3248 vec![Tag::parse(&["t", "hashtree"]).unwrap()],
3249 )
3250 .custom_created_at(Timestamp::from_secs(6))
3251 .to_event(&other_keys)
3252 .unwrap();
3253
3254 ingest_parsed_event(&graph_store, &matching).unwrap();
3255 ingest_parsed_event(&graph_store, &non_matching).unwrap();
3256
3257 let filter = Filter::new().hashtag("hashtree").search("video");
3258 let events = query_events(&graph_store, &filter, 10);
3259 assert_eq!(events.len(), 1);
3260 assert_eq!(events[0].id, matching.id);
3261 }
3262
3263 fn benchmark_dataset_path() -> Option<PathBuf> {
3264 std::env::var_os("HASHTREE_BENCH_DATASET_PATH").map(PathBuf::from)
3265 }
3266
3267 fn benchmark_dataset_url() -> String {
3268 std::env::var("HASHTREE_BENCH_DATASET_URL")
3269 .ok()
3270 .filter(|value| !value.is_empty())
3271 .unwrap_or_else(|| WELLORDER_FIXTURE_URL.to_string())
3272 }
3273
3274 fn benchmark_stream_warmup_events(measured_events: usize) -> usize {
3275 std::env::var("HASHTREE_BENCH_WARMUP_EVENTS")
3276 .ok()
3277 .and_then(|value| value.parse::<usize>().ok())
3278 .unwrap_or_else(|| measured_events.clamp(1, 200))
3279 }
3280
3281 fn ensure_benchmark_dataset(path: &Path, url: &str) -> Result<()> {
3282 if path.exists() {
3283 return Ok(());
3284 }
3285
3286 let parent = path
3287 .parent()
3288 .context("benchmark dataset path has no parent directory")?;
3289 fs::create_dir_all(parent).context("create benchmark dataset directory")?;
3290
3291 let tmp = path.with_extension("tmp");
3292 let mut response = reqwest::blocking::get(url)
3293 .context("download benchmark dataset")?
3294 .error_for_status()
3295 .context("benchmark dataset request failed")?;
3296 let mut file = File::create(&tmp).context("create temporary benchmark dataset file")?;
3297 std::io::copy(&mut response, &mut file).context("write benchmark dataset")?;
3298 fs::rename(&tmp, path).context("move benchmark dataset into place")?;
3299
3300 Ok(())
3301 }
3302
3303 fn load_benchmark_dataset(path: &Path, max_events: usize) -> Result<Vec<Event>> {
3304 if max_events == 0 {
3305 return Ok(Vec::new());
3306 }
3307
3308 let mut child = Command::new("bzip2")
3309 .args(["-dc", &path.to_string_lossy()])
3310 .stdout(Stdio::piped())
3311 .spawn()
3312 .context("spawn bzip2 for benchmark dataset")?;
3313 let stdout = child
3314 .stdout
3315 .take()
3316 .context("benchmark dataset stdout missing")?;
3317 let mut events = Vec::with_capacity(max_events);
3318
3319 {
3320 let reader = BufReader::new(stdout);
3321 for line in reader.lines() {
3322 if events.len() >= max_events {
3323 break;
3324 }
3325 let line = line.context("read benchmark dataset line")?;
3326 let trimmed = line.trim();
3327 if trimmed.is_empty() {
3328 continue;
3329 }
3330 if let Ok(event) = Event::from_json(trimmed.to_string()) {
3331 events.push(event);
3332 }
3333 }
3334 }
3335
3336 if events.len() < max_events {
3337 let status = child.wait().context("wait for benchmark dataset reader")?;
3338 anyhow::ensure!(
3339 status.success(),
3340 "benchmark dataset reader exited with status {status}"
3341 );
3342 } else {
3343 let _ = child.kill();
3344 let _ = child.wait();
3345 }
3346
3347 Ok(events)
3348 }
3349
3350 fn build_synthetic_benchmark_events(event_count: usize, author_count: usize) -> Vec<Event> {
3351 let authors = (0..author_count)
3352 .map(|_| Keys::generate())
3353 .collect::<Vec<_>>();
3354 let mut events = Vec::with_capacity(event_count);
3355 for i in 0..event_count {
3356 let kind = if i % 8 < 5 {
3357 Kind::TextNote
3358 } else {
3359 Kind::Custom(30_023)
3360 };
3361 let mut tags = Vec::new();
3362 if kind == Kind::TextNote && i % 16 == 0 {
3363 tags.push(Tag::parse(&["t", "hashtree"]).unwrap());
3364 }
3365 let content = if kind == Kind::TextNote && i % 32 == 0 {
3366 format!("benchmark target event {i}")
3367 } else {
3368 format!("benchmark event {i}")
3369 };
3370 let event = EventBuilder::new(kind, content, tags)
3371 .custom_created_at(Timestamp::from_secs(1_700_000_000 + i as u64))
3372 .to_event(&authors[i % author_count])
3373 .unwrap();
3374 events.push(event);
3375 }
3376 events
3377 }
3378
3379 fn load_benchmark_events(
3380 event_count: usize,
3381 author_count: usize,
3382 ) -> Result<(String, Vec<Event>)> {
3383 if let Some(path) = benchmark_dataset_path() {
3384 let url = benchmark_dataset_url();
3385 ensure_benchmark_dataset(&path, &url)?;
3386 let events = load_benchmark_dataset(&path, event_count)?;
3387 return Ok((format!("dataset:{}", path.display()), events));
3388 }
3389
3390 Ok((
3391 format!("synthetic:{author_count}-authors"),
3392 build_synthetic_benchmark_events(event_count, author_count),
3393 ))
3394 }
3395
3396 fn first_tag_filter(event: &Event) -> Option<Filter> {
3397 event.tags.iter().find_map(|tag| match tag.as_slice() {
3398 [name, value, ..]
3399 if name.len() == 1
3400 && !value.is_empty()
3401 && name.as_bytes()[0].is_ascii_lowercase() =>
3402 {
3403 let letter = SingleLetterTag::from_char(name.chars().next()?).ok()?;
3404 Some(Filter::new().custom_tag(letter, [value.to_string()]))
3405 }
3406 _ => None,
3407 })
3408 }
3409
3410 fn first_search_term(event: &Event) -> Option<String> {
3411 event
3412 .content
3413 .split(|ch: char| !ch.is_alphanumeric())
3414 .find(|token| token.len() >= 4)
3415 .map(|token| token.to_ascii_lowercase())
3416 }
3417
3418 fn benchmark_match_count(events: &[Event], filter: &Filter, limit: usize) -> usize {
3419 events
3420 .iter()
3421 .filter(|event| filter.match_event(event))
3422 .count()
3423 .min(limit)
3424 }
3425
3426 fn benchmark_btree_orders() -> Vec<usize> {
3427 std::env::var("HASHTREE_BTREE_ORDERS")
3428 .ok()
3429 .map(|value| {
3430 value
3431 .split(',')
3432 .filter_map(|part| part.trim().parse::<usize>().ok())
3433 .filter(|order| *order >= 2)
3434 .collect::<Vec<_>>()
3435 })
3436 .filter(|orders| !orders.is_empty())
3437 .unwrap_or_else(|| vec![16, 24, 32, 48, 64])
3438 }
3439
3440 fn benchmark_read_iterations() -> usize {
3441 std::env::var("HASHTREE_BENCH_READ_ITERATIONS")
3442 .ok()
3443 .and_then(|value| value.parse::<usize>().ok())
3444 .unwrap_or(5)
3445 .max(1)
3446 }
3447
3448 fn average_duration(samples: &[Duration]) -> Duration {
3449 if samples.is_empty() {
3450 return Duration::ZERO;
3451 }
3452
3453 Duration::from_secs_f64(
3454 samples.iter().map(Duration::as_secs_f64).sum::<f64>() / samples.len() as f64,
3455 )
3456 }
3457
3458 fn average_read_trace(samples: &[ReadTraceSnapshot]) -> ReadTraceSnapshot {
3459 if samples.is_empty() {
3460 return ReadTraceSnapshot::default();
3461 }
3462
3463 let len = samples.len() as u64;
3464 ReadTraceSnapshot {
3465 get_calls: samples.iter().map(|sample| sample.get_calls).sum::<u64>() / len,
3466 total_bytes: samples.iter().map(|sample| sample.total_bytes).sum::<u64>() / len,
3467 unique_blocks: (samples
3468 .iter()
3469 .map(|sample| sample.unique_blocks as u64)
3470 .sum::<u64>()
3471 / len) as usize,
3472 unique_bytes: samples
3473 .iter()
3474 .map(|sample| sample.unique_bytes)
3475 .sum::<u64>()
3476 / len,
3477 cache_hits: samples.iter().map(|sample| sample.cache_hits).sum::<u64>() / len,
3478 remote_fetches: samples
3479 .iter()
3480 .map(|sample| sample.remote_fetches)
3481 .sum::<u64>()
3482 / len,
3483 remote_bytes: samples
3484 .iter()
3485 .map(|sample| sample.remote_bytes)
3486 .sum::<u64>()
3487 / len,
3488 }
3489 }
3490
3491 fn estimate_serialized_remote_ms(snapshot: &ReadTraceSnapshot, model: NetworkModel) -> f64 {
3492 let transfer_ms = if model.bandwidth_mib_per_s <= 0.0 {
3493 0.0
3494 } else {
3495 (snapshot.remote_bytes as f64 / (model.bandwidth_mib_per_s * 1024.0 * 1024.0)) * 1000.0
3496 };
3497 snapshot.remote_fetches as f64 * model.rtt_ms + transfer_ms
3498 }
3499
3500 #[derive(Debug, Clone)]
3501 struct IndexBenchmarkDataset {
3502 source: String,
3503 events: Vec<Event>,
3504 guaranteed_tag_name: String,
3505 guaranteed_tag_value: String,
3506 replaceable_pubkey: String,
3507 replaceable_kind: u32,
3508 parameterized_pubkey: String,
3509 parameterized_kind: u32,
3510 parameterized_d_tag: String,
3511 }
3512
3513 fn load_index_benchmark_dataset(
3514 event_count: usize,
3515 author_count: usize,
3516 ) -> Result<IndexBenchmarkDataset> {
3517 let (source, mut events) = load_benchmark_events(event_count, author_count)?;
3518 let base_timestamp = events
3519 .iter()
3520 .map(|event| event.created_at.as_u64())
3521 .max()
3522 .unwrap_or(1_700_000_000)
3523 + 1;
3524
3525 let replaceable_keys = Keys::generate();
3526 let parameterized_keys = Keys::generate();
3527 let tagged_keys = Keys::generate();
3528 let guaranteed_tag_name = "t".to_string();
3529 let guaranteed_tag_value = "btreebench".to_string();
3530 let replaceable_kind = 10_000u32;
3531 let parameterized_kind = 30_023u32;
3532 let parameterized_d_tag = "btree-bench".to_string();
3533
3534 let tagged = EventBuilder::new(
3535 Kind::TextNote,
3536 "btree benchmark tagged note",
3537 vec![Tag::parse(&["t", &guaranteed_tag_value]).unwrap()],
3538 )
3539 .custom_created_at(Timestamp::from_secs(base_timestamp))
3540 .to_event(&tagged_keys)
3541 .unwrap();
3542 let replaceable_old = EventBuilder::new(
3543 Kind::Custom(replaceable_kind.try_into().unwrap()),
3544 "replaceable old",
3545 [],
3546 )
3547 .custom_created_at(Timestamp::from_secs(base_timestamp + 1))
3548 .to_event(&replaceable_keys)
3549 .unwrap();
3550 let replaceable_new = EventBuilder::new(
3551 Kind::Custom(replaceable_kind.try_into().unwrap()),
3552 "replaceable new",
3553 [],
3554 )
3555 .custom_created_at(Timestamp::from_secs(base_timestamp + 2))
3556 .to_event(&replaceable_keys)
3557 .unwrap();
3558 let parameterized_old = EventBuilder::new(
3559 Kind::Custom(parameterized_kind.try_into().unwrap()),
3560 "",
3561 vec![Tag::identifier(¶meterized_d_tag)],
3562 )
3563 .custom_created_at(Timestamp::from_secs(base_timestamp + 3))
3564 .to_event(¶meterized_keys)
3565 .unwrap();
3566 let parameterized_new = EventBuilder::new(
3567 Kind::Custom(parameterized_kind.try_into().unwrap()),
3568 "",
3569 vec![Tag::identifier(¶meterized_d_tag)],
3570 )
3571 .custom_created_at(Timestamp::from_secs(base_timestamp + 4))
3572 .to_event(¶meterized_keys)
3573 .unwrap();
3574
3575 events.extend([
3576 tagged,
3577 replaceable_old,
3578 replaceable_new,
3579 parameterized_old,
3580 parameterized_new,
3581 ]);
3582
3583 Ok(IndexBenchmarkDataset {
3584 source,
3585 events,
3586 guaranteed_tag_name,
3587 guaranteed_tag_value,
3588 replaceable_pubkey: replaceable_keys.public_key().to_hex(),
3589 replaceable_kind,
3590 parameterized_pubkey: parameterized_keys.public_key().to_hex(),
3591 parameterized_kind,
3592 parameterized_d_tag,
3593 })
3594 }
3595
3596 fn build_btree_query_cases(dataset: &IndexBenchmarkDataset) -> Vec<BenchmarkQueryCase> {
3597 let primary_kind = dataset
3598 .events
3599 .iter()
3600 .find(|event| event.kind == Kind::TextNote)
3601 .map(|event| event.kind)
3602 .or_else(|| dataset.events.first().map(|event| event.kind))
3603 .expect("benchmark requires at least one event");
3604 let primary_kind_u32 = primary_kind.as_u16() as u32;
3605
3606 let author_pubkey = dataset
3607 .events
3608 .iter()
3609 .filter(|event| event.kind == primary_kind)
3610 .fold(HashMap::<String, usize>::new(), |mut counts, event| {
3611 *counts.entry(event.pubkey.to_hex()).or_default() += 1;
3612 counts
3613 })
3614 .into_iter()
3615 .max_by_key(|(_, count)| *count)
3616 .map(|(pubkey, _)| pubkey)
3617 .expect("benchmark requires an author for the selected kind");
3618
3619 let by_id_id = dataset.events[dataset.events.len() / 2].id.to_hex();
3620
3621 vec![
3622 BenchmarkQueryCase::ById { id: by_id_id },
3623 BenchmarkQueryCase::ByAuthor {
3624 pubkey: author_pubkey.clone(),
3625 limit: 50,
3626 },
3627 BenchmarkQueryCase::ByAuthorKind {
3628 pubkey: author_pubkey,
3629 kind: primary_kind_u32,
3630 limit: 50,
3631 },
3632 BenchmarkQueryCase::ByKind {
3633 kind: primary_kind_u32,
3634 limit: 200,
3635 },
3636 BenchmarkQueryCase::ByTag {
3637 tag_name: dataset.guaranteed_tag_name.clone(),
3638 tag_value: dataset.guaranteed_tag_value.clone(),
3639 limit: 100,
3640 },
3641 BenchmarkQueryCase::Recent { limit: 100 },
3642 BenchmarkQueryCase::Replaceable {
3643 pubkey: dataset.replaceable_pubkey.clone(),
3644 kind: dataset.replaceable_kind,
3645 },
3646 BenchmarkQueryCase::ParameterizedReplaceable {
3647 pubkey: dataset.parameterized_pubkey.clone(),
3648 kind: dataset.parameterized_kind,
3649 d_tag: dataset.parameterized_d_tag.clone(),
3650 },
3651 ]
3652 }
3653
3654 fn benchmark_warm_query_case<S: Store + 'static>(
3655 base: Arc<S>,
3656 root: &Cid,
3657 order: usize,
3658 case: &BenchmarkQueryCase,
3659 iterations: usize,
3660 ) -> QueryBenchmarkResult {
3661 let trace_store = Arc::new(CountingStore::new(base));
3662 let event_store = NostrEventStore::with_options(
3663 Arc::clone(&trace_store),
3664 NostrEventStoreOptions {
3665 btree_order: Some(order),
3666 },
3667 );
3668 let mut durations = Vec::with_capacity(iterations);
3669 let mut traces = Vec::with_capacity(iterations);
3670 for _ in 0..iterations {
3671 trace_store.reset();
3672 let started = Instant::now();
3673 let matches = block_on(case.execute(&event_store, root)).unwrap();
3674 durations.push(started.elapsed());
3675 traces.push(trace_store.snapshot());
3676 assert!(
3677 matches > 0,
3678 "benchmark query {} returned no matches",
3679 case.name()
3680 );
3681 }
3682 let mut sorted = durations.clone();
3683 sorted.sort_unstable();
3684 QueryBenchmarkResult {
3685 average_duration: average_duration(&durations),
3686 p95_duration: duration_percentile(&sorted, 95, 100),
3687 reads: average_read_trace(&traces),
3688 }
3689 }
3690
3691 fn benchmark_cold_query_case<S: Store + 'static>(
3692 remote: Arc<S>,
3693 root: &Cid,
3694 order: usize,
3695 case: &BenchmarkQueryCase,
3696 iterations: usize,
3697 ) -> QueryBenchmarkResult {
3698 let mut durations = Vec::with_capacity(iterations);
3699 let mut traces = Vec::with_capacity(iterations);
3700 for _ in 0..iterations {
3701 let cache = Arc::new(MemoryStore::new());
3702 let trace_store = Arc::new(ReadThroughStore::new(cache, Arc::clone(&remote)));
3703 let event_store = NostrEventStore::with_options(
3704 Arc::clone(&trace_store),
3705 NostrEventStoreOptions {
3706 btree_order: Some(order),
3707 },
3708 );
3709 let started = Instant::now();
3710 let matches = block_on(case.execute(&event_store, root)).unwrap();
3711 durations.push(started.elapsed());
3712 traces.push(trace_store.snapshot());
3713 assert!(
3714 matches > 0,
3715 "benchmark query {} returned no matches",
3716 case.name()
3717 );
3718 }
3719 let mut sorted = durations.clone();
3720 sorted.sort_unstable();
3721 QueryBenchmarkResult {
3722 average_duration: average_duration(&durations),
3723 p95_duration: duration_percentile(&sorted, 95, 100),
3724 reads: average_read_trace(&traces),
3725 }
3726 }
3727
3728 fn duration_percentile(
3729 sorted: &[std::time::Duration],
3730 numerator: usize,
3731 denominator: usize,
3732 ) -> std::time::Duration {
3733 if sorted.is_empty() {
3734 return std::time::Duration::ZERO;
3735 }
3736 let index = ((sorted.len() - 1) * numerator) / denominator;
3737 sorted[index]
3738 }
3739
3740 #[test]
3741 #[ignore = "benchmark"]
3742 fn benchmark_query_events_large_dataset() {
3743 let _guard = test_lock();
3744 let tmp = TempDir::new().unwrap();
3745 let graph_store =
3746 open_social_graph_store_with_mapsize(tmp.path(), Some(512 * 1024 * 1024)).unwrap();
3747 set_nostr_profile_enabled(true);
3748 reset_nostr_profile();
3749
3750 let author_count = 64usize;
3751 let measured_event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3752 .ok()
3753 .and_then(|value| value.parse::<usize>().ok())
3754 .unwrap_or(600usize);
3755 let warmup_event_count = benchmark_stream_warmup_events(measured_event_count);
3756 let total_event_count = warmup_event_count + measured_event_count;
3757 let (source, events) = load_benchmark_events(total_event_count, author_count).unwrap();
3758 let loaded_event_count = events.len();
3759 let warmup_event_count = warmup_event_count.min(loaded_event_count.saturating_sub(1));
3760 let (warmup_events, measured_events) = events.split_at(warmup_event_count);
3761
3762 println!(
3763 "starting steady-state dataset benchmark with {} warmup events and {} measured stream events from {}",
3764 warmup_events.len(),
3765 measured_events.len(),
3766 source
3767 );
3768 if !warmup_events.is_empty() {
3769 ingest_parsed_events(&graph_store, warmup_events).unwrap();
3770 }
3771
3772 let stream_start = Instant::now();
3773 let mut per_event_latencies = Vec::with_capacity(measured_events.len());
3774 for event in measured_events {
3775 let event_start = Instant::now();
3776 ingest_parsed_event(&graph_store, event).unwrap();
3777 per_event_latencies.push(event_start.elapsed());
3778 }
3779 let ingest_duration = stream_start.elapsed();
3780
3781 let mut sorted_latencies = per_event_latencies.clone();
3782 sorted_latencies.sort_unstable();
3783 let average_latency = if per_event_latencies.is_empty() {
3784 std::time::Duration::ZERO
3785 } else {
3786 std::time::Duration::from_secs_f64(
3787 per_event_latencies
3788 .iter()
3789 .map(std::time::Duration::as_secs_f64)
3790 .sum::<f64>()
3791 / per_event_latencies.len() as f64,
3792 )
3793 };
3794 let ingest_capacity_eps = if ingest_duration.is_zero() {
3795 f64::INFINITY
3796 } else {
3797 measured_events.len() as f64 / ingest_duration.as_secs_f64()
3798 };
3799 println!(
3800 "benchmark steady-state ingest complete in {:?} (avg={:?} p50={:?} p95={:?} p99={:?} capacity={:.2} events/s)",
3801 ingest_duration,
3802 average_latency,
3803 duration_percentile(&sorted_latencies, 50, 100),
3804 duration_percentile(&sorted_latencies, 95, 100),
3805 duration_percentile(&sorted_latencies, 99, 100),
3806 ingest_capacity_eps
3807 );
3808 let mut profile = take_nostr_profile();
3809 profile.sort_by(|left, right| right.total.cmp(&left.total));
3810 for stat in profile {
3811 let pct = if ingest_duration.is_zero() {
3812 0.0
3813 } else {
3814 (stat.total.as_secs_f64() / ingest_duration.as_secs_f64()) * 100.0
3815 };
3816 let average = if stat.count == 0 {
3817 std::time::Duration::ZERO
3818 } else {
3819 std::time::Duration::from_secs_f64(stat.total.as_secs_f64() / stat.count as f64)
3820 };
3821 println!(
3822 "ingest profile: label={} total={:?} pct={:.1}% count={} avg={:?} max={:?}",
3823 stat.label, stat.total, pct, stat.count, average, stat.max
3824 );
3825 }
3826 set_nostr_profile_enabled(false);
3827
3828 let kind = events
3829 .iter()
3830 .find(|event| event.kind == Kind::TextNote)
3831 .map(|event| event.kind)
3832 .or_else(|| events.first().map(|event| event.kind))
3833 .expect("benchmark requires at least one event");
3834 let kind_filter = Filter::new().kind(kind);
3835 let kind_start = Instant::now();
3836 let kind_events = query_events(&graph_store, &kind_filter, 200);
3837 let kind_duration = kind_start.elapsed();
3838 assert_eq!(
3839 kind_events.len(),
3840 benchmark_match_count(&events, &kind_filter, 200)
3841 );
3842 assert!(kind_events
3843 .windows(2)
3844 .all(|window| window[0].created_at >= window[1].created_at));
3845
3846 let author_pubkey = events
3847 .iter()
3848 .find(|event| event.kind == kind)
3849 .map(|event| event.pubkey)
3850 .expect("benchmark requires an author for the selected kind");
3851 let author_filter = Filter::new().author(author_pubkey).kind(kind);
3852 let author_start = Instant::now();
3853 let author_events = query_events(&graph_store, &author_filter, 50);
3854 let author_duration = author_start.elapsed();
3855 assert_eq!(
3856 author_events.len(),
3857 benchmark_match_count(&events, &author_filter, 50)
3858 );
3859
3860 let tag_filter = events
3861 .iter()
3862 .find_map(first_tag_filter)
3863 .expect("benchmark requires at least one tagged event");
3864 let tag_start = Instant::now();
3865 let tag_events = query_events(&graph_store, &tag_filter, 100);
3866 let tag_duration = tag_start.elapsed();
3867 assert_eq!(
3868 tag_events.len(),
3869 benchmark_match_count(&events, &tag_filter, 100)
3870 );
3871
3872 let search_source = events
3873 .iter()
3874 .find_map(|event| first_search_term(event).map(|term| (event.kind, term)))
3875 .expect("benchmark requires at least one searchable event");
3876 let search_filter = Filter::new().kind(search_source.0).search(search_source.1);
3877 let search_start = Instant::now();
3878 let search_events = query_events(&graph_store, &search_filter, 100);
3879 let search_duration = search_start.elapsed();
3880 assert_eq!(
3881 search_events.len(),
3882 benchmark_match_count(&events, &search_filter, 100)
3883 );
3884
3885 println!(
3886 "steady-state benchmark: source={} warmup_events={} stream_events={} ingest={:?} avg={:?} p50={:?} p95={:?} p99={:?} capacity_eps={:.2} kind={:?} author={:?} tag={:?} search={:?}",
3887 source,
3888 warmup_events.len(),
3889 measured_events.len(),
3890 ingest_duration,
3891 average_latency,
3892 duration_percentile(&sorted_latencies, 50, 100),
3893 duration_percentile(&sorted_latencies, 95, 100),
3894 duration_percentile(&sorted_latencies, 99, 100),
3895 ingest_capacity_eps,
3896 kind_duration,
3897 author_duration,
3898 tag_duration,
3899 search_duration
3900 );
3901 }
3902
3903 #[test]
3904 #[ignore = "benchmark"]
3905 fn benchmark_nostr_btree_query_tradeoffs() {
3906 let _guard = test_lock();
3907 let event_count = std::env::var("HASHTREE_BENCH_EVENTS")
3908 .ok()
3909 .and_then(|value| value.parse::<usize>().ok())
3910 .unwrap_or(2_000usize);
3911 let iterations = benchmark_read_iterations();
3912 let orders = benchmark_btree_orders();
3913 let dataset = load_index_benchmark_dataset(event_count, 64).unwrap();
3914 let cases = build_btree_query_cases(&dataset);
3915 let stored_events = dataset
3916 .events
3917 .iter()
3918 .map(stored_event_from_nostr)
3919 .collect::<Vec<_>>();
3920
3921 println!(
3922 "btree-order benchmark: source={} events={} iterations={} orders={:?}",
3923 dataset.source,
3924 stored_events.len(),
3925 iterations,
3926 orders
3927 );
3928 println!(
3929 "network models are serialized fetch estimates: {}",
3930 NETWORK_MODELS
3931 .iter()
3932 .map(|model| format!(
3933 "{}={}ms_rtt/{}MiBps",
3934 model.name, model.rtt_ms, model.bandwidth_mib_per_s
3935 ))
3936 .collect::<Vec<_>>()
3937 .join(", ")
3938 );
3939
3940 for order in orders {
3941 let tmp = TempDir::new().unwrap();
3942 let local_store =
3943 Arc::new(LocalStore::new(tmp.path().join("blobs"), &StorageBackend::Lmdb).unwrap());
3944 let event_store = NostrEventStore::with_options(
3945 Arc::clone(&local_store),
3946 NostrEventStoreOptions {
3947 btree_order: Some(order),
3948 },
3949 );
3950 let root = block_on(event_store.build(None, stored_events.clone()))
3951 .unwrap()
3952 .expect("benchmark build root");
3953
3954 println!("btree-order={} root={}", order, hex::encode(root.hash));
3955 let mut warm_total_ms = 0.0f64;
3956 let mut model_totals = NETWORK_MODELS
3957 .iter()
3958 .map(|model| (model.name, 0.0f64))
3959 .collect::<HashMap<_, _>>();
3960
3961 for case in &cases {
3962 let warm = benchmark_warm_query_case(
3963 Arc::clone(&local_store),
3964 &root,
3965 order,
3966 case,
3967 iterations,
3968 );
3969 let cold = benchmark_cold_query_case(
3970 Arc::clone(&local_store),
3971 &root,
3972 order,
3973 case,
3974 iterations,
3975 );
3976 warm_total_ms += warm.average_duration.as_secs_f64() * 1000.0;
3977
3978 let model_estimates = NETWORK_MODELS
3979 .iter()
3980 .map(|model| {
3981 let estimate = estimate_serialized_remote_ms(&cold.reads, *model);
3982 *model_totals.get_mut(model.name).unwrap() += estimate;
3983 format!("{}={:.2}ms", model.name, estimate)
3984 })
3985 .collect::<Vec<_>>()
3986 .join(" ");
3987
3988 println!(
3989 "btree-order={} query={} warm_avg={:?} warm_p95={:?} warm_blocks={} warm_unique_bytes={} cold_fetches={} cold_bytes={} cold_local_avg={:?} {}",
3990 order,
3991 case.name(),
3992 warm.average_duration,
3993 warm.p95_duration,
3994 warm.reads.unique_blocks,
3995 warm.reads.unique_bytes,
3996 cold.reads.remote_fetches,
3997 cold.reads.remote_bytes,
3998 cold.average_duration,
3999 model_estimates
4000 );
4001 }
4002
4003 println!(
4004 "btree-order={} summary unweighted_warm_avg_ms={:.3} {}",
4005 order,
4006 warm_total_ms / cases.len() as f64,
4007 NETWORK_MODELS
4008 .iter()
4009 .map(|model| format!(
4010 "{}={:.2}ms",
4011 model.name,
4012 model_totals[model.name] / cases.len() as f64
4013 ))
4014 .collect::<Vec<_>>()
4015 .join(" ")
4016 );
4017 }
4018 }
4019
4020 #[test]
4021 fn test_ensure_social_graph_mapsize_rounds_and_applies() {
4022 let _guard = test_lock();
4023 let tmp = TempDir::new().unwrap();
4024 ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
4025 let requested = 70 * 1024 * 1024;
4026 ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
4027 let env = unsafe {
4028 heed::EnvOpenOptions::new()
4029 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
4030 .max_dbs(SOCIALGRAPH_MAX_DBS)
4031 .open(tmp.path())
4032 }
4033 .unwrap();
4034 assert!(env.info().map_size >= requested as usize);
4035 assert_eq!(env.info().map_size % page_size_bytes(), 0);
4036 }
4037
4038 #[test]
4039 fn test_ingest_events_batches_graph_updates() {
4040 let _guard = test_lock();
4041 let tmp = TempDir::new().unwrap();
4042 let graph_store = open_social_graph_store(tmp.path()).unwrap();
4043
4044 let root_keys = Keys::generate();
4045 let alice_keys = Keys::generate();
4046 let bob_keys = Keys::generate();
4047
4048 let root_pk = root_keys.public_key().to_bytes();
4049 set_social_graph_root(&graph_store, &root_pk);
4050
4051 let root_follows_alice = EventBuilder::new(
4052 Kind::ContactList,
4053 "",
4054 vec![Tag::public_key(alice_keys.public_key())],
4055 )
4056 .custom_created_at(Timestamp::from_secs(10))
4057 .to_event(&root_keys)
4058 .unwrap();
4059 let alice_follows_bob = EventBuilder::new(
4060 Kind::ContactList,
4061 "",
4062 vec![Tag::public_key(bob_keys.public_key())],
4063 )
4064 .custom_created_at(Timestamp::from_secs(11))
4065 .to_event(&alice_keys)
4066 .unwrap();
4067
4068 ingest_parsed_events(
4069 &graph_store,
4070 &[root_follows_alice.clone(), alice_follows_bob.clone()],
4071 )
4072 .unwrap();
4073
4074 assert_eq!(
4075 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4076 Some(1)
4077 );
4078 assert_eq!(
4079 get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
4080 Some(2)
4081 );
4082
4083 let filter = Filter::new().kind(Kind::ContactList);
4084 let stored = query_events(&graph_store, &filter, 10);
4085 let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
4086 assert!(ids.contains(&root_follows_alice.id));
4087 assert!(ids.contains(&alice_follows_bob.id));
4088 }
4089
4090 #[test]
4091 fn test_ingest_graph_events_updates_graph_without_indexing_events() {
4092 let _guard = test_lock();
4093 let tmp = TempDir::new().unwrap();
4094 let graph_store = open_social_graph_store(tmp.path()).unwrap();
4095
4096 let root_keys = Keys::generate();
4097 let alice_keys = Keys::generate();
4098
4099 let root_pk = root_keys.public_key().to_bytes();
4100 set_social_graph_root(&graph_store, &root_pk);
4101
4102 let root_follows_alice = EventBuilder::new(
4103 Kind::ContactList,
4104 "",
4105 vec![Tag::public_key(alice_keys.public_key())],
4106 )
4107 .custom_created_at(Timestamp::from_secs(10))
4108 .to_event(&root_keys)
4109 .unwrap();
4110
4111 ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
4112 .unwrap();
4113
4114 assert_eq!(
4115 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
4116 Some(1)
4117 );
4118 let filter = Filter::new().kind(Kind::ContactList);
4119 assert!(query_events(&graph_store, &filter, 10).is_empty());
4120 }
4121}