hashtree_cli/nostrdb_integration/
crawler.rs1use nostrdb_social::Ndb;
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::watch;
8
9pub struct SocialGraphCrawler {
12 ndb: Arc<Ndb>,
13 spambox: Option<Arc<Ndb>>,
14 keys: nostr::Keys,
15 relays: Vec<String>,
16 max_depth: u32,
17}
18
19impl SocialGraphCrawler {
20 pub fn new(
21 ndb: Arc<Ndb>,
22 keys: nostr::Keys,
23 relays: Vec<String>,
24 max_depth: u32,
25 ) -> Self {
26 Self {
27 ndb,
28 spambox: None,
29 keys,
30 relays,
31 max_depth,
32 }
33 }
34
35 pub fn with_spambox(mut self, spambox: Arc<Ndb>) -> Self {
36 self.spambox = Some(spambox);
37 self
38 }
39
40 fn is_within_social_graph(&self, pk_bytes: &[u8; 32]) -> bool {
41 if pk_bytes == &self.keys.public_key().to_bytes() {
42 return true;
43 }
44 super::get_follow_distance(&self.ndb, pk_bytes)
45 .map(|distance| distance <= self.max_depth)
46 .unwrap_or(false)
47 }
48
49 fn ingest_event_into(&self, ndb: &Ndb, sub_id: &str, event: &nostr::Event) {
50 if let Ok(json) = serde_json::to_string(event) {
51 super::ingest_event(ndb, sub_id, &json);
52 }
53 }
54
55 pub(crate) fn handle_incoming_event(&self, event: &nostr::Event) {
56 let is_contact_list = event.kind == nostr::Kind::ContactList;
57 let is_mute_list = event.kind == nostr::Kind::Custom(10000);
58 if !is_contact_list && !is_mute_list {
59 return;
60 }
61
62 let pk_bytes = event.pubkey.to_bytes();
63 if self.is_within_social_graph(&pk_bytes) {
64 self.ingest_event_into(&self.ndb, "live", event);
65 return;
66 }
67
68 if let Some(spambox) = &self.spambox {
69 self.ingest_event_into(spambox, "spambox", event);
70 } else {
71 tracing::debug!(
72 "Social graph crawler: dropping untrusted {} from {}...",
73 if is_contact_list { "contact list" } else { "mute list" },
74 &event.pubkey.to_hex()[..8.min(event.pubkey.to_hex().len())]
75 );
76 }
77 }
78
79 #[allow(deprecated)] pub async fn crawl(&self, shutdown_rx: watch::Receiver<bool>) {
83 use nostr::nips::nip19::ToBech32;
84 use nostr_sdk::prelude::RelayPoolNotification;
85
86 if self.relays.is_empty() {
87 tracing::warn!("Social graph crawler: no relays configured, skipping");
88 return;
89 }
90
91 let mut shutdown_rx = shutdown_rx;
92 if *shutdown_rx.borrow() {
93 tracing::info!("Social graph crawler: shutdown requested before start");
94 return;
95 }
96
97 tracing::info!(
98 "Starting social graph crawl (max_depth={}, relays={})",
99 self.max_depth,
100 self.relays.len()
101 );
102
103 let sdk_keys = match nostr_sdk::Keys::parse(
104 &self.keys.secret_key().to_bech32().unwrap_or_default(),
105 ) {
106 Ok(k) => k,
107 Err(e) => {
108 tracing::error!("Failed to parse keys for crawler: {}", e);
109 return;
110 }
111 };
112
113 let client = nostr_sdk::Client::new(&sdk_keys);
114
115 for relay in &self.relays {
116 if let Err(e) = client.add_relay(relay).await {
117 tracing::warn!("Failed to add relay {}: {}", relay, e);
118 }
119 }
120 client.connect().await;
121
122 let root_pk = self.keys.public_key().to_bytes();
124 let mut visited: HashSet<[u8; 32]> = HashSet::new();
125 let mut current_level = vec![root_pk];
126 visited.insert(root_pk);
127
128 for depth in 0..self.max_depth {
129 if current_level.is_empty() || *shutdown_rx.borrow() {
130 break;
131 }
132
133 tracing::info!(
134 "Crawling depth {} with {} pubkeys",
135 depth,
136 current_level.len()
137 );
138
139 let mut next_level = Vec::new();
140
141 for pk_bytes in ¤t_level {
142 if *shutdown_rx.borrow() {
143 break;
144 }
145
146 let pk_hex = hex::encode(pk_bytes);
147
148 let pk = match nostr::PublicKey::from_slice(pk_bytes) {
149 Ok(pk) => pk,
150 Err(_) => continue,
151 };
152
153 let filter = nostr::Filter::new()
154 .author(pk)
155 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)]);
156
157 let source = nostr_sdk::EventSource::relays(Some(Duration::from_secs(5)));
158
159 match tokio::time::timeout(
160 Duration::from_secs(10),
161 client.get_events_of(vec![filter], source),
162 )
163 .await
164 {
165 Ok(Ok(events)) => {
166 for event in &events {
167 self.ingest_event_into(&self.ndb, "crawl", event);
169
170 if event.kind() == nostr::Kind::ContactList {
172 for tag in event.tags().iter() {
173 if let Some(nostr::TagStandard::PublicKey {
174 public_key,
175 ..
176 }) = tag.as_standardized()
177 {
178 let follow_bytes = public_key.to_bytes();
179 if !visited.contains(&follow_bytes) {
180 visited.insert(follow_bytes);
181 next_level.push(follow_bytes);
182 }
183 }
184 }
185 }
186 }
187 tracing::debug!(
188 "Depth {}: fetched {} events for {}...",
189 depth,
190 events.len(),
191 &pk_hex[..8.min(pk_hex.len())]
192 );
193 }
194 Ok(Err(e)) => {
195 tracing::debug!("Failed to fetch events for {}...: {}", &pk_hex[..8], e);
196 }
197 Err(_) => {
198 tracing::debug!("Timeout fetching events for {}...", &pk_hex[..8]);
199 }
200 }
201 }
202
203 current_level = next_level;
204 }
205
206 let filter = nostr::Filter::new()
207 .kinds(vec![nostr::Kind::ContactList, nostr::Kind::Custom(10000)])
208 .since(nostr::Timestamp::now());
209
210 match client.subscribe(vec![filter], None).await {
211 Ok(_) => tracing::info!("Social graph crawler: subscribed to contact and mute lists"),
212 Err(e) => tracing::warn!("Social graph crawler: failed to subscribe: {}", e),
213 }
214
215 let mut notifications = client.notifications();
216 loop {
217 tokio::select! {
218 _ = shutdown_rx.changed() => {
219 if *shutdown_rx.borrow() {
220 break;
221 }
222 }
223 notification = notifications.recv() => {
224 match notification {
225 Ok(RelayPoolNotification::Event { event, .. }) => {
226 self.handle_incoming_event(&event);
227 }
228 Ok(_) => {}
229 Err(e) => {
230 tracing::warn!("Social graph crawler notification error: {}", e);
231 break;
232 }
233 }
234 }
235 }
236 }
237
238 if let Err(e) = client.disconnect().await {
239 tracing::debug!("Error disconnecting crawler client: {}", e);
240 }
241
242 tracing::info!(
243 "Social graph crawl complete: visited {} pubkeys",
244 visited.len()
245 );
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use nostr::{EventBuilder, Kind, Tag, PublicKey};
253 use std::sync::Arc;
254 use tempfile::TempDir;
255
256 async fn wait_for_follow(ndb: &Ndb, owner: &[u8; 32], target: &[u8; 32]) -> bool {
257 let deadline = tokio::time::Instant::now() + Duration::from_millis(500);
258 loop {
259 let follows = super::super::get_follows(ndb, owner);
260 if follows.iter().any(|pk| pk == target) {
261 return true;
262 }
263 if tokio::time::Instant::now() >= deadline {
264 return false;
265 }
266 tokio::time::sleep(Duration::from_millis(20)).await;
267 }
268 }
269
270 #[tokio::test]
271 async fn test_crawler_routes_untrusted_to_spambox() {
272 let _guard = super::super::test_lock();
273 let tmp = TempDir::new().unwrap();
274 let ndb = super::super::init_ndb(tmp.path()).unwrap();
275 let spambox = super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
276
277 let root_keys = nostr::Keys::generate();
278 let root_pk = root_keys.public_key().to_bytes();
279 super::super::set_social_graph_root(&ndb, &root_pk);
280 tokio::time::sleep(Duration::from_millis(100)).await;
281
282 let crawler = SocialGraphCrawler::new(
283 Arc::clone(&ndb),
284 root_keys.clone(),
285 vec![],
286 2,
287 ).with_spambox(Arc::clone(&spambox));
288
289 let unknown_keys = nostr::Keys::generate();
290 let follow_tag = Tag::public_key(PublicKey::from_slice(&root_pk).unwrap());
291 let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
292 .to_event(&unknown_keys)
293 .unwrap();
294
295 crawler.handle_incoming_event(&event);
296
297 let unknown_pk = unknown_keys.public_key().to_bytes();
298 assert!(!wait_for_follow(&ndb, &unknown_pk, &root_pk).await);
299 assert!(wait_for_follow(&spambox, &unknown_pk, &root_pk).await);
300 }
301
302 #[tokio::test]
303 async fn test_crawler_routes_trusted_to_main_db() {
304 let _guard = super::super::test_lock();
305 let tmp = TempDir::new().unwrap();
306 let ndb = super::super::init_ndb(tmp.path()).unwrap();
307 let spambox = super::super::init_ndb_at_path(&tmp.path().join("nostrdb_spambox"), None).unwrap();
308
309 let root_keys = nostr::Keys::generate();
310 let root_pk = root_keys.public_key().to_bytes();
311 super::super::set_social_graph_root(&ndb, &root_pk);
312 tokio::time::sleep(Duration::from_millis(100)).await;
313
314 let crawler = SocialGraphCrawler::new(
315 Arc::clone(&ndb),
316 root_keys.clone(),
317 vec![],
318 2,
319 ).with_spambox(Arc::clone(&spambox));
320
321 let target_keys = nostr::Keys::generate();
322 let target_pk = target_keys.public_key().to_bytes();
323 let follow_tag = Tag::public_key(PublicKey::from_slice(&target_pk).unwrap());
324 let event = EventBuilder::new(Kind::ContactList, "", vec![follow_tag])
325 .to_event(&root_keys)
326 .unwrap();
327
328 crawler.handle_incoming_event(&event);
329
330 assert!(wait_for_follow(&ndb, &root_pk, &target_pk).await);
331 assert!(!wait_for_follow(&spambox, &root_pk, &target_pk).await);
332 }
333
334 #[tokio::test]
335 async fn test_crawler_no_relays() {
336 let tmp = TempDir::new().unwrap();
337 let ndb = {
338 let _guard = super::super::test_lock();
339 super::super::init_ndb(tmp.path()).unwrap()
340 };
341 let keys = nostr::Keys::generate();
342 let crawler = SocialGraphCrawler::new(ndb, keys, vec![], 2);
343 let (_tx, rx) = watch::channel(false);
344 crawler.crawl(rx).await;
346 }
347
348 #[tokio::test]
349 async fn test_crawler_shutdown_signal() {
350 let tmp = TempDir::new().unwrap();
351 let ndb = {
352 let _guard = super::super::test_lock();
353 super::super::init_ndb(tmp.path()).unwrap()
354 };
355 let keys = nostr::Keys::generate();
356 let crawler =
357 SocialGraphCrawler::new(ndb, keys, vec!["wss://localhost:1".to_string()], 2);
358 let (_tx, rx) = watch::channel(true); crawler.crawl(rx).await;
360 }
361}