hashtree_cli/nostrdb_integration/
crawler.rs1use nostrdb_social::Ndb;
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::watch;
8
9pub struct SocialGraphCrawler {
12 ndb: Arc<Ndb>,
13 spambox: Option<Arc<Ndb>>,
14 keys: nostr::Keys,
15 relays: Vec<String>,
16 max_depth: u32,
17}
18
19impl SocialGraphCrawler {
20 pub fn new(ndb: Arc<Ndb>, keys: nostr::Keys, relays: Vec<String>, max_depth: u32) -> Self {
21 Self {
22 ndb,
23 spambox: None,
24 keys,
25 relays,
26 max_depth,
27 }
28 }
29
30 pub fn with_spambox(mut self, spambox: Arc<Ndb>) -> Self {
31 self.spambox = Some(spambox);
32 self
33 }
34
35 fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
36 if pk_bytes == &self.keys.public_key().to_bytes() {
37 return true;
38 }
39 super::get_follow_distance(&self.ndb, pk_bytes)
40 .map(|distance| distance <= self.max_depth)
41 .unwrap_or(false)
42 }
43
44 fn ingest_event_into(&self, ndb: &Ndb, sub_id: &str, event: &nostr::Event) {
45 if let Ok(json) = serde_json::to_string(event) {
46 super::ingest_event(ndb, sub_id, &json);
47 }
48 }
49
50 #[allow(deprecated)] fn collect_missing_root_follows(
52 &self,
53 event: &nostr::Event,
54 fetched_contact_lists: &mut HashSet<[u8; 32]>,
55 ) -> Vec<[u8; 32]> {
56 if self.max_depth < 2 {
57 return Vec::new();
58 }
59 if event.kind != nostr::Kind::ContactList {
60 return Vec::new();
61 }
62
63 let root_pk = self.keys.public_key().to_bytes();
64 if event.pubkey.to_bytes() != root_pk {
65 return Vec::new();
66 }
67
68 let mut missing = Vec::new();
69 for tag in event.tags().iter() {
70 if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
71 let pk_bytes = public_key.to_bytes();
72 if fetched_contact_lists.contains(&pk_bytes) {
73 continue;
74 }
75
76 let existing_follows = super::get_follows(&self.ndb, &pk_bytes);
77 if !existing_follows.is_empty() {
78 fetched_contact_lists.insert(pk_bytes);
79 continue;
80 }
81
82 fetched_contact_lists.insert(pk_bytes);
83 missing.push(pk_bytes);
84 }
85 }
86
87 missing
88 }
89
90 async fn fetch_contact_lists_for_pubkeys(
91 &self,
92 client: &nostr_sdk::Client,
93 pubkeys: &[[u8; 32]],
94 shutdown_rx: &watch::Receiver<bool>,
95 sub_id: &str,
96 ) {
97 for pk_bytes in pubkeys {
98 if *shutdown_rx.borrow() {
99 break;
100 }
101
102 let pk = match nostr::PublicKey::from_slice(pk_bytes) {
103 Ok(pk) => pk,
104 Err(_) => continue,
105 };
106
107 let filter = nostr::Filter::new()
108 .author(pk)
109 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)]);
110
111 let source = nostr_sdk::EventSource::relays(Some(Duration::from_secs(5)));
112 match tokio::time::timeout(
113 Duration::from_secs(10),
114 client.get_events_of(vec![filter], source),
115 )
116 .await
117 {
118 Ok(Ok(events)) => {
119 for event in &events {
120 self.ingest_event_into(&self.ndb, sub_id, event);
121 }
122 }
123 Ok(Err(e)) => {
124 tracing::debug!("Failed to fetch events for {}...: {}", &pk.to_hex()[..8], e);
125 }
126 Err(_) => {
127 tracing::debug!("Timeout fetching events for {}...", &pk.to_hex()[..8]);
128 }
129 }
130 }
131 }
132
133 pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
134 let is_contact_list = event.kind == nostr::Kind::ContactList;
135 let is_mute_list = event.kind == nostr::Kind::Custom(10000);
136 if !is_contact_list && !is_mute_list {
137 return;
138 }
139
140 let pk_bytes = event.pubkey.to_bytes();
141 if self.is_within_social_graph(&pk_bytes) {
142 self.ingest_event_into(&self.ndb, "live", event);
143 return;
144 }
145
146 if let Some(spambox) = &self.spambox {
147 self.ingest_event_into(spambox, "spambox", event);
148 } else {
149 tracing::debug!(
150 "Social graph crawler: dropping untrusted {} from {}...",
151 if is_contact_list {
152 "contact list"
153 } else {
154 "mute list"
155 },
156 &event.pubkey.to_hex()[..8.min(event.pubkey.to_hex().len())]
157 );
158 }
159 }
160
161 #[allow(deprecated)] pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
165 use nostr::nips::nip19::ToBech32;
166 use nostr_sdk::prelude::RelayPoolNotification;
167
168 if self.relays.is_empty() {
169 tracing::warn!("Social graph crawler: no relays configured, skipping");
170 return;
171 }
172
173 let mut shutdown_rx = shutdown_rx;
174 if *shutdown_rx.borrow() {
175 tracing::info!("Social graph crawler: shutdown requested before start");
176 return;
177 }
178
179 tracing::info!(
180 "Starting social graph crawl (max_depth={}, relays={})",
181 self.max_depth,
182 self.relays.len()
183 );
184
185 let sdk_keys =
186 match nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32().unwrap_or_default()) {
187 Ok(k) => k,
188 Err(e) => {
189 tracing::error!("Failed to parse keys for crawler: {}", e);
190 return;
191 }
192 };
193
194 let client = nostr_sdk::Client::new(&sdk_keys);
195
196 for relay in &self.relays {
197 if let Err(e) = client.add_relay(relay).await {
198 tracing::warn!("Failed to add relay {}: {}", relay, e);
199 }
200 }
201 client.connect().await;
202
203 let root_pk = self.keys.public_key().to_bytes();
205 let mut visited: HashSet<[u8; 32]> = HashSet::new();
206 let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
207 let mut current_level = vec![root_pk];
208 visited.insert(root_pk);
209
210 for depth in 0..self.max_depth {
211 if current_level.is_empty() || *shutdown_rx.borrow() {
212 break;
213 }
214
215 tracing::info!(
216 "Crawling depth {} with {} pubkeys",
217 depth,
218 current_level.len()
219 );
220
221 let mut next_level = Vec::new();
222
223 for pk_bytes in ¤t_level {
224 if *shutdown_rx.borrow() {
225 break;
226 }
227
228 fetched_contact_lists.insert(*pk_bytes);
229
230 let pk_hex = hex::encode(pk_bytes);
231
232 let pk = match nostr::PublicKey::from_slice(pk_bytes) {
233 Ok(pk) => pk,
234 Err(_) => continue,
235 };
236
237 let filter = nostr::Filter::new()
238 .author(pk)
239 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)]);
240
241 let source = nostr_sdk::EventSource::relays(Some(Duration::from_secs(5)));
242
243 match tokio::time::timeout(
244 Duration::from_secs(10),
245 client.get_events_of(vec![filter], source),
246 )
247 .await
248 {
249 Ok(Ok(events)) => {
250 for event in &events {
251 self.ingest_event_into(&self.ndb, "crawl", event);
253
254 if event.kind() == nostr::Kind::ContactList {
256 for tag in event.tags().iter() {
257 if let Some(nostr::TagStandard::PublicKey {
258 public_key, ..
259 }) = tag.as_standardized()
260 {
261 let follow_bytes = public_key.to_bytes();
262 if !visited.contains(&follow_bytes) {
263 visited.insert(follow_bytes);
264 next_level.push(follow_bytes);
265 }
266 }
267 }
268 }
269 }
270 tracing::debug!(
271 "Depth {}: fetched {} events for {}...",
272 depth,
273 events.len(),
274 &pk_hex[..8.min(pk_hex.len())]
275 );
276 }
277 Ok(Err(e)) => {
278 tracing::debug!("Failed to fetch events for {}...: {}", &pk_hex[..8], e);
279 }
280 Err(_) => {
281 tracing::debug!("Timeout fetching events for {}...", &pk_hex[..8]);
282 }
283 }
284 }
285
286 current_level = next_level;
287 }
288
289 let filter = nostr::Filter::new()
290 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)])
291 .since(nostr::Timestamp::now());
292
293 match client.subscribe(vec![filter], None).await {
294 Ok(_) => tracing::info!("Social graph crawler: subscribed to contact and mute lists"),
295 Err(e) => tracing::warn!("Social graph crawler: failed to subscribe: {}", e),
296 }
297
298 let mut notifications = client.notifications();
299 loop {
300 tokio::select! {
301 _ = shutdown_rx.changed() => {
302 if *shutdown_rx.borrow() {
303 break;
304 }
305 }
306 notification = notifications.recv() => {
307 match notification {
308 Ok(RelayPoolNotification::Event { event, .. }) => {
309 self.handle_incoming_event(&event);
310
311 let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
312 if !missing.is_empty() {
313 tracing::info!(
314 "Root follow list updated: fetching {} missing contact lists",
315 missing.len()
316 );
317 self.fetch_contact_lists_for_pubkeys(&client, &missing, &shutdown_rx, "recrawl").await;
318 }
319 }
320 Ok(_) => {}
321 Err(e) => {
322 tracing::warn!("Social graph crawler notification error: {}", e);
323 break;
324 }
325 }
326 }
327 }
328 }
329
330 if let Err(e) = client.disconnect().await {
331 tracing::debug!("Error disconnecting crawler client: {}", e);
332 }
333
334 tracing::info!(
335 "Social graph crawl complete: visited {} pubkeys",
336 visited.len()
337 );
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use nostr::{EventBuilder, Kind, PublicKey, Tag};
345 use std::collections::HashSet;
346 use std::sync::Arc;
347 use tempfile::TempDir;
348
349 async fn wait_for_follow(ndb: &Ndb, owner: &[u8; 32], target: &[u8; 32]) -> bool {
350 let deadline = tokio::time::Instant::now() + Duration::from_millis(500);
351 loop {
352 let follows = super::super::get_follows(ndb, owner);
353 if follows.iter().any(|pk| pk == target) {
354 return true;
355 }
356 if tokio::time::Instant::now() >= deadline {
357 return false;
358 }
359 tokio::time::sleep(Duration::from_millis(20)).await;
360 }
361 }
362
363 #[tokio::test]
364 async fn test_crawler_routes_untrusted_to_spambox() {
365 let _guard = super::super::test_lock();
366 let tmp = TempDir::new().unwrap();
367 let ndb = super::super::init_ndb(tmp.path()).unwrap();
368 let spambox =
369 super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
370
371 let root_keys = nostr::Keys::generate();
372 let root_pk = root_keys.public_key().to_bytes();
373 super::super::set_social_graph_root(&ndb, &root_pk);
374 tokio::time::sleep(Duration::from_millis(100)).await;
375
376 let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2)
377 .with_spambox(Arc::clone(&spambox));
378
379 let unknown_keys = nostr::Keys::generate();
380 let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
381 let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
382 .to_event(&unknown_keys)
383 .unwrap();
384
385 crawler.handle_incoming_event(&event);
386
387 let unknown_pk = unknown_keys.public_key().to_bytes();
388 assert!(!wait_for_follow(&ndb, &unknown_pk, &root_pk).await);
389 assert!(wait_for_follow(&spambox, &unknown_pk, &root_pk).await);
390 }
391
392 #[tokio::test]
393 async fn test_crawler_routes_trusted_to_main_db() {
394 let _guard = super::super::test_lock();
395 let tmp = TempDir::new().unwrap();
396 let ndb = super::super::init_ndb(tmp.path()).unwrap();
397 let spambox =
398 super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
399
400 let root_keys = nostr::Keys::generate();
401 let root_pk = root_keys.public_key().to_bytes();
402 super::super::set_social_graph_root(&ndb, &root_pk);
403 tokio::time::sleep(Duration::from_millis(100)).await;
404
405 let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2)
406 .with_spambox(Arc::clone(&spambox));
407
408 let target_keys = nostr::Keys::generate();
409 let target_pk = target_keys.public_key().to_bytes();
410 let follow_tag = Tag::public_key(PublicKey::from_slice(&target_pk).unwrap());
411 let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
412 .to_event(&root_keys)
413 .unwrap();
414
415 crawler.handle_incoming_event(&event);
416
417 assert!(wait_for_follow(&ndb, &root_pk, &target_pk).await);
418 assert!(!wait_for_follow(&spambox, &root_pk, &target_pk).await);
419 }
420
421 #[tokio::test]
422 async fn test_crawler_no_relays() {
423 let tmp = TempDir::new().unwrap();
424 let ndb = {
425 let _guard = super::super::test_lock();
426 super::super::init_ndb(tmp.path()).unwrap()
427 };
428 let keys = nostr::Keys::generate();
429 let crawler = SocialGraphCrawler::new(ndb, keys, vec![], 2);
430 let (_tx, rx) = watch::channel(false);
431 crawler.crawl(rx).await;
433 }
434
435 #[tokio::test]
436 async fn test_crawler_shutdown_signal() {
437 let tmp = TempDir::new().unwrap();
438 let ndb = {
439 let _guard = super::super::test_lock();
440 super::super::init_ndb(tmp.path()).unwrap()
441 };
442 let keys = nostr::Keys::generate();
443 let crawler = SocialGraphCrawler::new(ndb, keys, vec!["wss://localhost:1".to_string()], 2);
444 let (_tx, rx) = watch::channel(true); crawler.crawl(rx).await;
446 }
447
448 #[tokio::test]
449 async fn test_collect_missing_root_follows_skips_known_and_fetched() {
450 let _guard = super::super::test_lock();
451 let tmp = TempDir::new().unwrap();
452 let ndb = super::super::init_ndb(tmp.path()).unwrap();
453
454 let root_keys = nostr::Keys::generate();
455 let root_pk = root_keys.public_key().to_bytes();
456 super::super::set_social_graph_root(&ndb, &root_pk);
457 tokio::time::sleep(Duration::from_millis(100)).await;
458
459 let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2);
460
461 let known_keys = nostr::Keys::generate();
462 let known_pk = known_keys.public_key().to_bytes();
463 let known_follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
464 let known_event = EventBuilder::new(Kind::ContactList, "", vec![known_follow_tag])
465 .to_event(&known_keys)
466 .unwrap();
467 crawler.ingest_event_into(&ndb, "test", &known_event);
468 assert!(wait_for_follow(&ndb, &known_pk, &root_pk).await);
469
470 let missing_keys = nostr::Keys::generate();
471 let missing_pk = missing_keys.public_key().to_bytes();
472
473 let fetched_keys = nostr::Keys::generate();
474 let fetched_pk = fetched_keys.public_key().to_bytes();
475
476 let tags = vec![
477 Tag::public_key(PublicKey::from_slice(&known_pk).unwrap()),
478 Tag::public_key(PublicKey::from_slice(&missing_pk).unwrap()),
479 Tag::public_key(PublicKey::from_slice(&fetched_pk).unwrap()),
480 ];
481 let root_event = EventBuilder::new(Kind::ContactList, "", tags)
482 .to_event(&root_keys)
483 .unwrap();
484
485 let mut fetched = HashSet::new();
486 fetched.insert(fetched_pk);
487
488 let missing = crawler.collect_missing_root_follows(&root_event, &mut fetched);
489
490 assert_eq!(missing.len(), 1);
491 assert_eq!(missing[0], missing_pk);
492 assert!(fetched.contains(&known_pk));
493 assert!(fetched.contains(&missing_pk));
494 assert!(fetched.contains(&fetched_pk));
495 }
496
497 #[tokio::test]
498 async fn test_collect_missing_root_follows_ignores_non_root() {
499 let _guard = super::super::test_lock();
500 let tmp = TempDir::new().unwrap();
501 let ndb = super::super::init_ndb(tmp.path()).unwrap();
502
503 let root_keys = nostr::Keys::generate();
504 let root_pk = root_keys.public_key().to_bytes();
505 super::super::set_social_graph_root(&ndb, &root_pk);
506 tokio::time::sleep(Duration::from_millis(100)).await;
507
508 let crawler = SocialGraphCrawler::new(Arc::clone(&ndb), root_keys.clone(), vec![], 2);
509
510 let other_keys = nostr::Keys::generate();
511 let other_pk = other_keys.public_key().to_bytes();
512 let tag = Tag::public_key(PublicKey::from_slice(&other_pk).unwrap());
513 let event = EventBuilder::new(Kind::ContactList, "", vec![tag])
514 .to_event(&other_keys)
515 .unwrap();
516
517 let mut fetched = HashSet::new();
518 let missing = crawler.collect_missing_root_follows(&event, &mut fetched);
519
520 assert!(missing.is_empty());
521 assert!(fetched.is_empty());
522 }
523}