1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10 LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::Cid;
21use hashtree_nostr::{ListEventsOptions, NostrEventStore, NostrEventStoreError, StoredNostrEvent};
22use nostr::{Event, Filter, JsonUtil, Kind};
23use nostr_social_graph::{
24 BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
25 SocialGraphBackend as NostrSocialGraphBackend,
26};
27use nostr_social_graph_heed::HeedSocialGraph;
28
29use crate::storage::{LocalStore, StorageRouter};
30
31#[cfg(test)]
32use std::sync::{Mutex, MutexGuard, OnceLock};
33
34pub type UserSet = BTreeSet<[u8; 32]>;
35
36const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
37const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
38const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
39const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
40const SOCIALGRAPH_MAX_DBS: u32 = 16;
41
42#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
43struct StoredCid {
44 hash: [u8; 32],
45 key: Option<[u8; 32]>,
46}
47
48#[derive(Debug, Clone, Default, serde::Serialize)]
49pub struct SocialGraphStats {
50 pub total_users: usize,
51 pub root: Option<String>,
52 pub total_follows: usize,
53 pub max_depth: u32,
54 pub size_by_distance: BTreeMap<u32, usize>,
55 pub enabled: bool,
56}
57
58#[derive(Debug, Clone)]
59struct DistanceCache {
60 stats: SocialGraphStats,
61 users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
62}
63
64#[derive(Debug, thiserror::Error)]
65#[error("{0}")]
66pub struct UpstreamGraphBackendError(String);
67
68pub struct SocialGraphStore {
69 graph: StdMutex<HeedSocialGraph>,
70 distance_cache: StdMutex<Option<DistanceCache>>,
71 event_store: NostrEventStore<StorageRouter>,
72 events_root_path: PathBuf,
73}
74
75pub trait SocialGraphBackend: Send + Sync {
76 fn stats(&self) -> Result<SocialGraphStats>;
77 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
78 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
79 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
80 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
81 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
82 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
83 fn ingest_event(&self, event: &Event) -> Result<()>;
84 fn ingest_events(&self, events: &[Event]) -> Result<()> {
85 for event in events {
86 self.ingest_event(event)?;
87 }
88 Ok(())
89 }
90 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
91 self.ingest_events(events)
92 }
93 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
94}
95
96#[cfg(test)]
97pub type TestLockGuard = MutexGuard<'static, ()>;
98
99#[cfg(test)]
100static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
101
102#[cfg(test)]
103pub fn test_lock() -> TestLockGuard {
104 NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
105}
106
107pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
108 open_social_graph_store_with_mapsize(data_dir, None)
109}
110
111pub fn open_social_graph_store_with_mapsize(
112 data_dir: &Path,
113 mapsize_bytes: Option<u64>,
114) -> Result<Arc<SocialGraphStore>> {
115 let db_dir = data_dir.join("socialgraph");
116 open_social_graph_store_at_path(&db_dir, mapsize_bytes)
117}
118
119pub fn open_social_graph_store_with_storage(
120 data_dir: &Path,
121 store: Arc<StorageRouter>,
122 mapsize_bytes: Option<u64>,
123) -> Result<Arc<SocialGraphStore>> {
124 let db_dir = data_dir.join("socialgraph");
125 open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
126}
127
128pub fn open_social_graph_store_at_path(
129 db_dir: &Path,
130 mapsize_bytes: Option<u64>,
131) -> Result<Arc<SocialGraphStore>> {
132 let config = hashtree_config::Config::load_or_default();
133 let backend = &config.storage.backend;
134 let local_store = Arc::new(
135 LocalStore::new(db_dir.join("blobs"), backend)
136 .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
137 );
138 let store = Arc::new(StorageRouter::new(local_store));
139 open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
140}
141
142pub fn open_social_graph_store_at_path_with_storage(
143 db_dir: &Path,
144 store: Arc<StorageRouter>,
145 mapsize_bytes: Option<u64>,
146) -> Result<Arc<SocialGraphStore>> {
147 std::fs::create_dir_all(db_dir)?;
148 if let Some(size) = mapsize_bytes {
149 ensure_social_graph_mapsize(db_dir, size)?;
150 }
151 let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
152 .context("open nostr-social-graph heed backend")?;
153
154 Ok(Arc::new(SocialGraphStore {
155 graph: StdMutex::new(graph),
156 distance_cache: StdMutex::new(None),
157 event_store: NostrEventStore::new(store),
158 events_root_path: db_dir.join(EVENTS_ROOT_FILE),
159 }))
160}
161
162pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
163 if let Err(err) = store.set_root(pk_bytes) {
164 tracing::warn!("Failed to set social graph root: {err}");
165 }
166}
167
168pub fn get_follow_distance(
169 backend: &(impl SocialGraphBackend + ?Sized),
170 pk_bytes: &[u8; 32],
171) -> Option<u32> {
172 backend.follow_distance(pk_bytes).ok().flatten()
173}
174
175pub fn get_follows(
176 backend: &(impl SocialGraphBackend + ?Sized),
177 pk_bytes: &[u8; 32],
178) -> Vec<[u8; 32]> {
179 match backend.followed_targets(pk_bytes) {
180 Ok(set) => set.into_iter().collect(),
181 Err(_) => Vec::new(),
182 }
183}
184
185pub fn is_overmuted(
186 backend: &(impl SocialGraphBackend + ?Sized),
187 _root_pk: &[u8; 32],
188 user_pk: &[u8; 32],
189 threshold: f64,
190) -> bool {
191 backend
192 .is_overmuted_user(user_pk, threshold)
193 .unwrap_or(false)
194}
195
196pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
197 let event = match Event::from_json(event_json) {
198 Ok(event) => event,
199 Err(_) => return,
200 };
201
202 if let Err(err) = backend.ingest_event(&event) {
203 tracing::warn!("Failed to ingest social graph event: {err}");
204 }
205}
206
207pub fn ingest_parsed_event(
208 backend: &(impl SocialGraphBackend + ?Sized),
209 event: &Event,
210) -> Result<()> {
211 backend.ingest_event(event)
212}
213
214pub fn ingest_parsed_events(
215 backend: &(impl SocialGraphBackend + ?Sized),
216 events: &[Event],
217) -> Result<()> {
218 backend.ingest_events(events)
219}
220
221pub fn ingest_graph_parsed_events(
222 backend: &(impl SocialGraphBackend + ?Sized),
223 events: &[Event],
224) -> Result<()> {
225 backend.ingest_graph_events(events)
226}
227
228pub fn query_events(
229 backend: &(impl SocialGraphBackend + ?Sized),
230 filter: &Filter,
231 limit: usize,
232) -> Vec<Event> {
233 backend.query_events(filter, limit).unwrap_or_default()
234}
235
236impl SocialGraphStore {
237 fn invalidate_distance_cache(&self) {
238 *self.distance_cache.lock().unwrap() = None;
239 }
240
241 fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
242 let unique_ids = state
243 .unique_ids
244 .into_iter()
245 .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
246 .collect::<Result<HashMap<_, _>>>()?;
247
248 let mut users_by_distance = BTreeMap::new();
249 let mut size_by_distance = BTreeMap::new();
250 for (distance, users) in state.users_by_follow_distance {
251 let decoded = users
252 .into_iter()
253 .filter_map(|id| unique_ids.get(&id).copied())
254 .collect::<Vec<_>>();
255 size_by_distance.insert(distance, decoded.len());
256 users_by_distance.insert(distance, decoded);
257 }
258
259 let total_follows = state
260 .followed_by_user
261 .iter()
262 .map(|(_, targets)| targets.len())
263 .sum::<usize>();
264 let total_users = size_by_distance.values().copied().sum();
265 let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
266
267 Ok(DistanceCache {
268 stats: SocialGraphStats {
269 total_users,
270 root: Some(state.root),
271 total_follows,
272 max_depth,
273 size_by_distance,
274 enabled: true,
275 },
276 users_by_distance,
277 })
278 }
279
280 fn load_distance_cache(&self) -> Result<DistanceCache> {
281 if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
282 return Ok(cache);
283 }
284
285 let state = {
286 let graph = self.graph.lock().unwrap();
287 graph.export_state().context("export social graph state")?
288 };
289 let cache = Self::build_distance_cache(state)?;
290 *self.distance_cache.lock().unwrap() = Some(cache.clone());
291 Ok(cache)
292 }
293
294 fn set_root(&self, root: &[u8; 32]) -> Result<()> {
295 let root_hex = hex::encode(root);
296 {
297 let mut graph = self.graph.lock().unwrap();
298 if should_replace_placeholder_root(&graph)? {
299 let fresh = SocialGraph::new(&root_hex);
300 graph
301 .replace_state(&fresh.export_state())
302 .context("replace placeholder social graph root")?;
303 } else {
304 graph
305 .set_root(&root_hex)
306 .context("set nostr-social-graph root")?;
307 }
308 }
309 self.invalidate_distance_cache();
310 Ok(())
311 }
312
313 fn stats(&self) -> Result<SocialGraphStats> {
314 Ok(self.load_distance_cache()?.stats)
315 }
316
317 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
318 let graph = self.graph.lock().unwrap();
319 let distance = graph
320 .get_follow_distance(&hex::encode(pk_bytes))
321 .context("read social graph follow distance")?;
322 Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
323 }
324
325 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
326 Ok(self
327 .load_distance_cache()?
328 .users_by_distance
329 .get(&distance)
330 .cloned()
331 .unwrap_or_default())
332 }
333
334 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
335 let graph = self.graph.lock().unwrap();
336 graph
337 .get_follow_list_created_at(&hex::encode(owner))
338 .context("read social graph follow list timestamp")
339 }
340
341 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
342 let graph = self.graph.lock().unwrap();
343 decode_pubkey_set(
344 graph
345 .get_followed_by_user(&hex::encode(owner))
346 .context("read followed targets")?,
347 )
348 }
349
350 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
351 if threshold <= 0.0 {
352 return Ok(false);
353 }
354 let graph = self.graph.lock().unwrap();
355 graph
356 .is_overmuted(&hex::encode(user_pk), threshold)
357 .context("check social graph overmute")
358 }
359
360 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
361 let state = {
362 let graph = self.graph.lock().unwrap();
363 graph.export_state().context("export social graph state")?
364 };
365 let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
366 let root_hex = hex::encode(root);
367 if graph.get_root() != root_hex {
368 graph
369 .set_root(&root_hex)
370 .context("set snapshot social graph root")?;
371 }
372 let chunks = graph
373 .to_binary_chunks_with_budget(*options)
374 .context("encode social graph snapshot")?;
375 Ok(chunks.into_iter().map(Bytes::from).collect())
376 }
377
378 fn ingest_event(&self, event: &Event) -> Result<()> {
379 let current_root = self.events_root()?;
380 let next_root = self.store_event(current_root.as_ref(), event)?;
381 self.write_events_root(Some(&next_root))?;
382
383 if is_social_graph_event(event.kind) {
384 {
385 let mut graph = self.graph.lock().unwrap();
386 graph
387 .handle_event(&graph_event_from_nostr(event), true, 0.0)
388 .context("ingest social graph event into nostr-social-graph")?;
389 }
390 self.invalidate_distance_cache();
391 }
392
393 Ok(())
394 }
395
396 fn ingest_events(&self, events: &[Event]) -> Result<()> {
397 if events.is_empty() {
398 return Ok(());
399 }
400
401 let mut current_root = self.events_root()?;
402 for event in events {
403 let next_root = self.store_event(current_root.as_ref(), event)?;
404 current_root = Some(next_root);
405 }
406 self.write_events_root(current_root.as_ref())?;
407
408 let graph_events = events
409 .iter()
410 .filter(|event| is_social_graph_event(event.kind))
411 .collect::<Vec<_>>();
412 if graph_events.is_empty() {
413 return Ok(());
414 }
415
416 {
417 let mut graph = self.graph.lock().unwrap();
418 let mut snapshot = SocialGraph::from_state(
419 graph
420 .export_state()
421 .context("export social graph state for batch ingest")?,
422 )
423 .context("rebuild social graph state for batch ingest")?;
424 for event in graph_events {
425 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
426 }
427 graph
428 .replace_state(&snapshot.export_state())
429 .context("replace batched social graph state")?;
430 }
431 self.invalidate_distance_cache();
432
433 Ok(())
434 }
435
436 fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
437 let graph_events = events
438 .iter()
439 .filter(|event| is_social_graph_event(event.kind))
440 .collect::<Vec<_>>();
441 if graph_events.is_empty() {
442 return Ok(());
443 }
444
445 {
446 let mut graph = self.graph.lock().unwrap();
447 let mut snapshot = SocialGraph::from_state(
448 graph
449 .export_state()
450 .context("export social graph state for graph-only ingest")?,
451 )
452 .context("rebuild social graph state for graph-only ingest")?;
453 for event in graph_events {
454 snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
455 }
456 graph
457 .replace_state(&snapshot.export_state())
458 .context("replace graph-only social graph state")?;
459 }
460 self.invalidate_distance_cache();
461 Ok(())
462 }
463
464 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
465 if limit == 0 {
466 return Ok(Vec::new());
467 }
468
469 let events_root = self.events_root()?;
470 let Some(root) = events_root.as_ref() else {
471 return Ok(Vec::new());
472 };
473 let mut candidates = Vec::new();
474 let mut seen: HashSet<[u8; 32]> = HashSet::new();
475
476 if let Some(ids) = filter.ids.as_ref() {
477 for id in ids {
478 let id_bytes = id.to_bytes();
479 if !seen.insert(id_bytes) {
480 continue;
481 }
482 if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
483 if filter.match_event(&event) {
484 candidates.push(event);
485 }
486 }
487 if candidates.len() >= limit {
488 break;
489 }
490 }
491 } else if let Some(authors) = filter.authors.as_ref() {
492 for author in authors {
493 let mut author_matches = 0usize;
494 for event in self.load_events_for_author(root, author, filter)? {
495 let id_bytes = event.id.to_bytes();
496 if !seen.insert(id_bytes) {
497 continue;
498 }
499 if filter.match_event(&event) {
500 candidates.push(event);
501 author_matches += 1;
502 }
503 if author_matches >= limit {
504 break;
505 }
506 }
507 }
508 } else {
509 for event in self.load_recent_events(root)? {
510 let id_bytes = event.id.to_bytes();
511 if !seen.insert(id_bytes) {
512 continue;
513 }
514 if filter.match_event(&event) {
515 candidates.push(event);
516 }
517 if candidates.len() >= limit {
518 break;
519 }
520 }
521 }
522
523 candidates.sort_by(|a, b| {
524 b.created_at
525 .as_u64()
526 .cmp(&a.created_at.as_u64())
527 .then_with(|| a.id.cmp(&b.id))
528 });
529 candidates.truncate(limit);
530 Ok(candidates)
531 }
532
533 fn events_root(&self) -> Result<Option<Cid>> {
534 let Ok(bytes) = std::fs::read(&self.events_root_path) else {
535 return Ok(None);
536 };
537 decode_cid(&bytes)
538 }
539
540 fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
541 let Some(root) = root else {
542 if self.events_root_path.exists() {
543 std::fs::remove_file(&self.events_root_path)?;
544 }
545 return Ok(());
546 };
547
548 let encoded = encode_cid(root)?;
549 let tmp_path = self.events_root_path.with_extension("tmp");
550 std::fs::write(&tmp_path, encoded)?;
551 std::fs::rename(tmp_path, &self.events_root_path)?;
552 Ok(())
553 }
554
555 fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
556 let stored = stored_event_from_nostr(event);
557 block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
558 }
559
560 fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
561 let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
562 .map_err(map_event_store_error)?;
563 stored.map(nostr_event_from_stored).transpose()
564 }
565
566 fn load_events_for_author(
567 &self,
568 root: &Cid,
569 author: &nostr::PublicKey,
570 filter: &Filter,
571 ) -> Result<Vec<Event>> {
572 let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
573 if kinds.len() == 1 {
574 kinds.iter().next().map(|kind| kind.as_u16() as u32)
575 } else {
576 None
577 }
578 });
579 let author_hex = author.to_hex();
580 let stored = match kind_filter {
581 Some(kind) => block_on(self.event_store.list_by_author_and_kind(
582 Some(root),
583 &author_hex,
584 kind,
585 ListEventsOptions::default(),
586 ))
587 .map_err(map_event_store_error)?,
588 None => block_on(self.event_store.list_by_author(
589 Some(root),
590 &author_hex,
591 ListEventsOptions::default(),
592 ))
593 .map_err(map_event_store_error)?,
594 };
595 stored
596 .into_iter()
597 .map(nostr_event_from_stored)
598 .collect::<Result<Vec<_>>>()
599 }
600
601 fn load_recent_events(&self, root: &Cid) -> Result<Vec<Event>> {
602 let stored = block_on(
603 self.event_store
604 .list_recent(Some(root), ListEventsOptions::default()),
605 )
606 .map_err(map_event_store_error)?;
607 stored
608 .into_iter()
609 .map(nostr_event_from_stored)
610 .collect::<Result<Vec<_>>>()
611 }
612}
613
614impl SocialGraphBackend for SocialGraphStore {
615 fn stats(&self) -> Result<SocialGraphStats> {
616 SocialGraphStore::stats(self)
617 }
618
619 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
620 SocialGraphStore::users_by_follow_distance(self, distance)
621 }
622
623 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
624 SocialGraphStore::follow_distance(self, pk_bytes)
625 }
626
627 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
628 SocialGraphStore::follow_list_created_at(self, owner)
629 }
630
631 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
632 SocialGraphStore::followed_targets(self, owner)
633 }
634
635 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
636 SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
637 }
638
639 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
640 SocialGraphStore::snapshot_chunks(self, root, options)
641 }
642
643 fn ingest_event(&self, event: &Event) -> Result<()> {
644 SocialGraphStore::ingest_event(self, event)
645 }
646
647 fn ingest_events(&self, events: &[Event]) -> Result<()> {
648 SocialGraphStore::ingest_events(self, events)
649 }
650
651 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
652 SocialGraphStore::apply_graph_events_only(self, events)
653 }
654
655 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
656 SocialGraphStore::query_events(self, filter, limit)
657 }
658}
659
660impl NostrSocialGraphBackend for SocialGraphStore {
661 type Error = UpstreamGraphBackendError;
662
663 fn get_root(&self) -> std::result::Result<String, Self::Error> {
664 let graph = self.graph.lock().unwrap();
665 graph
666 .get_root()
667 .context("read social graph root")
668 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
669 }
670
671 fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
672 let root_bytes =
673 decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
674 SocialGraphStore::set_root(self, &root_bytes)
675 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
676 }
677
678 fn handle_event(
679 &mut self,
680 event: &GraphEvent,
681 allow_unknown_authors: bool,
682 overmute_threshold: f64,
683 ) -> std::result::Result<(), Self::Error> {
684 {
685 let mut graph = self.graph.lock().unwrap();
686 graph
687 .handle_event(event, allow_unknown_authors, overmute_threshold)
688 .context("ingest social graph event into heed backend")
689 .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
690 }
691 self.invalidate_distance_cache();
692 Ok(())
693 }
694
695 fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
696 let graph = self.graph.lock().unwrap();
697 graph
698 .get_follow_distance(user)
699 .context("read social graph follow distance")
700 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
701 }
702
703 fn is_following(
704 &self,
705 follower: &str,
706 followed_user: &str,
707 ) -> std::result::Result<bool, Self::Error> {
708 let graph = self.graph.lock().unwrap();
709 graph
710 .is_following(follower, followed_user)
711 .context("read social graph following edge")
712 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
713 }
714
715 fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
716 let graph = self.graph.lock().unwrap();
717 graph
718 .get_followed_by_user(user)
719 .context("read followed-by-user list")
720 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
721 }
722
723 fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
724 let graph = self.graph.lock().unwrap();
725 graph
726 .get_followers_by_user(user)
727 .context("read followers-by-user list")
728 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
729 }
730
731 fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
732 let graph = self.graph.lock().unwrap();
733 graph
734 .get_muted_by_user(user)
735 .context("read muted-by-user list")
736 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
737 }
738
739 fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
740 let graph = self.graph.lock().unwrap();
741 graph
742 .get_user_muted_by(user)
743 .context("read user-muted-by list")
744 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
745 }
746
747 fn get_follow_list_created_at(
748 &self,
749 user: &str,
750 ) -> std::result::Result<Option<u64>, Self::Error> {
751 let graph = self.graph.lock().unwrap();
752 graph
753 .get_follow_list_created_at(user)
754 .context("read social graph follow list timestamp")
755 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
756 }
757
758 fn get_mute_list_created_at(
759 &self,
760 user: &str,
761 ) -> std::result::Result<Option<u64>, Self::Error> {
762 let graph = self.graph.lock().unwrap();
763 graph
764 .get_mute_list_created_at(user)
765 .context("read social graph mute list timestamp")
766 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
767 }
768
769 fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
770 let graph = self.graph.lock().unwrap();
771 graph
772 .is_overmuted(user, threshold)
773 .context("check social graph overmute")
774 .map_err(|err| UpstreamGraphBackendError(err.to_string()))
775 }
776}
777
778impl<T> SocialGraphBackend for Arc<T>
779where
780 T: SocialGraphBackend + ?Sized,
781{
782 fn stats(&self) -> Result<SocialGraphStats> {
783 self.as_ref().stats()
784 }
785
786 fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
787 self.as_ref().users_by_follow_distance(distance)
788 }
789
790 fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
791 self.as_ref().follow_distance(pk_bytes)
792 }
793
794 fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
795 self.as_ref().follow_list_created_at(owner)
796 }
797
798 fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
799 self.as_ref().followed_targets(owner)
800 }
801
802 fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
803 self.as_ref().is_overmuted_user(user_pk, threshold)
804 }
805
806 fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
807 self.as_ref().snapshot_chunks(root, options)
808 }
809
810 fn ingest_event(&self, event: &Event) -> Result<()> {
811 self.as_ref().ingest_event(event)
812 }
813
814 fn ingest_events(&self, events: &[Event]) -> Result<()> {
815 self.as_ref().ingest_events(events)
816 }
817
818 fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
819 self.as_ref().ingest_graph_events(events)
820 }
821
822 fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
823 self.as_ref().query_events(filter, limit)
824 }
825}
826
827fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
828 if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
829 return Ok(false);
830 }
831
832 let GraphStats {
833 users,
834 follows,
835 mutes,
836 ..
837 } = graph.size().context("size social graph")?;
838 Ok(users <= 1 && follows == 0 && mutes == 0)
839}
840
841fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
842 let mut set = UserSet::new();
843 for value in values {
844 set.insert(decode_pubkey(&value)?);
845 }
846 Ok(set)
847}
848
849fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
850 let mut bytes = [0u8; 32];
851 hex::decode_to_slice(value, &mut bytes)
852 .with_context(|| format!("decode social graph pubkey {value}"))?;
853 Ok(bytes)
854}
855
856fn is_social_graph_event(kind: Kind) -> bool {
857 kind == Kind::ContactList || kind == Kind::MuteList
858}
859
860fn graph_event_from_nostr(event: &Event) -> GraphEvent {
861 GraphEvent {
862 created_at: event.created_at.as_u64(),
863 content: event.content.clone(),
864 tags: event
865 .tags
866 .iter()
867 .map(|tag| tag.as_slice().to_vec())
868 .collect(),
869 kind: event.kind.as_u16() as u32,
870 pubkey: event.pubkey.to_hex(),
871 id: event.id.to_hex(),
872 sig: event.sig.to_string(),
873 }
874}
875
876fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
877 StoredNostrEvent {
878 id: event.id.to_hex(),
879 pubkey: event.pubkey.to_hex(),
880 created_at: event.created_at.as_u64(),
881 kind: event.kind.as_u16() as u32,
882 tags: event
883 .tags
884 .iter()
885 .map(|tag| tag.as_slice().to_vec())
886 .collect(),
887 content: event.content.clone(),
888 sig: event.sig.to_string(),
889 }
890}
891
892fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
893 let value = serde_json::json!({
894 "id": event.id,
895 "pubkey": event.pubkey,
896 "created_at": event.created_at,
897 "kind": event.kind,
898 "tags": event.tags,
899 "content": event.content,
900 "sig": event.sig,
901 });
902 Event::from_json(value.to_string()).context("decode stored nostr event")
903}
904
905fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
906 rmp_serde::to_vec_named(&StoredCid {
907 hash: cid.hash,
908 key: cid.key,
909 })
910 .context("encode social graph events root")
911}
912
913fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
914 let stored: StoredCid =
915 rmp_serde::from_slice(bytes).context("decode social graph events root")?;
916 Ok(Some(Cid {
917 hash: stored.hash,
918 key: stored.key,
919 }))
920}
921
922fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
923 anyhow::anyhow!("nostr event store error: {err}")
924}
925
926fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
927 let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
928 let page_size = page_size_bytes() as u64;
929 let rounded = requested
930 .checked_add(page_size.saturating_sub(1))
931 .map(|size| size / page_size * page_size)
932 .unwrap_or(requested);
933 let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
934
935 let env = unsafe {
936 heed::EnvOpenOptions::new()
937 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
938 .max_dbs(SOCIALGRAPH_MAX_DBS)
939 .open(db_dir)
940 }
941 .context("open social graph LMDB env for resize")?;
942 if env.info().map_size < map_size {
943 unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
944 }
945
946 Ok(())
947}
948
949fn page_size_bytes() -> usize {
950 page_size::get_granularity()
951}
952
953#[cfg(test)]
954mod tests {
955 use super::*;
956 use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
957 use tempfile::TempDir;
958
959 #[test]
960 fn test_open_social_graph_store() {
961 let _guard = test_lock();
962 let tmp = TempDir::new().unwrap();
963 let graph_store = open_social_graph_store(tmp.path()).unwrap();
964 assert_eq!(Arc::strong_count(&graph_store), 1);
965 }
966
967 #[test]
968 fn test_set_root_and_get_follow_distance() {
969 let _guard = test_lock();
970 let tmp = TempDir::new().unwrap();
971 let graph_store = open_social_graph_store(tmp.path()).unwrap();
972 let root_pk = [1u8; 32];
973 set_social_graph_root(&graph_store, &root_pk);
974 assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
975 }
976
977 #[test]
978 fn test_ingest_event_updates_follows_and_mutes() {
979 let _guard = test_lock();
980 let tmp = TempDir::new().unwrap();
981 let graph_store = open_social_graph_store(tmp.path()).unwrap();
982
983 let root_keys = Keys::generate();
984 let alice_keys = Keys::generate();
985 let bob_keys = Keys::generate();
986
987 let root_pk = root_keys.public_key().to_bytes();
988 set_social_graph_root(&graph_store, &root_pk);
989
990 let follow = EventBuilder::new(
991 Kind::ContactList,
992 "",
993 vec![Tag::public_key(alice_keys.public_key())],
994 )
995 .custom_created_at(Timestamp::from_secs(10))
996 .to_event(&root_keys)
997 .unwrap();
998 ingest_event(&graph_store, "follow", &follow.as_json());
999
1000 let mute = EventBuilder::new(
1001 Kind::MuteList,
1002 "",
1003 vec![Tag::public_key(bob_keys.public_key())],
1004 )
1005 .custom_created_at(Timestamp::from_secs(11))
1006 .to_event(&root_keys)
1007 .unwrap();
1008 ingest_event(&graph_store, "mute", &mute.as_json());
1009
1010 assert_eq!(
1011 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
1012 Some(1)
1013 );
1014 assert!(is_overmuted(
1015 &graph_store,
1016 &root_pk,
1017 &bob_keys.public_key().to_bytes(),
1018 1.0
1019 ));
1020 }
1021
1022 #[test]
1023 fn test_query_events_by_author() {
1024 let _guard = test_lock();
1025 let tmp = TempDir::new().unwrap();
1026 let graph_store = open_social_graph_store(tmp.path()).unwrap();
1027 let keys = Keys::generate();
1028
1029 let older = EventBuilder::new(Kind::TextNote, "older", [])
1030 .custom_created_at(Timestamp::from_secs(5))
1031 .to_event(&keys)
1032 .unwrap();
1033 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1034 .custom_created_at(Timestamp::from_secs(6))
1035 .to_event(&keys)
1036 .unwrap();
1037
1038 ingest_parsed_event(&graph_store, &older).unwrap();
1039 ingest_parsed_event(&graph_store, &newer).unwrap();
1040
1041 let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
1042 let events = query_events(&graph_store, &filter, 10);
1043 assert_eq!(events.len(), 2);
1044 assert_eq!(events[0].id, newer.id);
1045 assert_eq!(events[1].id, older.id);
1046 }
1047
1048 #[test]
1049 fn test_query_events_survives_reopen() {
1050 let _guard = test_lock();
1051 let tmp = TempDir::new().unwrap();
1052 let db_dir = tmp.path().join("socialgraph-store");
1053 let keys = Keys::generate();
1054 let other_keys = Keys::generate();
1055
1056 {
1057 let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
1058 let older = EventBuilder::new(Kind::TextNote, "older", [])
1059 .custom_created_at(Timestamp::from_secs(5))
1060 .to_event(&keys)
1061 .unwrap();
1062 let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1063 .custom_created_at(Timestamp::from_secs(6))
1064 .to_event(&keys)
1065 .unwrap();
1066 let latest = EventBuilder::new(Kind::TextNote, "latest", [])
1067 .custom_created_at(Timestamp::from_secs(7))
1068 .to_event(&other_keys)
1069 .unwrap();
1070
1071 ingest_parsed_event(&graph_store, &older).unwrap();
1072 ingest_parsed_event(&graph_store, &newer).unwrap();
1073 ingest_parsed_event(&graph_store, &latest).unwrap();
1074 }
1075
1076 let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
1077
1078 let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
1079 let author_events = query_events(&reopened, &author_filter, 10);
1080 assert_eq!(author_events.len(), 2);
1081 assert_eq!(author_events[0].content, "newer");
1082 assert_eq!(author_events[1].content, "older");
1083
1084 let recent_filter = Filter::new().kind(Kind::TextNote);
1085 let recent_events = query_events(&reopened, &recent_filter, 2);
1086 assert_eq!(recent_events.len(), 2);
1087 assert_eq!(recent_events[0].content, "latest");
1088 assert_eq!(recent_events[1].content, "newer");
1089 }
1090
1091 #[test]
1092 fn test_ensure_social_graph_mapsize_rounds_and_applies() {
1093 let _guard = test_lock();
1094 let tmp = TempDir::new().unwrap();
1095 ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
1096 let requested = 70 * 1024 * 1024;
1097 ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
1098 let env = unsafe {
1099 heed::EnvOpenOptions::new()
1100 .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
1101 .max_dbs(SOCIALGRAPH_MAX_DBS)
1102 .open(tmp.path())
1103 }
1104 .unwrap();
1105 assert!(env.info().map_size >= requested as usize);
1106 assert_eq!(env.info().map_size % page_size_bytes(), 0);
1107 }
1108
1109 #[test]
1110 fn test_ingest_events_batches_graph_updates() {
1111 let _guard = test_lock();
1112 let tmp = TempDir::new().unwrap();
1113 let graph_store = open_social_graph_store(tmp.path()).unwrap();
1114
1115 let root_keys = Keys::generate();
1116 let alice_keys = Keys::generate();
1117 let bob_keys = Keys::generate();
1118
1119 let root_pk = root_keys.public_key().to_bytes();
1120 set_social_graph_root(&graph_store, &root_pk);
1121
1122 let root_follows_alice = EventBuilder::new(
1123 Kind::ContactList,
1124 "",
1125 vec![Tag::public_key(alice_keys.public_key())],
1126 )
1127 .custom_created_at(Timestamp::from_secs(10))
1128 .to_event(&root_keys)
1129 .unwrap();
1130 let alice_follows_bob = EventBuilder::new(
1131 Kind::ContactList,
1132 "",
1133 vec![Tag::public_key(bob_keys.public_key())],
1134 )
1135 .custom_created_at(Timestamp::from_secs(11))
1136 .to_event(&alice_keys)
1137 .unwrap();
1138
1139 ingest_parsed_events(
1140 &graph_store,
1141 &[root_follows_alice.clone(), alice_follows_bob.clone()],
1142 )
1143 .unwrap();
1144
1145 assert_eq!(
1146 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
1147 Some(1)
1148 );
1149 assert_eq!(
1150 get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
1151 Some(2)
1152 );
1153
1154 let filter = Filter::new().kind(Kind::ContactList);
1155 let stored = query_events(&graph_store, &filter, 10);
1156 let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
1157 assert!(ids.contains(&root_follows_alice.id));
1158 assert!(ids.contains(&alice_follows_bob.id));
1159 }
1160
1161 #[test]
1162 fn test_ingest_graph_events_updates_graph_without_indexing_events() {
1163 let _guard = test_lock();
1164 let tmp = TempDir::new().unwrap();
1165 let graph_store = open_social_graph_store(tmp.path()).unwrap();
1166
1167 let root_keys = Keys::generate();
1168 let alice_keys = Keys::generate();
1169
1170 let root_pk = root_keys.public_key().to_bytes();
1171 set_social_graph_root(&graph_store, &root_pk);
1172
1173 let root_follows_alice = EventBuilder::new(
1174 Kind::ContactList,
1175 "",
1176 vec![Tag::public_key(alice_keys.public_key())],
1177 )
1178 .custom_created_at(Timestamp::from_secs(10))
1179 .to_event(&root_keys)
1180 .unwrap();
1181
1182 ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
1183 .unwrap();
1184
1185 assert_eq!(
1186 get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
1187 Some(1)
1188 );
1189 let filter = Filter::new().kind(Kind::ContactList);
1190 assert!(query_events(&graph_store, &filter, 10).is_empty());
1191 }
1192}