1use futures::stream::{self, StreamExt};
2use std::collections::HashSet;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use nostr::Timestamp;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::{
12 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
13 LocalListFileState, SocialGraphBackend,
14};
15
16const DEFAULT_AUTHOR_BATCH_SIZE: usize = 500;
17const DEFAULT_CONCURRENT_BATCHES: usize = 4;
18const GRAPH_FETCH_TIMEOUT: Duration = Duration::from_secs(15);
19const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
20pub(crate) const SOCIALGRAPH_RELAY_EVENT_MAX_SIZE: u32 = 512 * 1024;
21#[cfg(not(test))]
22const CRAWLER_STARTUP_DELAY: Duration = Duration::from_secs(5);
23#[cfg(test)]
24const CRAWLER_STARTUP_DELAY: Duration = Duration::from_millis(50);
25#[cfg(not(test))]
26const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_secs(5);
27#[cfg(test)]
28const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_millis(100);
29
30pub struct SocialGraphTaskHandles {
31 pub shutdown_tx: watch::Sender<bool>,
32 pub crawl_handle: JoinHandle<()>,
33 pub local_list_handle: JoinHandle<()>,
34}
35
36pub struct SocialGraphCrawler {
37 graph_store: Arc<dyn SocialGraphBackend>,
38 spambox: Option<Arc<dyn SocialGraphBackend>>,
39 keys: nostr::Keys,
40 relays: Vec<String>,
41 max_depth: u32,
42 author_batch_size: usize,
43 concurrent_batches: usize,
44 full_recrawl: bool,
45 known_since: Option<Timestamp>,
46}
47
48impl SocialGraphCrawler {
49 pub fn new(
50 graph_store: Arc<dyn SocialGraphBackend>,
51 keys: nostr::Keys,
52 relays: Vec<String>,
53 max_depth: u32,
54 ) -> Self {
55 Self {
56 graph_store,
57 spambox: None,
58 keys,
59 relays,
60 max_depth,
61 author_batch_size: DEFAULT_AUTHOR_BATCH_SIZE,
62 concurrent_batches: DEFAULT_CONCURRENT_BATCHES,
63 full_recrawl: false,
64 known_since: None,
65 }
66 }
67
68 pub fn with_spambox(mut self, spambox: Arc<dyn SocialGraphBackend>) -> Self {
69 self.spambox = Some(spambox);
70 self
71 }
72
73 pub fn with_author_batch_size(mut self, author_batch_size: usize) -> Self {
74 self.author_batch_size = author_batch_size.max(1);
75 self
76 }
77
78 pub fn with_concurrent_batches(mut self, concurrent_batches: usize) -> Self {
79 self.concurrent_batches = concurrent_batches.max(1);
80 self
81 }
82
83 pub fn with_full_recrawl(mut self, full_recrawl: bool) -> Self {
84 self.full_recrawl = full_recrawl;
85 self
86 }
87
88 pub fn with_known_since(mut self, known_since: Option<u64>) -> Self {
89 self.known_since = known_since.map(Timestamp::from);
90 self
91 }
92
93 fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
94 if pk_bytes == &self.keys.public_key().to_bytes() {
95 return true;
96 }
97
98 super::get_follow_distance(self.graph_store.as_ref(), pk_bytes)
99 .map(|distance| distance <= self.max_depth)
100 .unwrap_or(false)
101 }
102
103 fn ingest_events_into(
104 &self,
105 graph_store: &(impl SocialGraphBackend + ?Sized),
106 events: &[nostr::Event],
107 ) {
108 if let Err(err) = super::ingest_parsed_events(graph_store, events) {
109 tracing::debug!("Failed to ingest crawler event: {}", err);
110 }
111 }
112
113 #[allow(deprecated)]
114 fn collect_missing_root_follows(
115 &self,
116 event: &nostr::Event,
117 fetched_contact_lists: &mut HashSet<[u8; 32]>,
118 ) -> Vec<[u8; 32]> {
119 if self.max_depth < 2 || event.kind != nostr::Kind::ContactList {
120 return Vec::new();
121 }
122
123 let root_pk = self.keys.public_key().to_bytes();
124 if event.pubkey.to_bytes() != root_pk {
125 return Vec::new();
126 }
127
128 let mut missing = Vec::new();
129 for tag in event.tags.iter() {
130 if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
131 let pk_bytes = public_key.to_bytes();
132 if fetched_contact_lists.contains(&pk_bytes) {
133 continue;
134 }
135
136 let existing_follows = super::get_follows(self.graph_store.as_ref(), &pk_bytes);
137 if !existing_follows.is_empty() {
138 fetched_contact_lists.insert(pk_bytes);
139 continue;
140 }
141
142 fetched_contact_lists.insert(pk_bytes);
143 missing.push(pk_bytes);
144 }
145 }
146
147 missing
148 }
149
150 fn graph_filter_for_pubkeys(
151 pubkeys: &[[u8; 32]],
152 since: Option<Timestamp>,
153 ) -> Option<nostr::Filter> {
154 let authors = pubkeys
155 .iter()
156 .filter_map(|pk_bytes| nostr::PublicKey::from_slice(pk_bytes).ok())
157 .collect::<Vec<_>>();
158 if authors.is_empty() {
159 return None;
160 }
161
162 let mut filter = nostr::Filter::new()
163 .authors(authors)
164 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList]);
165 if let Some(since) = since {
166 filter = filter.since(since);
167 }
168
169 Some(filter)
170 }
171
172 async fn fetch_graph_events_for_pubkeys(
173 &self,
174 client: &nostr_sdk::Client,
175 pubkeys: &[[u8; 32]],
176 since: Option<Timestamp>,
177 ) -> Vec<nostr::Event> {
178 let Some(filter) = Self::graph_filter_for_pubkeys(pubkeys, since) else {
179 return Vec::new();
180 };
181
182 match client
183 .get_events_of(
184 vec![filter],
185 nostr_sdk::EventSource::relays(Some(GRAPH_FETCH_TIMEOUT)),
186 )
187 .await
188 {
189 Ok(events) => events,
190 Err(err) => {
191 tracing::debug!(
192 "Failed to fetch graph events for {} authors: {}",
193 pubkeys.len(),
194 err
195 );
196 Vec::new()
197 }
198 }
199 }
200
201 async fn fetch_contact_lists_for_pubkeys(
202 &self,
203 client: &nostr_sdk::Client,
204 pubkeys: &[[u8; 32]],
205 since: Option<Timestamp>,
206 shutdown_rx: &watch::Receiver<bool>,
207 ) {
208 let chunk_futures = stream::iter(
209 pubkeys
210 .chunks(self.author_batch_size)
211 .map(|chunk| chunk.to_vec())
212 .collect::<Vec<_>>(),
213 )
214 .map(|chunk| async move {
215 self.fetch_graph_events_for_pubkeys(client, &chunk, since)
216 .await
217 });
218
219 let mut in_flight = chunk_futures.buffer_unordered(self.concurrent_batches);
220 while let Some(events) = in_flight.next().await {
221 if *shutdown_rx.borrow() {
222 break;
223 }
224 if let Err(err) = super::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
225 {
226 tracing::debug!("Failed to ingest crawler graph batch: {}", err);
227 }
228 }
229 }
230
231 fn authors_to_fetch_at_distance(&self, distance: u32) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
232 let Ok(users) = self.graph_store.users_by_follow_distance(distance) else {
233 return (Vec::new(), Vec::new());
234 };
235 if self.full_recrawl {
236 return (users, Vec::new());
237 }
238
239 let mut new_authors = Vec::new();
240 let mut known_authors = Vec::new();
241 let refresh_known_authors = self.known_since.is_some();
242 for pk_bytes in users {
243 match self
244 .graph_store
245 .follow_list_created_at(&pk_bytes)
246 .ok()
247 .flatten()
248 {
249 Some(_) if refresh_known_authors => known_authors.push(pk_bytes),
250 Some(_) => {}
251 None => new_authors.push(pk_bytes),
252 }
253 }
254 (new_authors, known_authors)
255 }
256
257 async fn connect_client(&self) -> Option<nostr_sdk::Client> {
258 use nostr::nips::nip19::ToBech32;
259
260 let Ok(sdk_keys) =
261 nostr_sdk::Keys::parse(self.keys.secret_key().to_bech32().unwrap_or_default())
262 else {
263 return None;
264 };
265
266 let mut relay_limits = nostr_sdk::pool::RelayLimits::default();
267 relay_limits.events.max_size = Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE);
268 let client = nostr_sdk::Client::with_opts(
269 sdk_keys,
270 nostr_sdk::Options::new().relay_limits(relay_limits),
271 );
272 for relay in &self.relays {
273 if let Err(err) = client.add_relay(relay).await {
274 tracing::warn!("Failed to add relay {}: {}", relay, err);
275 }
276 }
277 client.connect_with_timeout(RELAY_CONNECT_TIMEOUT).await;
278 Some(client)
279 }
280
281 async fn sync_graph_once(
282 &self,
283 client: &nostr_sdk::Client,
284 shutdown_rx: &watch::Receiver<bool>,
285 fetched_contact_lists: &mut HashSet<[u8; 32]>,
286 ) {
287 for distance in 0..=self.max_depth {
288 if *shutdown_rx.borrow() {
289 break;
290 }
291
292 let (new_authors, known_authors) = self.authors_to_fetch_at_distance(distance);
293 if new_authors.is_empty() && known_authors.is_empty() {
294 continue;
295 }
296
297 tracing::debug!(
298 "Social graph sync distance={} new_authors={} known_authors={}",
299 distance,
300 new_authors.len(),
301 known_authors.len()
302 );
303
304 if !new_authors.is_empty() {
305 for pk_bytes in &new_authors {
306 fetched_contact_lists.insert(*pk_bytes);
307 }
308 self.fetch_contact_lists_for_pubkeys(client, &new_authors, None, shutdown_rx)
309 .await;
310 }
311
312 if !known_authors.is_empty() {
313 for pk_bytes in &known_authors {
314 fetched_contact_lists.insert(*pk_bytes);
315 }
316 self.fetch_contact_lists_for_pubkeys(
317 client,
318 &known_authors,
319 self.known_since,
320 shutdown_rx,
321 )
322 .await;
323 }
324 }
325 }
326
327 pub async fn warm_once(&self) {
328 if self.relays.is_empty() {
329 tracing::warn!("Social graph crawler: no relays configured, skipping");
330 return;
331 }
332
333 let Some(client) = self.connect_client().await else {
334 return;
335 };
336 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
337 let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
338 self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
339 .await;
340 let _ = client.disconnect().await;
341 }
342
343 pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
344 let is_contact_list = event.kind == nostr::Kind::ContactList;
345 let is_mute_list = event.kind == nostr::Kind::MuteList;
346 if !is_contact_list && !is_mute_list {
347 return;
348 }
349
350 let pk_bytes = event.pubkey.to_bytes();
351 if self.is_within_social_graph(&pk_bytes) {
352 self.ingest_events_into(self.graph_store.as_ref(), std::slice::from_ref(event));
353 return;
354 }
355
356 if let Some(spambox) = &self.spambox {
357 self.ingest_events_into(spambox.as_ref(), std::slice::from_ref(event));
358 }
359 }
360
361 #[allow(deprecated)]
362 pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
363 use nostr_sdk::prelude::RelayPoolNotification;
364
365 if self.relays.is_empty() {
366 tracing::warn!("Social graph crawler: no relays configured, skipping");
367 return;
368 }
369
370 let mut shutdown_rx = shutdown_rx;
371 if *shutdown_rx.borrow() {
372 return;
373 }
374
375 let Some(client) = self.connect_client().await else {
376 return;
377 };
378
379 let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
380 self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
381 .await;
382
383 let filter = nostr::Filter::new()
384 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList])
385 .since(nostr::Timestamp::now());
386
387 let _ = client.subscribe(vec![filter], None).await;
388
389 let mut notifications = client.notifications();
390 loop {
391 tokio::select! {
392 _ = shutdown_rx.changed() => {
393 if *shutdown_rx.borrow() {
394 break;
395 }
396 }
397 notification = notifications.recv() => {
398 match notification {
399 Ok(RelayPoolNotification::Event { event, .. }) => {
400 self.handle_incoming_event(&event);
401 let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
402 if !missing.is_empty() {
403 self.fetch_contact_lists_for_pubkeys(&client, &missing, None, &shutdown_rx).await;
404 }
405 }
406 Ok(_) => {}
407 Err(err) => {
408 tracing::warn!("Social graph crawler notification error: {}", err);
409 break;
410 }
411 }
412 }
413 }
414 }
415
416 let _ = client.disconnect().await;
417 }
418}
419
420pub fn spawn_social_graph_tasks(
421 graph_store: Arc<dyn SocialGraphBackend>,
422 keys: nostr::Keys,
423 relays: Vec<String>,
424 max_depth: u32,
425 spambox: Option<Arc<dyn SocialGraphBackend>>,
426 data_dir: PathBuf,
427) -> SocialGraphTaskHandles {
428 let (shutdown_tx, crawl_shutdown_rx) = watch::channel(false);
429 let local_list_shutdown_rx = crawl_shutdown_rx.clone();
430 let crawl_data_dir = data_dir.clone();
431 let local_list_data_dir = data_dir;
432
433 let crawl_handle = tokio::spawn({
434 let graph_store = Arc::clone(&graph_store);
435 let keys = keys.clone();
436 let relays = relays.clone();
437 let spambox = spambox.clone();
438 async move {
439 tokio::time::sleep(CRAWLER_STARTUP_DELAY).await;
440 let mut crawler = SocialGraphCrawler::new(graph_store.clone(), keys, relays, max_depth);
441 if let Some(spambox) = spambox {
442 crawler = crawler.with_spambox(spambox);
443 }
444 if let Err(err) =
445 sync_local_list_files_force(graph_store.as_ref(), &crawl_data_dir, &crawler.keys)
446 {
447 tracing::warn!(
448 "Failed to sync local social graph lists at startup: {}",
449 err
450 );
451 }
452 crawler.crawl(crawl_shutdown_rx).await;
453 }
454 });
455
456 let local_list_handle = tokio::spawn({
457 let graph_store = Arc::clone(&graph_store);
458 let keys = keys.clone();
459 let relays = relays.clone();
460 let spambox = spambox.clone();
461 async move {
462 let mut shutdown_rx = local_list_shutdown_rx;
463 let mut state =
464 read_local_list_file_state(&local_list_data_dir).unwrap_or_else(|err| {
465 tracing::warn!("Failed to read local social graph list state: {}", err);
466 LocalListFileState::default()
467 });
468 let mut interval = tokio::time::interval(LOCAL_LIST_POLL_INTERVAL);
469 loop {
470 tokio::select! {
471 _ = shutdown_rx.changed() => {
472 if *shutdown_rx.borrow() {
473 break;
474 }
475 }
476 _ = interval.tick() => {
477 let outcome = match sync_local_list_files_if_changed(
478 graph_store.as_ref(),
479 &local_list_data_dir,
480 &keys,
481 &mut state,
482 ) {
483 Ok(outcome) => outcome,
484 Err(err) => {
485 tracing::warn!("Failed to sync local social graph list files: {}", err);
486 continue;
487 }
488 };
489
490 if outcome.contacts_changed {
491 let mut crawler =
492 SocialGraphCrawler::new(graph_store.clone(), keys.clone(), relays.clone(), max_depth);
493 if let Some(spambox) = spambox.clone() {
494 crawler = crawler.with_spambox(spambox);
495 }
496 crawler.warm_once().await;
497 }
498 }
499 }
500 }
501 }
502 });
503
504 SocialGraphTaskHandles {
505 shutdown_tx,
506 crawl_handle,
507 local_list_handle,
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use std::net::TcpListener;
515 use std::sync::Mutex;
516 use std::time::Instant;
517
518 use futures::{SinkExt, StreamExt};
519 use nostr::{EventBuilder, JsonUtil, Kind, PublicKey, Tag};
520 use tempfile::TempDir;
521 use tokio::net::TcpStream;
522 use tokio::sync::broadcast;
523 use tokio_tungstenite::{accept_async, tungstenite::Message};
524
525 #[derive(Debug, Default)]
526 struct RelayState {
527 events: Vec<nostr::Event>,
528 request_author_counts: Vec<usize>,
529 requested_authors: Vec<Vec<String>>,
530 }
531
532 struct TestRelay {
533 port: u16,
534 shutdown: broadcast::Sender<()>,
535 state: Arc<Mutex<RelayState>>,
536 }
537
538 impl TestRelay {
539 fn new(events: Vec<nostr::Event>) -> Self {
540 let state = Arc::new(Mutex::new(RelayState {
541 events,
542 request_author_counts: Vec::new(),
543 requested_authors: Vec::new(),
544 }));
545 let (shutdown, _) = broadcast::channel(1);
546
547 let std_listener = TcpListener::bind("127.0.0.1:0").expect("bind relay listener");
548 let port = std_listener.local_addr().expect("local addr").port();
549 std_listener
550 .set_nonblocking(true)
551 .expect("set listener nonblocking");
552
553 let state_for_thread = Arc::clone(&state);
554 let shutdown_for_thread = shutdown.clone();
555 std::thread::spawn(move || {
556 let runtime = tokio::runtime::Builder::new_multi_thread()
557 .worker_threads(2)
558 .enable_all()
559 .build()
560 .expect("build tokio runtime");
561 runtime.block_on(async move {
562 let listener = tokio::net::TcpListener::from_std(std_listener)
563 .expect("tokio listener from std");
564 let mut shutdown_rx = shutdown_for_thread.subscribe();
565
566 loop {
567 tokio::select! {
568 _ = shutdown_rx.recv() => break,
569 accept = listener.accept() => {
570 if let Ok((stream, _)) = accept {
571 let state = Arc::clone(&state_for_thread);
572 tokio::spawn(async move {
573 handle_connection(stream, state).await;
574 });
575 }
576 }
577 }
578 }
579 });
580 });
581
582 std::thread::sleep(Duration::from_millis(100));
583
584 Self {
585 port,
586 shutdown,
587 state,
588 }
589 }
590
591 fn url(&self) -> String {
592 format!("ws://127.0.0.1:{}", self.port)
593 }
594
595 fn request_author_counts(&self) -> Vec<usize> {
596 self.state
597 .lock()
598 .expect("relay state lock")
599 .request_author_counts
600 .clone()
601 }
602
603 fn requested_authors(&self) -> Vec<Vec<String>> {
604 self.state
605 .lock()
606 .expect("relay state lock")
607 .requested_authors
608 .clone()
609 }
610 }
611
612 impl Drop for TestRelay {
613 fn drop(&mut self) {
614 let _ = self.shutdown.send(());
615 std::thread::sleep(Duration::from_millis(50));
616 }
617 }
618
619 fn matching_events(
620 state: &Arc<Mutex<RelayState>>,
621 filters: &[nostr::Filter],
622 ) -> Vec<nostr::Event> {
623 let guard = state.lock().expect("relay state lock");
624 guard
625 .events
626 .iter()
627 .filter(|event| {
628 filters.is_empty() || filters.iter().any(|filter| filter.match_event(event))
629 })
630 .cloned()
631 .collect()
632 }
633
634 async fn send_relay_message(
635 write: &mut futures::stream::SplitSink<
636 tokio_tungstenite::WebSocketStream<TcpStream>,
637 Message,
638 >,
639 message: nostr::RelayMessage,
640 ) {
641 let _ = write.send(Message::Text(message.as_json())).await;
642 }
643
644 async fn handle_connection(stream: TcpStream, state: Arc<Mutex<RelayState>>) {
645 let ws_stream = match accept_async(stream).await {
646 Ok(ws) => ws,
647 Err(_) => return,
648 };
649 let (mut write, mut read) = ws_stream.split();
650
651 while let Some(message) = read.next().await {
652 let text = match message {
653 Ok(Message::Text(text)) => text,
654 Ok(Message::Ping(data)) => {
655 let _ = write.send(Message::Pong(data)).await;
656 continue;
657 }
658 Ok(Message::Close(_)) => break,
659 _ => continue,
660 };
661
662 let parsed = match nostr::ClientMessage::from_json(text.as_bytes()) {
663 Ok(message) => message,
664 Err(_) => continue,
665 };
666
667 match parsed {
668 nostr::ClientMessage::Req {
669 subscription_id,
670 filters,
671 } => {
672 let author_count = filters
673 .iter()
674 .filter_map(|filter| filter.authors.as_ref())
675 .map(|authors| authors.len())
676 .sum();
677 let mut requested_authors = filters
678 .iter()
679 .filter_map(|filter| filter.authors.as_ref())
680 .flat_map(|authors| authors.iter().map(|author| author.to_hex()))
681 .collect::<Vec<_>>();
682 requested_authors.sort();
683 requested_authors.dedup();
684 {
685 let mut guard = state.lock().expect("relay state lock");
686 guard.request_author_counts.push(author_count);
687 guard.requested_authors.push(requested_authors);
688 }
689
690 for event in matching_events(&state, &filters) {
691 send_relay_message(
692 &mut write,
693 nostr::RelayMessage::event(subscription_id.clone(), event),
694 )
695 .await;
696 }
697 send_relay_message(&mut write, nostr::RelayMessage::eose(subscription_id))
698 .await;
699 }
700 nostr::ClientMessage::Close(subscription_id) => {
701 send_relay_message(
702 &mut write,
703 nostr::RelayMessage::closed(subscription_id, ""),
704 )
705 .await;
706 }
707 _ => {}
708 }
709 }
710 }
711
712 async fn wait_until<F>(timeout: Duration, mut condition: F)
713 where
714 F: FnMut() -> bool,
715 {
716 let start = Instant::now();
717 while start.elapsed() < timeout {
718 if condition() {
719 return;
720 }
721 tokio::time::sleep(Duration::from_millis(50)).await;
722 }
723 panic!("condition not met within {:?}", timeout);
724 }
725
726 fn write_contacts_file(path: &std::path::Path, pubkeys: &[String]) {
727 std::fs::write(
728 path,
729 serde_json::to_string(pubkeys).expect("serialize contacts"),
730 )
731 .expect("write contacts file");
732 }
733
734 #[tokio::test]
735 async fn test_crawler_routes_untrusted_to_spambox() {
736 let _guard = crate::socialgraph::test_lock();
737 let tmp = TempDir::new().unwrap();
738 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
739 let spambox =
740 crate::socialgraph::open_social_graph_store_at_path(&tmp.path().join("spambox"), None)
741 .unwrap();
742
743 let root_keys = nostr::Keys::generate();
744 let root_pk = root_keys.public_key().to_bytes();
745 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
746 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
747 let spambox_backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = spambox.clone();
748
749 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![], 2)
750 .with_spambox(spambox_backend);
751
752 let unknown_keys = nostr::Keys::generate();
753 let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
754 let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
755 .to_event(&unknown_keys)
756 .unwrap();
757
758 crawler.handle_incoming_event(&event);
759
760 let unknown_pk = unknown_keys.public_key().to_bytes();
761 assert!(crate::socialgraph::get_follows(&graph_store, &unknown_pk).is_empty());
762 assert_eq!(
763 crate::socialgraph::get_follows(&spambox, &unknown_pk),
764 vec![root_pk]
765 );
766 }
767
768 #[tokio::test]
769 #[allow(clippy::await_holding_lock)]
770 async fn test_crawler_batches_graph_fetches_by_author_chunk() {
771 let _guard = crate::socialgraph::test_lock();
772 let tmp = TempDir::new().unwrap();
773 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
774
775 let root_keys = nostr::Keys::generate();
776 let root_pk = root_keys.public_key().to_bytes();
777 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
778 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
779
780 let alice_keys = nostr::Keys::generate();
781 let bob_keys = nostr::Keys::generate();
782 let carol_keys = nostr::Keys::generate();
783
784 let root_event = EventBuilder::new(
785 Kind::ContactList,
786 "",
787 vec![
788 Tag::public_key(alice_keys.public_key()),
789 Tag::public_key(bob_keys.public_key()),
790 ],
791 )
792 .custom_created_at(nostr::Timestamp::from(10))
793 .to_event(&root_keys)
794 .unwrap();
795 let alice_event = EventBuilder::new(
796 Kind::ContactList,
797 "",
798 vec![Tag::public_key(carol_keys.public_key())],
799 )
800 .custom_created_at(nostr::Timestamp::from(11))
801 .to_event(&alice_keys)
802 .unwrap();
803 let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
804 .custom_created_at(nostr::Timestamp::from(12))
805 .to_event(&bob_keys)
806 .unwrap();
807
808 let relay = TestRelay::new(vec![root_event, alice_event, bob_event]);
809 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
810 .with_author_batch_size(2);
811
812 let (shutdown_tx, shutdown_rx) = watch::channel(false);
813 let handle = tokio::spawn(async move {
814 crawler.crawl(shutdown_rx).await;
815 });
816
817 let alice_pk = alice_keys.public_key().to_bytes();
818 let bob_pk = bob_keys.public_key().to_bytes();
819 let carol_pk = carol_keys.public_key().to_bytes();
820 wait_until(Duration::from_secs(5), || {
821 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
822 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
823 root_follows.contains(&alice_pk)
824 && root_follows.contains(&bob_pk)
825 && alice_follows.contains(&carol_pk)
826 })
827 .await;
828
829 let _ = shutdown_tx.send(true);
830 handle.await.unwrap();
831
832 let author_counts = relay.request_author_counts();
833 assert!(
834 author_counts.iter().any(|count| *count >= 2),
835 "expected batched author REQ, got {:?}",
836 author_counts
837 );
838 }
839
840 #[tokio::test]
841 #[allow(clippy::await_holding_lock)]
842 async fn test_crawler_expands_from_existing_graph_without_root_refetch() {
843 let _guard = crate::socialgraph::test_lock();
844 let tmp = TempDir::new().unwrap();
845 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
846
847 let root_keys = nostr::Keys::generate();
848 let root_pk = root_keys.public_key().to_bytes();
849 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
850 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
851
852 let alice_keys = nostr::Keys::generate();
853 let bob_keys = nostr::Keys::generate();
854 let carol_keys = nostr::Keys::generate();
855
856 let root_event = EventBuilder::new(
857 Kind::ContactList,
858 "",
859 vec![
860 Tag::public_key(alice_keys.public_key()),
861 Tag::public_key(bob_keys.public_key()),
862 ],
863 )
864 .custom_created_at(nostr::Timestamp::from(10))
865 .to_event(&root_keys)
866 .unwrap();
867 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
868
869 let alice_event = EventBuilder::new(
870 Kind::ContactList,
871 "",
872 vec![Tag::public_key(carol_keys.public_key())],
873 )
874 .custom_created_at(nostr::Timestamp::from(11))
875 .to_event(&alice_keys)
876 .unwrap();
877 let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
878 .custom_created_at(nostr::Timestamp::from(12))
879 .to_event(&bob_keys)
880 .unwrap();
881
882 let relay = TestRelay::new(vec![alice_event, bob_event]);
883 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
884 .with_author_batch_size(2);
885
886 let (shutdown_tx, shutdown_rx) = watch::channel(false);
887 let handle = tokio::spawn(async move {
888 crawler.crawl(shutdown_rx).await;
889 });
890
891 let alice_pk = alice_keys.public_key().to_bytes();
892 let carol_pk = carol_keys.public_key().to_bytes();
893 wait_until(Duration::from_secs(5), || {
894 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&carol_pk)
895 })
896 .await;
897
898 let _ = shutdown_tx.send(true);
899 handle.await.unwrap();
900
901 let author_counts = relay.request_author_counts();
902 let requested_authors = relay.requested_authors();
903 assert!(
904 requested_authors
905 .iter()
906 .flatten()
907 .all(|author| author != &root_keys.public_key().to_hex()),
908 "expected incremental crawl to skip root refetch, got {:?}",
909 requested_authors
910 );
911 assert!(
912 author_counts.iter().any(|count| *count >= 2),
913 "expected batched distance-1 REQ, got {:?}",
914 author_counts
915 );
916 }
917
918 #[tokio::test]
919 #[allow(clippy::await_holding_lock)]
920 async fn test_crawler_full_recrawl_refetches_root() {
921 let _guard = crate::socialgraph::test_lock();
922 let tmp = TempDir::new().unwrap();
923 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
924
925 let root_keys = nostr::Keys::generate();
926 let root_pk = root_keys.public_key().to_bytes();
927 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
928 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
929
930 let alice_keys = nostr::Keys::generate();
931 let bob_keys = nostr::Keys::generate();
932
933 let root_event = EventBuilder::new(
934 Kind::ContactList,
935 "",
936 vec![
937 Tag::public_key(alice_keys.public_key()),
938 Tag::public_key(bob_keys.public_key()),
939 ],
940 )
941 .custom_created_at(nostr::Timestamp::from(10))
942 .to_event(&root_keys)
943 .unwrap();
944 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
945
946 let relay = TestRelay::new(vec![root_event]);
947 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
948 .with_full_recrawl(true);
949
950 let (shutdown_tx, shutdown_rx) = watch::channel(false);
951 let handle = tokio::spawn(async move {
952 crawler.crawl(shutdown_rx).await;
953 });
954
955 wait_until(Duration::from_secs(5), || {
956 relay.request_author_counts().contains(&1)
957 })
958 .await;
959
960 let _ = shutdown_tx.send(true);
961 handle.await.unwrap();
962
963 let requested_authors = relay.requested_authors();
964 assert!(
965 requested_authors
966 .iter()
967 .flatten()
968 .any(|author| author == &root_keys.public_key().to_hex()),
969 "expected full recrawl to refetch root, got {:?}",
970 requested_authors
971 );
972 }
973
974 #[tokio::test]
975 #[allow(clippy::await_holding_lock)]
976 async fn test_crawler_revisits_known_authors_with_since_cursor() {
977 let _guard = crate::socialgraph::test_lock();
978 let tmp = TempDir::new().unwrap();
979 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
980
981 let root_keys = nostr::Keys::generate();
982 let root_pk = root_keys.public_key().to_bytes();
983 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
984 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
985
986 let alice_keys = nostr::Keys::generate();
987 let carol_keys = nostr::Keys::generate();
988 let dave_keys = nostr::Keys::generate();
989
990 let root_event = EventBuilder::new(
991 Kind::ContactList,
992 "",
993 vec![Tag::public_key(alice_keys.public_key())],
994 )
995 .custom_created_at(nostr::Timestamp::from(10))
996 .to_event(&root_keys)
997 .unwrap();
998 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
999
1000 let alice_old_event = EventBuilder::new(
1001 Kind::ContactList,
1002 "",
1003 vec![Tag::public_key(carol_keys.public_key())],
1004 )
1005 .custom_created_at(nostr::Timestamp::from(11))
1006 .to_event(&alice_keys)
1007 .unwrap();
1008 crate::socialgraph::ingest_parsed_event(&graph_store, &alice_old_event).unwrap();
1009
1010 let alice_new_event = EventBuilder::new(
1011 Kind::ContactList,
1012 "",
1013 vec![
1014 Tag::public_key(carol_keys.public_key()),
1015 Tag::public_key(dave_keys.public_key()),
1016 ],
1017 )
1018 .custom_created_at(nostr::Timestamp::from(20))
1019 .to_event(&alice_keys)
1020 .unwrap();
1021
1022 let relay = TestRelay::new(vec![alice_new_event]);
1023 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1024 .with_author_batch_size(2)
1025 .with_known_since(Some(15));
1026
1027 let (shutdown_tx, shutdown_rx) = watch::channel(false);
1028 let handle = tokio::spawn(async move {
1029 crawler.crawl(shutdown_rx).await;
1030 });
1031
1032 let alice_pk = alice_keys.public_key().to_bytes();
1033 let dave_pk = dave_keys.public_key().to_bytes();
1034 wait_until(Duration::from_secs(5), || {
1035 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk)
1036 })
1037 .await;
1038
1039 let _ = shutdown_tx.send(true);
1040 handle.await.unwrap();
1041
1042 let requested_authors = relay.requested_authors();
1043 assert!(
1044 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk),
1045 "expected incremental crawl to refresh known author follow list"
1046 );
1047 assert!(
1048 requested_authors
1049 .iter()
1050 .flatten()
1051 .any(|author| author == &alice_keys.public_key().to_hex()),
1052 "expected crawler to refetch known author, got {:?}",
1053 requested_authors
1054 );
1055 }
1056
1057 #[tokio::test]
1058 #[allow(clippy::await_holding_lock)]
1059 async fn test_crawler_warm_once_completes_initial_sync_without_shutdown() {
1060 let _guard = crate::socialgraph::test_lock();
1061 let tmp = TempDir::new().unwrap();
1062 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1063
1064 let root_keys = nostr::Keys::generate();
1065 let root_pk = root_keys.public_key().to_bytes();
1066 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1067 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1068
1069 let alice_keys = nostr::Keys::generate();
1070 let bob_keys = nostr::Keys::generate();
1071
1072 let root_event = EventBuilder::new(
1073 Kind::ContactList,
1074 "",
1075 vec![Tag::public_key(alice_keys.public_key())],
1076 )
1077 .custom_created_at(nostr::Timestamp::from(10))
1078 .to_event(&root_keys)
1079 .unwrap();
1080 let alice_event = EventBuilder::new(
1081 Kind::ContactList,
1082 "",
1083 vec![Tag::public_key(bob_keys.public_key())],
1084 )
1085 .custom_created_at(nostr::Timestamp::from(11))
1086 .to_event(&alice_keys)
1087 .unwrap();
1088
1089 let relay = TestRelay::new(vec![root_event, alice_event]);
1090 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1091 .with_author_batch_size(2);
1092
1093 let started = std::time::Instant::now();
1094 crawler.warm_once().await;
1095
1096 let alice_pk = alice_keys.public_key().to_bytes();
1097 let bob_pk = bob_keys.public_key().to_bytes();
1098 assert!(
1099 started.elapsed() < Duration::from_secs(5),
1100 "warm_once should complete finite sync promptly"
1101 );
1102 assert!(
1103 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&bob_pk),
1104 "expected warm_once to ingest distance-1 follow list"
1105 );
1106 }
1107
1108 #[tokio::test]
1109 async fn test_crawler_warm_once_accepts_large_contact_list_events() {
1110 let _guard = crate::socialgraph::test_lock();
1111 let tmp = TempDir::new().unwrap();
1112 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1113
1114 let root_keys = nostr::Keys::generate();
1115 let root_pk = root_keys.public_key().to_bytes();
1116 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1117 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1118
1119 let followed_keys = (0..1_600)
1120 .map(|_| nostr::Keys::generate())
1121 .collect::<Vec<_>>();
1122 let tags = followed_keys
1123 .iter()
1124 .map(|keys| Tag::public_key(keys.public_key()))
1125 .collect::<Vec<_>>();
1126 let root_event = EventBuilder::new(Kind::ContactList, "", tags)
1127 .custom_created_at(nostr::Timestamp::from(10))
1128 .to_event(&root_keys)
1129 .unwrap();
1130 assert!(
1131 root_event.as_json().len() > 70_000,
1132 "test event should exceed nostr-sdk default size limit"
1133 );
1134
1135 let relay = TestRelay::new(vec![root_event]);
1136 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
1137 .with_author_batch_size(1);
1138
1139 crawler.warm_once().await;
1140
1141 let follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1142 let first_pk = followed_keys
1143 .first()
1144 .expect("first followed key")
1145 .public_key()
1146 .to_bytes();
1147 let last_pk = followed_keys
1148 .last()
1149 .expect("last followed key")
1150 .public_key()
1151 .to_bytes();
1152 assert!(
1153 follows.contains(&first_pk) && follows.contains(&last_pk),
1154 "expected warm_once to ingest oversized contact list event"
1155 );
1156 }
1157
1158 #[tokio::test]
1159 #[allow(clippy::await_holding_lock)]
1160 async fn test_spawned_social_graph_tasks_sync_local_contacts_on_startup() {
1161 let _guard = crate::socialgraph::test_lock();
1162 let tmp = TempDir::new().unwrap();
1163 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1164
1165 let root_keys = nostr::Keys::generate();
1166 let root_pk = root_keys.public_key().to_bytes();
1167 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1168
1169 let alice_keys = nostr::Keys::generate();
1170 let bob_keys = nostr::Keys::generate();
1171
1172 write_contacts_file(
1173 &tmp.path().join("contacts.json"),
1174 &[alice_keys.public_key().to_hex()],
1175 );
1176
1177 let alice_event = EventBuilder::new(
1178 Kind::ContactList,
1179 "",
1180 vec![Tag::public_key(bob_keys.public_key())],
1181 )
1182 .custom_created_at(nostr::Timestamp::from(11))
1183 .to_event(&alice_keys)
1184 .unwrap();
1185
1186 let relay = TestRelay::new(vec![alice_event]);
1187 let tasks = spawn_social_graph_tasks(
1188 graph_store.clone(),
1189 root_keys.clone(),
1190 vec![relay.url()],
1191 2,
1192 None,
1193 tmp.path().to_path_buf(),
1194 );
1195
1196 let alice_pk = alice_keys.public_key().to_bytes();
1197 let bob_pk = bob_keys.public_key().to_bytes();
1198 wait_until(Duration::from_secs(5), || {
1199 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1200 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1201 root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1202 })
1203 .await;
1204
1205 let _ = tasks.shutdown_tx.send(true);
1206 tasks.crawl_handle.abort();
1207 tasks.local_list_handle.abort();
1208 }
1209
1210 #[tokio::test]
1211 #[allow(clippy::await_holding_lock)]
1212 async fn test_spawned_social_graph_tasks_refresh_when_contacts_change() {
1213 let _guard = crate::socialgraph::test_lock();
1214 let tmp = TempDir::new().unwrap();
1215 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1216
1217 let root_keys = nostr::Keys::generate();
1218 let root_pk = root_keys.public_key().to_bytes();
1219 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1220
1221 let alice_keys = nostr::Keys::generate();
1222 let bob_keys = nostr::Keys::generate();
1223
1224 let alice_event = EventBuilder::new(
1225 Kind::ContactList,
1226 "",
1227 vec![Tag::public_key(bob_keys.public_key())],
1228 )
1229 .custom_created_at(nostr::Timestamp::from(11))
1230 .to_event(&alice_keys)
1231 .unwrap();
1232
1233 let relay = TestRelay::new(vec![alice_event]);
1234 let tasks = spawn_social_graph_tasks(
1235 graph_store.clone(),
1236 root_keys.clone(),
1237 vec![relay.url()],
1238 2,
1239 None,
1240 tmp.path().to_path_buf(),
1241 );
1242
1243 let alice_pk = alice_keys.public_key().to_bytes();
1244 let bob_pk = bob_keys.public_key().to_bytes();
1245 assert!(!crate::socialgraph::get_follows(&graph_store, &root_pk).contains(&alice_pk));
1246
1247 write_contacts_file(
1248 &tmp.path().join("contacts.json"),
1249 &[alice_keys.public_key().to_hex()],
1250 );
1251
1252 wait_until(Duration::from_secs(5), || {
1253 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1254 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1255 root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1256 })
1257 .await;
1258
1259 let _ = tasks.shutdown_tx.send(true);
1260 tasks.crawl_handle.abort();
1261 tasks.local_list_handle.abort();
1262 }
1263}