1use futures::stream::{self, StreamExt};
2use std::collections::HashSet;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use nostr::Timestamp;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::{
12 read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
13 LocalListFileState, SocialGraphBackend,
14};
15
16const DEFAULT_AUTHOR_BATCH_SIZE: usize = 500;
17const DEFAULT_CONCURRENT_BATCHES: usize = 4;
18const GRAPH_FETCH_TIMEOUT: Duration = Duration::from_secs(15);
19const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
20#[cfg(not(test))]
21const CRAWLER_STARTUP_DELAY: Duration = Duration::from_secs(5);
22#[cfg(test)]
23const CRAWLER_STARTUP_DELAY: Duration = Duration::from_millis(50);
24#[cfg(not(test))]
25const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_secs(5);
26#[cfg(test)]
27const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_millis(100);
28
29pub struct SocialGraphTaskHandles {
30 pub shutdown_tx: watch::Sender<bool>,
31 pub crawl_handle: JoinHandle<()>,
32 pub local_list_handle: JoinHandle<()>,
33}
34
35pub struct SocialGraphCrawler {
36 graph_store: Arc<dyn SocialGraphBackend>,
37 spambox: Option<Arc<dyn SocialGraphBackend>>,
38 keys: nostr::Keys,
39 relays: Vec<String>,
40 max_depth: u32,
41 author_batch_size: usize,
42 concurrent_batches: usize,
43 full_recrawl: bool,
44 known_since: Option<Timestamp>,
45}
46
47impl SocialGraphCrawler {
48 pub fn new(
49 graph_store: Arc<dyn SocialGraphBackend>,
50 keys: nostr::Keys,
51 relays: Vec<String>,
52 max_depth: u32,
53 ) -> Self {
54 Self {
55 graph_store,
56 spambox: None,
57 keys,
58 relays,
59 max_depth,
60 author_batch_size: DEFAULT_AUTHOR_BATCH_SIZE,
61 concurrent_batches: DEFAULT_CONCURRENT_BATCHES,
62 full_recrawl: false,
63 known_since: None,
64 }
65 }
66
67 pub fn with_spambox(mut self, spambox: Arc<dyn SocialGraphBackend>) -> Self {
68 self.spambox = Some(spambox);
69 self
70 }
71
72 pub fn with_author_batch_size(mut self, author_batch_size: usize) -> Self {
73 self.author_batch_size = author_batch_size.max(1);
74 self
75 }
76
77 pub fn with_concurrent_batches(mut self, concurrent_batches: usize) -> Self {
78 self.concurrent_batches = concurrent_batches.max(1);
79 self
80 }
81
82 pub fn with_full_recrawl(mut self, full_recrawl: bool) -> Self {
83 self.full_recrawl = full_recrawl;
84 self
85 }
86
87 pub fn with_known_since(mut self, known_since: Option<u64>) -> Self {
88 self.known_since = known_since.map(Timestamp::from);
89 self
90 }
91
92 fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
93 if pk_bytes == &self.keys.public_key().to_bytes() {
94 return true;
95 }
96
97 super::get_follow_distance(self.graph_store.as_ref(), pk_bytes)
98 .map(|distance| distance <= self.max_depth)
99 .unwrap_or(false)
100 }
101
102 fn ingest_events_into(
103 &self,
104 graph_store: &(impl SocialGraphBackend + ?Sized),
105 events: &[nostr::Event],
106 ) {
107 if let Err(err) = super::ingest_parsed_events(graph_store, events) {
108 tracing::debug!("Failed to ingest crawler event: {}", err);
109 }
110 }
111
112 #[allow(deprecated)]
113 fn collect_missing_root_follows(
114 &self,
115 event: &nostr::Event,
116 fetched_contact_lists: &mut HashSet<[u8; 32]>,
117 ) -> Vec<[u8; 32]> {
118 if self.max_depth < 2 || event.kind != nostr::Kind::ContactList {
119 return Vec::new();
120 }
121
122 let root_pk = self.keys.public_key().to_bytes();
123 if event.pubkey.to_bytes() != root_pk {
124 return Vec::new();
125 }
126
127 let mut missing = Vec::new();
128 for tag in event.tags.iter() {
129 if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
130 let pk_bytes = public_key.to_bytes();
131 if fetched_contact_lists.contains(&pk_bytes) {
132 continue;
133 }
134
135 let existing_follows = super::get_follows(self.graph_store.as_ref(), &pk_bytes);
136 if !existing_follows.is_empty() {
137 fetched_contact_lists.insert(pk_bytes);
138 continue;
139 }
140
141 fetched_contact_lists.insert(pk_bytes);
142 missing.push(pk_bytes);
143 }
144 }
145
146 missing
147 }
148
149 fn graph_filter_for_pubkeys(
150 pubkeys: &[[u8; 32]],
151 since: Option<Timestamp>,
152 ) -> Option<nostr::Filter> {
153 let authors = pubkeys
154 .iter()
155 .filter_map(|pk_bytes| nostr::PublicKey::from_slice(pk_bytes).ok())
156 .collect::<Vec<_>>();
157 if authors.is_empty() {
158 return None;
159 }
160
161 let mut filter = nostr::Filter::new()
162 .authors(authors)
163 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList]);
164 if let Some(since) = since {
165 filter = filter.since(since);
166 }
167
168 Some(filter)
169 }
170
171 async fn fetch_graph_events_for_pubkeys(
172 &self,
173 client: &nostr_sdk::Client,
174 pubkeys: &[[u8; 32]],
175 since: Option<Timestamp>,
176 ) -> Vec<nostr::Event> {
177 let Some(filter) = Self::graph_filter_for_pubkeys(pubkeys, since) else {
178 return Vec::new();
179 };
180
181 match client
182 .get_events_of(
183 vec![filter],
184 nostr_sdk::EventSource::relays(Some(GRAPH_FETCH_TIMEOUT)),
185 )
186 .await
187 {
188 Ok(events) => events,
189 Err(err) => {
190 tracing::debug!(
191 "Failed to fetch graph events for {} authors: {}",
192 pubkeys.len(),
193 err
194 );
195 Vec::new()
196 }
197 }
198 }
199
200 async fn fetch_contact_lists_for_pubkeys(
201 &self,
202 client: &nostr_sdk::Client,
203 pubkeys: &[[u8; 32]],
204 since: Option<Timestamp>,
205 shutdown_rx: &watch::Receiver<bool>,
206 ) {
207 let chunk_futures = stream::iter(
208 pubkeys
209 .chunks(self.author_batch_size)
210 .map(|chunk| chunk.to_vec())
211 .collect::<Vec<_>>(),
212 )
213 .map(|chunk| async move {
214 self.fetch_graph_events_for_pubkeys(client, &chunk, since)
215 .await
216 });
217
218 let mut in_flight = chunk_futures.buffer_unordered(self.concurrent_batches);
219 while let Some(events) = in_flight.next().await {
220 if *shutdown_rx.borrow() {
221 break;
222 }
223 if let Err(err) = super::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
224 {
225 tracing::debug!("Failed to ingest crawler graph batch: {}", err);
226 }
227 }
228 }
229
230 fn authors_to_fetch_at_distance(&self, distance: u32) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
231 let Ok(users) = self.graph_store.users_by_follow_distance(distance) else {
232 return (Vec::new(), Vec::new());
233 };
234 if self.full_recrawl {
235 return (users, Vec::new());
236 }
237
238 let mut new_authors = Vec::new();
239 let mut known_authors = Vec::new();
240 let refresh_known_authors = self.known_since.is_some();
241 for pk_bytes in users {
242 match self
243 .graph_store
244 .follow_list_created_at(&pk_bytes)
245 .ok()
246 .flatten()
247 {
248 Some(_) if refresh_known_authors => known_authors.push(pk_bytes),
249 Some(_) => {}
250 None => new_authors.push(pk_bytes),
251 }
252 }
253 (new_authors, known_authors)
254 }
255
256 async fn connect_client(&self) -> Option<nostr_sdk::Client> {
257 use nostr::nips::nip19::ToBech32;
258
259 let Ok(sdk_keys) =
260 nostr_sdk::Keys::parse(self.keys.secret_key().to_bech32().unwrap_or_default())
261 else {
262 return None;
263 };
264
265 let client = nostr_sdk::Client::new(&sdk_keys);
266 for relay in &self.relays {
267 if let Err(err) = client.add_relay(relay).await {
268 tracing::warn!("Failed to add relay {}: {}", relay, err);
269 }
270 }
271 client.connect_with_timeout(RELAY_CONNECT_TIMEOUT).await;
272 Some(client)
273 }
274
275 async fn sync_graph_once(
276 &self,
277 client: &nostr_sdk::Client,
278 shutdown_rx: &watch::Receiver<bool>,
279 fetched_contact_lists: &mut HashSet<[u8; 32]>,
280 ) {
281 for distance in 0..=self.max_depth {
282 if *shutdown_rx.borrow() {
283 break;
284 }
285
286 let (new_authors, known_authors) = self.authors_to_fetch_at_distance(distance);
287 if new_authors.is_empty() && known_authors.is_empty() {
288 continue;
289 }
290
291 tracing::debug!(
292 "Social graph sync distance={} new_authors={} known_authors={}",
293 distance,
294 new_authors.len(),
295 known_authors.len()
296 );
297
298 if !new_authors.is_empty() {
299 for pk_bytes in &new_authors {
300 fetched_contact_lists.insert(*pk_bytes);
301 }
302 self.fetch_contact_lists_for_pubkeys(client, &new_authors, None, shutdown_rx)
303 .await;
304 }
305
306 if !known_authors.is_empty() {
307 for pk_bytes in &known_authors {
308 fetched_contact_lists.insert(*pk_bytes);
309 }
310 self.fetch_contact_lists_for_pubkeys(
311 client,
312 &known_authors,
313 self.known_since,
314 shutdown_rx,
315 )
316 .await;
317 }
318 }
319 }
320
321 pub async fn warm_once(&self) {
322 if self.relays.is_empty() {
323 tracing::warn!("Social graph crawler: no relays configured, skipping");
324 return;
325 }
326
327 let Some(client) = self.connect_client().await else {
328 return;
329 };
330 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
331 let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
332 self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
333 .await;
334 let _ = client.disconnect().await;
335 }
336
337 pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
338 let is_contact_list = event.kind == nostr::Kind::ContactList;
339 let is_mute_list = event.kind == nostr::Kind::MuteList;
340 if !is_contact_list && !is_mute_list {
341 return;
342 }
343
344 let pk_bytes = event.pubkey.to_bytes();
345 if self.is_within_social_graph(&pk_bytes) {
346 self.ingest_events_into(self.graph_store.as_ref(), std::slice::from_ref(event));
347 return;
348 }
349
350 if let Some(spambox) = &self.spambox {
351 self.ingest_events_into(spambox.as_ref(), std::slice::from_ref(event));
352 }
353 }
354
355 #[allow(deprecated)]
356 pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
357 use nostr_sdk::prelude::RelayPoolNotification;
358
359 if self.relays.is_empty() {
360 tracing::warn!("Social graph crawler: no relays configured, skipping");
361 return;
362 }
363
364 let mut shutdown_rx = shutdown_rx;
365 if *shutdown_rx.borrow() {
366 return;
367 }
368
369 let Some(client) = self.connect_client().await else {
370 return;
371 };
372
373 let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
374 self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
375 .await;
376
377 let filter = nostr::Filter::new()
378 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList])
379 .since(nostr::Timestamp::now());
380
381 let _ = client.subscribe(vec![filter], None).await;
382
383 let mut notifications = client.notifications();
384 loop {
385 tokio::select! {
386 _ = shutdown_rx.changed() => {
387 if *shutdown_rx.borrow() {
388 break;
389 }
390 }
391 notification = notifications.recv() => {
392 match notification {
393 Ok(RelayPoolNotification::Event { event, .. }) => {
394 self.handle_incoming_event(&event);
395 let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
396 if !missing.is_empty() {
397 self.fetch_contact_lists_for_pubkeys(&client, &missing, None, &shutdown_rx).await;
398 }
399 }
400 Ok(_) => {}
401 Err(err) => {
402 tracing::warn!("Social graph crawler notification error: {}", err);
403 break;
404 }
405 }
406 }
407 }
408 }
409
410 let _ = client.disconnect().await;
411 }
412}
413
414pub fn spawn_social_graph_tasks(
415 graph_store: Arc<dyn SocialGraphBackend>,
416 keys: nostr::Keys,
417 relays: Vec<String>,
418 max_depth: u32,
419 spambox: Option<Arc<dyn SocialGraphBackend>>,
420 data_dir: PathBuf,
421) -> SocialGraphTaskHandles {
422 let (shutdown_tx, crawl_shutdown_rx) = watch::channel(false);
423 let local_list_shutdown_rx = crawl_shutdown_rx.clone();
424 let crawl_data_dir = data_dir.clone();
425 let local_list_data_dir = data_dir;
426
427 let crawl_handle = tokio::spawn({
428 let graph_store = Arc::clone(&graph_store);
429 let keys = keys.clone();
430 let relays = relays.clone();
431 let spambox = spambox.clone();
432 async move {
433 tokio::time::sleep(CRAWLER_STARTUP_DELAY).await;
434 let mut crawler = SocialGraphCrawler::new(graph_store.clone(), keys, relays, max_depth);
435 if let Some(spambox) = spambox {
436 crawler = crawler.with_spambox(spambox);
437 }
438 if let Err(err) =
439 sync_local_list_files_force(graph_store.as_ref(), &crawl_data_dir, &crawler.keys)
440 {
441 tracing::warn!(
442 "Failed to sync local social graph lists at startup: {}",
443 err
444 );
445 }
446 crawler.crawl(crawl_shutdown_rx).await;
447 }
448 });
449
450 let local_list_handle = tokio::spawn({
451 let graph_store = Arc::clone(&graph_store);
452 let keys = keys.clone();
453 let relays = relays.clone();
454 let spambox = spambox.clone();
455 async move {
456 let mut shutdown_rx = local_list_shutdown_rx;
457 let mut state =
458 read_local_list_file_state(&local_list_data_dir).unwrap_or_else(|err| {
459 tracing::warn!("Failed to read local social graph list state: {}", err);
460 LocalListFileState::default()
461 });
462 let mut interval = tokio::time::interval(LOCAL_LIST_POLL_INTERVAL);
463 loop {
464 tokio::select! {
465 _ = shutdown_rx.changed() => {
466 if *shutdown_rx.borrow() {
467 break;
468 }
469 }
470 _ = interval.tick() => {
471 let outcome = match sync_local_list_files_if_changed(
472 graph_store.as_ref(),
473 &local_list_data_dir,
474 &keys,
475 &mut state,
476 ) {
477 Ok(outcome) => outcome,
478 Err(err) => {
479 tracing::warn!("Failed to sync local social graph list files: {}", err);
480 continue;
481 }
482 };
483
484 if outcome.contacts_changed {
485 let mut crawler =
486 SocialGraphCrawler::new(graph_store.clone(), keys.clone(), relays.clone(), max_depth);
487 if let Some(spambox) = spambox.clone() {
488 crawler = crawler.with_spambox(spambox);
489 }
490 crawler.warm_once().await;
491 }
492 }
493 }
494 }
495 }
496 });
497
498 SocialGraphTaskHandles {
499 shutdown_tx,
500 crawl_handle,
501 local_list_handle,
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508 use std::net::TcpListener;
509 use std::sync::Mutex;
510 use std::time::Instant;
511
512 use futures::{SinkExt, StreamExt};
513 use nostr::{EventBuilder, JsonUtil, Kind, PublicKey, Tag};
514 use tempfile::TempDir;
515 use tokio::net::TcpStream;
516 use tokio::sync::broadcast;
517 use tokio_tungstenite::{accept_async, tungstenite::Message};
518
519 #[derive(Debug, Default)]
520 struct RelayState {
521 events: Vec<nostr::Event>,
522 request_author_counts: Vec<usize>,
523 requested_authors: Vec<Vec<String>>,
524 }
525
526 struct TestRelay {
527 port: u16,
528 shutdown: broadcast::Sender<()>,
529 state: Arc<Mutex<RelayState>>,
530 }
531
532 impl TestRelay {
533 fn new(events: Vec<nostr::Event>) -> Self {
534 let state = Arc::new(Mutex::new(RelayState {
535 events,
536 request_author_counts: Vec::new(),
537 requested_authors: Vec::new(),
538 }));
539 let (shutdown, _) = broadcast::channel(1);
540
541 let std_listener = TcpListener::bind("127.0.0.1:0").expect("bind relay listener");
542 let port = std_listener.local_addr().expect("local addr").port();
543 std_listener
544 .set_nonblocking(true)
545 .expect("set listener nonblocking");
546
547 let state_for_thread = Arc::clone(&state);
548 let shutdown_for_thread = shutdown.clone();
549 std::thread::spawn(move || {
550 let runtime = tokio::runtime::Builder::new_multi_thread()
551 .worker_threads(2)
552 .enable_all()
553 .build()
554 .expect("build tokio runtime");
555 runtime.block_on(async move {
556 let listener = tokio::net::TcpListener::from_std(std_listener)
557 .expect("tokio listener from std");
558 let mut shutdown_rx = shutdown_for_thread.subscribe();
559
560 loop {
561 tokio::select! {
562 _ = shutdown_rx.recv() => break,
563 accept = listener.accept() => {
564 if let Ok((stream, _)) = accept {
565 let state = Arc::clone(&state_for_thread);
566 tokio::spawn(async move {
567 handle_connection(stream, state).await;
568 });
569 }
570 }
571 }
572 }
573 });
574 });
575
576 std::thread::sleep(Duration::from_millis(100));
577
578 Self {
579 port,
580 shutdown,
581 state,
582 }
583 }
584
585 fn url(&self) -> String {
586 format!("ws://127.0.0.1:{}", self.port)
587 }
588
589 fn request_author_counts(&self) -> Vec<usize> {
590 self.state
591 .lock()
592 .expect("relay state lock")
593 .request_author_counts
594 .clone()
595 }
596
597 fn requested_authors(&self) -> Vec<Vec<String>> {
598 self.state
599 .lock()
600 .expect("relay state lock")
601 .requested_authors
602 .clone()
603 }
604 }
605
606 impl Drop for TestRelay {
607 fn drop(&mut self) {
608 let _ = self.shutdown.send(());
609 std::thread::sleep(Duration::from_millis(50));
610 }
611 }
612
613 fn matching_events(
614 state: &Arc<Mutex<RelayState>>,
615 filters: &[nostr::Filter],
616 ) -> Vec<nostr::Event> {
617 let guard = state.lock().expect("relay state lock");
618 guard
619 .events
620 .iter()
621 .filter(|event| {
622 filters.is_empty() || filters.iter().any(|filter| filter.match_event(event))
623 })
624 .cloned()
625 .collect()
626 }
627
628 async fn send_relay_message(
629 write: &mut futures::stream::SplitSink<
630 tokio_tungstenite::WebSocketStream<TcpStream>,
631 Message,
632 >,
633 message: nostr::RelayMessage,
634 ) {
635 let _ = write.send(Message::Text(message.as_json())).await;
636 }
637
638 async fn handle_connection(stream: TcpStream, state: Arc<Mutex<RelayState>>) {
639 let ws_stream = match accept_async(stream).await {
640 Ok(ws) => ws,
641 Err(_) => return,
642 };
643 let (mut write, mut read) = ws_stream.split();
644
645 while let Some(message) = read.next().await {
646 let text = match message {
647 Ok(Message::Text(text)) => text,
648 Ok(Message::Ping(data)) => {
649 let _ = write.send(Message::Pong(data)).await;
650 continue;
651 }
652 Ok(Message::Close(_)) => break,
653 _ => continue,
654 };
655
656 let parsed = match nostr::ClientMessage::from_json(text.as_bytes()) {
657 Ok(message) => message,
658 Err(_) => continue,
659 };
660
661 match parsed {
662 nostr::ClientMessage::Req {
663 subscription_id,
664 filters,
665 } => {
666 let author_count = filters
667 .iter()
668 .filter_map(|filter| filter.authors.as_ref())
669 .map(|authors| authors.len())
670 .sum();
671 let mut requested_authors = filters
672 .iter()
673 .filter_map(|filter| filter.authors.as_ref())
674 .flat_map(|authors| authors.iter().map(|author| author.to_hex()))
675 .collect::<Vec<_>>();
676 requested_authors.sort();
677 requested_authors.dedup();
678 {
679 let mut guard = state.lock().expect("relay state lock");
680 guard.request_author_counts.push(author_count);
681 guard.requested_authors.push(requested_authors);
682 }
683
684 for event in matching_events(&state, &filters) {
685 send_relay_message(
686 &mut write,
687 nostr::RelayMessage::event(subscription_id.clone(), event),
688 )
689 .await;
690 }
691 send_relay_message(&mut write, nostr::RelayMessage::eose(subscription_id))
692 .await;
693 }
694 nostr::ClientMessage::Close(subscription_id) => {
695 send_relay_message(
696 &mut write,
697 nostr::RelayMessage::closed(subscription_id, ""),
698 )
699 .await;
700 }
701 _ => {}
702 }
703 }
704 }
705
706 async fn wait_until<F>(timeout: Duration, mut condition: F)
707 where
708 F: FnMut() -> bool,
709 {
710 let start = Instant::now();
711 while start.elapsed() < timeout {
712 if condition() {
713 return;
714 }
715 tokio::time::sleep(Duration::from_millis(50)).await;
716 }
717 panic!("condition not met within {:?}", timeout);
718 }
719
720 fn write_contacts_file(path: &std::path::Path, pubkeys: &[String]) {
721 std::fs::write(
722 path,
723 serde_json::to_string(pubkeys).expect("serialize contacts"),
724 )
725 .expect("write contacts file");
726 }
727
728 #[tokio::test]
729 async fn test_crawler_routes_untrusted_to_spambox() {
730 let _guard = crate::socialgraph::test_lock();
731 let tmp = TempDir::new().unwrap();
732 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
733 let spambox =
734 crate::socialgraph::open_social_graph_store_at_path(&tmp.path().join("spambox"), None)
735 .unwrap();
736
737 let root_keys = nostr::Keys::generate();
738 let root_pk = root_keys.public_key().to_bytes();
739 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
740 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
741 let spambox_backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = spambox.clone();
742
743 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![], 2)
744 .with_spambox(spambox_backend);
745
746 let unknown_keys = nostr::Keys::generate();
747 let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
748 let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
749 .to_event(&unknown_keys)
750 .unwrap();
751
752 crawler.handle_incoming_event(&event);
753
754 let unknown_pk = unknown_keys.public_key().to_bytes();
755 assert!(crate::socialgraph::get_follows(&graph_store, &unknown_pk).is_empty());
756 assert_eq!(
757 crate::socialgraph::get_follows(&spambox, &unknown_pk),
758 vec![root_pk]
759 );
760 }
761
762 #[tokio::test]
763 #[allow(clippy::await_holding_lock)]
764 async fn test_crawler_batches_graph_fetches_by_author_chunk() {
765 let _guard = crate::socialgraph::test_lock();
766 let tmp = TempDir::new().unwrap();
767 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
768
769 let root_keys = nostr::Keys::generate();
770 let root_pk = root_keys.public_key().to_bytes();
771 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
772 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
773
774 let alice_keys = nostr::Keys::generate();
775 let bob_keys = nostr::Keys::generate();
776 let carol_keys = nostr::Keys::generate();
777
778 let root_event = EventBuilder::new(
779 Kind::ContactList,
780 "",
781 vec![
782 Tag::public_key(alice_keys.public_key()),
783 Tag::public_key(bob_keys.public_key()),
784 ],
785 )
786 .custom_created_at(nostr::Timestamp::from(10))
787 .to_event(&root_keys)
788 .unwrap();
789 let alice_event = EventBuilder::new(
790 Kind::ContactList,
791 "",
792 vec![Tag::public_key(carol_keys.public_key())],
793 )
794 .custom_created_at(nostr::Timestamp::from(11))
795 .to_event(&alice_keys)
796 .unwrap();
797 let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
798 .custom_created_at(nostr::Timestamp::from(12))
799 .to_event(&bob_keys)
800 .unwrap();
801
802 let relay = TestRelay::new(vec![root_event, alice_event, bob_event]);
803 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
804 .with_author_batch_size(2);
805
806 let (shutdown_tx, shutdown_rx) = watch::channel(false);
807 let handle = tokio::spawn(async move {
808 crawler.crawl(shutdown_rx).await;
809 });
810
811 let alice_pk = alice_keys.public_key().to_bytes();
812 let bob_pk = bob_keys.public_key().to_bytes();
813 let carol_pk = carol_keys.public_key().to_bytes();
814 wait_until(Duration::from_secs(5), || {
815 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
816 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
817 root_follows.contains(&alice_pk)
818 && root_follows.contains(&bob_pk)
819 && alice_follows.contains(&carol_pk)
820 })
821 .await;
822
823 let _ = shutdown_tx.send(true);
824 handle.await.unwrap();
825
826 let author_counts = relay.request_author_counts();
827 assert!(
828 author_counts.iter().any(|count| *count >= 2),
829 "expected batched author REQ, got {:?}",
830 author_counts
831 );
832 }
833
834 #[tokio::test]
835 #[allow(clippy::await_holding_lock)]
836 async fn test_crawler_expands_from_existing_graph_without_root_refetch() {
837 let _guard = crate::socialgraph::test_lock();
838 let tmp = TempDir::new().unwrap();
839 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
840
841 let root_keys = nostr::Keys::generate();
842 let root_pk = root_keys.public_key().to_bytes();
843 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
844 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
845
846 let alice_keys = nostr::Keys::generate();
847 let bob_keys = nostr::Keys::generate();
848 let carol_keys = nostr::Keys::generate();
849
850 let root_event = EventBuilder::new(
851 Kind::ContactList,
852 "",
853 vec![
854 Tag::public_key(alice_keys.public_key()),
855 Tag::public_key(bob_keys.public_key()),
856 ],
857 )
858 .custom_created_at(nostr::Timestamp::from(10))
859 .to_event(&root_keys)
860 .unwrap();
861 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
862
863 let alice_event = EventBuilder::new(
864 Kind::ContactList,
865 "",
866 vec![Tag::public_key(carol_keys.public_key())],
867 )
868 .custom_created_at(nostr::Timestamp::from(11))
869 .to_event(&alice_keys)
870 .unwrap();
871 let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
872 .custom_created_at(nostr::Timestamp::from(12))
873 .to_event(&bob_keys)
874 .unwrap();
875
876 let relay = TestRelay::new(vec![alice_event, bob_event]);
877 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
878 .with_author_batch_size(2);
879
880 let (shutdown_tx, shutdown_rx) = watch::channel(false);
881 let handle = tokio::spawn(async move {
882 crawler.crawl(shutdown_rx).await;
883 });
884
885 let alice_pk = alice_keys.public_key().to_bytes();
886 let carol_pk = carol_keys.public_key().to_bytes();
887 wait_until(Duration::from_secs(5), || {
888 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&carol_pk)
889 })
890 .await;
891
892 let _ = shutdown_tx.send(true);
893 handle.await.unwrap();
894
895 let author_counts = relay.request_author_counts();
896 let requested_authors = relay.requested_authors();
897 assert!(
898 requested_authors
899 .iter()
900 .flatten()
901 .all(|author| author != &root_keys.public_key().to_hex()),
902 "expected incremental crawl to skip root refetch, got {:?}",
903 requested_authors
904 );
905 assert!(
906 author_counts.iter().any(|count| *count >= 2),
907 "expected batched distance-1 REQ, got {:?}",
908 author_counts
909 );
910 }
911
912 #[tokio::test]
913 #[allow(clippy::await_holding_lock)]
914 async fn test_crawler_full_recrawl_refetches_root() {
915 let _guard = crate::socialgraph::test_lock();
916 let tmp = TempDir::new().unwrap();
917 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
918
919 let root_keys = nostr::Keys::generate();
920 let root_pk = root_keys.public_key().to_bytes();
921 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
922 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
923
924 let alice_keys = nostr::Keys::generate();
925 let bob_keys = nostr::Keys::generate();
926
927 let root_event = EventBuilder::new(
928 Kind::ContactList,
929 "",
930 vec![
931 Tag::public_key(alice_keys.public_key()),
932 Tag::public_key(bob_keys.public_key()),
933 ],
934 )
935 .custom_created_at(nostr::Timestamp::from(10))
936 .to_event(&root_keys)
937 .unwrap();
938 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
939
940 let relay = TestRelay::new(vec![root_event]);
941 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
942 .with_full_recrawl(true);
943
944 let (shutdown_tx, shutdown_rx) = watch::channel(false);
945 let handle = tokio::spawn(async move {
946 crawler.crawl(shutdown_rx).await;
947 });
948
949 wait_until(Duration::from_secs(5), || {
950 relay.request_author_counts().contains(&1)
951 })
952 .await;
953
954 let _ = shutdown_tx.send(true);
955 handle.await.unwrap();
956
957 let requested_authors = relay.requested_authors();
958 assert!(
959 requested_authors
960 .iter()
961 .flatten()
962 .any(|author| author == &root_keys.public_key().to_hex()),
963 "expected full recrawl to refetch root, got {:?}",
964 requested_authors
965 );
966 }
967
968 #[tokio::test]
969 #[allow(clippy::await_holding_lock)]
970 async fn test_crawler_revisits_known_authors_with_since_cursor() {
971 let _guard = crate::socialgraph::test_lock();
972 let tmp = TempDir::new().unwrap();
973 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
974
975 let root_keys = nostr::Keys::generate();
976 let root_pk = root_keys.public_key().to_bytes();
977 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
978 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
979
980 let alice_keys = nostr::Keys::generate();
981 let carol_keys = nostr::Keys::generate();
982 let dave_keys = nostr::Keys::generate();
983
984 let root_event = EventBuilder::new(
985 Kind::ContactList,
986 "",
987 vec![Tag::public_key(alice_keys.public_key())],
988 )
989 .custom_created_at(nostr::Timestamp::from(10))
990 .to_event(&root_keys)
991 .unwrap();
992 crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
993
994 let alice_old_event = EventBuilder::new(
995 Kind::ContactList,
996 "",
997 vec![Tag::public_key(carol_keys.public_key())],
998 )
999 .custom_created_at(nostr::Timestamp::from(11))
1000 .to_event(&alice_keys)
1001 .unwrap();
1002 crate::socialgraph::ingest_parsed_event(&graph_store, &alice_old_event).unwrap();
1003
1004 let alice_new_event = EventBuilder::new(
1005 Kind::ContactList,
1006 "",
1007 vec![
1008 Tag::public_key(carol_keys.public_key()),
1009 Tag::public_key(dave_keys.public_key()),
1010 ],
1011 )
1012 .custom_created_at(nostr::Timestamp::from(20))
1013 .to_event(&alice_keys)
1014 .unwrap();
1015
1016 let relay = TestRelay::new(vec![alice_new_event]);
1017 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1018 .with_author_batch_size(2)
1019 .with_known_since(Some(15));
1020
1021 let (shutdown_tx, shutdown_rx) = watch::channel(false);
1022 let handle = tokio::spawn(async move {
1023 crawler.crawl(shutdown_rx).await;
1024 });
1025
1026 let alice_pk = alice_keys.public_key().to_bytes();
1027 let dave_pk = dave_keys.public_key().to_bytes();
1028 wait_until(Duration::from_secs(5), || {
1029 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk)
1030 })
1031 .await;
1032
1033 let _ = shutdown_tx.send(true);
1034 handle.await.unwrap();
1035
1036 let requested_authors = relay.requested_authors();
1037 assert!(
1038 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk),
1039 "expected incremental crawl to refresh known author follow list"
1040 );
1041 assert!(
1042 requested_authors
1043 .iter()
1044 .flatten()
1045 .any(|author| author == &alice_keys.public_key().to_hex()),
1046 "expected crawler to refetch known author, got {:?}",
1047 requested_authors
1048 );
1049 }
1050
1051 #[tokio::test]
1052 #[allow(clippy::await_holding_lock)]
1053 async fn test_crawler_warm_once_completes_initial_sync_without_shutdown() {
1054 let _guard = crate::socialgraph::test_lock();
1055 let tmp = TempDir::new().unwrap();
1056 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1057
1058 let root_keys = nostr::Keys::generate();
1059 let root_pk = root_keys.public_key().to_bytes();
1060 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1061 let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1062
1063 let alice_keys = nostr::Keys::generate();
1064 let bob_keys = nostr::Keys::generate();
1065
1066 let root_event = EventBuilder::new(
1067 Kind::ContactList,
1068 "",
1069 vec![Tag::public_key(alice_keys.public_key())],
1070 )
1071 .custom_created_at(nostr::Timestamp::from(10))
1072 .to_event(&root_keys)
1073 .unwrap();
1074 let alice_event = EventBuilder::new(
1075 Kind::ContactList,
1076 "",
1077 vec![Tag::public_key(bob_keys.public_key())],
1078 )
1079 .custom_created_at(nostr::Timestamp::from(11))
1080 .to_event(&alice_keys)
1081 .unwrap();
1082
1083 let relay = TestRelay::new(vec![root_event, alice_event]);
1084 let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1085 .with_author_batch_size(2);
1086
1087 let started = std::time::Instant::now();
1088 crawler.warm_once().await;
1089
1090 let alice_pk = alice_keys.public_key().to_bytes();
1091 let bob_pk = bob_keys.public_key().to_bytes();
1092 assert!(
1093 started.elapsed() < Duration::from_secs(5),
1094 "warm_once should complete finite sync promptly"
1095 );
1096 assert!(
1097 crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&bob_pk),
1098 "expected warm_once to ingest distance-1 follow list"
1099 );
1100 }
1101
1102 #[tokio::test]
1103 #[allow(clippy::await_holding_lock)]
1104 async fn test_spawned_social_graph_tasks_sync_local_contacts_on_startup() {
1105 let _guard = crate::socialgraph::test_lock();
1106 let tmp = TempDir::new().unwrap();
1107 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1108
1109 let root_keys = nostr::Keys::generate();
1110 let root_pk = root_keys.public_key().to_bytes();
1111 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1112
1113 let alice_keys = nostr::Keys::generate();
1114 let bob_keys = nostr::Keys::generate();
1115
1116 write_contacts_file(
1117 &tmp.path().join("contacts.json"),
1118 &[alice_keys.public_key().to_hex()],
1119 );
1120
1121 let alice_event = EventBuilder::new(
1122 Kind::ContactList,
1123 "",
1124 vec![Tag::public_key(bob_keys.public_key())],
1125 )
1126 .custom_created_at(nostr::Timestamp::from(11))
1127 .to_event(&alice_keys)
1128 .unwrap();
1129
1130 let relay = TestRelay::new(vec![alice_event]);
1131 let tasks = spawn_social_graph_tasks(
1132 graph_store.clone(),
1133 root_keys.clone(),
1134 vec![relay.url()],
1135 2,
1136 None,
1137 tmp.path().to_path_buf(),
1138 );
1139
1140 let alice_pk = alice_keys.public_key().to_bytes();
1141 let bob_pk = bob_keys.public_key().to_bytes();
1142 wait_until(Duration::from_secs(5), || {
1143 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1144 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1145 root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1146 })
1147 .await;
1148
1149 let _ = tasks.shutdown_tx.send(true);
1150 tasks.crawl_handle.abort();
1151 tasks.local_list_handle.abort();
1152 }
1153
1154 #[tokio::test]
1155 #[allow(clippy::await_holding_lock)]
1156 async fn test_spawned_social_graph_tasks_refresh_when_contacts_change() {
1157 let _guard = crate::socialgraph::test_lock();
1158 let tmp = TempDir::new().unwrap();
1159 let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1160
1161 let root_keys = nostr::Keys::generate();
1162 let root_pk = root_keys.public_key().to_bytes();
1163 crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1164
1165 let alice_keys = nostr::Keys::generate();
1166 let bob_keys = nostr::Keys::generate();
1167
1168 let alice_event = EventBuilder::new(
1169 Kind::ContactList,
1170 "",
1171 vec![Tag::public_key(bob_keys.public_key())],
1172 )
1173 .custom_created_at(nostr::Timestamp::from(11))
1174 .to_event(&alice_keys)
1175 .unwrap();
1176
1177 let relay = TestRelay::new(vec![alice_event]);
1178 let tasks = spawn_social_graph_tasks(
1179 graph_store.clone(),
1180 root_keys.clone(),
1181 vec![relay.url()],
1182 2,
1183 None,
1184 tmp.path().to_path_buf(),
1185 );
1186
1187 let alice_pk = alice_keys.public_key().to_bytes();
1188 let bob_pk = bob_keys.public_key().to_bytes();
1189 assert!(!crate::socialgraph::get_follows(&graph_store, &root_pk).contains(&alice_pk));
1190
1191 write_contacts_file(
1192 &tmp.path().join("contacts.json"),
1193 &[alice_keys.public_key().to_hex()],
1194 );
1195
1196 wait_until(Duration::from_secs(5), || {
1197 let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1198 let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1199 root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1200 })
1201 .await;
1202
1203 let _ = tasks.shutdown_tx.send(true);
1204 tasks.crawl_handle.abort();
1205 tasks.local_list_handle.abort();
1206 }
1207}