Skip to main content

hashtree_cli/socialgraph/
crawler.rs

1use futures::stream::{self, StreamExt};
2use std::collections::HashSet;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::Duration;
6
7use nostr::Timestamp;
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use super::{
12    read_local_list_file_state, sync_local_list_files_force, sync_local_list_files_if_changed,
13    LocalListFileState, SocialGraphBackend,
14};
15
16const DEFAULT_AUTHOR_BATCH_SIZE: usize = 500;
17const DEFAULT_CONCURRENT_BATCHES: usize = 4;
18const GRAPH_FETCH_TIMEOUT: Duration = Duration::from_secs(15);
19const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
20#[cfg(not(test))]
21const CRAWLER_STARTUP_DELAY: Duration = Duration::from_secs(5);
22#[cfg(test)]
23const CRAWLER_STARTUP_DELAY: Duration = Duration::from_millis(50);
24#[cfg(not(test))]
25const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_secs(5);
26#[cfg(test)]
27const LOCAL_LIST_POLL_INTERVAL: Duration = Duration::from_millis(100);
28
29pub struct SocialGraphTaskHandles {
30    pub shutdown_tx: watch::Sender<bool>,
31    pub crawl_handle: JoinHandle<()>,
32    pub local_list_handle: JoinHandle<()>,
33}
34
35pub struct SocialGraphCrawler {
36    graph_store: Arc<dyn SocialGraphBackend>,
37    spambox: Option<Arc<dyn SocialGraphBackend>>,
38    keys: nostr::Keys,
39    relays: Vec<String>,
40    max_depth: u32,
41    author_batch_size: usize,
42    concurrent_batches: usize,
43    full_recrawl: bool,
44    known_since: Option<Timestamp>,
45}
46
47impl SocialGraphCrawler {
48    pub fn new(
49        graph_store: Arc<dyn SocialGraphBackend>,
50        keys: nostr::Keys,
51        relays: Vec<String>,
52        max_depth: u32,
53    ) -> Self {
54        Self {
55            graph_store,
56            spambox: None,
57            keys,
58            relays,
59            max_depth,
60            author_batch_size: DEFAULT_AUTHOR_BATCH_SIZE,
61            concurrent_batches: DEFAULT_CONCURRENT_BATCHES,
62            full_recrawl: false,
63            known_since: None,
64        }
65    }
66
67    pub fn with_spambox(mut self, spambox: Arc<dyn SocialGraphBackend>) -> Self {
68        self.spambox = Some(spambox);
69        self
70    }
71
72    pub fn with_author_batch_size(mut self, author_batch_size: usize) -> Self {
73        self.author_batch_size = author_batch_size.max(1);
74        self
75    }
76
77    pub fn with_concurrent_batches(mut self, concurrent_batches: usize) -> Self {
78        self.concurrent_batches = concurrent_batches.max(1);
79        self
80    }
81
82    pub fn with_full_recrawl(mut self, full_recrawl: bool) -> Self {
83        self.full_recrawl = full_recrawl;
84        self
85    }
86
87    pub fn with_known_since(mut self, known_since: Option<u64>) -> Self {
88        self.known_since = known_since.map(Timestamp::from);
89        self
90    }
91
92    fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
93        if pk_bytes == &self.keys.public_key().to_bytes() {
94            return true;
95        }
96
97        super::get_follow_distance(self.graph_store.as_ref(), pk_bytes)
98            .map(|distance| distance <= self.max_depth)
99            .unwrap_or(false)
100    }
101
102    fn ingest_events_into(
103        &self,
104        graph_store: &(impl SocialGraphBackend + ?Sized),
105        events: &[nostr::Event],
106    ) {
107        if let Err(err) = super::ingest_parsed_events(graph_store, events) {
108            tracing::debug!("Failed to ingest crawler event: {}", err);
109        }
110    }
111
112    #[allow(deprecated)]
113    fn collect_missing_root_follows(
114        &self,
115        event: &nostr::Event,
116        fetched_contact_lists: &mut HashSet<[u8; 32]>,
117    ) -> Vec<[u8; 32]> {
118        if self.max_depth < 2 || event.kind != nostr::Kind::ContactList {
119            return Vec::new();
120        }
121
122        let root_pk = self.keys.public_key().to_bytes();
123        if event.pubkey.to_bytes() != root_pk {
124            return Vec::new();
125        }
126
127        let mut missing = Vec::new();
128        for tag in event.tags.iter() {
129            if let Some(nostr::TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
130                let pk_bytes = public_key.to_bytes();
131                if fetched_contact_lists.contains(&pk_bytes) {
132                    continue;
133                }
134
135                let existing_follows = super::get_follows(self.graph_store.as_ref(), &pk_bytes);
136                if !existing_follows.is_empty() {
137                    fetched_contact_lists.insert(pk_bytes);
138                    continue;
139                }
140
141                fetched_contact_lists.insert(pk_bytes);
142                missing.push(pk_bytes);
143            }
144        }
145
146        missing
147    }
148
149    fn graph_filter_for_pubkeys(
150        pubkeys: &[[u8; 32]],
151        since: Option<Timestamp>,
152    ) -> Option<nostr::Filter> {
153        let authors = pubkeys
154            .iter()
155            .filter_map(|pk_bytes| nostr::PublicKey::from_slice(pk_bytes).ok())
156            .collect::<Vec<_>>();
157        if authors.is_empty() {
158            return None;
159        }
160
161        let mut filter = nostr::Filter::new()
162            .authors(authors)
163            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList]);
164        if let Some(since) = since {
165            filter = filter.since(since);
166        }
167
168        Some(filter)
169    }
170
171    async fn fetch_graph_events_for_pubkeys(
172        &self,
173        client: &nostr_sdk::Client,
174        pubkeys: &[[u8; 32]],
175        since: Option<Timestamp>,
176    ) -> Vec<nostr::Event> {
177        let Some(filter) = Self::graph_filter_for_pubkeys(pubkeys, since) else {
178            return Vec::new();
179        };
180
181        match client
182            .get_events_of(
183                vec![filter],
184                nostr_sdk::EventSource::relays(Some(GRAPH_FETCH_TIMEOUT)),
185            )
186            .await
187        {
188            Ok(events) => events,
189            Err(err) => {
190                tracing::debug!(
191                    "Failed to fetch graph events for {} authors: {}",
192                    pubkeys.len(),
193                    err
194                );
195                Vec::new()
196            }
197        }
198    }
199
200    async fn fetch_contact_lists_for_pubkeys(
201        &self,
202        client: &nostr_sdk::Client,
203        pubkeys: &[[u8; 32]],
204        since: Option<Timestamp>,
205        shutdown_rx: &watch::Receiver<bool>,
206    ) {
207        let chunk_futures = stream::iter(
208            pubkeys
209                .chunks(self.author_batch_size)
210                .map(|chunk| chunk.to_vec())
211                .collect::<Vec<_>>(),
212        )
213        .map(|chunk| async move {
214            self.fetch_graph_events_for_pubkeys(client, &chunk, since)
215                .await
216        });
217
218        let mut in_flight = chunk_futures.buffer_unordered(self.concurrent_batches);
219        while let Some(events) = in_flight.next().await {
220            if *shutdown_rx.borrow() {
221                break;
222            }
223            if let Err(err) = super::ingest_graph_parsed_events(self.graph_store.as_ref(), &events)
224            {
225                tracing::debug!("Failed to ingest crawler graph batch: {}", err);
226            }
227        }
228    }
229
230    fn authors_to_fetch_at_distance(&self, distance: u32) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
231        let Ok(users) = self.graph_store.users_by_follow_distance(distance) else {
232            return (Vec::new(), Vec::new());
233        };
234        if self.full_recrawl {
235            return (users, Vec::new());
236        }
237
238        let mut new_authors = Vec::new();
239        let mut known_authors = Vec::new();
240        let refresh_known_authors = self.known_since.is_some();
241        for pk_bytes in users {
242            match self
243                .graph_store
244                .follow_list_created_at(&pk_bytes)
245                .ok()
246                .flatten()
247            {
248                Some(_) if refresh_known_authors => known_authors.push(pk_bytes),
249                Some(_) => {}
250                None => new_authors.push(pk_bytes),
251            }
252        }
253        (new_authors, known_authors)
254    }
255
256    async fn connect_client(&self) -> Option<nostr_sdk::Client> {
257        use nostr::nips::nip19::ToBech32;
258
259        let Ok(sdk_keys) =
260            nostr_sdk::Keys::parse(self.keys.secret_key().to_bech32().unwrap_or_default())
261        else {
262            return None;
263        };
264
265        let client = nostr_sdk::Client::new(&sdk_keys);
266        for relay in &self.relays {
267            if let Err(err) = client.add_relay(relay).await {
268                tracing::warn!("Failed to add relay {}: {}", relay, err);
269            }
270        }
271        client.connect_with_timeout(RELAY_CONNECT_TIMEOUT).await;
272        Some(client)
273    }
274
275    async fn sync_graph_once(
276        &self,
277        client: &nostr_sdk::Client,
278        shutdown_rx: &watch::Receiver<bool>,
279        fetched_contact_lists: &mut HashSet<[u8; 32]>,
280    ) {
281        for distance in 0..=self.max_depth {
282            if *shutdown_rx.borrow() {
283                break;
284            }
285
286            let (new_authors, known_authors) = self.authors_to_fetch_at_distance(distance);
287            if new_authors.is_empty() && known_authors.is_empty() {
288                continue;
289            }
290
291            tracing::debug!(
292                "Social graph sync distance={} new_authors={} known_authors={}",
293                distance,
294                new_authors.len(),
295                known_authors.len()
296            );
297
298            if !new_authors.is_empty() {
299                for pk_bytes in &new_authors {
300                    fetched_contact_lists.insert(*pk_bytes);
301                }
302                self.fetch_contact_lists_for_pubkeys(client, &new_authors, None, shutdown_rx)
303                    .await;
304            }
305
306            if !known_authors.is_empty() {
307                for pk_bytes in &known_authors {
308                    fetched_contact_lists.insert(*pk_bytes);
309                }
310                self.fetch_contact_lists_for_pubkeys(
311                    client,
312                    &known_authors,
313                    self.known_since,
314                    shutdown_rx,
315                )
316                .await;
317            }
318        }
319    }
320
321    pub async fn warm_once(&self) {
322        if self.relays.is_empty() {
323            tracing::warn!("Social graph crawler: no relays configured, skipping");
324            return;
325        }
326
327        let Some(client) = self.connect_client().await else {
328            return;
329        };
330        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
331        let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
332        self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
333            .await;
334        let _ = client.disconnect().await;
335    }
336
337    pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
338        let is_contact_list = event.kind == nostr::Kind::ContactList;
339        let is_mute_list = event.kind == nostr::Kind::MuteList;
340        if !is_contact_list && !is_mute_list {
341            return;
342        }
343
344        let pk_bytes = event.pubkey.to_bytes();
345        if self.is_within_social_graph(&pk_bytes) {
346            self.ingest_events_into(self.graph_store.as_ref(), std::slice::from_ref(event));
347            return;
348        }
349
350        if let Some(spambox) = &self.spambox {
351            self.ingest_events_into(spambox.as_ref(), std::slice::from_ref(event));
352        }
353    }
354
355    #[allow(deprecated)]
356    pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
357        use nostr_sdk::prelude::RelayPoolNotification;
358
359        if self.relays.is_empty() {
360            tracing::warn!("Social graph crawler: no relays configured, skipping");
361            return;
362        }
363
364        let mut shutdown_rx = shutdown_rx;
365        if *shutdown_rx.borrow() {
366            return;
367        }
368
369        let Some(client) = self.connect_client().await else {
370            return;
371        };
372
373        let mut fetched_contact_lists: HashSet<[u8; 32]> = HashSet::new();
374        self.sync_graph_once(&client, &shutdown_rx, &mut fetched_contact_lists)
375            .await;
376
377        let filter = nostr::Filter::new()
378            .kinds(vec![nostr::Kind::ContactList, nostr::Kind::MuteList])
379            .since(nostr::Timestamp::now());
380
381        let _ = client.subscribe(vec![filter], None).await;
382
383        let mut notifications = client.notifications();
384        loop {
385            tokio::select! {
386                _ = shutdown_rx.changed() => {
387                    if *shutdown_rx.borrow() {
388                        break;
389                    }
390                }
391                notification = notifications.recv() => {
392                    match notification {
393                        Ok(RelayPoolNotification::Event { event, .. }) => {
394                            self.handle_incoming_event(&event);
395                            let missing = self.collect_missing_root_follows(&event, &mut fetched_contact_lists);
396                            if !missing.is_empty() {
397                                self.fetch_contact_lists_for_pubkeys(&client, &missing, None, &shutdown_rx).await;
398                            }
399                        }
400                        Ok(_) => {}
401                        Err(err) => {
402                            tracing::warn!("Social graph crawler notification error: {}", err);
403                            break;
404                        }
405                    }
406                }
407            }
408        }
409
410        let _ = client.disconnect().await;
411    }
412}
413
414pub fn spawn_social_graph_tasks(
415    graph_store: Arc<dyn SocialGraphBackend>,
416    keys: nostr::Keys,
417    relays: Vec<String>,
418    max_depth: u32,
419    spambox: Option<Arc<dyn SocialGraphBackend>>,
420    data_dir: PathBuf,
421) -> SocialGraphTaskHandles {
422    let (shutdown_tx, crawl_shutdown_rx) = watch::channel(false);
423    let local_list_shutdown_rx = crawl_shutdown_rx.clone();
424    let crawl_data_dir = data_dir.clone();
425    let local_list_data_dir = data_dir;
426
427    let crawl_handle = tokio::spawn({
428        let graph_store = Arc::clone(&graph_store);
429        let keys = keys.clone();
430        let relays = relays.clone();
431        let spambox = spambox.clone();
432        async move {
433            tokio::time::sleep(CRAWLER_STARTUP_DELAY).await;
434            let mut crawler = SocialGraphCrawler::new(graph_store.clone(), keys, relays, max_depth);
435            if let Some(spambox) = spambox {
436                crawler = crawler.with_spambox(spambox);
437            }
438            if let Err(err) =
439                sync_local_list_files_force(graph_store.as_ref(), &crawl_data_dir, &crawler.keys)
440            {
441                tracing::warn!(
442                    "Failed to sync local social graph lists at startup: {}",
443                    err
444                );
445            }
446            crawler.crawl(crawl_shutdown_rx).await;
447        }
448    });
449
450    let local_list_handle = tokio::spawn({
451        let graph_store = Arc::clone(&graph_store);
452        let keys = keys.clone();
453        let relays = relays.clone();
454        let spambox = spambox.clone();
455        async move {
456            let mut shutdown_rx = local_list_shutdown_rx;
457            let mut state =
458                read_local_list_file_state(&local_list_data_dir).unwrap_or_else(|err| {
459                    tracing::warn!("Failed to read local social graph list state: {}", err);
460                    LocalListFileState::default()
461                });
462            let mut interval = tokio::time::interval(LOCAL_LIST_POLL_INTERVAL);
463            loop {
464                tokio::select! {
465                    _ = shutdown_rx.changed() => {
466                        if *shutdown_rx.borrow() {
467                            break;
468                        }
469                    }
470                    _ = interval.tick() => {
471                        let outcome = match sync_local_list_files_if_changed(
472                            graph_store.as_ref(),
473                            &local_list_data_dir,
474                            &keys,
475                            &mut state,
476                        ) {
477                            Ok(outcome) => outcome,
478                            Err(err) => {
479                                tracing::warn!("Failed to sync local social graph list files: {}", err);
480                                continue;
481                            }
482                        };
483
484                        if outcome.contacts_changed {
485                            let mut crawler =
486                                SocialGraphCrawler::new(graph_store.clone(), keys.clone(), relays.clone(), max_depth);
487                            if let Some(spambox) = spambox.clone() {
488                                crawler = crawler.with_spambox(spambox);
489                            }
490                            crawler.warm_once().await;
491                        }
492                    }
493                }
494            }
495        }
496    });
497
498    SocialGraphTaskHandles {
499        shutdown_tx,
500        crawl_handle,
501        local_list_handle,
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508    use std::net::TcpListener;
509    use std::sync::Mutex;
510    use std::time::Instant;
511
512    use futures::{SinkExt, StreamExt};
513    use nostr::{EventBuilder, JsonUtil, Kind, PublicKey, Tag};
514    use tempfile::TempDir;
515    use tokio::net::TcpStream;
516    use tokio::sync::broadcast;
517    use tokio_tungstenite::{accept_async, tungstenite::Message};
518
519    #[derive(Debug, Default)]
520    struct RelayState {
521        events: Vec<nostr::Event>,
522        request_author_counts: Vec<usize>,
523        requested_authors: Vec<Vec<String>>,
524    }
525
526    struct TestRelay {
527        port: u16,
528        shutdown: broadcast::Sender<()>,
529        state: Arc<Mutex<RelayState>>,
530    }
531
532    impl TestRelay {
533        fn new(events: Vec<nostr::Event>) -> Self {
534            let state = Arc::new(Mutex::new(RelayState {
535                events,
536                request_author_counts: Vec::new(),
537                requested_authors: Vec::new(),
538            }));
539            let (shutdown, _) = broadcast::channel(1);
540
541            let std_listener = TcpListener::bind("127.0.0.1:0").expect("bind relay listener");
542            let port = std_listener.local_addr().expect("local addr").port();
543            std_listener
544                .set_nonblocking(true)
545                .expect("set listener nonblocking");
546
547            let state_for_thread = Arc::clone(&state);
548            let shutdown_for_thread = shutdown.clone();
549            std::thread::spawn(move || {
550                let runtime = tokio::runtime::Builder::new_multi_thread()
551                    .worker_threads(2)
552                    .enable_all()
553                    .build()
554                    .expect("build tokio runtime");
555                runtime.block_on(async move {
556                    let listener = tokio::net::TcpListener::from_std(std_listener)
557                        .expect("tokio listener from std");
558                    let mut shutdown_rx = shutdown_for_thread.subscribe();
559
560                    loop {
561                        tokio::select! {
562                            _ = shutdown_rx.recv() => break,
563                            accept = listener.accept() => {
564                                if let Ok((stream, _)) = accept {
565                                    let state = Arc::clone(&state_for_thread);
566                                    tokio::spawn(async move {
567                                        handle_connection(stream, state).await;
568                                    });
569                                }
570                            }
571                        }
572                    }
573                });
574            });
575
576            std::thread::sleep(Duration::from_millis(100));
577
578            Self {
579                port,
580                shutdown,
581                state,
582            }
583        }
584
585        fn url(&self) -> String {
586            format!("ws://127.0.0.1:{}", self.port)
587        }
588
589        fn request_author_counts(&self) -> Vec<usize> {
590            self.state
591                .lock()
592                .expect("relay state lock")
593                .request_author_counts
594                .clone()
595        }
596
597        fn requested_authors(&self) -> Vec<Vec<String>> {
598            self.state
599                .lock()
600                .expect("relay state lock")
601                .requested_authors
602                .clone()
603        }
604    }
605
606    impl Drop for TestRelay {
607        fn drop(&mut self) {
608            let _ = self.shutdown.send(());
609            std::thread::sleep(Duration::from_millis(50));
610        }
611    }
612
613    fn matching_events(
614        state: &Arc<Mutex<RelayState>>,
615        filters: &[nostr::Filter],
616    ) -> Vec<nostr::Event> {
617        let guard = state.lock().expect("relay state lock");
618        guard
619            .events
620            .iter()
621            .filter(|event| {
622                filters.is_empty() || filters.iter().any(|filter| filter.match_event(event))
623            })
624            .cloned()
625            .collect()
626    }
627
628    async fn send_relay_message(
629        write: &mut futures::stream::SplitSink<
630            tokio_tungstenite::WebSocketStream<TcpStream>,
631            Message,
632        >,
633        message: nostr::RelayMessage,
634    ) {
635        let _ = write.send(Message::Text(message.as_json())).await;
636    }
637
638    async fn handle_connection(stream: TcpStream, state: Arc<Mutex<RelayState>>) {
639        let ws_stream = match accept_async(stream).await {
640            Ok(ws) => ws,
641            Err(_) => return,
642        };
643        let (mut write, mut read) = ws_stream.split();
644
645        while let Some(message) = read.next().await {
646            let text = match message {
647                Ok(Message::Text(text)) => text,
648                Ok(Message::Ping(data)) => {
649                    let _ = write.send(Message::Pong(data)).await;
650                    continue;
651                }
652                Ok(Message::Close(_)) => break,
653                _ => continue,
654            };
655
656            let parsed = match nostr::ClientMessage::from_json(text.as_bytes()) {
657                Ok(message) => message,
658                Err(_) => continue,
659            };
660
661            match parsed {
662                nostr::ClientMessage::Req {
663                    subscription_id,
664                    filters,
665                } => {
666                    let author_count = filters
667                        .iter()
668                        .filter_map(|filter| filter.authors.as_ref())
669                        .map(|authors| authors.len())
670                        .sum();
671                    let mut requested_authors = filters
672                        .iter()
673                        .filter_map(|filter| filter.authors.as_ref())
674                        .flat_map(|authors| authors.iter().map(|author| author.to_hex()))
675                        .collect::<Vec<_>>();
676                    requested_authors.sort();
677                    requested_authors.dedup();
678                    {
679                        let mut guard = state.lock().expect("relay state lock");
680                        guard.request_author_counts.push(author_count);
681                        guard.requested_authors.push(requested_authors);
682                    }
683
684                    for event in matching_events(&state, &filters) {
685                        send_relay_message(
686                            &mut write,
687                            nostr::RelayMessage::event(subscription_id.clone(), event),
688                        )
689                        .await;
690                    }
691                    send_relay_message(&mut write, nostr::RelayMessage::eose(subscription_id))
692                        .await;
693                }
694                nostr::ClientMessage::Close(subscription_id) => {
695                    send_relay_message(
696                        &mut write,
697                        nostr::RelayMessage::closed(subscription_id, ""),
698                    )
699                    .await;
700                }
701                _ => {}
702            }
703        }
704    }
705
706    async fn wait_until<F>(timeout: Duration, mut condition: F)
707    where
708        F: FnMut() -> bool,
709    {
710        let start = Instant::now();
711        while start.elapsed() < timeout {
712            if condition() {
713                return;
714            }
715            tokio::time::sleep(Duration::from_millis(50)).await;
716        }
717        panic!("condition not met within {:?}", timeout);
718    }
719
720    fn write_contacts_file(path: &std::path::Path, pubkeys: &[String]) {
721        std::fs::write(
722            path,
723            serde_json::to_string(pubkeys).expect("serialize contacts"),
724        )
725        .expect("write contacts file");
726    }
727
728    #[tokio::test]
729    async fn test_crawler_routes_untrusted_to_spambox() {
730        let _guard = crate::socialgraph::test_lock();
731        let tmp = TempDir::new().unwrap();
732        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
733        let spambox =
734            crate::socialgraph::open_social_graph_store_at_path(&tmp.path().join("spambox"), None)
735                .unwrap();
736
737        let root_keys = nostr::Keys::generate();
738        let root_pk = root_keys.public_key().to_bytes();
739        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
740        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
741        let spambox_backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = spambox.clone();
742
743        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![], 2)
744            .with_spambox(spambox_backend);
745
746        let unknown_keys = nostr::Keys::generate();
747        let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
748        let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
749            .to_event(&unknown_keys)
750            .unwrap();
751
752        crawler.handle_incoming_event(&event);
753
754        let unknown_pk = unknown_keys.public_key().to_bytes();
755        assert!(crate::socialgraph::get_follows(&graph_store, &unknown_pk).is_empty());
756        assert_eq!(
757            crate::socialgraph::get_follows(&spambox, &unknown_pk),
758            vec![root_pk]
759        );
760    }
761
762    #[tokio::test]
763    #[allow(clippy::await_holding_lock)]
764    async fn test_crawler_batches_graph_fetches_by_author_chunk() {
765        let _guard = crate::socialgraph::test_lock();
766        let tmp = TempDir::new().unwrap();
767        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
768
769        let root_keys = nostr::Keys::generate();
770        let root_pk = root_keys.public_key().to_bytes();
771        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
772        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
773
774        let alice_keys = nostr::Keys::generate();
775        let bob_keys = nostr::Keys::generate();
776        let carol_keys = nostr::Keys::generate();
777
778        let root_event = EventBuilder::new(
779            Kind::ContactList,
780            "",
781            vec![
782                Tag::public_key(alice_keys.public_key()),
783                Tag::public_key(bob_keys.public_key()),
784            ],
785        )
786        .custom_created_at(nostr::Timestamp::from(10))
787        .to_event(&root_keys)
788        .unwrap();
789        let alice_event = EventBuilder::new(
790            Kind::ContactList,
791            "",
792            vec![Tag::public_key(carol_keys.public_key())],
793        )
794        .custom_created_at(nostr::Timestamp::from(11))
795        .to_event(&alice_keys)
796        .unwrap();
797        let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
798            .custom_created_at(nostr::Timestamp::from(12))
799            .to_event(&bob_keys)
800            .unwrap();
801
802        let relay = TestRelay::new(vec![root_event, alice_event, bob_event]);
803        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
804            .with_author_batch_size(2);
805
806        let (shutdown_tx, shutdown_rx) = watch::channel(false);
807        let handle = tokio::spawn(async move {
808            crawler.crawl(shutdown_rx).await;
809        });
810
811        let alice_pk = alice_keys.public_key().to_bytes();
812        let bob_pk = bob_keys.public_key().to_bytes();
813        let carol_pk = carol_keys.public_key().to_bytes();
814        wait_until(Duration::from_secs(5), || {
815            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
816            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
817            root_follows.contains(&alice_pk)
818                && root_follows.contains(&bob_pk)
819                && alice_follows.contains(&carol_pk)
820        })
821        .await;
822
823        let _ = shutdown_tx.send(true);
824        handle.await.unwrap();
825
826        let author_counts = relay.request_author_counts();
827        assert!(
828            author_counts.iter().any(|count| *count >= 2),
829            "expected batched author REQ, got {:?}",
830            author_counts
831        );
832    }
833
834    #[tokio::test]
835    #[allow(clippy::await_holding_lock)]
836    async fn test_crawler_expands_from_existing_graph_without_root_refetch() {
837        let _guard = crate::socialgraph::test_lock();
838        let tmp = TempDir::new().unwrap();
839        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
840
841        let root_keys = nostr::Keys::generate();
842        let root_pk = root_keys.public_key().to_bytes();
843        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
844        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
845
846        let alice_keys = nostr::Keys::generate();
847        let bob_keys = nostr::Keys::generate();
848        let carol_keys = nostr::Keys::generate();
849
850        let root_event = EventBuilder::new(
851            Kind::ContactList,
852            "",
853            vec![
854                Tag::public_key(alice_keys.public_key()),
855                Tag::public_key(bob_keys.public_key()),
856            ],
857        )
858        .custom_created_at(nostr::Timestamp::from(10))
859        .to_event(&root_keys)
860        .unwrap();
861        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
862
863        let alice_event = EventBuilder::new(
864            Kind::ContactList,
865            "",
866            vec![Tag::public_key(carol_keys.public_key())],
867        )
868        .custom_created_at(nostr::Timestamp::from(11))
869        .to_event(&alice_keys)
870        .unwrap();
871        let bob_event = EventBuilder::new(Kind::ContactList, "", vec![])
872            .custom_created_at(nostr::Timestamp::from(12))
873            .to_event(&bob_keys)
874            .unwrap();
875
876        let relay = TestRelay::new(vec![alice_event, bob_event]);
877        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
878            .with_author_batch_size(2);
879
880        let (shutdown_tx, shutdown_rx) = watch::channel(false);
881        let handle = tokio::spawn(async move {
882            crawler.crawl(shutdown_rx).await;
883        });
884
885        let alice_pk = alice_keys.public_key().to_bytes();
886        let carol_pk = carol_keys.public_key().to_bytes();
887        wait_until(Duration::from_secs(5), || {
888            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&carol_pk)
889        })
890        .await;
891
892        let _ = shutdown_tx.send(true);
893        handle.await.unwrap();
894
895        let author_counts = relay.request_author_counts();
896        let requested_authors = relay.requested_authors();
897        assert!(
898            requested_authors
899                .iter()
900                .flatten()
901                .all(|author| author != &root_keys.public_key().to_hex()),
902            "expected incremental crawl to skip root refetch, got {:?}",
903            requested_authors
904        );
905        assert!(
906            author_counts.iter().any(|count| *count >= 2),
907            "expected batched distance-1 REQ, got {:?}",
908            author_counts
909        );
910    }
911
912    #[tokio::test]
913    #[allow(clippy::await_holding_lock)]
914    async fn test_crawler_full_recrawl_refetches_root() {
915        let _guard = crate::socialgraph::test_lock();
916        let tmp = TempDir::new().unwrap();
917        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
918
919        let root_keys = nostr::Keys::generate();
920        let root_pk = root_keys.public_key().to_bytes();
921        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
922        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
923
924        let alice_keys = nostr::Keys::generate();
925        let bob_keys = nostr::Keys::generate();
926
927        let root_event = EventBuilder::new(
928            Kind::ContactList,
929            "",
930            vec![
931                Tag::public_key(alice_keys.public_key()),
932                Tag::public_key(bob_keys.public_key()),
933            ],
934        )
935        .custom_created_at(nostr::Timestamp::from(10))
936        .to_event(&root_keys)
937        .unwrap();
938        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
939
940        let relay = TestRelay::new(vec![root_event]);
941        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 1)
942            .with_full_recrawl(true);
943
944        let (shutdown_tx, shutdown_rx) = watch::channel(false);
945        let handle = tokio::spawn(async move {
946            crawler.crawl(shutdown_rx).await;
947        });
948
949        wait_until(Duration::from_secs(5), || {
950            relay.request_author_counts().contains(&1)
951        })
952        .await;
953
954        let _ = shutdown_tx.send(true);
955        handle.await.unwrap();
956
957        let requested_authors = relay.requested_authors();
958        assert!(
959            requested_authors
960                .iter()
961                .flatten()
962                .any(|author| author == &root_keys.public_key().to_hex()),
963            "expected full recrawl to refetch root, got {:?}",
964            requested_authors
965        );
966    }
967
968    #[tokio::test]
969    #[allow(clippy::await_holding_lock)]
970    async fn test_crawler_revisits_known_authors_with_since_cursor() {
971        let _guard = crate::socialgraph::test_lock();
972        let tmp = TempDir::new().unwrap();
973        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
974
975        let root_keys = nostr::Keys::generate();
976        let root_pk = root_keys.public_key().to_bytes();
977        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
978        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
979
980        let alice_keys = nostr::Keys::generate();
981        let carol_keys = nostr::Keys::generate();
982        let dave_keys = nostr::Keys::generate();
983
984        let root_event = EventBuilder::new(
985            Kind::ContactList,
986            "",
987            vec![Tag::public_key(alice_keys.public_key())],
988        )
989        .custom_created_at(nostr::Timestamp::from(10))
990        .to_event(&root_keys)
991        .unwrap();
992        crate::socialgraph::ingest_parsed_event(&graph_store, &root_event).unwrap();
993
994        let alice_old_event = EventBuilder::new(
995            Kind::ContactList,
996            "",
997            vec![Tag::public_key(carol_keys.public_key())],
998        )
999        .custom_created_at(nostr::Timestamp::from(11))
1000        .to_event(&alice_keys)
1001        .unwrap();
1002        crate::socialgraph::ingest_parsed_event(&graph_store, &alice_old_event).unwrap();
1003
1004        let alice_new_event = EventBuilder::new(
1005            Kind::ContactList,
1006            "",
1007            vec![
1008                Tag::public_key(carol_keys.public_key()),
1009                Tag::public_key(dave_keys.public_key()),
1010            ],
1011        )
1012        .custom_created_at(nostr::Timestamp::from(20))
1013        .to_event(&alice_keys)
1014        .unwrap();
1015
1016        let relay = TestRelay::new(vec![alice_new_event]);
1017        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1018            .with_author_batch_size(2)
1019            .with_known_since(Some(15));
1020
1021        let (shutdown_tx, shutdown_rx) = watch::channel(false);
1022        let handle = tokio::spawn(async move {
1023            crawler.crawl(shutdown_rx).await;
1024        });
1025
1026        let alice_pk = alice_keys.public_key().to_bytes();
1027        let dave_pk = dave_keys.public_key().to_bytes();
1028        wait_until(Duration::from_secs(5), || {
1029            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk)
1030        })
1031        .await;
1032
1033        let _ = shutdown_tx.send(true);
1034        handle.await.unwrap();
1035
1036        let requested_authors = relay.requested_authors();
1037        assert!(
1038            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&dave_pk),
1039            "expected incremental crawl to refresh known author follow list"
1040        );
1041        assert!(
1042            requested_authors
1043                .iter()
1044                .flatten()
1045                .any(|author| author == &alice_keys.public_key().to_hex()),
1046            "expected crawler to refetch known author, got {:?}",
1047            requested_authors
1048        );
1049    }
1050
1051    #[tokio::test]
1052    #[allow(clippy::await_holding_lock)]
1053    async fn test_crawler_warm_once_completes_initial_sync_without_shutdown() {
1054        let _guard = crate::socialgraph::test_lock();
1055        let tmp = TempDir::new().unwrap();
1056        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1057
1058        let root_keys = nostr::Keys::generate();
1059        let root_pk = root_keys.public_key().to_bytes();
1060        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1061        let backend: Arc<dyn crate::socialgraph::SocialGraphBackend> = graph_store.clone();
1062
1063        let alice_keys = nostr::Keys::generate();
1064        let bob_keys = nostr::Keys::generate();
1065
1066        let root_event = EventBuilder::new(
1067            Kind::ContactList,
1068            "",
1069            vec![Tag::public_key(alice_keys.public_key())],
1070        )
1071        .custom_created_at(nostr::Timestamp::from(10))
1072        .to_event(&root_keys)
1073        .unwrap();
1074        let alice_event = EventBuilder::new(
1075            Kind::ContactList,
1076            "",
1077            vec![Tag::public_key(bob_keys.public_key())],
1078        )
1079        .custom_created_at(nostr::Timestamp::from(11))
1080        .to_event(&alice_keys)
1081        .unwrap();
1082
1083        let relay = TestRelay::new(vec![root_event, alice_event]);
1084        let crawler = SocialGraphCrawler::new(backend, root_keys.clone(), vec![relay.url()], 2)
1085            .with_author_batch_size(2);
1086
1087        let started = std::time::Instant::now();
1088        crawler.warm_once().await;
1089
1090        let alice_pk = alice_keys.public_key().to_bytes();
1091        let bob_pk = bob_keys.public_key().to_bytes();
1092        assert!(
1093            started.elapsed() < Duration::from_secs(5),
1094            "warm_once should complete finite sync promptly"
1095        );
1096        assert!(
1097            crate::socialgraph::get_follows(&graph_store, &alice_pk).contains(&bob_pk),
1098            "expected warm_once to ingest distance-1 follow list"
1099        );
1100    }
1101
1102    #[tokio::test]
1103    #[allow(clippy::await_holding_lock)]
1104    async fn test_spawned_social_graph_tasks_sync_local_contacts_on_startup() {
1105        let _guard = crate::socialgraph::test_lock();
1106        let tmp = TempDir::new().unwrap();
1107        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1108
1109        let root_keys = nostr::Keys::generate();
1110        let root_pk = root_keys.public_key().to_bytes();
1111        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1112
1113        let alice_keys = nostr::Keys::generate();
1114        let bob_keys = nostr::Keys::generate();
1115
1116        write_contacts_file(
1117            &tmp.path().join("contacts.json"),
1118            &[alice_keys.public_key().to_hex()],
1119        );
1120
1121        let alice_event = EventBuilder::new(
1122            Kind::ContactList,
1123            "",
1124            vec![Tag::public_key(bob_keys.public_key())],
1125        )
1126        .custom_created_at(nostr::Timestamp::from(11))
1127        .to_event(&alice_keys)
1128        .unwrap();
1129
1130        let relay = TestRelay::new(vec![alice_event]);
1131        let tasks = spawn_social_graph_tasks(
1132            graph_store.clone(),
1133            root_keys.clone(),
1134            vec![relay.url()],
1135            2,
1136            None,
1137            tmp.path().to_path_buf(),
1138        );
1139
1140        let alice_pk = alice_keys.public_key().to_bytes();
1141        let bob_pk = bob_keys.public_key().to_bytes();
1142        wait_until(Duration::from_secs(5), || {
1143            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1144            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1145            root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1146        })
1147        .await;
1148
1149        let _ = tasks.shutdown_tx.send(true);
1150        tasks.crawl_handle.abort();
1151        tasks.local_list_handle.abort();
1152    }
1153
1154    #[tokio::test]
1155    #[allow(clippy::await_holding_lock)]
1156    async fn test_spawned_social_graph_tasks_refresh_when_contacts_change() {
1157        let _guard = crate::socialgraph::test_lock();
1158        let tmp = TempDir::new().unwrap();
1159        let graph_store = crate::socialgraph::open_social_graph_store(tmp.path()).unwrap();
1160
1161        let root_keys = nostr::Keys::generate();
1162        let root_pk = root_keys.public_key().to_bytes();
1163        crate::socialgraph::set_social_graph_root(&graph_store, &root_pk);
1164
1165        let alice_keys = nostr::Keys::generate();
1166        let bob_keys = nostr::Keys::generate();
1167
1168        let alice_event = EventBuilder::new(
1169            Kind::ContactList,
1170            "",
1171            vec![Tag::public_key(bob_keys.public_key())],
1172        )
1173        .custom_created_at(nostr::Timestamp::from(11))
1174        .to_event(&alice_keys)
1175        .unwrap();
1176
1177        let relay = TestRelay::new(vec![alice_event]);
1178        let tasks = spawn_social_graph_tasks(
1179            graph_store.clone(),
1180            root_keys.clone(),
1181            vec![relay.url()],
1182            2,
1183            None,
1184            tmp.path().to_path_buf(),
1185        );
1186
1187        let alice_pk = alice_keys.public_key().to_bytes();
1188        let bob_pk = bob_keys.public_key().to_bytes();
1189        assert!(!crate::socialgraph::get_follows(&graph_store, &root_pk).contains(&alice_pk));
1190
1191        write_contacts_file(
1192            &tmp.path().join("contacts.json"),
1193            &[alice_keys.public_key().to_hex()],
1194        );
1195
1196        wait_until(Duration::from_secs(5), || {
1197            let root_follows = crate::socialgraph::get_follows(&graph_store, &root_pk);
1198            let alice_follows = crate::socialgraph::get_follows(&graph_store, &alice_pk);
1199            root_follows.contains(&alice_pk) && alice_follows.contains(&bob_pk)
1200        })
1201        .await;
1202
1203        let _ = tasks.shutdown_tx.send(true);
1204        tasks.crawl_handle.abort();
1205        tasks.local_list_handle.abort();
1206    }
1207}