1use futures::stream::{self, StreamExt};
2use std::collections::HashSet;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use nostr::Timestamp;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::{
12 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
13 LocalListFileState, SocialGraphBackend,
14};
15
16const DEFAULT_AUTHOR_BATCH_SIZE: usize = 500;
17const DEFAULT_CONCURRENT_BATCHES: usize = 4;
18const GRAPH_FETCH_TIMEOUT: Duration = Duration::from_secs(15);
19const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
20pub(crate) const SOCIALGRAPH_RELAY_EVENT_MAX_SIZE: u32 = 512 * 1024;
21#[cfg(not(test))]
22const CRAWLER_STARTUP_DELAY: Duration = Duration::from_secs(5);
23#[cfg(test)]
24const CRAWLER_STARTUP_DELAY: Duration = Duration::from_millis(50);
25#[cfg(not(test))]
26const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_secs(5);
27#[cfg(test)]
28const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_millis(100);
29
30pub struct SocialGraphTaskHandles {
31 pub shutdown_tx: watch::Sender<bool>,
32 pub crawl_handle: JoinHandle<()>,
33 pub local_list_handle: JoinHandle<()>,
34}
35
36pub struct SocialGraphCrawler {
37 graph_store: Arc<dyn SocialGraphBackend>,
38 spambox: Option<Arc<dyn SocialGraphBackend>>,
39 keys: nostr::Keys,
40 relays: Vec<String>,
41 max_depth: u32,
42 author_batch_size: usize,
43 concurrent_batches: usize,
44 full_recrawl: bool,
45 known_since: Option<Timestamp>,
46}
47
48impl SocialGraphCrawler {
49 pub fn new(
50 graph_store: Arc<dyn SocialGraphBackend>,
51 keys: nostr::Keys,
52 relays: Vec<String>,
53 max_depth: u32,
54 ) -> Self {
55 Self {
56 graph_store,
57 spambox: None,
58 keys,
59 relays,
60 max_depth,
61 author_batch_size: DEFAULT_AUTHOR_BATCH_SIZE,
62 concurrent_batches: DEFAULT_CONCURRENT_BATCHES,
63 full_recrawl: false,
64 known_since: None,
65 }
66 }
67
68 pub fn with_spambox(mut self, spambox: Arc<dyn SocialGraphBackend>) -> Self {
69 self.spambox = Some(spambox);
70 self
71 }
72
73 pub fn with_author_batch_size(mut self, author_batch_size: usize) -> Self {
74 self.author_batch_size = author_batch_size.max(1);
75 self
76 }
77
78 pub fn with_concurrent_batches(mut self, concurrent_batches: usize) -> Self {
79 self.concurrent_batches = concurrent_batches.max(1);
80 self
81 }
82
83 pub fn with_full_recrawl(mut self, full_recrawl: bool) -> Self {
84 self.full_recrawl = full_recrawl;
85 self
86 }
87
88 pub fn with_known_since(mut self, known_since: Option<u64>) -> Self {
89 self.known_since = known_since.map(Timestamp::from);
90 self
91 }
92
93 fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
94 if pk_bytes == &self.keys.public_key().to_bytes() {
95 return true;
96 }
97
98 super::get_follow_distance(self.graph_store.as_ref(), pk_bytes)
99 .map(|distance| distance <= self.max_depth)
100 .unwrap_or(false)
101 }
102
103 fn ingest_events_into(
104 &self,
105 graph_store: &(impl SocialGraphBackend + ?Sized),
106 events: &[nostr::Event],
107 ) {
108 if let Err(err) = super::ingest_parsed_events(graph_store, events) {
109 tracing::debug!("Failed to ingest crawler event: {}", err);
110 }
111 }
112
113 #[allow(deprecated)]
114 fn collect_missing_root_follows(
115 &self,
116 event: &nostr::Event,
117 fetched_contact_lists: &mut HashSet<[u8; 32]>,
118 ) -> Vec<[u8; 32]> {
119 if self.max_depth < 2 || event.kind != nostr::Kind::ContactList {
120 return Vec::new();
121 }
122
123 let root_pk = self.keys.public_key().to_bytes();
124 if event.pubkey.to_bytes() != root_pk {
125 return Vec::new();
126 }
127
128 let mut missing = Vec::new();
129 for tag in event.tags.iter() {
130 if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
131 let pk_bytes = public_key.to_bytes();
132 if fetched_contact_lists.contains(&pk_bytes) {
133 continue;
134 }
135
136 let existing_follows = super::get_follows(self.graph_store.as_ref(), &pk_bytes);
137 if !existing_follows.is_empty() {
138 fetched_contact_lists.insert(pk_bytes);
139 continue;
140 }
141
142 fetched_contact_lists.insert(pk_bytes);
143 missing.push(pk_bytes);
144 }
145 }
146
147 missing
148 }
149
150 fn graph_filter_for_pubkeys(
151 pubkeys: &[[u8; 32]],
152 since: Option<Timestamp>,
153 ) -> Option<nostr::Filter> {
154 let authors = pubkeys
155 .iter()
156 .filter_map(|pk_bytes| nostr::PublicKey::from_slice(pk_bytes).ok())
157 .collect::<Vec<_>>();
158 if authors.is_empty() {
159 return None;
160 }
161
162 let mut filter = nostr::Filter::new()
163 .authors(authors)
164 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList]);
165 if let Some(since) = since {
166 filter = filter.since(since);
167 }
168
169 Some(filter)
170 }
171
172 async fn fetch_graph_events_for_pubkeys(
173 &self,
174 client: &nostr_sdk::Client,
175 pubkeys: &[[u8; 32]],
176 since: Option<Timestamp>,
177 ) -> Vec<nostr::Event> {
178 let Some(filter) = Self::graph_filter_for_pubkeys(pubkeys, since) else {
179 return Vec::new();
180 };
181
182 match client.fetch_events(filter, GRAPH_FETCH_TIMEOUT).await {
183 Ok(events) => events.to_vec(),
184 Err(err) => {
185 tracing::debug!(
186 "Failed to fetch graph events for {} authors: {}",
187 pubkeys.len(),
188 err
189 );
190 Vec::new()
191 }
192 }
193 }
194
195 async fn fetch_contact_lists_for_pubkeys(
196 &self,
197 client: &nostr_sdk::Client,
198 pubkeys: &[[u8; 32]],
199 since: Option<Timestamp>,
200 shutdown_rx: &watch::Receiver<bool>,
201 ) {
202 let chunk_futures = stream::iter(
203 pubkeys
204 .chunks(self.author_batch_size)
205 .map(|chunk| chunk.to_vec())
206 .collect::<Vec<_>>(),
207 )
208 .map(|chunk| async move {
209 self.fetch_graph_events_for_pubkeys(client, &chunk, since)
210 .await
211 });
212
213 let mut in_flight = chunk_futures.buffer_unordered(self.concurrent_batches);
214 while let Some(events) = in_flight.next().await {
215 if *shutdown_rx.borrow() {
216 break;
217 }
218 if let Err(err) = super::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
219 {
220 tracing::debug!("Failed to ingest crawler graph batch: {}", err);
221 }
222 }
223 }
224
225 fn authors_to_fetch_at_distance(&self, distance: u32) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
226 let Ok(users) = self.graph_store.users_by_follow_distance(distance) else {
227 return (Vec::new(), Vec::new());
228 };
229 if self.full_recrawl {
230 return (users, Vec::new());
231 }
232
233 let mut new_authors = Vec::new();
234 let mut known_authors = Vec::new();
235 let refresh_known_authors = self.known_since.is_some();
236 for pk_bytes in users {
237 match self
238 .graph_store
239 .follow_list_created_at(&pk_bytes)
240 .ok()
241 .flatten()
242 {
243 Some(_) if refresh_known_authors => known_authors.push(pk_bytes),
244 Some(_) => {}
245 None => new_authors.push(pk_bytes),
246 }
247 }
248 (new_authors, known_authors)
249 }
250
251 async fn connect_client(&self) -> Option<nostr_sdk::Client> {
252 use nostr::nips::nip19::ToBech32;
253
254 let secret = self.keys.secret_key().to_bech32().unwrap_or_default();
255 let Ok(sdk_keys) = nostr_sdk::Keys::parse(&secret) else {
256 return None;
257 };
258
259 let mut relay_limits = nostr_sdk::pool::RelayLimits::default();
260 relay_limits.events.max_size = Some(SOCIALGRAPH_RELAY_EVENT_MAX_SIZE);
261 let client = nostr_sdk::Client::builder()
262 .signer(sdk_keys)
263 .opts(nostr_sdk::ClientOptions::new().relay_limits(relay_limits))
264 .build();
265 for relay in &self.relays {
266 if let Err(err) = client.add_relay(relay).await {
267 tracing::warn!("Failed to add relay {}: {}", relay, err);
268 }
269 }
270 let _ = tokio::time::timeout(RELAY_CONNECT_TIMEOUT, client.connect()).await;
271 Some(client)
272 }
273
274 async fn sync_graph_once(
275 &self,
276 client: &nostr_sdk::Client,
277 shutdown_rx: &watch::Receiver<bool>,
278 fetched_contact_lists: &mut HashSet<[u8; 32]>,
279 ) {
280 for distance in 0..=self.max_depth {
281 if *shutdown_rx.borrow() {
282 break;
283 }
284
285 let (new_authors, known_authors) = self.authors_to_fetch_at_distance(distance);
286 if new_authors.is_empty() && known_authors.is_empty() {
287 continue;
288 }
289
290 tracing::debug!(
291 "Social graph sync distance={} new_authors={} known_authors={}",
292 distance,
293 new_authors.len(),
294 known_authors.len()
295 );
296
297 if !new_authors.is_empty() {
298 for pk_bytes in &new_authors {
299 fetched_contact_lists.insert(*pk_bytes);
300 }
301 self.fetch_contact_lists_for_pubkeys(client, &new_authors, None, shutdown_rx)
302 .await;
303 }
304
305 if !known_authors.is_empty() {
306 for pk_bytes in &known_authors {
307 fetched_contact_lists.insert(*pk_bytes);
308 }
309 self.fetch_contact_lists_for_pubkeys(
310 client,
311 &known_authors,
312 self.known_since,
313 shutdown_rx,
314 )
315 .await;
316 }
317 }
318 }
319
320 pub async fn warm_once(&self) {
321 if self.relays.is_empty() {
322 tracing::warn!("Social graph crawler: no relays configured, skipping");
323 return;
324 }
325
326 let Some(client) = self.connect_client().await else {
327 return;
328 };
329 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
330 let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
331 self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
332 .await;
333 let _ = client.disconnect().await;
334 }
335
336 pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
337 let is_contact_list = event.kind == nostr::Kind::ContactList;
338 let is_mute_list = event.kind == nostr::Kind::MuteList;
339 if !is_contact_list && !is_mute_list {
340 return;
341 }
342
343 let pk_bytes = event.pubkey.to_bytes();
344 if self.is_within_social_graph(&pk_bytes) {
345 self.ingest_events_into(self.graph_store.as_ref(), std::slice::from_ref(event));
346 return;
347 }
348
349 if let Some(spambox) = &self.spambox {
350 self.ingest_events_into(spambox.as_ref(), std::slice::from_ref(event));
351 }
352 }
353
354 #[allow(deprecated)]
355 pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
356 use nostr_sdk::prelude::RelayPoolNotification;
357
358 if self.relays.is_empty() {
359 tracing::warn!("Social graph crawler: no relays configured, skipping");
360 return;
361 }
362
363 let mut shutdown_rx = shutdown_rx;
364 if *shutdown_rx.borrow() {
365 return;
366 }
367
368 let Some(client) = self.connect_client().await else {
369 return;
370 };
371
372 let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
373 self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
374 .await;
375
376 let filter = nostr::Filter::new()
377 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList])
378 .since(nostr::Timestamp::now());
379
380 let _ = client.subscribe(filter, None).await;
381
382 let mut notifications = client.notifications();
383 loop {
384 tokio::select! {
385 _ = shutdown_rx.changed() => {
386 if *shutdown_rx.borrow() {
387 break;
388 }
389 }
390 notification = notifications.recv() => {
391 match notification {
392 Ok(RelayPoolNotification::Event { event, .. }) => {
393 self.handle_incoming_event(&event);
394 let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
395 if !missing.is_empty() {
396 self.fetch_contact_lists_for_pubkeys(&client, &missing, None, &shutdown_rx).await;
397 }
398 }
399 Ok(_) => {}
400 Err(err) => {
401 tracing::warn!("Social graph crawler notification error: {}", err);
402 break;
403 }
404 }
405 }
406 }
407 }
408
409 let _ = client.disconnect().await;
410 }
411}
412
413pub fn spawn_social_graph_tasks(
414 graph_store: Arc<dyn SocialGraphBackend>,
415 keys: nostr::Keys,
416 relays: Vec<String>,
417 max_depth: u32,
418 spambox: Option<Arc<dyn SocialGraphBackend>>,
419 data_dir: PathBuf,
420) -> SocialGraphTaskHandles {
421 let (shutdown_tx, crawl_shutdown_rx) = watch::channel(false);
422 let local_list_shutdown_rx = crawl_shutdown_rx.clone();
423 let crawl_data_dir = data_dir.clone();
424 let local_list_data_dir = data_dir;
425
426 let crawl_handle = tokio::spawn({
427 let graph_store = Arc::clone(&graph_store);
428 let keys = keys.clone();
429 let relays = relays.clone();
430 let spambox = spambox.clone();
431 async move {
432 tokio::time::sleep(CRAWLER_STARTUP_DELAY).await;
433 let mut crawler = SocialGraphCrawler::new(graph_store.clone(), keys, relays, max_depth);
434 if let Some(spambox) = spambox {
435 crawler = crawler.with_spambox(spambox);
436 }
437 if let Err(err) =
438 sync_local_list_files_force(graph_store.as_ref(), &crawl_data_dir, &crawler.keys)
439 {
440 tracing::warn!(
441 "Failed to sync local social graph lists at startup: {}",
442 err
443 );
444 }
445 crawler.crawl(crawl_shutdown_rx).await;
446 }
447 });
448
449 let local_list_handle = tokio::spawn({
450 let graph_store = Arc::clone(&graph_store);
451 let keys = keys.clone();
452 let relays = relays.clone();
453 let spambox = spambox.clone();
454 async move {
455 let mut shutdown_rx = local_list_shutdown_rx;
456 let mut state =
457 read_local_list_file_state(&local_list_data_dir).unwrap_or_else(|err| {
458 tracing::warn!("Failed to read local social graph list state: {}", err);
459 LocalListFileState::default()
460 });
461 let mut interval = tokio::time::interval(LOCAL_LIST_POLL_INTERVAL);
462 loop {
463 tokio::select! {
464 _ = shutdown_rx.changed() => {
465 if *shutdown_rx.borrow() {
466 break;
467 }
468 }
469 _ = interval.tick() => {
470 let outcome = match sync_local_list_files_if_changed(
471 graph_store.as_ref(),
472 &local_list_data_dir,
473 &keys,
474 &mut state,
475 ) {
476 Ok(outcome) => outcome,
477 Err(err) => {
478 tracing::warn!("Failed to sync local social graph list files: {}", err);
479 continue;
480 }
481 };
482
483 if outcome.contacts_changed {
484 let mut crawler =
485 SocialGraphCrawler::new(graph_store.clone(), keys.clone(), relays.clone(), max_depth);
486 if let Some(spambox) = spambox.clone() {
487 crawler = crawler.with_spambox(spambox);
488 }
489 crawler.warm_once().await;
490 }
491 }
492 }
493 }
494 }
495 });
496
497 SocialGraphTaskHandles {
498 shutdown_tx,
499 crawl_handle,
500 local_list_handle,
501 }
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507 use std::net::TcpListener;
508 use std::sync::Mutex;
509 use std::time::Instant;
510
511 use futures::{SinkExt, StreamExt};
512 use nostr::{EventBuilder, JsonUtil, Kind, PublicKey, Tag};
513 use tempfile::TempDir;
514 use tokio::net::TcpStream;
515 use tokio::sync::broadcast;
516 use tokio_tungstenite::{accept_async, tungstenite::Message};
517
518 macro_rules! event_builder {
519 ($kind:expr, $content:expr $(,)?) => {
520 EventBuilder::new($kind, $content)
521 };
522 ($kind:expr, $content:expr, $tags:expr $(,)?) => {
523 EventBuilder::new($kind, $content).tags($tags)
524 };
525 }
526
527 #[derive(Debug, Default)]
528 struct RelayState {
529 events: Vec<nostr::Event>,
530 request_author_counts: Vec<usize>,
531 requested_authors: Vec<Vec<String>>,
532 }
533
534 struct TestRelay {
535 port: u16,
536 shutdown: broadcast::Sender<()>,
537 state: Arc<Mutex<RelayState>>,
538 }
539
540 impl TestRelay {
541 fn new(events: Vec<nostr::Event>) -> Self {
542 let state = Arc::new(Mutex::new(RelayState {
543 events,
544 request_author_counts: Vec::new(),
545 requested_authors: Vec::new(),
546 }));
547 let (shutdown, _) = broadcast::channel(1);
548
549 let std_listener = TcpListener::bind("127.0.0.1:0").expect("bind relay listener");
550 let port = std_listener.local_addr().expect("local addr").port();
551 std_listener
552 .set_nonblocking(true)
553 .expect("set listener nonblocking");
554
555 let state_for_thread = Arc::clone(&state);
556 let shutdown_for_thread = shutdown.clone();
557 std::thread::spawn(move || {
558 let runtime = tokio::runtime::Builder::new_multi_thread()
559 .worker_threads(2)
560 .enable_all()
561 .build()
562 .expect("build tokio runtime");
563 runtime.block_on(async move {
564 let listener = tokio::net::TcpListener::from_std(std_listener)
565 .expect("tokio listener from std");
566 let mut shutdown_rx = shutdown_for_thread.subscribe();
567
568 loop {
569 tokio::select! {
570 _ = shutdown_rx.recv() => break,
571 accept = listener.accept() => {
572 if let Ok((stream, _)) = accept {
573 let state = Arc::clone(&state_for_thread);
574 tokio::spawn(async move {
575 handle_connection(stream, state).await;
576 });
577 }
578 }
579 }
580 }
581 });
582 });
583
584 std::thread::sleep(Duration::from_millis(100));
585
586 Self {
587 port,
588 shutdown,
589 state,
590 }
591 }
592
593 fn url(&self) -> String {
594 format!("ws://127.0.0.1:{}", self.port)
595 }
596
597 fn request_author_counts(&self) -> Vec<usize> {
598 self.state
599 .lock()
600 .expect("relay state lock")
601 .request_author_counts
602 .clone()
603 }
604
605 fn requested_authors(&self) -> Vec<Vec<String>> {
606 self.state
607 .lock()
608 .expect("relay state lock")
609 .requested_authors
610 .clone()
611 }
612 }
613
614 impl Drop for TestRelay {
615 fn drop(&mut self) {
616 let _ = self.shutdown.send(());
617 std::thread::sleep(Duration::from_millis(50));
618 }
619 }
620
621 fn matching_events(
622 state: &Arc<Mutex<RelayState>>,
623 filters: &[nostr::Filter],
624 ) -> Vec<nostr::Event> {
625 let guard = state.lock().expect("relay state lock");
626 guard
627 .events
628 .iter()
629 .filter(|event| {
630 filters.is_empty()
631 || filters
632 .iter()
633 .any(|filter| filter.match_event(event, Default::default()))
634 })
635 .cloned()
636 .collect()
637 }
638
639 async fn send_relay_message(
640 write: &mut futures::stream::SplitSink<
641 tokio_tungstenite::WebSocketStream<TcpStream>,
642 Message,
643 >,
644 message: nostr::RelayMessage<'_>,
645 ) {
646 let _ = write.send(Message::Text(message.as_json())).await;
647 }
648
649 async fn handle_connection(stream: TcpStream, state: Arc<Mutex<RelayState>>) {
650 let ws_stream = match accept_async(stream).await {
651 Ok(ws) => ws,
652 Err(_) => return,
653 };
654 let (mut write, mut read) = ws_stream.split();
655
656 while let Some(message) = read.next().await {
657 let text = match message {
658 Ok(Message::Text(text)) => text,
659 Ok(Message::Ping(data)) => {
660 let _ = write.send(Message::Pong(data)).await;
661 continue;
662 }
663 Ok(Message::Close(_)) => break,
664 _ => continue,
665 };
666
667 let parsed = match nostr::ClientMessage::from_json(text.as_bytes()) {
668 Ok(message) => message,
669 Err(_) => continue,
670 };
671
672 match parsed {
673 nostr::ClientMessage::Req {
674 subscription_id,
675 filters,
676 } => {
677 let subscription_id = subscription_id.into_owned();
678 let filters = filters
679 .into_iter()
680 .map(|filter| filter.into_owned())
681 .collect::<Vec<_>>();
682 let author_count = filters
683 .iter()
684 .filter_map(|filter| filter.authors.as_ref())
685 .map(|authors| authors.len())
686 .sum();
687 let mut requested_authors = filters
688 .iter()
689 .filter_map(|filter| filter.authors.as_ref())
690 .flat_map(|authors| authors.iter().map(|author| author.to_hex()))
691 .collect::<Vec<_>>();
692 requested_authors.sort();
693 requested_authors.dedup();
694 {
695 let mut guard = state.lock().expect("relay state lock");
696 guard.request_author_counts.push(author_count);
697 guard.requested_authors.push(requested_authors);
698 }
699
700 for event in matching_events(&state, &filters) {
701 send_relay_message(
702 &mut write,
703 nostr::RelayMessage::event(subscription_id.clone(), event),
704 )
705 .await;
706 }
707 send_relay_message(&mut write, nostr::RelayMessage::eose(subscription_id))
708 .await;
709 }
710 nostr::ClientMessage::Close(subscription_id) => {
711 send_relay_message(
712 &mut write,
713 nostr::RelayMessage::closed(subscription_id.into_owned(), ""),
714 )
715 .await;
716 }
717 _ => {}
718 }
719 }
720 }
721
722 async fn wait_until<F>(timeout: Duration, mut condition: F)
723 where
724 F: FnMut() -> bool,
725 {
726 let start = Instant::now();
727 while start.elapsed() < timeout {
728 if condition() {
729 return;
730 }
731 tokio::time::sleep(Duration::from_millis(50)).await;
732 }
733 panic!("condition not met within {:?}", timeout);
734 }
735
736 fn write_contacts_file(path: &std::path::Path, pubkeys: &[String]) {
737 std::fs::write(
738 path,
739 serde_json::to_string(pubkeys).expect("serialize contacts"),
740 )
741 .expect("write contacts file");
742 }
743
744 #[tokio::test]
745 async fn test_crawler_routes_untrusted_to_spambox() {
746 let _guard = crate::socialgraph::test_lock();
747 let tmp = TempDir::new().unwrap();
748 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
749 let spambox =
750 crate::socialgraph::open_social_graph_store_at_path(&tmp.path().join("spambox"), None)
751 .unwrap();
752
753 let root_keys = nostr::Keys::generate();
754 let root_pk = root_keys.public_key().to_bytes();
755 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
756 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
757 let spambox_backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = spambox.clone();
758
759 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![], 2)
760 .with_spambox(spambox_backend);
761
762 let unknown_keys = nostr::Keys::generate();
763 let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
764 let event = event_builder!(Kind::ContactList, "")
765 .tags(vec![follow_tag])
766 .sign_with_keys(&unknown_keys)
767 .unwrap();
768
769 crawler.handle_incoming_event(&event);
770
771 let unknown_pk = unknown_keys.public_key().to_bytes();
772 assert!(crate::socialgraph::get_follows(&graph_store, &unknown_pk).is_empty());
773 assert_eq!(
774 crate::socialgraph::get_follows(&spambox, &unknown_pk),
775 vec![root_pk]
776 );
777 }
778
779 #[tokio::test]
780 #[allow(clippy::await_holding_lock)]
781 async fn test_crawler_batches_graph_fetches_by_author_chunk() {
782 let _guard = crate::socialgraph::test_lock();
783 let tmp = TempDir::new().unwrap();
784 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
785
786 let root_keys = nostr::Keys::generate();
787 let root_pk = root_keys.public_key().to_bytes();
788 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
789 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
790
791 let alice_keys = nostr::Keys::generate();
792 let bob_keys = nostr::Keys::generate();
793 let carol_keys = nostr::Keys::generate();
794
795 let root_event = event_builder!(
796 Kind::ContactList,
797 "",
798 vec![
799 Tag::public_key(alice_keys.public_key()),
800 Tag::public_key(bob_keys.public_key()),
801 ],
802 )
803 .custom_created_at(nostr::Timestamp::from(10))
804 .sign_with_keys(&root_keys)
805 .unwrap();
806 let alice_event = event_builder!(
807 Kind::ContactList,
808 "",
809 vec![Tag::public_key(carol_keys.public_key())],
810 )
811 .custom_created_at(nostr::Timestamp::from(11))
812 .sign_with_keys(&alice_keys)
813 .unwrap();
814 let bob_event = event_builder!(Kind::ContactList, "")
815 .custom_created_at(nostr::Timestamp::from(12))
816 .sign_with_keys(&bob_keys)
817 .unwrap();
818
819 let relay = TestRelay::new(vec![root_event, alice_event, bob_event]);
820 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
821 .with_author_batch_size(2);
822
823 let (shutdown_tx, shutdown_rx) = watch::channel(false);
824 let handle = tokio::spawn(async move {
825 crawler.crawl(shutdown_rx).await;
826 });
827
828 let alice_pk = alice_keys.public_key().to_bytes();
829 let bob_pk = bob_keys.public_key().to_bytes();
830 let carol_pk = carol_keys.public_key().to_bytes();
831 wait_until(Duration::from_secs(5), || {
832 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
833 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
834 root_follows.contains(&alice_pk)
835 && root_follows.contains(&bob_pk)
836 && alice_follows.contains(&carol_pk)
837 })
838 .await;
839
840 let _ = shutdown_tx.send(true);
841 handle.await.unwrap();
842
843 let author_counts = relay.request_author_counts();
844 assert!(
845 author_counts.iter().any(|count| *count >= 2),
846 "expected batched author REQ, got {:?}",
847 author_counts
848 );
849 }
850
851 #[tokio::test]
852 #[allow(clippy::await_holding_lock)]
853 async fn test_crawler_expands_from_existing_graph_without_root_refetch() {
854 let _guard = crate::socialgraph::test_lock();
855 let tmp = TempDir::new().unwrap();
856 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
857
858 let root_keys = nostr::Keys::generate();
859 let root_pk = root_keys.public_key().to_bytes();
860 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
861 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
862
863 let alice_keys = nostr::Keys::generate();
864 let bob_keys = nostr::Keys::generate();
865 let carol_keys = nostr::Keys::generate();
866
867 let root_event = event_builder!(
868 Kind::ContactList,
869 "",
870 vec![
871 Tag::public_key(alice_keys.public_key()),
872 Tag::public_key(bob_keys.public_key()),
873 ],
874 )
875 .custom_created_at(nostr::Timestamp::from(10))
876 .sign_with_keys(&root_keys)
877 .unwrap();
878 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
879
880 let alice_event = event_builder!(
881 Kind::ContactList,
882 "",
883 vec![Tag::public_key(carol_keys.public_key())],
884 )
885 .custom_created_at(nostr::Timestamp::from(11))
886 .sign_with_keys(&alice_keys)
887 .unwrap();
888 let bob_event = event_builder!(Kind::ContactList, "")
889 .custom_created_at(nostr::Timestamp::from(12))
890 .sign_with_keys(&bob_keys)
891 .unwrap();
892
893 let relay = TestRelay::new(vec![alice_event, bob_event]);
894 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
895 .with_author_batch_size(2);
896
897 let (shutdown_tx, shutdown_rx) = watch::channel(false);
898 let handle = tokio::spawn(async move {
899 crawler.crawl(shutdown_rx).await;
900 });
901
902 let alice_pk = alice_keys.public_key().to_bytes();
903 let carol_pk = carol_keys.public_key().to_bytes();
904 wait_until(Duration::from_secs(5), || {
905 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&carol_pk)
906 })
907 .await;
908
909 let _ = shutdown_tx.send(true);
910 handle.await.unwrap();
911
912 let author_counts = relay.request_author_counts();
913 let requested_authors = relay.requested_authors();
914 assert!(
915 requested_authors
916 .iter()
917 .flatten()
918 .all(|author| author != &root_keys.public_key().to_hex()),
919 "expected incremental crawl to skip root refetch, got {:?}",
920 requested_authors
921 );
922 assert!(
923 author_counts.iter().any(|count| *count >= 2),
924 "expected batched distance-1 REQ, got {:?}",
925 author_counts
926 );
927 }
928
929 #[tokio::test]
930 #[allow(clippy::await_holding_lock)]
931 async fn test_crawler_full_recrawl_refetches_root() {
932 let _guard = crate::socialgraph::test_lock();
933 let tmp = TempDir::new().unwrap();
934 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
935
936 let root_keys = nostr::Keys::generate();
937 let root_pk = root_keys.public_key().to_bytes();
938 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
939 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
940
941 let alice_keys = nostr::Keys::generate();
942 let bob_keys = nostr::Keys::generate();
943
944 let root_event = event_builder!(
945 Kind::ContactList,
946 "",
947 vec![
948 Tag::public_key(alice_keys.public_key()),
949 Tag::public_key(bob_keys.public_key()),
950 ],
951 )
952 .custom_created_at(nostr::Timestamp::from(10))
953 .sign_with_keys(&root_keys)
954 .unwrap();
955 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
956
957 let relay = TestRelay::new(vec![root_event]);
958 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
959 .with_full_recrawl(true);
960
961 let (shutdown_tx, shutdown_rx) = watch::channel(false);
962 let handle = tokio::spawn(async move {
963 crawler.crawl(shutdown_rx).await;
964 });
965
966 wait_until(Duration::from_secs(5), || {
967 relay.request_author_counts().contains(&1)
968 })
969 .await;
970
971 let _ = shutdown_tx.send(true);
972 handle.await.unwrap();
973
974 let requested_authors = relay.requested_authors();
975 assert!(
976 requested_authors
977 .iter()
978 .flatten()
979 .any(|author| author == &root_keys.public_key().to_hex()),
980 "expected full recrawl to refetch root, got {:?}",
981 requested_authors
982 );
983 }
984
985 #[tokio::test]
986 #[allow(clippy::await_holding_lock)]
987 async fn test_crawler_revisits_known_authors_with_since_cursor() {
988 let _guard = crate::socialgraph::test_lock();
989 let tmp = TempDir::new().unwrap();
990 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
991
992 let root_keys = nostr::Keys::generate();
993 let root_pk = root_keys.public_key().to_bytes();
994 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
995 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
996
997 let alice_keys = nostr::Keys::generate();
998 let carol_keys = nostr::Keys::generate();
999 let dave_keys = nostr::Keys::generate();
1000
1001 let root_event = event_builder!(
1002 Kind::ContactList,
1003 "",
1004 vec![Tag::public_key(alice_keys.public_key())],
1005 )
1006 .custom_created_at(nostr::Timestamp::from(10))
1007 .sign_with_keys(&root_keys)
1008 .unwrap();
1009 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
1010
1011 let alice_old_event = event_builder!(
1012 Kind::ContactList,
1013 "",
1014 vec![Tag::public_key(carol_keys.public_key())],
1015 )
1016 .custom_created_at(nostr::Timestamp::from(11))
1017 .sign_with_keys(&alice_keys)
1018 .unwrap();
1019 crate::socialgraph::ingest_parsed_event(&graph_store, &alice_old_event).unwrap();
1020
1021 let alice_new_event = event_builder!(
1022 Kind::ContactList,
1023 "",
1024 vec![
1025 Tag::public_key(carol_keys.public_key()),
1026 Tag::public_key(dave_keys.public_key()),
1027 ],
1028 )
1029 .custom_created_at(nostr::Timestamp::from(20))
1030 .sign_with_keys(&alice_keys)
1031 .unwrap();
1032
1033 let relay = TestRelay::new(vec![alice_new_event]);
1034 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1035 .with_author_batch_size(2)
1036 .with_known_since(Some(15));
1037
1038 let (shutdown_tx, shutdown_rx) = watch::channel(false);
1039 let handle = tokio::spawn(async move {
1040 crawler.crawl(shutdown_rx).await;
1041 });
1042
1043 let alice_pk = alice_keys.public_key().to_bytes();
1044 let dave_pk = dave_keys.public_key().to_bytes();
1045 wait_until(Duration::from_secs(5), || {
1046 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk)
1047 })
1048 .await;
1049
1050 let _ = shutdown_tx.send(true);
1051 handle.await.unwrap();
1052
1053 let requested_authors = relay.requested_authors();
1054 assert!(
1055 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk),
1056 "expected incremental crawl to refresh known author follow list"
1057 );
1058 assert!(
1059 requested_authors
1060 .iter()
1061 .flatten()
1062 .any(|author| author == &alice_keys.public_key().to_hex()),
1063 "expected crawler to refetch known author, got {:?}",
1064 requested_authors
1065 );
1066 }
1067
1068 #[tokio::test]
1069 #[allow(clippy::await_holding_lock)]
1070 async fn test_crawler_warm_once_completes_initial_sync_without_shutdown() {
1071 let _guard = crate::socialgraph::test_lock();
1072 let tmp = TempDir::new().unwrap();
1073 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1074
1075 let root_keys = nostr::Keys::generate();
1076 let root_pk = root_keys.public_key().to_bytes();
1077 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1078 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1079
1080 let alice_keys = nostr::Keys::generate();
1081 let bob_keys = nostr::Keys::generate();
1082
1083 let root_event = event_builder!(
1084 Kind::ContactList,
1085 "",
1086 vec![Tag::public_key(alice_keys.public_key())],
1087 )
1088 .custom_created_at(nostr::Timestamp::from(10))
1089 .sign_with_keys(&root_keys)
1090 .unwrap();
1091 let alice_event = event_builder!(
1092 Kind::ContactList,
1093 "",
1094 vec![Tag::public_key(bob_keys.public_key())],
1095 )
1096 .custom_created_at(nostr::Timestamp::from(11))
1097 .sign_with_keys(&alice_keys)
1098 .unwrap();
1099
1100 let relay = TestRelay::new(vec![root_event, alice_event]);
1101 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1102 .with_author_batch_size(2);
1103
1104 let started = std::time::Instant::now();
1105 crawler.warm_once().await;
1106
1107 let alice_pk = alice_keys.public_key().to_bytes();
1108 let bob_pk = bob_keys.public_key().to_bytes();
1109 assert!(
1110 started.elapsed() < Duration::from_secs(5),
1111 "warm_once should complete finite sync promptly"
1112 );
1113 assert!(
1114 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&bob_pk),
1115 "expected warm_once to ingest distance-1 follow list"
1116 );
1117 }
1118
1119 #[tokio::test]
1120 async fn test_crawler_warm_once_accepts_large_contact_list_events() {
1121 let _guard = crate::socialgraph::test_lock();
1122 let tmp = TempDir::new().unwrap();
1123 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1124
1125 let root_keys = nostr::Keys::generate();
1126 let root_pk = root_keys.public_key().to_bytes();
1127 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1128 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1129
1130 let followed_keys = (0..1_600)
1131 .map(|_| nostr::Keys::generate())
1132 .collect::<Vec<_>>();
1133 let tags = followed_keys
1134 .iter()
1135 .map(|keys| Tag::public_key(keys.public_key()))
1136 .collect::<Vec<_>>();
1137 let root_event = event_builder!(Kind::ContactList, "")
1138 .tags(tags)
1139 .custom_created_at(nostr::Timestamp::from(10))
1140 .sign_with_keys(&root_keys)
1141 .unwrap();
1142 assert!(
1143 root_event.as_json().len() > 70_000,
1144 "test event should exceed nostr-sdk default size limit"
1145 );
1146
1147 let relay = TestRelay::new(vec![root_event]);
1148 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
1149 .with_author_batch_size(1);
1150
1151 crawler.warm_once().await;
1152
1153 let follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1154 let first_pk = followed_keys
1155 .first()
1156 .expect("first followed key")
1157 .public_key()
1158 .to_bytes();
1159 let last_pk = followed_keys
1160 .last()
1161 .expect("last followed key")
1162 .public_key()
1163 .to_bytes();
1164 assert!(
1165 follows.contains(&first_pk) && follows.contains(&last_pk),
1166 "expected warm_once to ingest oversized contact list event"
1167 );
1168 }
1169
1170 #[tokio::test]
1171 #[allow(clippy::await_holding_lock)]
1172 async fn test_spawned_social_graph_tasks_sync_local_contacts_on_startup() {
1173 let _guard = crate::socialgraph::test_lock();
1174 let tmp = TempDir::new().unwrap();
1175 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1176
1177 let root_keys = nostr::Keys::generate();
1178 let root_pk = root_keys.public_key().to_bytes();
1179 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1180
1181 let alice_keys = nostr::Keys::generate();
1182 let bob_keys = nostr::Keys::generate();
1183
1184 write_contacts_file(
1185 &tmp.path().join("contacts.json"),
1186 &[alice_keys.public_key().to_hex()],
1187 );
1188
1189 let alice_event = event_builder!(
1190 Kind::ContactList,
1191 "",
1192 vec![Tag::public_key(bob_keys.public_key())],
1193 )
1194 .custom_created_at(nostr::Timestamp::from(11))
1195 .sign_with_keys(&alice_keys)
1196 .unwrap();
1197
1198 let relay = TestRelay::new(vec![alice_event]);
1199 let tasks = spawn_social_graph_tasks(
1200 graph_store.clone(),
1201 root_keys.clone(),
1202 vec![relay.url()],
1203 2,
1204 None,
1205 tmp.path().to_path_buf(),
1206 );
1207
1208 let alice_pk = alice_keys.public_key().to_bytes();
1209 let bob_pk = bob_keys.public_key().to_bytes();
1210 wait_until(Duration::from_secs(5), || {
1211 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1212 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1213 root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1214 })
1215 .await;
1216
1217 let _ = tasks.shutdown_tx.send(true);
1218 tasks.crawl_handle.abort();
1219 tasks.local_list_handle.abort();
1220 }
1221
1222 #[tokio::test]
1223 #[allow(clippy::await_holding_lock)]
1224 async fn test_spawned_social_graph_tasks_refresh_when_contacts_change() {
1225 let _guard = crate::socialgraph::test_lock();
1226 let tmp = TempDir::new().unwrap();
1227 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1228
1229 let root_keys = nostr::Keys::generate();
1230 let root_pk = root_keys.public_key().to_bytes();
1231 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1232
1233 let alice_keys = nostr::Keys::generate();
1234 let bob_keys = nostr::Keys::generate();
1235
1236 let alice_event = event_builder!(
1237 Kind::ContactList,
1238 "",
1239 vec![Tag::public_key(bob_keys.public_key())],
1240 )
1241 .custom_created_at(nostr::Timestamp::from(11))
1242 .sign_with_keys(&alice_keys)
1243 .unwrap();
1244
1245 let relay = TestRelay::new(vec![alice_event]);
1246 let tasks = spawn_social_graph_tasks(
1247 graph_store.clone(),
1248 root_keys.clone(),
1249 vec![relay.url()],
1250 2,
1251 None,
1252 tmp.path().to_path_buf(),
1253 );
1254
1255 let alice_pk = alice_keys.public_key().to_bytes();
1256 let bob_pk = bob_keys.public_key().to_bytes();
1257 assert!(!crate::socialgraph::get_follows(&graph_store, &root_pk).contains(&alice_pk));
1258
1259 write_contacts_file(
1260 &tmp.path().join("contacts.json"),
1261 &[alice_keys.public_key().to_hex()],
1262 );
1263
1264 wait_until(Duration::from_secs(5), || {
1265 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1266 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1267 root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1268 })
1269 .await;
1270
1271 let _ = tasks.shutdown_tx.send(true);
1272 tasks.crawl_handle.abort();
1273 tasks.local_list_handle.abort();
1274 }
1275}