Skip to main content

hashtree_cli/nostrdb_integration/
crawler.rs

1//! Social graph crawler - BFS crawl of follow lists via Nostr relays.
2
3use nostrdb_social::Ndb;
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::watch;
8
9/// Crawls the social graph by fetching kind 3 (contact list) events from relays
10/// and ingesting them into nostrdb.
11pub struct SocialGraphCrawler {
12    ndb: Arc<Ndb>,
13    spambox: Option<Arc<Ndb>>,
14    keys: nostr::Keys,
15    relays: Vec<String>,
16    max_depth: u32,
17}
18
19impl SocialGraphCrawler {
20    pub fn new(ndb: Arc<Ndb>, keys: nostr::Keys, relays: Vec<String>, max_depth: u32) -> Self {
21        Self {
22            ndb,
23            spambox: None,
24            keys,
25            relays,
26            max_depth,
27        }
28    }
29
30    pub fn with_spambox(mut self, spambox: Arc<Ndb>) -> Self {
31        self.spambox = Some(spambox);
32        self
33    }
34
35    fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
36        if pk_bytes == &self.keys.public_key().to_bytes() {
37            return true;
38        }
39        super::get_follow_distance(&self.ndb, pk_bytes)
40            .map(|distance| distance <= self.max_depth)
41            .unwrap_or(false)
42    }
43
44    fn ingest_event_into(&self, ndb: &Ndb, sub_id: &str, event: &nostr::Event) {
45        if let Ok(json) = serde_json::to_string(event) {
46            super::ingest_event(ndb, sub_id, &json);
47        }
48    }
49
50    #[allow(deprecated)] // nostr 0.35 deprecates tags() but we use this version
51    fn collect_missing_root_follows(
52        &self,
53        event: &nostr::Event,
54        fetched_contact_lists: &mut HashSet<[u8; 32]>,
55    ) -> Vec<[u8; 32]> {
56        if self.max_depth < 2 {
57            return Vec::new();
58        }
59        if event.kind != nostr::Kind::ContactList {
60            return Vec::new();
61        }
62
63        let root_pk = self.keys.public_key().to_bytes();
64        if event.pubkey.to_bytes() != root_pk {
65            return Vec::new();
66        }
67
68        let mut missing = Vec::new();
69        for tag in event.tags().iter() {
70            if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
71                let pk_bytes = public_key.to_bytes();
72                if fetched_contact_lists.contains(&pk_bytes) {
73                    continue;
74                }
75
76                let existing_follows = super::get_follows(&self.ndb, &pk_bytes);
77                if !existing_follows.is_empty() {
78                    fetched_contact_lists.insert(pk_bytes);
79                    continue;
80                }
81
82                fetched_contact_lists.insert(pk_bytes);
83                missing.push(pk_bytes);
84            }
85        }
86
87        missing
88    }
89
90    async fn fetch_contact_lists_for_pubkeys(
91        &self,
92        client: &nostr_sdk::Client,
93        pubkeys: &[[u8; 32]],
94        shutdown_rx: &watch::Receiver<bool>,
95        sub_id: &str,
96    ) {
97        for pk_bytes in pubkeys {
98            if *shutdown_rx.borrow() {
99                break;
100            }
101
102            let pk = match nostr::PublicKey::from_slice(pk_bytes) {
103                Ok(pk) => pk,
104                Err(_) => continue,
105            };
106
107            let filter = nostr::Filter::new()
108                .author(pk)
109                .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)]);
110
111            let source = nostr_sdk::EventSource::relays(Some(Duration::from_secs(5)));
112            match tokio::time::timeout(
113                Duration::from_secs(10),
114                client.get_events_of(vec![filter], source),
115            )
116            .await
117            {
118                Ok(Ok(events)) => {
119                    for event in &events {
120                        self.ingest_event_into(&self.ndb, sub_id, event);
121                    }
122                }
123                Ok(Err(e)) => {
124                    tracing::debug!("Failed to fetch events for {}...: {}", &pk.to_hex()[..8], e);
125                }
126                Err(_) => {
127                    tracing::debug!("Timeout fetching events for {}...", &pk.to_hex()[..8]);
128                }
129            }
130        }
131    }
132
133    pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
134        let is_contact_list = event.kind == nostr::Kind::ContactList;
135        let is_mute_list = event.kind == nostr::Kind::Custom(10000);
136        if !is_contact_list && !is_mute_list {
137            return;
138        }
139
140        let pk_bytes = event.pubkey.to_bytes();
141        if self.is_within_social_graph(&pk_bytes) {
142            self.ingest_event_into(&self.ndb, "live", event);
143            return;
144        }
145
146        if let Some(spambox) = &self.spambox {
147            self.ingest_event_into(spambox, "spambox", event);
148        } else {
149            tracing::debug!(
150                "Social graph crawler: dropping untrusted {} from {}...",
151                if is_contact_list {
152                    "contact list"
153                } else {
154                    "mute list"
155                },
156                &event.pubkey.to_hex()[..8.min(event.pubkey.to_hex().len())]
157            );
158        }
159    }
160
161    /// Run the BFS crawl until shutdown is signaled.
162    /// Fetches contact lists from relays and feeds them into nostrdb.
163    #[allow(deprecated)] // nostr 0.35 deprecates kind()/tags() but we use this version
164    pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
165        use nostr::nips::nip19::ToBech32;
166        use nostr_sdk::prelude::RelayPoolNotification;
167
168        if self.relays.is_empty() {
169            tracing::warn!("Social graph crawler: no relays configured, skipping");
170            return;
171        }
172
173        let mut shutdown_rx = shutdown_rx;
174        if *shutdown_rx.borrow() {
175            tracing::info!("Social graph crawler: shutdown requested before start");
176            return;
177        }
178
179        tracing::info!(
180            "Starting social graph crawl (max_depth={}, relays={})",
181            self.max_depth,
182            self.relays.len()
183        );
184
185        let sdk_keys =
186            match nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32().unwrap_or_default()) {
187                Ok(k) => k,
188                Err(e) => {
189                    tracing::error!("Failed to parse keys for crawler: {}", e);
190                    return;
191                }
192            };
193
194        let client = nostr_sdk::Client::new(&sdk_keys);
195
196        for relay in &self.relays {
197            if let Err(e) = client.add_relay(relay).await {
198                tracing::warn!("Failed to add relay {}: {}", relay, e);
199            }
200        }
201        client.connect().await;
202
203        // BFS: start from root, fetch contact lists at each depth
204        let root_pk = self.keys.public_key().to_bytes();
205        let mut visited: HashSet<[u8; 32]> = HashSet::new();
206        let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
207        let mut current_level = vec![root_pk];
208        visited.insert(root_pk);
209
210        for depth in 0..self.max_depth {
211            if current_level.is_empty() || *shutdown_rx.borrow() {
212                break;
213            }
214
215            tracing::info!(
216                "Crawling depth {} with {} pubkeys",
217                depth,
218                current_level.len()
219            );
220
221            let mut next_level = Vec::new();
222
223            for pk_bytes in &current_level {
224                if *shutdown_rx.borrow() {
225                    break;
226                }
227
228                fetched_contact_lists.insert(*pk_bytes);
229
230                let pk_hex = hex::encode(pk_bytes);
231
232                let pk = match nostr::PublicKey::from_slice(pk_bytes) {
233                    Ok(pk) => pk,
234                    Err(_) => continue,
235                };
236
237                let filter = nostr::Filter::new()
238                    .author(pk)
239                    .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)]);
240
241                let source = nostr_sdk::EventSource::relays(Some(Duration::from_secs(5)));
242
243                match tokio::time::timeout(
244                    Duration::from_secs(10),
245                    client.get_events_of(vec![filter], source),
246                )
247                .await
248                {
249                    Ok(Ok(events)) => {
250                        for event in &events {
251                            // Ingest into nostrdb
252                            self.ingest_event_into(&self.ndb, "crawl", event);
253
254                            // Extract follows for next level
255                            if event.kind() == nostr::Kind::ContactList {
256                                for tag in event.tags().iter() {
257                                    if let Some(nostr::TagStandard::PublicKey {
258                                        public_key, ..
259                                    }) = tag.as_standardized()
260                                    {
261                                        let follow_bytes = public_key.to_bytes();
262                                        if !visited.contains(&follow_bytes) {
263                                            visited.insert(follow_bytes);
264                                            next_level.push(follow_bytes);
265                                        }
266                                    }
267                                }
268                            }
269                        }
270                        tracing::debug!(
271                            "Depth {}: fetched {} events for {}...",
272                            depth,
273                            events.len(),
274                            &pk_hex[..8.min(pk_hex.len())]
275                        );
276                    }
277                    Ok(Err(e)) => {
278                        tracing::debug!("Failed to fetch events for {}...: {}", &pk_hex[..8], e);
279                    }
280                    Err(_) => {
281                        tracing::debug!("Timeout fetching events for {}...", &pk_hex[..8]);
282                    }
283                }
284            }
285
286            current_level = next_level;
287        }
288
289        let filter = nostr::Filter::new()
290            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)])
291            .since(nostr::Timestamp::now());
292
293        match client.subscribe(vec![filter], None).await {
294            Ok(_) => tracing::info!("Social graph crawler: subscribed to contact and mute lists"),
295            Err(e) => tracing::warn!("Social graph crawler: failed to subscribe: {}", e),
296        }
297
298        let mut notifications = client.notifications();
299        loop {
300            tokio::select! {
301                _ = shutdown_rx.changed() => {
302                    if *shutdown_rx.borrow() {
303                        break;
304                    }
305                }
306                notification = notifications.recv() => {
307                    match notification {
308                        Ok(RelayPoolNotification::Event { event, .. }) => {
309                            self.handle_incoming_event(&event);
310
311                            let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
312                            if !missing.is_empty() {
313                                tracing::info!(
314                                    "Root follow list updated: fetching {} missing contact lists",
315                                    missing.len()
316                                );
317                                self.fetch_contact_lists_for_pubkeys(&client, &missing, &shutdown_rx, "recrawl").await;
318                            }
319                        }
320                        Ok(_) => {}
321                        Err(e) => {
322                            tracing::warn!("Social graph crawler notification error: {}", e);
323                            break;
324                        }
325                    }
326                }
327            }
328        }
329
330        if let Err(e) = client.disconnect().await {
331            tracing::debug!("Error disconnecting crawler client: {}", e);
332        }
333
334        tracing::info!(
335            "Social graph crawl complete: visited {} pubkeys",
336            visited.len()
337        );
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use nostr::{EventBuilder, Kind, PublicKey, Tag};
345    use std::collections::HashSet;
346    use std::sync::Arc;
347    use tempfile::TempDir;
348
349    async fn wait_for_follow(ndb: &Ndb, owner: &[u8; 32], target: &[u8; 32]) -> bool {
350        let deadline = tokio::time::Instant::now() + Duration::from_millis(500);
351        loop {
352            let follows = super::super::get_follows(ndb, owner);
353            if follows.iter().any(|pk| pk == target) {
354                return true;
355            }
356            if tokio::time::Instant::now() >= deadline {
357                return false;
358            }
359            tokio::time::sleep(Duration::from_millis(20)).await;
360        }
361    }
362
363    #[tokio::test]
364    async fn test_crawler_routes_untrusted_to_spambox() {
365        let _guard = super::super::test_lock();
366        let tmp = TempDir::new().unwrap();
367        let ndb = super::super::init_ndb(tmp.path()).unwrap();
368        let spambox =
369            super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
370
371        let root_keys = nostr::Keys::generate();
372        let root_pk = root_keys.public_key().to_bytes();
373        super::super::set_social_graph_root(&ndb, &root_pk);
374        tokio::time::sleep(Duration::from_millis(100)).await;
375
376        let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2)
377            .with_spambox(Arc::clone(&spambox));
378
379        let unknown_keys = nostr::Keys::generate();
380        let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
381        let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
382            .to_event(&unknown_keys)
383            .unwrap();
384
385        crawler.handle_incoming_event(&event);
386
387        let unknown_pk = unknown_keys.public_key().to_bytes();
388        assert!(!wait_for_follow(&ndb, &unknown_pk, &root_pk).await);
389        assert!(wait_for_follow(&spambox, &unknown_pk, &root_pk).await);
390    }
391
392    #[tokio::test]
393    async fn test_crawler_routes_trusted_to_main_db() {
394        let _guard = super::super::test_lock();
395        let tmp = TempDir::new().unwrap();
396        let ndb = super::super::init_ndb(tmp.path()).unwrap();
397        let spambox =
398            super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
399
400        let root_keys = nostr::Keys::generate();
401        let root_pk = root_keys.public_key().to_bytes();
402        super::super::set_social_graph_root(&ndb, &root_pk);
403        tokio::time::sleep(Duration::from_millis(100)).await;
404
405        let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2)
406            .with_spambox(Arc::clone(&spambox));
407
408        let target_keys = nostr::Keys::generate();
409        let target_pk = target_keys.public_key().to_bytes();
410        let follow_tag = Tag::public_key(PublicKey::from_slice(&target_pk).unwrap());
411        let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
412            .to_event(&root_keys)
413            .unwrap();
414
415        crawler.handle_incoming_event(&event);
416
417        assert!(wait_for_follow(&ndb, &root_pk, &target_pk).await);
418        assert!(!wait_for_follow(&spambox, &root_pk, &target_pk).await);
419    }
420
421    #[tokio::test]
422    async fn test_crawler_no_relays() {
423        let tmp = TempDir::new().unwrap();
424        let ndb = {
425            let _guard = super::super::test_lock();
426            super::super::init_ndb(tmp.path()).unwrap()
427        };
428        let keys = nostr::Keys::generate();
429        let crawler = SocialGraphCrawler::new(ndb, keys, vec![], 2);
430        let (_tx, rx) = watch::channel(false);
431        // Should return immediately with no relays
432        crawler.crawl(rx).await;
433    }
434
435    #[tokio::test]
436    async fn test_crawler_shutdown_signal() {
437        let tmp = TempDir::new().unwrap();
438        let ndb = {
439            let _guard = super::super::test_lock();
440            super::super::init_ndb(tmp.path()).unwrap()
441        };
442        let keys = nostr::Keys::generate();
443        let crawler = SocialGraphCrawler::new(ndb, keys, vec!["wss://localhost:1".to_string()], 2);
444        let (_tx, rx) = watch::channel(true); // Already shutdown
445        crawler.crawl(rx).await;
446    }
447
448    #[tokio::test]
449    async fn test_collect_missing_root_follows_skips_known_and_fetched() {
450        let _guard = super::super::test_lock();
451        let tmp = TempDir::new().unwrap();
452        let ndb = super::super::init_ndb(tmp.path()).unwrap();
453
454        let root_keys = nostr::Keys::generate();
455        let root_pk = root_keys.public_key().to_bytes();
456        super::super::set_social_graph_root(&ndb, &root_pk);
457        tokio::time::sleep(Duration::from_millis(100)).await;
458
459        let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2);
460
461        let known_keys = nostr::Keys::generate();
462        let known_pk = known_keys.public_key().to_bytes();
463        let known_follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
464        let known_event = EventBuilder::new(Kind::ContactList, "", vec![known_follow_tag])
465            .to_event(&known_keys)
466            .unwrap();
467        crawler.ingest_event_into(&ndb, "test", &known_event);
468        assert!(wait_for_follow(&ndb, &known_pk, &root_pk).await);
469
470        let missing_keys = nostr::Keys::generate();
471        let missing_pk = missing_keys.public_key().to_bytes();
472
473        let fetched_keys = nostr::Keys::generate();
474        let fetched_pk = fetched_keys.public_key().to_bytes();
475
476        let tags = vec![
477            Tag::public_key(PublicKey::from_slice(&known_pk).unwrap()),
478            Tag::public_key(PublicKey::from_slice(&missing_pk).unwrap()),
479            Tag::public_key(PublicKey::from_slice(&fetched_pk).unwrap()),
480        ];
481        let root_event = EventBuilder::new(Kind::ContactList, "", tags)
482            .to_event(&root_keys)
483            .unwrap();
484
485        let mut fetched = HashSet::new();
486        fetched.insert(fetched_pk);
487
488        let missing = crawler.collect_missing_root_follows(&root_event, &mut fetched);
489
490        assert_eq!(missing.len(), 1);
491        assert_eq!(missing[0], missing_pk);
492        assert!(fetched.contains(&known_pk));
493        assert!(fetched.contains(&missing_pk));
494        assert!(fetched.contains(&fetched_pk));
495    }
496
497    #[tokio::test]
498    async fn test_collect_missing_root_follows_ignores_non_root() {
499        let _guard = super::super::test_lock();
500        let tmp = TempDir::new().unwrap();
501        let ndb = super::super::init_ndb(tmp.path()).unwrap();
502
503        let root_keys = nostr::Keys::generate();
504        let root_pk = root_keys.public_key().to_bytes();
505        super::super::set_social_graph_root(&ndb, &root_pk);
506        tokio::time::sleep(Duration::from_millis(100)).await;
507
508        let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2);
509
510        let other_keys = nostr::Keys::generate();
511        let other_pk = other_keys.public_key().to_bytes();
512        let tag = Tag::public_key(PublicKey::from_slice(&other_pk).unwrap());
513        let event = EventBuilder::new(Kind::ContactList, "", vec![tag])
514            .to_event(&other_keys)
515            .unwrap();
516
517        let mut fetched = HashSet::new();
518        let missing = crawler.collect_missing_root_follows(&event, &mut fetched);
519
520        assert!(missing.is_empty());
521        assert!(fetched.is_empty());
522    }
523}