Skip to main content

hashtree_cli/nostrdb_integration/
crawler.rs

1//! Social graph crawler - BFS crawl of follow lists via Nostr relays.
2
3use nostrdb_social::Ndb;
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::watch;
8
9/// Crawls the social graph by fetching kind 3 (contact list) events from relays
10/// and ingesting them into nostrdb.
11pub struct SocialGraphCrawler {
12    ndb: Arc<Ndb>,
13    spambox: Option<Arc<Ndb>>,
14    keys: nostr::Keys,
15    relays: Vec<String>,
16    max_depth: u32,
17}
18
19impl SocialGraphCrawler {
20    pub fn new(
21        ndb: Arc<Ndb>,
22        keys: nostr::Keys,
23        relays: Vec<String>,
24        max_depth: u32,
25    ) -> Self {
26        Self {
27            ndb,
28            spambox: None,
29            keys,
30            relays,
31            max_depth,
32        }
33    }
34
35    pub fn with_spambox(mut self, spambox: Arc<Ndb>) -> Self {
36        self.spambox = Some(spambox);
37        self
38    }
39
40    fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
41        if pk_bytes == &self.keys.public_key().to_bytes() {
42            return true;
43        }
44        super::get_follow_distance(&self.ndb, pk_bytes)
45            .map(|distance| distance <= self.max_depth)
46            .unwrap_or(false)
47    }
48
49    fn ingest_event_into(&self, ndb: &Ndb, sub_id: &str, event: &nostr::Event) {
50        if let Ok(json) = serde_json::to_string(event) {
51            super::ingest_event(ndb, sub_id, &json);
52        }
53    }
54
55    pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
56        let is_contact_list = event.kind == nostr::Kind::ContactList;
57        let is_mute_list = event.kind == nostr::Kind::Custom(10000);
58        if !is_contact_list && !is_mute_list {
59            return;
60        }
61
62        let pk_bytes = event.pubkey.to_bytes();
63        if self.is_within_social_graph(&pk_bytes) {
64            self.ingest_event_into(&self.ndb, "live", event);
65            return;
66        }
67
68        if let Some(spambox) = &self.spambox {
69            self.ingest_event_into(spambox, "spambox", event);
70        } else {
71            tracing::debug!(
72                "Social graph crawler: dropping untrusted {} from {}...",
73                if is_contact_list { "contact list" } else { "mute list" },
74                &event.pubkey.to_hex()[..8.min(event.pubkey.to_hex().len())]
75            );
76        }
77    }
78
79    /// Run the BFS crawl until shutdown is signaled.
80    /// Fetches contact lists from relays and feeds them into nostrdb.
81    #[allow(deprecated)] // nostr 0.35 deprecates kind()/tags() but we use this version
82    pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
83        use nostr::nips::nip19::ToBech32;
84        use nostr_sdk::prelude::RelayPoolNotification;
85
86        if self.relays.is_empty() {
87            tracing::warn!("Social graph crawler: no relays configured, skipping");
88            return;
89        }
90
91        let mut shutdown_rx = shutdown_rx;
92        if *shutdown_rx.borrow() {
93            tracing::info!("Social graph crawler: shutdown requested before start");
94            return;
95        }
96
97        tracing::info!(
98            "Starting social graph crawl (max_depth={}, relays={})",
99            self.max_depth,
100            self.relays.len()
101        );
102
103        let sdk_keys = match nostr_sdk::Keys::parse(
104            &self.keys.secret_key().to_bech32().unwrap_or_default(),
105        ) {
106            Ok(k) => k,
107            Err(e) => {
108                tracing::error!("Failed to parse keys for crawler: {}", e);
109                return;
110            }
111        };
112
113        let client = nostr_sdk::Client::new(&sdk_keys);
114
115        for relay in &self.relays {
116            if let Err(e) = client.add_relay(relay).await {
117                tracing::warn!("Failed to add relay {}: {}", relay, e);
118            }
119        }
120        client.connect().await;
121
122        // BFS: start from root, fetch contact lists at each depth
123        let root_pk = self.keys.public_key().to_bytes();
124        let mut visited: HashSet<[u8; 32]> = HashSet::new();
125        let mut current_level = vec![root_pk];
126        visited.insert(root_pk);
127
128        for depth in 0..self.max_depth {
129            if current_level.is_empty() || *shutdown_rx.borrow() {
130                break;
131            }
132
133            tracing::info!(
134                "Crawling depth {} with {} pubkeys",
135                depth,
136                current_level.len()
137            );
138
139            let mut next_level = Vec::new();
140
141            for pk_bytes in &current_level {
142                if *shutdown_rx.borrow() {
143                    break;
144                }
145
146                let pk_hex = hex::encode(pk_bytes);
147
148                let pk = match nostr::PublicKey::from_slice(pk_bytes) {
149                    Ok(pk) => pk,
150                    Err(_) => continue,
151                };
152
153                let filter = nostr::Filter::new()
154                    .author(pk)
155                    .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)]);
156
157                let source = nostr_sdk::EventSource::relays(Some(Duration::from_secs(5)));
158
159                match tokio::time::timeout(
160                    Duration::from_secs(10),
161                    client.get_events_of(vec![filter], source),
162                )
163                .await
164                {
165                    Ok(Ok(events)) => {
166                        for event in &events {
167                            // Ingest into nostrdb
168                            self.ingest_event_into(&self.ndb, "crawl", event);
169
170                            // Extract follows for next level
171                            if event.kind() == nostr::Kind::ContactList {
172                                for tag in event.tags().iter() {
173                                    if let Some(nostr::TagStandard::PublicKey {
174                                        public_key,
175                                        ..
176                                    }) = tag.as_standardized()
177                                    {
178                                        let follow_bytes = public_key.to_bytes();
179                                        if !visited.contains(&follow_bytes) {
180                                            visited.insert(follow_bytes);
181                                            next_level.push(follow_bytes);
182                                        }
183                                    }
184                                }
185                            }
186                        }
187                        tracing::debug!(
188                            "Depth {}: fetched {} events for {}...",
189                            depth,
190                            events.len(),
191                            &pk_hex[..8.min(pk_hex.len())]
192                        );
193                    }
194                    Ok(Err(e)) => {
195                        tracing::debug!("Failed to fetch events for {}...: {}", &pk_hex[..8], e);
196                    }
197                    Err(_) => {
198                        tracing::debug!("Timeout fetching events for {}...", &pk_hex[..8]);
199                    }
200                }
201            }
202
203            current_level = next_level;
204        }
205
206        let filter = nostr::Filter::new()
207            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)])
208            .since(nostr::Timestamp::now());
209
210        match client.subscribe(vec![filter], None).await {
211            Ok(_) => tracing::info!("Social graph crawler: subscribed to contact and mute lists"),
212            Err(e) => tracing::warn!("Social graph crawler: failed to subscribe: {}", e),
213        }
214
215        let mut notifications = client.notifications();
216        loop {
217            tokio::select! {
218                _ = shutdown_rx.changed() => {
219                    if *shutdown_rx.borrow() {
220                        break;
221                    }
222                }
223                notification = notifications.recv() => {
224                    match notification {
225                        Ok(RelayPoolNotification::Event { event, .. }) => {
226                            self.handle_incoming_event(&event);
227                        }
228                        Ok(_) => {}
229                        Err(e) => {
230                            tracing::warn!("Social graph crawler notification error: {}", e);
231                            break;
232                        }
233                    }
234                }
235            }
236        }
237
238        if let Err(e) = client.disconnect().await {
239            tracing::debug!("Error disconnecting crawler client: {}", e);
240        }
241
242        tracing::info!(
243            "Social graph crawl complete: visited {} pubkeys",
244            visited.len()
245        );
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use nostr::{EventBuilder, Kind, Tag, PublicKey};
253    use std::sync::Arc;
254    use tempfile::TempDir;
255
256    async fn wait_for_follow(ndb: &Ndb, owner: &[u8; 32], target: &[u8; 32]) -> bool {
257        let deadline = tokio::time::Instant::now() + Duration::from_millis(500);
258        loop {
259            let follows = super::super::get_follows(ndb, owner);
260            if follows.iter().any(|pk| pk == target) {
261                return true;
262            }
263            if tokio::time::Instant::now() >= deadline {
264                return false;
265            }
266            tokio::time::sleep(Duration::from_millis(20)).await;
267        }
268    }
269
270    #[tokio::test]
271    async fn test_crawler_routes_untrusted_to_spambox() {
272        let _guard = super::super::test_lock();
273        let tmp = TempDir::new().unwrap();
274        let ndb = super::super::init_ndb(tmp.path()).unwrap();
275        let spambox = super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
276
277        let root_keys = nostr::Keys::generate();
278        let root_pk = root_keys.public_key().to_bytes();
279        super::super::set_social_graph_root(&ndb, &root_pk);
280        tokio::time::sleep(Duration::from_millis(100)).await;
281
282        let crawler = SocialGraphCrawler::new(
283            Arc::clone(&ndb),
284            root_keys.clone(),
285            vec![],
286            2,
287        ).with_spambox(Arc::clone(&spambox));
288
289        let unknown_keys = nostr::Keys::generate();
290        let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
291        let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
292            .to_event(&unknown_keys)
293            .unwrap();
294
295        crawler.handle_incoming_event(&event);
296
297        let unknown_pk = unknown_keys.public_key().to_bytes();
298        assert!(!wait_for_follow(&ndb, &unknown_pk, &root_pk).await);
299        assert!(wait_for_follow(&spambox, &unknown_pk, &root_pk).await);
300    }
301
302    #[tokio::test]
303    async fn test_crawler_routes_trusted_to_main_db() {
304        let _guard = super::super::test_lock();
305        let tmp = TempDir::new().unwrap();
306        let ndb = super::super::init_ndb(tmp.path()).unwrap();
307        let spambox = super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
308
309        let root_keys = nostr::Keys::generate();
310        let root_pk = root_keys.public_key().to_bytes();
311        super::super::set_social_graph_root(&ndb, &root_pk);
312        tokio::time::sleep(Duration::from_millis(100)).await;
313
314        let crawler = SocialGraphCrawler::new(
315            Arc::clone(&ndb),
316            root_keys.clone(),
317            vec![],
318            2,
319        ).with_spambox(Arc::clone(&spambox));
320
321        let target_keys = nostr::Keys::generate();
322        let target_pk = target_keys.public_key().to_bytes();
323        let follow_tag = Tag::public_key(PublicKey::from_slice(&target_pk).unwrap());
324        let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
325            .to_event(&root_keys)
326            .unwrap();
327
328        crawler.handle_incoming_event(&event);
329
330        assert!(wait_for_follow(&ndb, &root_pk, &target_pk).await);
331        assert!(!wait_for_follow(&spambox, &root_pk, &target_pk).await);
332    }
333
334    #[tokio::test]
335    async fn test_crawler_no_relays() {
336        let tmp = TempDir::new().unwrap();
337        let ndb = {
338            let _guard = super::super::test_lock();
339            super::super::init_ndb(tmp.path()).unwrap()
340        };
341        let keys = nostr::Keys::generate();
342        let crawler = SocialGraphCrawler::new(ndb, keys, vec![], 2);
343        let (_tx, rx) = watch::channel(false);
344        // Should return immediately with no relays
345        crawler.crawl(rx).await;
346    }
347
348    #[tokio::test]
349    async fn test_crawler_shutdown_signal() {
350        let tmp = TempDir::new().unwrap();
351        let ndb = {
352            let _guard = super::super::test_lock();
353            super::super::init_ndb(tmp.path()).unwrap()
354        };
355        let keys = nostr::Keys::generate();
356        let crawler =
357            SocialGraphCrawler::new(ndb, keys, vec!["wss://localhost:1".to_string()], 2);
358        let (_tx, rx) = watch::channel(true); // Already shutdown
359        crawler.crawl(rx).await;
360    }
361}