Skip to main content

hashtree_cli/socialgraph/
mod.rs

1pub mod access;
2pub mod crawler;
3pub mod local_lists;
4pub mod snapshot;
5
6pub use access::SocialGraphAccessControl;
7pub use crawler::SocialGraphCrawler;
8pub use local_lists::{
9    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
10    LocalListFileState, LocalListSyncOutcome,
11};
12
13use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex as StdMutex};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use futures::executor::block_on;
20use hashtree_core::Cid;
21use hashtree_nostr::{ListEventsOptions, NostrEventStore, NostrEventStoreError, StoredNostrEvent};
22use nostr::{Event, Filter, JsonUtil, Kind};
23use nostr_social_graph::{
24    BinaryBudget, GraphStats, NostrEvent as GraphEvent, SocialGraph,
25    SocialGraphBackend as NostrSocialGraphBackend,
26};
27use nostr_social_graph_heed::HeedSocialGraph;
28
29use crate::storage::{LocalStore, StorageRouter};
30
31#[cfg(test)]
32use std::sync::{Mutex, MutexGuard, OnceLock};
33
34pub type UserSet = BTreeSet<[u8; 32]>;
35
36const DEFAULT_ROOT_HEX: &str = "0000000000000000000000000000000000000000000000000000000000000000";
37const EVENTS_ROOT_FILE: &str = "events-root.msgpack";
38const UNKNOWN_FOLLOW_DISTANCE: u32 = 1000;
39const DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
40const SOCIALGRAPH_MAX_DBS: u32 = 16;
41
42#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
43struct StoredCid {
44    hash: [u8; 32],
45    key: Option<[u8; 32]>,
46}
47
48#[derive(Debug, Clone, Default, serde::Serialize)]
49pub struct SocialGraphStats {
50    pub total_users: usize,
51    pub root: Option<String>,
52    pub total_follows: usize,
53    pub max_depth: u32,
54    pub size_by_distance: BTreeMap<u32, usize>,
55    pub enabled: bool,
56}
57
58#[derive(Debug, Clone)]
59struct DistanceCache {
60    stats: SocialGraphStats,
61    users_by_distance: BTreeMap<u32, Vec<[u8; 32]>>,
62}
63
64#[derive(Debug, thiserror::Error)]
65#[error("{0}")]
66pub struct UpstreamGraphBackendError(String);
67
68pub struct SocialGraphStore {
69    graph: StdMutex<HeedSocialGraph>,
70    distance_cache: StdMutex<Option<DistanceCache>>,
71    event_store: NostrEventStore<StorageRouter>,
72    events_root_path: PathBuf,
73}
74
75pub trait SocialGraphBackend: Send + Sync {
76    fn stats(&self) -> Result<SocialGraphStats>;
77    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>>;
78    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>>;
79    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>>;
80    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet>;
81    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool>;
82    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>>;
83    fn ingest_event(&self, event: &Event) -> Result<()>;
84    fn ingest_events(&self, events: &[Event]) -> Result<()> {
85        for event in events {
86            self.ingest_event(event)?;
87        }
88        Ok(())
89    }
90    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
91        self.ingest_events(events)
92    }
93    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>>;
94}
95
96#[cfg(test)]
97pub type TestLockGuard = MutexGuard<'static, ()>;
98
99#[cfg(test)]
100static NDB_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
101
102#[cfg(test)]
103pub fn test_lock() -> TestLockGuard {
104    NDB_TEST_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
105}
106
107pub fn open_social_graph_store(data_dir: &Path) -> Result<Arc<SocialGraphStore>> {
108    open_social_graph_store_with_mapsize(data_dir, None)
109}
110
111pub fn open_social_graph_store_with_mapsize(
112    data_dir: &Path,
113    mapsize_bytes: Option<u64>,
114) -> Result<Arc<SocialGraphStore>> {
115    let db_dir = data_dir.join("socialgraph");
116    open_social_graph_store_at_path(&db_dir, mapsize_bytes)
117}
118
119pub fn open_social_graph_store_with_storage(
120    data_dir: &Path,
121    store: Arc<StorageRouter>,
122    mapsize_bytes: Option<u64>,
123) -> Result<Arc<SocialGraphStore>> {
124    let db_dir = data_dir.join("socialgraph");
125    open_social_graph_store_at_path_with_storage(&db_dir, store, mapsize_bytes)
126}
127
128pub fn open_social_graph_store_at_path(
129    db_dir: &Path,
130    mapsize_bytes: Option<u64>,
131) -> Result<Arc<SocialGraphStore>> {
132    let config = hashtree_config::Config::load_or_default();
133    let backend = &config.storage.backend;
134    let local_store = Arc::new(
135        LocalStore::new(db_dir.join("blobs"), backend)
136            .map_err(|err| anyhow::anyhow!("Failed to create social graph blob store: {err}"))?,
137    );
138    let store = Arc::new(StorageRouter::new(local_store));
139    open_social_graph_store_at_path_with_storage(db_dir, store, mapsize_bytes)
140}
141
142pub fn open_social_graph_store_at_path_with_storage(
143    db_dir: &Path,
144    store: Arc<StorageRouter>,
145    mapsize_bytes: Option<u64>,
146) -> Result<Arc<SocialGraphStore>> {
147    std::fs::create_dir_all(db_dir)?;
148    if let Some(size) = mapsize_bytes {
149        ensure_social_graph_mapsize(db_dir, size)?;
150    }
151    let graph = HeedSocialGraph::open(db_dir, DEFAULT_ROOT_HEX)
152        .context("open nostr-social-graph heed backend")?;
153
154    Ok(Arc::new(SocialGraphStore {
155        graph: StdMutex::new(graph),
156        distance_cache: StdMutex::new(None),
157        event_store: NostrEventStore::new(store),
158        events_root_path: db_dir.join(EVENTS_ROOT_FILE),
159    }))
160}
161
162pub fn set_social_graph_root(store: &SocialGraphStore, pk_bytes: &[u8; 32]) {
163    if let Err(err) = store.set_root(pk_bytes) {
164        tracing::warn!("Failed to set social graph root: {err}");
165    }
166}
167
168pub fn get_follow_distance(
169    backend: &(impl SocialGraphBackend + ?Sized),
170    pk_bytes: &[u8; 32],
171) -> Option<u32> {
172    backend.follow_distance(pk_bytes).ok().flatten()
173}
174
175pub fn get_follows(
176    backend: &(impl SocialGraphBackend + ?Sized),
177    pk_bytes: &[u8; 32],
178) -> Vec<[u8; 32]> {
179    match backend.followed_targets(pk_bytes) {
180        Ok(set) => set.into_iter().collect(),
181        Err(_) => Vec::new(),
182    }
183}
184
185pub fn is_overmuted(
186    backend: &(impl SocialGraphBackend + ?Sized),
187    _root_pk: &[u8; 32],
188    user_pk: &[u8; 32],
189    threshold: f64,
190) -> bool {
191    backend
192        .is_overmuted_user(user_pk, threshold)
193        .unwrap_or(false)
194}
195
196pub fn ingest_event(backend: &(impl SocialGraphBackend + ?Sized), _sub_id: &str, event_json: &str) {
197    let event = match Event::from_json(event_json) {
198        Ok(event) => event,
199        Err(_) => return,
200    };
201
202    if let Err(err) = backend.ingest_event(&event) {
203        tracing::warn!("Failed to ingest social graph event: {err}");
204    }
205}
206
207pub fn ingest_parsed_event(
208    backend: &(impl SocialGraphBackend + ?Sized),
209    event: &Event,
210) -> Result<()> {
211    backend.ingest_event(event)
212}
213
214pub fn ingest_parsed_events(
215    backend: &(impl SocialGraphBackend + ?Sized),
216    events: &[Event],
217) -> Result<()> {
218    backend.ingest_events(events)
219}
220
221pub fn ingest_graph_parsed_events(
222    backend: &(impl SocialGraphBackend + ?Sized),
223    events: &[Event],
224) -> Result<()> {
225    backend.ingest_graph_events(events)
226}
227
228pub fn query_events(
229    backend: &(impl SocialGraphBackend + ?Sized),
230    filter: &Filter,
231    limit: usize,
232) -> Vec<Event> {
233    backend.query_events(filter, limit).unwrap_or_default()
234}
235
236impl SocialGraphStore {
237    fn invalidate_distance_cache(&self) {
238        *self.distance_cache.lock().unwrap() = None;
239    }
240
241    fn build_distance_cache(state: nostr_social_graph::SocialGraphState) -> Result<DistanceCache> {
242        let unique_ids = state
243            .unique_ids
244            .into_iter()
245            .map(|(pubkey, id)| decode_pubkey(&pubkey).map(|decoded| (id, decoded)))
246            .collect::<Result<HashMap<_, _>>>()?;
247
248        let mut users_by_distance = BTreeMap::new();
249        let mut size_by_distance = BTreeMap::new();
250        for (distance, users) in state.users_by_follow_distance {
251            let decoded = users
252                .into_iter()
253                .filter_map(|id| unique_ids.get(&id).copied())
254                .collect::<Vec<_>>();
255            size_by_distance.insert(distance, decoded.len());
256            users_by_distance.insert(distance, decoded);
257        }
258
259        let total_follows = state
260            .followed_by_user
261            .iter()
262            .map(|(_, targets)| targets.len())
263            .sum::<usize>();
264        let total_users = size_by_distance.values().copied().sum();
265        let max_depth = size_by_distance.keys().copied().max().unwrap_or_default();
266
267        Ok(DistanceCache {
268            stats: SocialGraphStats {
269                total_users,
270                root: Some(state.root),
271                total_follows,
272                max_depth,
273                size_by_distance,
274                enabled: true,
275            },
276            users_by_distance,
277        })
278    }
279
280    fn load_distance_cache(&self) -> Result<DistanceCache> {
281        if let Some(cache) = self.distance_cache.lock().unwrap().clone() {
282            return Ok(cache);
283        }
284
285        let state = {
286            let graph = self.graph.lock().unwrap();
287            graph.export_state().context("export social graph state")?
288        };
289        let cache = Self::build_distance_cache(state)?;
290        *self.distance_cache.lock().unwrap() = Some(cache.clone());
291        Ok(cache)
292    }
293
294    fn set_root(&self, root: &[u8; 32]) -> Result<()> {
295        let root_hex = hex::encode(root);
296        {
297            let mut graph = self.graph.lock().unwrap();
298            if should_replace_placeholder_root(&graph)? {
299                let fresh = SocialGraph::new(&root_hex);
300                graph
301                    .replace_state(&fresh.export_state())
302                    .context("replace placeholder social graph root")?;
303            } else {
304                graph
305                    .set_root(&root_hex)
306                    .context("set nostr-social-graph root")?;
307            }
308        }
309        self.invalidate_distance_cache();
310        Ok(())
311    }
312
313    fn stats(&self) -> Result<SocialGraphStats> {
314        Ok(self.load_distance_cache()?.stats)
315    }
316
317    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
318        let graph = self.graph.lock().unwrap();
319        let distance = graph
320            .get_follow_distance(&hex::encode(pk_bytes))
321            .context("read social graph follow distance")?;
322        Ok((distance != UNKNOWN_FOLLOW_DISTANCE).then_some(distance))
323    }
324
325    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
326        Ok(self
327            .load_distance_cache()?
328            .users_by_distance
329            .get(&distance)
330            .cloned()
331            .unwrap_or_default())
332    }
333
334    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
335        let graph = self.graph.lock().unwrap();
336        graph
337            .get_follow_list_created_at(&hex::encode(owner))
338            .context("read social graph follow list timestamp")
339    }
340
341    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
342        let graph = self.graph.lock().unwrap();
343        decode_pubkey_set(
344            graph
345                .get_followed_by_user(&hex::encode(owner))
346                .context("read followed targets")?,
347        )
348    }
349
350    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
351        if threshold <= 0.0 {
352            return Ok(false);
353        }
354        let graph = self.graph.lock().unwrap();
355        graph
356            .is_overmuted(&hex::encode(user_pk), threshold)
357            .context("check social graph overmute")
358    }
359
360    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
361        let state = {
362            let graph = self.graph.lock().unwrap();
363            graph.export_state().context("export social graph state")?
364        };
365        let mut graph = SocialGraph::from_state(state).context("rebuild social graph state")?;
366        let root_hex = hex::encode(root);
367        if graph.get_root() != root_hex {
368            graph
369                .set_root(&root_hex)
370                .context("set snapshot social graph root")?;
371        }
372        let chunks = graph
373            .to_binary_chunks_with_budget(*options)
374            .context("encode social graph snapshot")?;
375        Ok(chunks.into_iter().map(Bytes::from).collect())
376    }
377
378    fn ingest_event(&self, event: &Event) -> Result<()> {
379        let current_root = self.events_root()?;
380        let next_root = self.store_event(current_root.as_ref(), event)?;
381        self.write_events_root(Some(&next_root))?;
382
383        if is_social_graph_event(event.kind) {
384            {
385                let mut graph = self.graph.lock().unwrap();
386                graph
387                    .handle_event(&graph_event_from_nostr(event), true, 0.0)
388                    .context("ingest social graph event into nostr-social-graph")?;
389            }
390            self.invalidate_distance_cache();
391        }
392
393        Ok(())
394    }
395
396    fn ingest_events(&self, events: &[Event]) -> Result<()> {
397        if events.is_empty() {
398            return Ok(());
399        }
400
401        let mut current_root = self.events_root()?;
402        for event in events {
403            let next_root = self.store_event(current_root.as_ref(), event)?;
404            current_root = Some(next_root);
405        }
406        self.write_events_root(current_root.as_ref())?;
407
408        let graph_events = events
409            .iter()
410            .filter(|event| is_social_graph_event(event.kind))
411            .collect::<Vec<_>>();
412        if graph_events.is_empty() {
413            return Ok(());
414        }
415
416        {
417            let mut graph = self.graph.lock().unwrap();
418            let mut snapshot = SocialGraph::from_state(
419                graph
420                    .export_state()
421                    .context("export social graph state for batch ingest")?,
422            )
423            .context("rebuild social graph state for batch ingest")?;
424            for event in graph_events {
425                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
426            }
427            graph
428                .replace_state(&snapshot.export_state())
429                .context("replace batched social graph state")?;
430        }
431        self.invalidate_distance_cache();
432
433        Ok(())
434    }
435
436    fn apply_graph_events_only(&self, events: &[Event]) -> Result<()> {
437        let graph_events = events
438            .iter()
439            .filter(|event| is_social_graph_event(event.kind))
440            .collect::<Vec<_>>();
441        if graph_events.is_empty() {
442            return Ok(());
443        }
444
445        {
446            let mut graph = self.graph.lock().unwrap();
447            let mut snapshot = SocialGraph::from_state(
448                graph
449                    .export_state()
450                    .context("export social graph state for graph-only ingest")?,
451            )
452            .context("rebuild social graph state for graph-only ingest")?;
453            for event in graph_events {
454                snapshot.handle_event(&graph_event_from_nostr(event), true, 0.0);
455            }
456            graph
457                .replace_state(&snapshot.export_state())
458                .context("replace graph-only social graph state")?;
459        }
460        self.invalidate_distance_cache();
461        Ok(())
462    }
463
464    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
465        if limit == 0 {
466            return Ok(Vec::new());
467        }
468
469        let events_root = self.events_root()?;
470        let Some(root) = events_root.as_ref() else {
471            return Ok(Vec::new());
472        };
473        let mut candidates = Vec::new();
474        let mut seen: HashSet<[u8; 32]> = HashSet::new();
475
476        if let Some(ids) = filter.ids.as_ref() {
477            for id in ids {
478                let id_bytes = id.to_bytes();
479                if !seen.insert(id_bytes) {
480                    continue;
481                }
482                if let Some(event) = self.load_event_by_id(root, &id.to_hex())? {
483                    if filter.match_event(&event) {
484                        candidates.push(event);
485                    }
486                }
487                if candidates.len() >= limit {
488                    break;
489                }
490            }
491        } else if let Some(authors) = filter.authors.as_ref() {
492            for author in authors {
493                let mut author_matches = 0usize;
494                for event in self.load_events_for_author(root, author, filter)? {
495                    let id_bytes = event.id.to_bytes();
496                    if !seen.insert(id_bytes) {
497                        continue;
498                    }
499                    if filter.match_event(&event) {
500                        candidates.push(event);
501                        author_matches += 1;
502                    }
503                    if author_matches >= limit {
504                        break;
505                    }
506                }
507            }
508        } else {
509            for event in self.load_recent_events(root)? {
510                let id_bytes = event.id.to_bytes();
511                if !seen.insert(id_bytes) {
512                    continue;
513                }
514                if filter.match_event(&event) {
515                    candidates.push(event);
516                }
517                if candidates.len() >= limit {
518                    break;
519                }
520            }
521        }
522
523        candidates.sort_by(|a, b| {
524            b.created_at
525                .as_u64()
526                .cmp(&a.created_at.as_u64())
527                .then_with(|| a.id.cmp(&b.id))
528        });
529        candidates.truncate(limit);
530        Ok(candidates)
531    }
532
533    fn events_root(&self) -> Result<Option<Cid>> {
534        let Ok(bytes) = std::fs::read(&self.events_root_path) else {
535            return Ok(None);
536        };
537        decode_cid(&bytes)
538    }
539
540    fn write_events_root(&self, root: Option<&Cid>) -> Result<()> {
541        let Some(root) = root else {
542            if self.events_root_path.exists() {
543                std::fs::remove_file(&self.events_root_path)?;
544            }
545            return Ok(());
546        };
547
548        let encoded = encode_cid(root)?;
549        let tmp_path = self.events_root_path.with_extension("tmp");
550        std::fs::write(&tmp_path, encoded)?;
551        std::fs::rename(tmp_path, &self.events_root_path)?;
552        Ok(())
553    }
554
555    fn store_event(&self, root: Option<&Cid>, event: &Event) -> Result<Cid> {
556        let stored = stored_event_from_nostr(event);
557        block_on(self.event_store.add(root, stored)).map_err(map_event_store_error)
558    }
559
560    fn load_event_by_id(&self, root: &Cid, event_id: &str) -> Result<Option<Event>> {
561        let stored = block_on(self.event_store.get_by_id(Some(root), event_id))
562            .map_err(map_event_store_error)?;
563        stored.map(nostr_event_from_stored).transpose()
564    }
565
566    fn load_events_for_author(
567        &self,
568        root: &Cid,
569        author: &nostr::PublicKey,
570        filter: &Filter,
571    ) -> Result<Vec<Event>> {
572        let kind_filter = filter.kinds.as_ref().and_then(|kinds| {
573            if kinds.len() == 1 {
574                kinds.iter().next().map(|kind| kind.as_u16() as u32)
575            } else {
576                None
577            }
578        });
579        let author_hex = author.to_hex();
580        let stored = match kind_filter {
581            Some(kind) => block_on(self.event_store.list_by_author_and_kind(
582                Some(root),
583                &author_hex,
584                kind,
585                ListEventsOptions::default(),
586            ))
587            .map_err(map_event_store_error)?,
588            None => block_on(self.event_store.list_by_author(
589                Some(root),
590                &author_hex,
591                ListEventsOptions::default(),
592            ))
593            .map_err(map_event_store_error)?,
594        };
595        stored
596            .into_iter()
597            .map(nostr_event_from_stored)
598            .collect::<Result<Vec<_>>>()
599    }
600
601    fn load_recent_events(&self, root: &Cid) -> Result<Vec<Event>> {
602        let stored = block_on(
603            self.event_store
604                .list_recent(Some(root), ListEventsOptions::default()),
605        )
606        .map_err(map_event_store_error)?;
607        stored
608            .into_iter()
609            .map(nostr_event_from_stored)
610            .collect::<Result<Vec<_>>>()
611    }
612}
613
614impl SocialGraphBackend for SocialGraphStore {
615    fn stats(&self) -> Result<SocialGraphStats> {
616        SocialGraphStore::stats(self)
617    }
618
619    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
620        SocialGraphStore::users_by_follow_distance(self, distance)
621    }
622
623    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
624        SocialGraphStore::follow_distance(self, pk_bytes)
625    }
626
627    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
628        SocialGraphStore::follow_list_created_at(self, owner)
629    }
630
631    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
632        SocialGraphStore::followed_targets(self, owner)
633    }
634
635    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
636        SocialGraphStore::is_overmuted_user(self, user_pk, threshold)
637    }
638
639    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
640        SocialGraphStore::snapshot_chunks(self, root, options)
641    }
642
643    fn ingest_event(&self, event: &Event) -> Result<()> {
644        SocialGraphStore::ingest_event(self, event)
645    }
646
647    fn ingest_events(&self, events: &[Event]) -> Result<()> {
648        SocialGraphStore::ingest_events(self, events)
649    }
650
651    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
652        SocialGraphStore::apply_graph_events_only(self, events)
653    }
654
655    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
656        SocialGraphStore::query_events(self, filter, limit)
657    }
658}
659
660impl NostrSocialGraphBackend for SocialGraphStore {
661    type Error = UpstreamGraphBackendError;
662
663    fn get_root(&self) -> std::result::Result<String, Self::Error> {
664        let graph = self.graph.lock().unwrap();
665        graph
666            .get_root()
667            .context("read social graph root")
668            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
669    }
670
671    fn set_root(&mut self, root: &str) -> std::result::Result<(), Self::Error> {
672        let root_bytes =
673            decode_pubkey(root).map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
674        SocialGraphStore::set_root(self, &root_bytes)
675            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
676    }
677
678    fn handle_event(
679        &mut self,
680        event: &GraphEvent,
681        allow_unknown_authors: bool,
682        overmute_threshold: f64,
683    ) -> std::result::Result<(), Self::Error> {
684        {
685            let mut graph = self.graph.lock().unwrap();
686            graph
687                .handle_event(event, allow_unknown_authors, overmute_threshold)
688                .context("ingest social graph event into heed backend")
689                .map_err(|err| UpstreamGraphBackendError(err.to_string()))?;
690        }
691        self.invalidate_distance_cache();
692        Ok(())
693    }
694
695    fn get_follow_distance(&self, user: &str) -> std::result::Result<u32, Self::Error> {
696        let graph = self.graph.lock().unwrap();
697        graph
698            .get_follow_distance(user)
699            .context("read social graph follow distance")
700            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
701    }
702
703    fn is_following(
704        &self,
705        follower: &str,
706        followed_user: &str,
707    ) -> std::result::Result<bool, Self::Error> {
708        let graph = self.graph.lock().unwrap();
709        graph
710            .is_following(follower, followed_user)
711            .context("read social graph following edge")
712            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
713    }
714
715    fn get_followed_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
716        let graph = self.graph.lock().unwrap();
717        graph
718            .get_followed_by_user(user)
719            .context("read followed-by-user list")
720            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
721    }
722
723    fn get_followers_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
724        let graph = self.graph.lock().unwrap();
725        graph
726            .get_followers_by_user(user)
727            .context("read followers-by-user list")
728            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
729    }
730
731    fn get_muted_by_user(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
732        let graph = self.graph.lock().unwrap();
733        graph
734            .get_muted_by_user(user)
735            .context("read muted-by-user list")
736            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
737    }
738
739    fn get_user_muted_by(&self, user: &str) -> std::result::Result<Vec<String>, Self::Error> {
740        let graph = self.graph.lock().unwrap();
741        graph
742            .get_user_muted_by(user)
743            .context("read user-muted-by list")
744            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
745    }
746
747    fn get_follow_list_created_at(
748        &self,
749        user: &str,
750    ) -> std::result::Result<Option<u64>, Self::Error> {
751        let graph = self.graph.lock().unwrap();
752        graph
753            .get_follow_list_created_at(user)
754            .context("read social graph follow list timestamp")
755            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
756    }
757
758    fn get_mute_list_created_at(
759        &self,
760        user: &str,
761    ) -> std::result::Result<Option<u64>, Self::Error> {
762        let graph = self.graph.lock().unwrap();
763        graph
764            .get_mute_list_created_at(user)
765            .context("read social graph mute list timestamp")
766            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
767    }
768
769    fn is_overmuted(&self, user: &str, threshold: f64) -> std::result::Result<bool, Self::Error> {
770        let graph = self.graph.lock().unwrap();
771        graph
772            .is_overmuted(user, threshold)
773            .context("check social graph overmute")
774            .map_err(|err| UpstreamGraphBackendError(err.to_string()))
775    }
776}
777
778impl<T> SocialGraphBackend for Arc<T>
779where
780    T: SocialGraphBackend + ?Sized,
781{
782    fn stats(&self) -> Result<SocialGraphStats> {
783        self.as_ref().stats()
784    }
785
786    fn users_by_follow_distance(&self, distance: u32) -> Result<Vec<[u8; 32]>> {
787        self.as_ref().users_by_follow_distance(distance)
788    }
789
790    fn follow_distance(&self, pk_bytes: &[u8; 32]) -> Result<Option<u32>> {
791        self.as_ref().follow_distance(pk_bytes)
792    }
793
794    fn follow_list_created_at(&self, owner: &[u8; 32]) -> Result<Option<u64>> {
795        self.as_ref().follow_list_created_at(owner)
796    }
797
798    fn followed_targets(&self, owner: &[u8; 32]) -> Result<UserSet> {
799        self.as_ref().followed_targets(owner)
800    }
801
802    fn is_overmuted_user(&self, user_pk: &[u8; 32], threshold: f64) -> Result<bool> {
803        self.as_ref().is_overmuted_user(user_pk, threshold)
804    }
805
806    fn snapshot_chunks(&self, root: &[u8; 32], options: &BinaryBudget) -> Result<Vec<Bytes>> {
807        self.as_ref().snapshot_chunks(root, options)
808    }
809
810    fn ingest_event(&self, event: &Event) -> Result<()> {
811        self.as_ref().ingest_event(event)
812    }
813
814    fn ingest_events(&self, events: &[Event]) -> Result<()> {
815        self.as_ref().ingest_events(events)
816    }
817
818    fn ingest_graph_events(&self, events: &[Event]) -> Result<()> {
819        self.as_ref().ingest_graph_events(events)
820    }
821
822    fn query_events(&self, filter: &Filter, limit: usize) -> Result<Vec<Event>> {
823        self.as_ref().query_events(filter, limit)
824    }
825}
826
827fn should_replace_placeholder_root(graph: &HeedSocialGraph) -> Result<bool> {
828    if graph.get_root().context("read current social graph root")? != DEFAULT_ROOT_HEX {
829        return Ok(false);
830    }
831
832    let GraphStats {
833        users,
834        follows,
835        mutes,
836        ..
837    } = graph.size().context("size social graph")?;
838    Ok(users <= 1 && follows == 0 && mutes == 0)
839}
840
841fn decode_pubkey_set(values: Vec<String>) -> Result<UserSet> {
842    let mut set = UserSet::new();
843    for value in values {
844        set.insert(decode_pubkey(&value)?);
845    }
846    Ok(set)
847}
848
849fn decode_pubkey(value: &str) -> Result<[u8; 32]> {
850    let mut bytes = [0u8; 32];
851    hex::decode_to_slice(value, &mut bytes)
852        .with_context(|| format!("decode social graph pubkey {value}"))?;
853    Ok(bytes)
854}
855
856fn is_social_graph_event(kind: Kind) -> bool {
857    kind == Kind::ContactList || kind == Kind::MuteList
858}
859
860fn graph_event_from_nostr(event: &Event) -> GraphEvent {
861    GraphEvent {
862        created_at: event.created_at.as_u64(),
863        content: event.content.clone(),
864        tags: event
865            .tags
866            .iter()
867            .map(|tag| tag.as_slice().to_vec())
868            .collect(),
869        kind: event.kind.as_u16() as u32,
870        pubkey: event.pubkey.to_hex(),
871        id: event.id.to_hex(),
872        sig: event.sig.to_string(),
873    }
874}
875
876fn stored_event_from_nostr(event: &Event) -> StoredNostrEvent {
877    StoredNostrEvent {
878        id: event.id.to_hex(),
879        pubkey: event.pubkey.to_hex(),
880        created_at: event.created_at.as_u64(),
881        kind: event.kind.as_u16() as u32,
882        tags: event
883            .tags
884            .iter()
885            .map(|tag| tag.as_slice().to_vec())
886            .collect(),
887        content: event.content.clone(),
888        sig: event.sig.to_string(),
889    }
890}
891
892fn nostr_event_from_stored(event: StoredNostrEvent) -> Result<Event> {
893    let value = serde_json::json!({
894        "id": event.id,
895        "pubkey": event.pubkey,
896        "created_at": event.created_at,
897        "kind": event.kind,
898        "tags": event.tags,
899        "content": event.content,
900        "sig": event.sig,
901    });
902    Event::from_json(value.to_string()).context("decode stored nostr event")
903}
904
905fn encode_cid(cid: &Cid) -> Result<Vec<u8>> {
906    rmp_serde::to_vec_named(&StoredCid {
907        hash: cid.hash,
908        key: cid.key,
909    })
910    .context("encode social graph events root")
911}
912
913fn decode_cid(bytes: &[u8]) -> Result<Option<Cid>> {
914    let stored: StoredCid =
915        rmp_serde::from_slice(bytes).context("decode social graph events root")?;
916    Ok(Some(Cid {
917        hash: stored.hash,
918        key: stored.key,
919    }))
920}
921
922fn map_event_store_error(err: NostrEventStoreError) -> anyhow::Error {
923    anyhow::anyhow!("nostr event store error: {err}")
924}
925
926fn ensure_social_graph_mapsize(db_dir: &Path, requested_bytes: u64) -> Result<()> {
927    let requested = requested_bytes.max(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES);
928    let page_size = page_size_bytes() as u64;
929    let rounded = requested
930        .checked_add(page_size.saturating_sub(1))
931        .map(|size| size / page_size * page_size)
932        .unwrap_or(requested);
933    let map_size = usize::try_from(rounded).context("social graph mapsize exceeds usize")?;
934
935    let env = unsafe {
936        heed::EnvOpenOptions::new()
937            .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
938            .max_dbs(SOCIALGRAPH_MAX_DBS)
939            .open(db_dir)
940    }
941    .context("open social graph LMDB env for resize")?;
942    if env.info().map_size < map_size {
943        unsafe { env.resize(map_size) }.context("resize social graph LMDB env")?;
944    }
945
946    Ok(())
947}
948
949fn page_size_bytes() -> usize {
950    page_size::get_granularity()
951}
952
953#[cfg(test)]
954mod tests {
955    use super::*;
956    use nostr::{EventBuilder, JsonUtil, Keys, Tag, Timestamp};
957    use tempfile::TempDir;
958
959    #[test]
960    fn test_open_social_graph_store() {
961        let _guard = test_lock();
962        let tmp = TempDir::new().unwrap();
963        let graph_store = open_social_graph_store(tmp.path()).unwrap();
964        assert_eq!(Arc::strong_count(&graph_store), 1);
965    }
966
967    #[test]
968    fn test_set_root_and_get_follow_distance() {
969        let _guard = test_lock();
970        let tmp = TempDir::new().unwrap();
971        let graph_store = open_social_graph_store(tmp.path()).unwrap();
972        let root_pk = [1u8; 32];
973        set_social_graph_root(&graph_store, &root_pk);
974        assert_eq!(get_follow_distance(&graph_store, &root_pk), Some(0));
975    }
976
977    #[test]
978    fn test_ingest_event_updates_follows_and_mutes() {
979        let _guard = test_lock();
980        let tmp = TempDir::new().unwrap();
981        let graph_store = open_social_graph_store(tmp.path()).unwrap();
982
983        let root_keys = Keys::generate();
984        let alice_keys = Keys::generate();
985        let bob_keys = Keys::generate();
986
987        let root_pk = root_keys.public_key().to_bytes();
988        set_social_graph_root(&graph_store, &root_pk);
989
990        let follow = EventBuilder::new(
991            Kind::ContactList,
992            "",
993            vec![Tag::public_key(alice_keys.public_key())],
994        )
995        .custom_created_at(Timestamp::from_secs(10))
996        .to_event(&root_keys)
997        .unwrap();
998        ingest_event(&graph_store, "follow", &follow.as_json());
999
1000        let mute = EventBuilder::new(
1001            Kind::MuteList,
1002            "",
1003            vec![Tag::public_key(bob_keys.public_key())],
1004        )
1005        .custom_created_at(Timestamp::from_secs(11))
1006        .to_event(&root_keys)
1007        .unwrap();
1008        ingest_event(&graph_store, "mute", &mute.as_json());
1009
1010        assert_eq!(
1011            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
1012            Some(1)
1013        );
1014        assert!(is_overmuted(
1015            &graph_store,
1016            &root_pk,
1017            &bob_keys.public_key().to_bytes(),
1018            1.0
1019        ));
1020    }
1021
1022    #[test]
1023    fn test_query_events_by_author() {
1024        let _guard = test_lock();
1025        let tmp = TempDir::new().unwrap();
1026        let graph_store = open_social_graph_store(tmp.path()).unwrap();
1027        let keys = Keys::generate();
1028
1029        let older = EventBuilder::new(Kind::TextNote, "older", [])
1030            .custom_created_at(Timestamp::from_secs(5))
1031            .to_event(&keys)
1032            .unwrap();
1033        let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1034            .custom_created_at(Timestamp::from_secs(6))
1035            .to_event(&keys)
1036            .unwrap();
1037
1038        ingest_parsed_event(&graph_store, &older).unwrap();
1039        ingest_parsed_event(&graph_store, &newer).unwrap();
1040
1041        let filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
1042        let events = query_events(&graph_store, &filter, 10);
1043        assert_eq!(events.len(), 2);
1044        assert_eq!(events[0].id, newer.id);
1045        assert_eq!(events[1].id, older.id);
1046    }
1047
1048    #[test]
1049    fn test_query_events_survives_reopen() {
1050        let _guard = test_lock();
1051        let tmp = TempDir::new().unwrap();
1052        let db_dir = tmp.path().join("socialgraph-store");
1053        let keys = Keys::generate();
1054        let other_keys = Keys::generate();
1055
1056        {
1057            let graph_store = open_social_graph_store_at_path(&db_dir, None).unwrap();
1058            let older = EventBuilder::new(Kind::TextNote, "older", [])
1059                .custom_created_at(Timestamp::from_secs(5))
1060                .to_event(&keys)
1061                .unwrap();
1062            let newer = EventBuilder::new(Kind::TextNote, "newer", [])
1063                .custom_created_at(Timestamp::from_secs(6))
1064                .to_event(&keys)
1065                .unwrap();
1066            let latest = EventBuilder::new(Kind::TextNote, "latest", [])
1067                .custom_created_at(Timestamp::from_secs(7))
1068                .to_event(&other_keys)
1069                .unwrap();
1070
1071            ingest_parsed_event(&graph_store, &older).unwrap();
1072            ingest_parsed_event(&graph_store, &newer).unwrap();
1073            ingest_parsed_event(&graph_store, &latest).unwrap();
1074        }
1075
1076        let reopened = open_social_graph_store_at_path(&db_dir, None).unwrap();
1077
1078        let author_filter = Filter::new().author(keys.public_key()).kind(Kind::TextNote);
1079        let author_events = query_events(&reopened, &author_filter, 10);
1080        assert_eq!(author_events.len(), 2);
1081        assert_eq!(author_events[0].content, "newer");
1082        assert_eq!(author_events[1].content, "older");
1083
1084        let recent_filter = Filter::new().kind(Kind::TextNote);
1085        let recent_events = query_events(&reopened, &recent_filter, 2);
1086        assert_eq!(recent_events.len(), 2);
1087        assert_eq!(recent_events[0].content, "latest");
1088        assert_eq!(recent_events[1].content, "newer");
1089    }
1090
1091    #[test]
1092    fn test_ensure_social_graph_mapsize_rounds_and_applies() {
1093        let _guard = test_lock();
1094        let tmp = TempDir::new().unwrap();
1095        ensure_social_graph_mapsize(tmp.path(), DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES).unwrap();
1096        let requested = 70 * 1024 * 1024;
1097        ensure_social_graph_mapsize(tmp.path(), requested).unwrap();
1098        let env = unsafe {
1099            heed::EnvOpenOptions::new()
1100                .map_size(DEFAULT_SOCIALGRAPH_MAP_SIZE_BYTES as usize)
1101                .max_dbs(SOCIALGRAPH_MAX_DBS)
1102                .open(tmp.path())
1103        }
1104        .unwrap();
1105        assert!(env.info().map_size >= requested as usize);
1106        assert_eq!(env.info().map_size % page_size_bytes(), 0);
1107    }
1108
1109    #[test]
1110    fn test_ingest_events_batches_graph_updates() {
1111        let _guard = test_lock();
1112        let tmp = TempDir::new().unwrap();
1113        let graph_store = open_social_graph_store(tmp.path()).unwrap();
1114
1115        let root_keys = Keys::generate();
1116        let alice_keys = Keys::generate();
1117        let bob_keys = Keys::generate();
1118
1119        let root_pk = root_keys.public_key().to_bytes();
1120        set_social_graph_root(&graph_store, &root_pk);
1121
1122        let root_follows_alice = EventBuilder::new(
1123            Kind::ContactList,
1124            "",
1125            vec![Tag::public_key(alice_keys.public_key())],
1126        )
1127        .custom_created_at(Timestamp::from_secs(10))
1128        .to_event(&root_keys)
1129        .unwrap();
1130        let alice_follows_bob = EventBuilder::new(
1131            Kind::ContactList,
1132            "",
1133            vec![Tag::public_key(bob_keys.public_key())],
1134        )
1135        .custom_created_at(Timestamp::from_secs(11))
1136        .to_event(&alice_keys)
1137        .unwrap();
1138
1139        ingest_parsed_events(
1140            &graph_store,
1141            &[root_follows_alice.clone(), alice_follows_bob.clone()],
1142        )
1143        .unwrap();
1144
1145        assert_eq!(
1146            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
1147            Some(1)
1148        );
1149        assert_eq!(
1150            get_follow_distance(&graph_store, &bob_keys.public_key().to_bytes()),
1151            Some(2)
1152        );
1153
1154        let filter = Filter::new().kind(Kind::ContactList);
1155        let stored = query_events(&graph_store, &filter, 10);
1156        let ids = stored.into_iter().map(|event| event.id).collect::<Vec<_>>();
1157        assert!(ids.contains(&root_follows_alice.id));
1158        assert!(ids.contains(&alice_follows_bob.id));
1159    }
1160
1161    #[test]
1162    fn test_ingest_graph_events_updates_graph_without_indexing_events() {
1163        let _guard = test_lock();
1164        let tmp = TempDir::new().unwrap();
1165        let graph_store = open_social_graph_store(tmp.path()).unwrap();
1166
1167        let root_keys = Keys::generate();
1168        let alice_keys = Keys::generate();
1169
1170        let root_pk = root_keys.public_key().to_bytes();
1171        set_social_graph_root(&graph_store, &root_pk);
1172
1173        let root_follows_alice = EventBuilder::new(
1174            Kind::ContactList,
1175            "",
1176            vec![Tag::public_key(alice_keys.public_key())],
1177        )
1178        .custom_created_at(Timestamp::from_secs(10))
1179        .to_event(&root_keys)
1180        .unwrap();
1181
1182        ingest_graph_parsed_events(&graph_store, std::slice::from_ref(&root_follows_alice))
1183            .unwrap();
1184
1185        assert_eq!(
1186            get_follow_distance(&graph_store, &alice_keys.public_key().to_bytes()),
1187            Some(1)
1188        );
1189        let filter = Filter::new().kind(Kind::ContactList);
1190        assert!(query_events(&graph_store, &filter, 10).is_empty());
1191    }
1192}