1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29 peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38 fn drop(&mut self) {
39 self.service.shutdown();
40 if let Some(join) = self.join.take() {
41 join.abort();
42 }
43 }
44}
45
46struct BackgroundMirrorRuntime {
47 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48 join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52 fn drop(&mut self) {
53 self.service.shutdown();
54 if let Some(join) = self.join.take() {
55 join.abort();
56 }
57 }
58}
59
60struct BackgroundServicesRuntime {
61 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62 mirror: Option<BackgroundMirrorRuntime>,
63 sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67 fn drop(&mut self) {
68 if let Some(handles) = self.crawler.as_ref() {
69 let _ = handles.shutdown_tx.send(true);
70 }
71 if let Some(runtime) = self.mirror.as_ref() {
72 runtime.service.shutdown();
73 }
74 if let Some(runtime) = self.sync.as_ref() {
75 runtime.service.shutdown();
76 }
77 }
78}
79
80impl BackgroundServicesRuntime {
81 fn status(&self) -> EmbeddedBackgroundServicesStatus {
82 EmbeddedBackgroundServicesStatus {
83 crawler_active: self.crawler.is_some(),
84 mirror_active: self.mirror.is_some(),
85 sync_active: self.sync.is_some(),
86 }
87 }
88}
89
90struct EmbeddedServerRuntime {
91 shutdown: Arc<Notify>,
92 join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96 runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101 Self {
102 runtime: Mutex::new(Some(EmbeddedServerRuntime {
103 shutdown,
104 join: Some(join),
105 })),
106 }
107 }
108
109 pub async fn shutdown(&self) {
110 let mut runtime = self.runtime.lock().await;
111 let Some(mut runtime) = runtime.take() else {
112 return;
113 };
114
115 runtime.shutdown.notify_waiters();
116 if let Some(mut join) = runtime.join.take() {
117 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118 Ok(Ok(())) => {}
119 Ok(Err(err)) => {
120 tracing::warn!("Embedded server task ended with join error: {}", err)
121 }
122 Err(_) => {
123 tracing::warn!("Timed out waiting for embedded server shutdown");
124 join.abort();
125 }
126 }
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133 pub crawler_active: bool,
134 pub mirror_active: bool,
135 pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139 keys: Keys,
140 data_dir: PathBuf,
141 store: Arc<HashtreeStore>,
142 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145 webrtc_state: Option<Arc<WebRTCState>>,
146 runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150 const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151 "wss://relay.primal.net",
152 "wss://nos.lol",
153 "wss://relay.nostr.band",
154 "wss://relay.snort.social",
155 "wss://temp.iris.to",
156 "wss://vault.iris.to",
157 "wss://relay.damus.io",
158 ];
159 const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
160 &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
161
162 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
163 let mut seen = HashSet::new();
164 let active_relays = active_relays
165 .iter()
166 .filter(|relay| seen.insert((*relay).clone()))
167 .cloned()
168 .collect::<Vec<_>>();
169 if active_relays.is_empty() {
170 return Vec::new();
171 }
172 let filtered = active_relays
173 .iter()
174 .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
175 .cloned()
176 .collect::<Vec<_>>();
177 if filtered.is_empty() {
178 return active_relays;
179 }
180
181 let filtered_set = filtered.iter().cloned().collect::<HashSet<_>>();
182 let mut selected = Self::MIRROR_PUBLISH_RELAY_PRIORITY
183 .iter()
184 .filter(|relay| filtered_set.contains(**relay))
185 .map(|relay| (*relay).to_string())
186 .collect::<Vec<_>>();
187 let mut selected_set = selected.iter().cloned().collect::<HashSet<_>>();
188 for relay in filtered {
189 if selected_set.insert(relay.clone()) {
190 selected.push(relay);
191 }
192 }
193
194 selected
195 }
196
197 pub fn new(
198 keys: Keys,
199 data_dir: PathBuf,
200 store: Arc<HashtreeStore>,
201 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
202 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
203 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
204 webrtc_state: Option<Arc<WebRTCState>>,
205 ) -> Self {
206 Self {
207 keys,
208 data_dir,
209 store,
210 graph_store_concrete,
211 graph_store,
212 spambox,
213 webrtc_state,
214 runtime: Mutex::new(BackgroundServicesRuntime {
215 crawler: None,
216 mirror: None,
217 sync: None,
218 }),
219 }
220 }
221
222 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
223 self.runtime.lock().await.status()
224 }
225
226 pub async fn shutdown(&self) {
227 let mut runtime = self.runtime.lock().await;
228 Self::shutdown_crawler(&mut runtime.crawler).await;
229 Self::shutdown_mirror(&mut runtime.mirror).await;
230 Self::shutdown_sync(&mut runtime.sync).await;
231 }
232
233 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
234 let Some(handles) = crawler.take() else {
235 return;
236 };
237
238 let _ = handles.shutdown_tx.send(true);
239
240 let mut crawl_handle = handles.crawl_handle;
241 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
242 Ok(Ok(())) => {}
243 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
244 Err(_) => {
245 tracing::warn!("Timed out waiting for crawler task shutdown");
246 crawl_handle.abort();
247 }
248 }
249
250 let mut local_list_handle = handles.local_list_handle;
251 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
252 {
253 Ok(Ok(())) => {}
254 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
255 Err(_) => {
256 tracing::warn!("Timed out waiting for local list task shutdown");
257 local_list_handle.abort();
258 }
259 }
260 }
261
262 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
263 let Some(mut runtime) = sync.take() else {
264 return;
265 };
266
267 runtime.service.shutdown();
268 if let Some(mut join) = runtime.join.take() {
269 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
270 Ok(Ok(())) => {}
271 Ok(Err(err)) => {
272 tracing::warn!("Background sync task ended with join error: {}", err)
273 }
274 Err(_) => {
275 tracing::warn!("Timed out waiting for background sync shutdown");
276 join.abort();
277 }
278 }
279 }
280 }
281
282 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
283 let Some(mut runtime) = mirror.take() else {
284 return;
285 };
286
287 runtime.service.shutdown();
288 if let Some(mut join) = runtime.join.take() {
289 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
290 Ok(Ok(())) => {}
291 Ok(Err(err)) => {
292 tracing::warn!("Background mirror task ended with join error: {}", err)
293 }
294 Err(_) => {
295 tracing::warn!("Timed out waiting for background mirror shutdown");
296 join.abort();
297 }
298 }
299 }
300 }
301
302 fn nostr_mirror_config(
303 config: &Config,
304 active_relays: &[String],
305 ) -> crate::nostr_mirror::NostrMirrorConfig {
306 crate::nostr_mirror::NostrMirrorConfig {
307 relays: active_relays.to_vec(),
308 publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
309 blossom_write_servers: config.blossom.all_write_servers(),
310 max_follow_distance: config
311 .nostr
312 .mirror_max_follow_distance
313 .unwrap_or(config.nostr.social_graph_crawl_depth),
314 overmute_threshold: config.nostr.overmute_threshold,
315 require_negentropy: config.nostr.negentropy_only,
316 kinds: config.nostr.mirror_kinds.clone(),
317 history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
318 history_sync_per_author_event_limit: config
319 .nostr
320 .history_sync_per_author_event_limit
321 .max(1),
322 missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
323 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
324 full_text_note_history_follow_distance: config
325 .nostr
326 .full_text_note_history_follow_distance,
327 full_text_note_history_max_relay_pages: config
328 .nostr
329 .full_text_note_history_max_relay_pages,
330 ..crate::nostr_mirror::NostrMirrorConfig::default()
331 }
332 }
333
334 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
335 let mut runtime = self.runtime.lock().await;
336
337 Self::shutdown_crawler(&mut runtime.crawler).await;
338 Self::shutdown_mirror(&mut runtime.mirror).await;
339 Self::shutdown_sync(&mut runtime.sync).await;
340
341 if !config.server.mode.background_services_enabled() {
342 return Ok(runtime.status());
343 }
344
345 let active_relays = config.nostr.active_relays();
346
347 if config.nostr.enabled
348 && config.nostr.social_graph_crawl_depth > 0
349 && !active_relays.is_empty()
350 {
351 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
352 self.graph_store.clone(),
353 self.keys.clone(),
354 active_relays.clone(),
355 config.nostr.social_graph_crawl_depth,
356 self.spambox.clone(),
357 self.data_dir.clone(),
358 ));
359
360 let service = Arc::new(
361 crate::nostr_mirror::BackgroundNostrMirror::new(
362 Self::nostr_mirror_config(config, &active_relays),
363 self.store.clone(),
364 self.graph_store_concrete.clone(),
365 Some(
366 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
367 .context("Failed to parse keys for background nostr mirror")?,
368 ),
369 )
370 .await
371 .context("Failed to create background nostr mirror")?,
372 );
373 let service_for_task = service.clone();
374 let join = tokio::task::spawn_blocking(move || {
375 let runtime = tokio::runtime::Builder::new_current_thread()
376 .enable_all()
377 .build()
378 .expect("build background nostr mirror runtime");
379 runtime.block_on(async {
380 if let Err(err) = service_for_task.run().await {
381 tracing::error!("Background nostr mirror error: {:#}", err);
382 }
383 });
384 });
385 runtime.mirror = Some(BackgroundMirrorRuntime {
386 service,
387 join: Some(join),
388 });
389 }
390
391 let has_pinned_refs = self
392 .store
393 .list_pinned_refs()
394 .map(|refs| !refs.is_empty())
395 .unwrap_or(false);
396 let has_tracked_authors = self
397 .store
398 .list_tracked_authors()
399 .map(|authors| !authors.is_empty())
400 .unwrap_or(false);
401
402 if config.sync.enabled
403 && (config.sync.sync_own
404 || config.sync.sync_followed
405 || has_pinned_refs
406 || has_tracked_authors)
407 && !active_relays.is_empty()
408 {
409 let sync_config = crate::sync::SyncConfig {
410 sync_own: config.sync.sync_own,
411 sync_followed: config.sync.sync_followed,
412 relays: active_relays,
413 max_concurrent: config.sync.max_concurrent,
414 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
415 blossom_timeout_ms: config.sync.blossom_timeout_ms,
416 };
417
418 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
419 .context("Failed to parse keys for sync")?;
420 let service = Arc::new(
421 crate::sync::BackgroundSync::new(
422 sync_config,
423 self.store.clone(),
424 sync_keys,
425 self.webrtc_state.clone(),
426 )
427 .await
428 .context("Failed to create background sync service")?,
429 );
430 let contacts_file = self.data_dir.join("contacts.json");
431 let service_for_task = service.clone();
432 let join = tokio::spawn(async move {
433 if let Err(err) = service_for_task.run(contacts_file).await {
434 tracing::error!("Background sync error: {}", err);
435 }
436 });
437 runtime.sync = Some(BackgroundSyncRuntime {
438 service,
439 join: Some(join),
440 });
441 }
442
443 Ok(runtime.status())
444 }
445}
446
447#[cfg(feature = "p2p")]
448pub struct EmbeddedPeerRouterController {
449 keys: Keys,
450 data_dir: PathBuf,
451 state: Arc<WebRTCState>,
452 store: Arc<dyn ContentStore>,
453 peer_classifier: PeerClassifier,
454 nostr_relay: Arc<NostrRelay>,
455 runtime: Mutex<Option<PeerRouterRuntime>>,
456}
457
458#[cfg(feature = "p2p")]
459impl EmbeddedPeerRouterController {
460 pub fn new(
461 keys: Keys,
462 data_dir: PathBuf,
463 state: Arc<WebRTCState>,
464 store: Arc<dyn ContentStore>,
465 peer_classifier: PeerClassifier,
466 nostr_relay: Arc<NostrRelay>,
467 ) -> Self {
468 Self {
469 keys,
470 data_dir,
471 state,
472 store,
473 peer_classifier,
474 nostr_relay,
475 runtime: Mutex::new(None),
476 }
477 }
478
479 pub fn state(&self) -> Arc<WebRTCState> {
480 self.state.clone()
481 }
482
483 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
484 let mut runtime = self.runtime.lock().await;
485 if let Some(runtime_handle) = runtime.take() {
486 if let Err(err) =
487 crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
488 {
489 tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
490 }
491 let _ = runtime_handle.shutdown.send(true);
492 runtime_handle.peer_state_persist.abort();
493 let mut join = runtime_handle.join;
494 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
495 Ok(Ok(())) => {}
496 Ok(Err(err)) => {
497 tracing::warn!("Peer router task ended with join error: {}", err);
498 }
499 Err(_) => {
500 tracing::warn!("Timed out waiting for peer router shutdown");
501 join.abort();
502 }
503 }
504 }
505
506 self.state.reset_runtime_state().await;
507 if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
508 tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
509 }
510
511 if !crate::p2p_common::peer_router_enabled(config) {
512 return Ok(false);
513 }
514
515 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
516 let mut manager = if config.server.mode.hash_get_enabled() {
517 WebRTCManager::new_with_state_and_store_and_classifier(
518 self.keys.clone(),
519 webrtc_config,
520 self.state.clone(),
521 self.store.clone(),
522 self.peer_classifier.clone(),
523 )
524 } else {
525 let mut manager =
526 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
527 manager.set_peer_classifier(self.peer_classifier.clone());
528 manager
529 };
530 manager
531 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
532 let shutdown = manager.shutdown_signal();
533 let join = tokio::spawn(async move {
534 if let Err(err) = manager.run().await {
535 tracing::error!("Peer router error: {}", err);
536 }
537 });
538 let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
539 self.data_dir.clone(),
540 self.state.clone(),
541 );
542 *runtime = Some(PeerRouterRuntime {
543 shutdown,
544 join,
545 peer_state_persist,
546 });
547 Ok(true)
548 }
549
550 pub async fn shutdown(&self) {
551 let mut runtime = self.runtime.lock().await;
552 let Some(runtime_handle) = runtime.take() else {
553 return;
554 };
555
556 if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
557 tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
558 }
559 let _ = runtime_handle.shutdown.send(true);
560 runtime_handle.peer_state_persist.abort();
561 let mut join = runtime_handle.join;
562 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
563 Ok(Ok(())) => {}
564 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
565 Err(_) => {
566 tracing::warn!("Timed out waiting for peer router shutdown");
567 join.abort();
568 }
569 }
570
571 self.state.reset_runtime_state().await;
572 }
573}
574
575pub struct EmbeddedDaemonController {
576 server_controller: Arc<EmbeddedServerController>,
577 #[cfg(feature = "p2p")]
578 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
579 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
580}
581
582impl EmbeddedDaemonController {
583 #[cfg(feature = "p2p")]
584 pub fn new(
585 server_controller: Arc<EmbeddedServerController>,
586 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
587 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
588 ) -> Self {
589 Self {
590 server_controller,
591 #[cfg(feature = "p2p")]
592 peer_router_controller,
593 background_services_controller,
594 }
595 }
596
597 #[cfg(not(feature = "p2p"))]
598 pub fn new(
599 server_controller: Arc<EmbeddedServerController>,
600 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
601 ) -> Self {
602 Self {
603 server_controller,
604 background_services_controller,
605 }
606 }
607
608 pub async fn shutdown(&self) {
609 self.server_controller.shutdown().await;
610 if let Some(controller) = self.background_services_controller.as_ref() {
611 controller.shutdown().await;
612 }
613 #[cfg(feature = "p2p")]
614 if let Some(controller) = self.peer_router_controller.as_ref() {
615 controller.shutdown().await;
616 }
617 }
618}
619
620pub struct EmbeddedDaemonOptions {
621 pub config: Config,
622 pub data_dir: PathBuf,
623 pub config_dir: Option<PathBuf>,
624 pub bind_address: String,
625 pub relays: Option<Vec<String>>,
626 pub extra_routes: Option<Router<AppState>>,
627 pub cors: Option<CorsLayer>,
628}
629
630pub struct EmbeddedDaemonInfo {
631 pub addr: String,
632 pub port: u16,
633 pub npub: String,
634 pub store: Arc<HashtreeStore>,
635 pub daemon_controller: Arc<EmbeddedDaemonController>,
636 #[allow(dead_code)]
637 pub webrtc_state: Option<Arc<WebRTCState>>,
638 #[cfg(feature = "p2p")]
639 #[allow(dead_code)]
640 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
641 #[allow(dead_code)]
642 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
643}
644
645pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
646 let _ = rustls::crypto::ring::default_provider().install_default();
647
648 let mut config = opts.config;
649 config.server.bind_address = opts.bind_address.clone();
650 if let Some(relays) = opts.relays {
651 config.nostr.relays = relays;
652 config.nostr.enabled = !config.nostr.relays.is_empty();
653 }
654
655 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
656 let nostr_db_max_bytes = config
657 .nostr
658 .db_max_size_gb
659 .saturating_mul(1024 * 1024 * 1024);
660 let spambox_db_max_bytes = config
661 .nostr
662 .spambox_max_size_gb
663 .saturating_mul(1024 * 1024 * 1024);
664
665 let store = Arc::new(HashtreeStore::with_options(
666 &opts.data_dir,
667 config.storage.s3.as_ref(),
668 max_size_bytes,
669 )?);
670
671 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
672 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
673 } else {
674 ensure_keys()?
675 };
676 let pk_bytes = pubkey_bytes(&keys);
677 let npub = keys
678 .public_key()
679 .to_bech32()
680 .context("Failed to encode npub")?;
681
682 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
683 allowed_pubkeys.insert(hex::encode(pk_bytes));
684 for npub_str in &config.nostr.allowed_npubs {
685 if let Ok(pk) = parse_npub(npub_str) {
686 allowed_pubkeys.insert(hex::encode(pk));
687 } else {
688 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
689 }
690 }
691
692 let graph_store = socialgraph::open_social_graph_store_with_storage(
693 &opts.data_dir,
694 store.store_arc(),
695 Some(nostr_db_max_bytes),
696 )
697 .context("Failed to initialize social graph store")?;
698 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
699
700 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
701 parse_npub(root_npub).unwrap_or(pk_bytes)
702 } else {
703 pk_bytes
704 };
705 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
706 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
707 .context("Failed to sync local social graph lists")?;
708 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
709
710 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
711 Arc::clone(&social_graph_store),
712 config.nostr.max_write_distance,
713 allowed_pubkeys.clone(),
714 ));
715
716 let nostr_relay_config = NostrRelayConfig {
717 spambox_db_max_bytes,
718 ..Default::default()
719 };
720 let mut public_event_pubkeys = HashSet::new();
721 public_event_pubkeys.insert(hex::encode(pk_bytes));
722 let nostr_relay = Arc::new(
723 NostrRelay::new(
724 Arc::clone(&social_graph_store),
725 opts.data_dir.clone(),
726 public_event_pubkeys,
727 Some(social_graph.clone()),
728 nostr_relay_config,
729 )
730 .context("Failed to initialize Nostr relay")?,
731 );
732
733 let crawler_spambox = if spambox_db_max_bytes == 0 {
734 None
735 } else {
736 let spam_dir = opts.data_dir.join("socialgraph_spambox");
737 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
738 Ok(store) => Some(store),
739 Err(err) => {
740 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
741 None
742 }
743 }
744 };
745 let crawler_spambox_backend = crawler_spambox
746 .clone()
747 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
748
749 #[cfg(feature = "p2p")]
750 let (webrtc_state, peer_router_controller): (
751 Option<Arc<WebRTCState>>,
752 Option<Arc<EmbeddedPeerRouterController>>,
753 ) = {
754 let router_config = crate::p2p_common::default_webrtc_config(&config);
755 let peer_classifier = crate::p2p_common::build_peer_classifier(
756 opts.data_dir.clone(),
757 Arc::clone(&social_graph_store),
758 );
759 let cashu_payment_client =
760 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
761 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
762 Ok(client) => {
763 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
764 }
765 Err(err) => {
766 tracing::warn!(
767 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
768 err
769 );
770 None
771 }
772 }
773 } else {
774 None
775 };
776 let cashu_mint_metadata =
777 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
778 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
779 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
780 Ok(store) => Some(store),
781 Err(err) => {
782 tracing::warn!(
783 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
784 err
785 );
786 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
787 }
788 }
789 } else {
790 None
791 };
792
793 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
794 router_config.request_selection_strategy,
795 router_config.request_fairness_enabled,
796 router_config.request_dispatch,
797 std::time::Duration::from_millis(router_config.message_timeout_ms),
798 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
799 cashu_payment_client,
800 cashu_mint_metadata,
801 ));
802 let controller = Arc::new(EmbeddedPeerRouterController::new(
803 keys.clone(),
804 opts.data_dir.clone(),
805 state.clone(),
806 Arc::clone(&store) as Arc<dyn ContentStore>,
807 peer_classifier,
808 nostr_relay.clone(),
809 ));
810 controller.apply_config(&config).await?;
811 (Some(state), Some(controller))
812 };
813
814 #[cfg(not(feature = "p2p"))]
815 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
816
817 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
818 keys.clone(),
819 opts.data_dir.clone(),
820 Arc::clone(&store),
821 graph_store.clone(),
822 Arc::clone(&social_graph_store),
823 crawler_spambox_backend,
824 webrtc_state.clone(),
825 ));
826 background_services_controller.apply_config(&config).await?;
827
828 let upstream_blossom = config.blossom.all_read_servers();
829 let active_nostr_relays = config.nostr.active_relays();
830
831 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
832 .with_server_mode(config.server.mode)
833 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
834 .with_allowed_pubkeys(allowed_pubkeys.clone())
835 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
836 .with_public_writes(config.server.public_writes)
837 .with_upstream_blossom(upstream_blossom)
838 .with_nostr_relay_urls(active_nostr_relays)
839 .with_social_graph(social_graph)
840 .with_socialgraph_snapshot(
841 Arc::clone(&social_graph_store),
842 social_graph_root_bytes,
843 config.server.socialgraph_snapshot_public,
844 )
845 .with_nostr_relay(nostr_relay.clone());
846
847 if crate::p2p_common::peer_router_enabled(&config) {
848 if let Some(ref state) = webrtc_state {
849 server = server.with_webrtc_peers(state.clone());
850 }
851 }
852
853 if let Some(extra) = opts.extra_routes {
854 server = server.with_extra_routes(extra);
855 }
856 if let Some(cors) = opts.cors {
857 server = server.with_cors(cors);
858 }
859
860 spawn_background_eviction_task(
861 Arc::clone(&store),
862 BACKGROUND_EVICTION_INTERVAL,
863 "embedded daemon",
864 );
865
866 let listener = TcpListener::bind(&opts.bind_address).await?;
867 let local_addr = listener.local_addr()?;
868 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
869
870 let server_shutdown = Arc::new(Notify::new());
871 let server_shutdown_for_task = Arc::clone(&server_shutdown);
872 let server_join = tokio::spawn(async move {
873 if let Err(e) = server
874 .run_with_listener_until(listener, async move {
875 server_shutdown_for_task.notified().await;
876 })
877 .await
878 {
879 tracing::error!("Embedded daemon server error: {}", e);
880 }
881 });
882 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
883 #[cfg(feature = "p2p")]
884 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
885 server_controller,
886 peer_router_controller.clone(),
887 Some(background_services_controller.clone()),
888 ));
889 #[cfg(not(feature = "p2p"))]
890 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
891 server_controller,
892 Some(background_services_controller.clone()),
893 ));
894
895 tracing::info!(
896 "Embedded daemon started on {}, identity {}",
897 actual_addr,
898 npub
899 );
900
901 Ok(EmbeddedDaemonInfo {
902 addr: actual_addr,
903 port: local_addr.port(),
904 npub,
905 store,
906 daemon_controller,
907 webrtc_state,
908 #[cfg(feature = "p2p")]
909 peer_router_controller,
910 background_services_controller: Some(background_services_controller),
911 })
912}
913
914#[cfg(test)]
915mod tests {
916 use super::EmbeddedBackgroundServicesController;
917 use crate::config::Config;
918
919 #[test]
920 fn mirror_publish_relays_orders_known_root_publish_relays_first() {
921 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
922 &[
923 "wss://graph-relay.iris.to".to_string(),
924 "wss://relay.example".to_string(),
925 "wss://relay.primal.net".to_string(),
926 "wss://relay.damus.io".to_string(),
927 "wss://temp.iris.to".to_string(),
928 "wss://vault.iris.to".to_string(),
929 "wss://upload.iris.to/nostr".to_string(),
930 ],
931 "0.0.0.0:8080",
932 );
933 assert_eq!(
934 relays,
935 vec![
936 "wss://relay.primal.net".to_string(),
937 "wss://temp.iris.to".to_string(),
938 "wss://vault.iris.to".to_string(),
939 "wss://relay.damus.io".to_string(),
940 "wss://relay.example".to_string(),
941 ]
942 );
943 }
944
945 #[test]
946 fn mirror_publish_relays_filters_known_bad_publish_targets_when_no_preferred_remain() {
947 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
948 &[
949 "wss://graph-relay.iris.to".to_string(),
950 "wss://relay.snort.social".to_string(),
951 "wss://relay.nostr.band".to_string(),
952 "wss://upload.iris.to/nostr".to_string(),
953 ],
954 "0.0.0.0:8080",
955 );
956 assert_eq!(
957 relays,
958 vec![
959 "wss://relay.nostr.band".to_string(),
960 "wss://relay.snort.social".to_string(),
961 ]
962 );
963 }
964
965 #[test]
966 fn nostr_mirror_config_allows_disabling_full_note_paging() {
967 let mut config = Config::default();
968 config.nostr.full_text_note_history_max_relay_pages = 0;
969
970 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
971 &config,
972 &["wss://relay.example".to_string()],
973 );
974
975 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
976
977 config.nostr.full_text_note_history_max_relay_pages = 64;
978 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
979 &config,
980 &["wss://relay.example".to_string()],
981 );
982
983 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
984 }
985
986 #[test]
987 fn nostr_mirror_config_can_limit_mirror_distance_independently() {
988 let mut config = Config::default();
989 config.nostr.social_graph_crawl_depth = 6;
990 config.nostr.mirror_max_follow_distance = Some(2);
991
992 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
993 &config,
994 &["wss://relay.example".to_string()],
995 );
996
997 assert_eq!(mirror_config.max_follow_distance, 2);
998
999 config.nostr.mirror_max_follow_distance = None;
1000 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1001 &config,
1002 &["wss://relay.example".to_string()],
1003 );
1004
1005 assert_eq!(mirror_config.max_follow_distance, 6);
1006 }
1007}