1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29 peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38 fn drop(&mut self) {
39 self.service.shutdown();
40 if let Some(join) = self.join.take() {
41 join.abort();
42 }
43 }
44}
45
46struct BackgroundMirrorRuntime {
47 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48 join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52 fn drop(&mut self) {
53 self.service.shutdown();
54 if let Some(join) = self.join.take() {
55 join.abort();
56 }
57 }
58}
59
60struct BackgroundServicesRuntime {
61 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62 mirror: Option<BackgroundMirrorRuntime>,
63 sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67 fn drop(&mut self) {
68 if let Some(handles) = self.crawler.as_ref() {
69 let _ = handles.shutdown_tx.send(true);
70 }
71 if let Some(runtime) = self.mirror.as_ref() {
72 runtime.service.shutdown();
73 }
74 if let Some(runtime) = self.sync.as_ref() {
75 runtime.service.shutdown();
76 }
77 }
78}
79
80impl BackgroundServicesRuntime {
81 fn status(&self) -> EmbeddedBackgroundServicesStatus {
82 EmbeddedBackgroundServicesStatus {
83 crawler_active: self.crawler.is_some(),
84 mirror_active: self.mirror.is_some(),
85 sync_active: self.sync.is_some(),
86 }
87 }
88}
89
90struct EmbeddedServerRuntime {
91 shutdown: Arc<Notify>,
92 join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96 runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101 Self {
102 runtime: Mutex::new(Some(EmbeddedServerRuntime {
103 shutdown,
104 join: Some(join),
105 })),
106 }
107 }
108
109 pub async fn shutdown(&self) {
110 let mut runtime = self.runtime.lock().await;
111 let Some(mut runtime) = runtime.take() else {
112 return;
113 };
114
115 runtime.shutdown.notify_waiters();
116 if let Some(mut join) = runtime.join.take() {
117 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118 Ok(Ok(())) => {}
119 Ok(Err(err)) => {
120 tracing::warn!("Embedded server task ended with join error: {}", err)
121 }
122 Err(_) => {
123 tracing::warn!("Timed out waiting for embedded server shutdown");
124 join.abort();
125 }
126 }
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133 pub crawler_active: bool,
134 pub mirror_active: bool,
135 pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139 keys: Keys,
140 data_dir: PathBuf,
141 store: Arc<HashtreeStore>,
142 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145 webrtc_state: Option<Arc<WebRTCState>>,
146 runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150 const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151 "wss://nos.lol",
152 "wss://temp.iris.to",
153 "wss://vault.iris.to",
154 "wss://relay.damus.io",
155 ];
156 const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
157 &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
158
159 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
160 let mut seen = HashSet::new();
161 let active_relays = active_relays
162 .iter()
163 .filter(|relay| seen.insert((*relay).clone()))
164 .cloned()
165 .collect::<Vec<_>>();
166 if active_relays.is_empty() {
167 return Vec::new();
168 }
169 let filtered = active_relays
170 .iter()
171 .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
172 .cloned()
173 .collect::<Vec<_>>();
174 if filtered.is_empty() {
175 return active_relays;
176 }
177
178 let mut selected = Vec::new();
179 let mut selected_set = HashSet::new();
180 for relay in Self::MIRROR_PUBLISH_RELAY_PRIORITY {
181 if filtered.iter().any(|active| active == relay) {
182 selected.push((*relay).to_string());
183 selected_set.insert((*relay).to_string());
184 }
185 }
186 for relay in filtered {
187 if selected_set.insert(relay.clone()) {
188 selected.push(relay);
189 }
190 }
191
192 selected
193 }
194
195 pub fn new(
196 keys: Keys,
197 data_dir: PathBuf,
198 store: Arc<HashtreeStore>,
199 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
200 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
201 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
202 webrtc_state: Option<Arc<WebRTCState>>,
203 ) -> Self {
204 Self {
205 keys,
206 data_dir,
207 store,
208 graph_store_concrete,
209 graph_store,
210 spambox,
211 webrtc_state,
212 runtime: Mutex::new(BackgroundServicesRuntime {
213 crawler: None,
214 mirror: None,
215 sync: None,
216 }),
217 }
218 }
219
220 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
221 self.runtime.lock().await.status()
222 }
223
224 pub async fn shutdown(&self) {
225 let mut runtime = self.runtime.lock().await;
226 Self::shutdown_crawler(&mut runtime.crawler).await;
227 Self::shutdown_mirror(&mut runtime.mirror).await;
228 Self::shutdown_sync(&mut runtime.sync).await;
229 }
230
231 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
232 let Some(handles) = crawler.take() else {
233 return;
234 };
235
236 let _ = handles.shutdown_tx.send(true);
237
238 let mut crawl_handle = handles.crawl_handle;
239 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
240 Ok(Ok(())) => {}
241 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
242 Err(_) => {
243 tracing::warn!("Timed out waiting for crawler task shutdown");
244 crawl_handle.abort();
245 }
246 }
247
248 let mut local_list_handle = handles.local_list_handle;
249 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
250 {
251 Ok(Ok(())) => {}
252 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
253 Err(_) => {
254 tracing::warn!("Timed out waiting for local list task shutdown");
255 local_list_handle.abort();
256 }
257 }
258 }
259
260 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
261 let Some(mut runtime) = sync.take() else {
262 return;
263 };
264
265 runtime.service.shutdown();
266 if let Some(mut join) = runtime.join.take() {
267 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
268 Ok(Ok(())) => {}
269 Ok(Err(err)) => {
270 tracing::warn!("Background sync task ended with join error: {}", err)
271 }
272 Err(_) => {
273 tracing::warn!("Timed out waiting for background sync shutdown");
274 join.abort();
275 }
276 }
277 }
278 }
279
280 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
281 let Some(mut runtime) = mirror.take() else {
282 return;
283 };
284
285 runtime.service.shutdown();
286 if let Some(mut join) = runtime.join.take() {
287 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
288 Ok(Ok(())) => {}
289 Ok(Err(err)) => {
290 tracing::warn!("Background mirror task ended with join error: {}", err)
291 }
292 Err(_) => {
293 tracing::warn!("Timed out waiting for background mirror shutdown");
294 join.abort();
295 }
296 }
297 }
298 }
299
300 fn nostr_mirror_config(
301 config: &Config,
302 active_relays: &[String],
303 ) -> crate::nostr_mirror::NostrMirrorConfig {
304 crate::nostr_mirror::NostrMirrorConfig {
305 relays: active_relays.to_vec(),
306 publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
307 blossom_write_servers: config.blossom.all_write_servers(),
308 max_follow_distance: config
309 .nostr
310 .mirror_max_follow_distance
311 .unwrap_or(config.nostr.social_graph_crawl_depth),
312 overmute_threshold: config.nostr.overmute_threshold,
313 require_negentropy: config.nostr.negentropy_only,
314 kinds: config.nostr.mirror_kinds.clone(),
315 history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
316 history_sync_per_author_event_limit: config
317 .nostr
318 .history_sync_per_author_event_limit
319 .max(1),
320 missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
321 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
322 full_text_note_history_follow_distance: config
323 .nostr
324 .full_text_note_history_follow_distance,
325 full_text_note_history_max_relay_pages: config
326 .nostr
327 .full_text_note_history_max_relay_pages,
328 ..crate::nostr_mirror::NostrMirrorConfig::default()
329 }
330 }
331
332 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
333 let mut runtime = self.runtime.lock().await;
334
335 Self::shutdown_crawler(&mut runtime.crawler).await;
336 Self::shutdown_mirror(&mut runtime.mirror).await;
337 Self::shutdown_sync(&mut runtime.sync).await;
338
339 if !config.server.mode.background_services_enabled() {
340 return Ok(runtime.status());
341 }
342
343 let active_relays = config.nostr.active_relays();
344
345 if config.nostr.enabled
346 && config.nostr.social_graph_crawl_depth > 0
347 && !active_relays.is_empty()
348 {
349 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
350 self.graph_store.clone(),
351 self.keys.clone(),
352 active_relays.clone(),
353 config.nostr.social_graph_crawl_depth,
354 self.spambox.clone(),
355 self.data_dir.clone(),
356 ));
357
358 let service = Arc::new(
359 crate::nostr_mirror::BackgroundNostrMirror::new(
360 Self::nostr_mirror_config(config, &active_relays),
361 self.store.clone(),
362 self.graph_store_concrete.clone(),
363 Some(
364 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
365 .context("Failed to parse keys for background nostr mirror")?,
366 ),
367 )
368 .await
369 .context("Failed to create background nostr mirror")?,
370 );
371 let service_for_task = service.clone();
372 let join = tokio::task::spawn_blocking(move || {
373 let runtime = tokio::runtime::Builder::new_current_thread()
374 .enable_all()
375 .build()
376 .expect("build background nostr mirror runtime");
377 runtime.block_on(async {
378 if let Err(err) = service_for_task.run().await {
379 tracing::error!("Background nostr mirror error: {:#}", err);
380 }
381 });
382 });
383 runtime.mirror = Some(BackgroundMirrorRuntime {
384 service,
385 join: Some(join),
386 });
387 }
388
389 if config.sync.enabled && !active_relays.is_empty() {
390 let has_pinned_refs = self
391 .store
392 .list_pinned_refs()
393 .map(|refs| !refs.is_empty())
394 .unwrap_or(false);
395 let has_tracked_authors = self
396 .store
397 .list_tracked_authors()
398 .map(|authors| !authors.is_empty())
399 .unwrap_or(false);
400 let should_sync = config.sync.sync_own
401 || config.sync.sync_followed
402 || has_pinned_refs
403 || has_tracked_authors;
404 if !should_sync {
405 return Ok(runtime.status());
406 }
407
408 let sync_config = crate::sync::SyncConfig {
409 sync_own: config.sync.sync_own,
410 sync_followed: config.sync.sync_followed,
411 relays: active_relays,
412 max_concurrent: config.sync.max_concurrent,
413 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
414 blossom_timeout_ms: config.sync.blossom_timeout_ms,
415 };
416
417 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
418 .context("Failed to parse keys for sync")?;
419 let service = Arc::new(
420 crate::sync::BackgroundSync::new(
421 sync_config,
422 self.store.clone(),
423 sync_keys,
424 self.webrtc_state.clone(),
425 )
426 .await
427 .context("Failed to create background sync service")?,
428 );
429 let contacts_file = self.data_dir.join("contacts.json");
430 let service_for_task = service.clone();
431 let join = tokio::spawn(async move {
432 if let Err(err) = service_for_task.run(contacts_file).await {
433 tracing::error!("Background sync error: {}", err);
434 }
435 });
436 runtime.sync = Some(BackgroundSyncRuntime {
437 service,
438 join: Some(join),
439 });
440 }
441
442 Ok(runtime.status())
443 }
444}
445
446#[cfg(feature = "p2p")]
447pub struct EmbeddedPeerRouterController {
448 keys: Keys,
449 data_dir: PathBuf,
450 state: Arc<WebRTCState>,
451 store: Arc<dyn ContentStore>,
452 peer_classifier: PeerClassifier,
453 nostr_relay: Arc<NostrRelay>,
454 runtime: Mutex<Option<PeerRouterRuntime>>,
455}
456
457#[cfg(feature = "p2p")]
458impl EmbeddedPeerRouterController {
459 pub fn new(
460 keys: Keys,
461 data_dir: PathBuf,
462 state: Arc<WebRTCState>,
463 store: Arc<dyn ContentStore>,
464 peer_classifier: PeerClassifier,
465 nostr_relay: Arc<NostrRelay>,
466 ) -> Self {
467 Self {
468 keys,
469 data_dir,
470 state,
471 store,
472 peer_classifier,
473 nostr_relay,
474 runtime: Mutex::new(None),
475 }
476 }
477
478 pub fn state(&self) -> Arc<WebRTCState> {
479 self.state.clone()
480 }
481
482 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
483 let mut runtime = self.runtime.lock().await;
484 if let Some(runtime_handle) = runtime.take() {
485 if let Err(err) =
486 crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
487 {
488 tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
489 }
490 let _ = runtime_handle.shutdown.send(true);
491 runtime_handle.peer_state_persist.abort();
492 let mut join = runtime_handle.join;
493 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
494 Ok(Ok(())) => {}
495 Ok(Err(err)) => {
496 tracing::warn!("Peer router task ended with join error: {}", err);
497 }
498 Err(_) => {
499 tracing::warn!("Timed out waiting for peer router shutdown");
500 join.abort();
501 }
502 }
503 }
504
505 self.state.reset_runtime_state().await;
506 if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
507 tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
508 }
509
510 if !crate::p2p_common::peer_router_enabled(config) {
511 return Ok(false);
512 }
513
514 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
515 let mut manager = if config.server.mode.hash_get_enabled() {
516 WebRTCManager::new_with_state_and_store_and_classifier(
517 self.keys.clone(),
518 webrtc_config,
519 self.state.clone(),
520 self.store.clone(),
521 self.peer_classifier.clone(),
522 )
523 } else {
524 let mut manager =
525 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
526 manager.set_peer_classifier(self.peer_classifier.clone());
527 manager
528 };
529 manager
530 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
531 let shutdown = manager.shutdown_signal();
532 let join = tokio::spawn(async move {
533 if let Err(err) = manager.run().await {
534 tracing::error!("Peer router error: {}", err);
535 }
536 });
537 let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
538 self.data_dir.clone(),
539 self.state.clone(),
540 );
541 *runtime = Some(PeerRouterRuntime {
542 shutdown,
543 join,
544 peer_state_persist,
545 });
546 Ok(true)
547 }
548
549 pub async fn shutdown(&self) {
550 let mut runtime = self.runtime.lock().await;
551 let Some(runtime_handle) = runtime.take() else {
552 return;
553 };
554
555 if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
556 tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
557 }
558 let _ = runtime_handle.shutdown.send(true);
559 runtime_handle.peer_state_persist.abort();
560 let mut join = runtime_handle.join;
561 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
562 Ok(Ok(())) => {}
563 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
564 Err(_) => {
565 tracing::warn!("Timed out waiting for peer router shutdown");
566 join.abort();
567 }
568 }
569
570 self.state.reset_runtime_state().await;
571 }
572}
573
574pub struct EmbeddedDaemonController {
575 server_controller: Arc<EmbeddedServerController>,
576 fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
577 #[cfg(feature = "p2p")]
578 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
579 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
580}
581
582impl EmbeddedDaemonController {
583 #[cfg(feature = "p2p")]
584 pub fn new(
585 server_controller: Arc<EmbeddedServerController>,
586 fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
587 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
588 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
589 ) -> Self {
590 Self {
591 server_controller,
592 fips_handle,
593 #[cfg(feature = "p2p")]
594 peer_router_controller,
595 background_services_controller,
596 }
597 }
598
599 #[cfg(not(feature = "p2p"))]
600 pub fn new(
601 server_controller: Arc<EmbeddedServerController>,
602 fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
603 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
604 ) -> Self {
605 Self {
606 server_controller,
607 fips_handle,
608 background_services_controller,
609 }
610 }
611
612 pub async fn shutdown(&self) {
613 self.server_controller.shutdown().await;
614 if let Some(handle) = self.fips_handle.as_ref() {
615 handle.shutdown();
616 }
617 if let Some(controller) = self.background_services_controller.as_ref() {
618 controller.shutdown().await;
619 }
620 #[cfg(feature = "p2p")]
621 if let Some(controller) = self.peer_router_controller.as_ref() {
622 controller.shutdown().await;
623 }
624 }
625}
626
627pub struct EmbeddedDaemonOptions {
628 pub config: Config,
629 pub data_dir: PathBuf,
630 pub config_dir: Option<PathBuf>,
631 pub bind_address: String,
632 pub relays: Option<Vec<String>>,
633 pub extra_routes: Option<Router<AppState>>,
634 pub cors: Option<CorsLayer>,
635}
636
637pub struct EmbeddedDaemonInfo {
638 pub addr: String,
639 pub port: u16,
640 pub npub: String,
641 pub store: Arc<HashtreeStore>,
642 pub daemon_controller: Arc<EmbeddedDaemonController>,
643 #[allow(dead_code)]
644 pub webrtc_state: Option<Arc<WebRTCState>>,
645 #[cfg(feature = "p2p")]
646 #[allow(dead_code)]
647 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
648 #[allow(dead_code)]
649 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
650}
651
652pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
653 let _ = rustls::crypto::ring::default_provider().install_default();
654
655 let mut config = opts.config;
656 config.server.bind_address = opts.bind_address.clone();
657 if let Some(relays) = opts.relays {
658 config.nostr.relays = relays;
659 config.nostr.enabled = !config.nostr.relays.is_empty();
660 }
661
662 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
663 let nostr_db_max_bytes = config
664 .nostr
665 .db_max_size_gb
666 .saturating_mul(1024 * 1024 * 1024);
667 let spambox_db_max_bytes = config
668 .nostr
669 .spambox_max_size_gb
670 .saturating_mul(1024 * 1024 * 1024);
671
672 let store = Arc::new(HashtreeStore::with_options(
673 &opts.data_dir,
674 config.storage.s3.as_ref(),
675 max_size_bytes,
676 )?);
677
678 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
679 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
680 } else {
681 ensure_keys()?
682 };
683 let pk_bytes = pubkey_bytes(&keys);
684 let npub = keys
685 .public_key()
686 .to_bech32()
687 .context("Failed to encode npub")?;
688
689 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
690 allowed_pubkeys.insert(hex::encode(pk_bytes));
691 for npub_str in &config.nostr.allowed_npubs {
692 if let Ok(pk) = parse_npub(npub_str) {
693 allowed_pubkeys.insert(hex::encode(pk));
694 } else {
695 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
696 }
697 }
698
699 let graph_store = socialgraph::open_social_graph_store_with_storage(
700 &opts.data_dir,
701 store.store_arc(),
702 Some(nostr_db_max_bytes),
703 )
704 .context("Failed to initialize social graph store")?;
705 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
706
707 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
708 parse_npub(root_npub).unwrap_or(pk_bytes)
709 } else {
710 pk_bytes
711 };
712 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
713 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
714 .context("Failed to sync local social graph lists")?;
715 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
716
717 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
718 Arc::clone(&social_graph_store),
719 config.nostr.max_write_distance,
720 allowed_pubkeys.clone(),
721 ));
722
723 let nostr_relay_config = NostrRelayConfig {
724 spambox_db_max_bytes,
725 ..Default::default()
726 };
727 let nostr_relay = if config.nostr.enabled {
728 let mut public_event_pubkeys = HashSet::new();
729 public_event_pubkeys.insert(hex::encode(pk_bytes));
730 Some(Arc::new(
731 NostrRelay::new(
732 Arc::clone(&social_graph_store),
733 opts.data_dir.clone(),
734 public_event_pubkeys,
735 Some(social_graph.clone()),
736 nostr_relay_config,
737 )
738 .context("Failed to initialize Nostr relay")?,
739 ))
740 } else {
741 None
742 };
743
744 let crawler_spambox = if config.nostr.enabled && spambox_db_max_bytes != 0 {
745 let spam_dir = opts.data_dir.join("socialgraph_spambox");
746 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
747 Ok(store) => Some(store),
748 Err(err) => {
749 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
750 None
751 }
752 }
753 } else {
754 None
755 };
756 let crawler_spambox_backend = crawler_spambox
757 .clone()
758 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
759
760 #[cfg(feature = "p2p")]
761 let (webrtc_state, peer_router_controller): (
762 Option<Arc<WebRTCState>>,
763 Option<Arc<EmbeddedPeerRouterController>>,
764 ) = if let Some(nostr_relay) = nostr_relay.clone() {
765 let router_config = crate::p2p_common::default_webrtc_config(&config);
766 let peer_classifier = crate::p2p_common::build_peer_classifier(
767 opts.data_dir.clone(),
768 Arc::clone(&social_graph_store),
769 );
770 let cashu_payment_client =
771 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
772 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
773 Ok(client) => {
774 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
775 }
776 Err(err) => {
777 tracing::warn!(
778 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
779 err
780 );
781 None
782 }
783 }
784 } else {
785 None
786 };
787 let cashu_mint_metadata =
788 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
789 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
790 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
791 Ok(store) => Some(store),
792 Err(err) => {
793 tracing::warn!(
794 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
795 err
796 );
797 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
798 }
799 }
800 } else {
801 None
802 };
803
804 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
805 router_config.request_selection_strategy,
806 router_config.request_fairness_enabled,
807 router_config.request_dispatch,
808 std::time::Duration::from_millis(router_config.message_timeout_ms),
809 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
810 cashu_payment_client,
811 cashu_mint_metadata,
812 ));
813 let controller = Arc::new(EmbeddedPeerRouterController::new(
814 keys.clone(),
815 opts.data_dir.clone(),
816 state.clone(),
817 Arc::clone(&store) as Arc<dyn ContentStore>,
818 peer_classifier,
819 nostr_relay.clone(),
820 ));
821 controller.apply_config(&config).await?;
822 (Some(state), Some(controller))
823 } else {
824 (None, None)
825 };
826
827 #[cfg(not(feature = "p2p"))]
828 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
829
830 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
831 keys.clone(),
832 opts.data_dir.clone(),
833 Arc::clone(&store),
834 graph_store.clone(),
835 Arc::clone(&social_graph_store),
836 crawler_spambox_backend,
837 webrtc_state.clone(),
838 ));
839
840 let upstream_blossom = config.blossom.all_read_servers();
841 let active_nostr_relays = config.nostr.active_relays();
842 let fips_handle =
843 crate::fips_transport::start_daemon_fips_transport(&config, &keys, Arc::clone(&store))
844 .await?
845 .map(Arc::new);
846
847 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
848 .with_server_mode(config.server.mode)
849 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
850 .with_fetch_from_fips_peers(config.server.fetch_from_fips_peers)
851 .with_allowed_pubkeys(allowed_pubkeys.clone())
852 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
853 .with_public_writes(config.server.public_writes)
854 .with_public_plaintext_reads(config.server.public_plaintext_reads)
855 .with_require_random_untrusted_ingest(config.blossom.require_random_untrusted_ingest)
856 .with_optimistic_blossom_uploads(config.blossom.optimistic_uploads)
857 .with_upstream_blossom(upstream_blossom)
858 .with_nostr_relay_urls(active_nostr_relays)
859 .with_social_graph(social_graph)
860 .with_socialgraph_snapshot(
861 Arc::clone(&social_graph_store),
862 social_graph_root_bytes,
863 config.server.socialgraph_snapshot_public,
864 );
865 if let Some(nostr_relay) = nostr_relay {
866 server = server.with_nostr_relay(nostr_relay);
867 }
868
869 if crate::p2p_common::peer_router_enabled(&config) {
870 if let Some(ref state) = webrtc_state {
871 server = server.with_webrtc_peers(state.clone());
872 }
873 }
874 if let Some(ref fips_handle) = fips_handle {
875 server = server.with_fips_transport(fips_handle.transport.clone());
876 }
877
878 if let Some(extra) = opts.extra_routes {
879 server = server.with_extra_routes(extra);
880 }
881 if let Some(cors) = opts.cors {
882 server = server.with_cors(cors);
883 }
884
885 spawn_background_eviction_task(
886 Arc::clone(&store),
887 BACKGROUND_EVICTION_INTERVAL,
888 "embedded daemon",
889 );
890
891 let listener = TcpListener::bind(&opts.bind_address).await?;
892 let local_addr = listener.local_addr()?;
893 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
894
895 let server_shutdown = Arc::new(Notify::new());
896 let server_shutdown_for_task = Arc::clone(&server_shutdown);
897 let server_join = tokio::spawn(async move {
898 if let Err(e) = server
899 .run_with_listener_until(listener, async move {
900 server_shutdown_for_task.notified().await;
901 })
902 .await
903 {
904 tracing::error!("Embedded daemon server error: {}", e);
905 }
906 });
907 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
908 background_services_controller.apply_config(&config).await?;
909 #[cfg(feature = "p2p")]
910 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
911 server_controller,
912 fips_handle.clone(),
913 peer_router_controller.clone(),
914 Some(background_services_controller.clone()),
915 ));
916 #[cfg(not(feature = "p2p"))]
917 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
918 server_controller,
919 fips_handle.clone(),
920 Some(background_services_controller.clone()),
921 ));
922
923 tracing::info!(
924 "Embedded daemon started on {}, identity {}",
925 actual_addr,
926 npub
927 );
928
929 Ok(EmbeddedDaemonInfo {
930 addr: actual_addr,
931 port: local_addr.port(),
932 npub,
933 store,
934 daemon_controller,
935 webrtc_state,
936 #[cfg(feature = "p2p")]
937 peer_router_controller,
938 background_services_controller: Some(background_services_controller),
939 })
940}
941
942#[cfg(test)]
943mod tests {
944 use super::EmbeddedBackgroundServicesController;
945 use crate::config::Config;
946
947 #[test]
948 fn mirror_publish_relays_orders_known_root_publish_relays_first() {
949 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
950 &[
951 "wss://graph-relay.iris.to".to_string(),
952 "wss://relay.example".to_string(),
953 "wss://relay.primal.net".to_string(),
954 "wss://relay.damus.io".to_string(),
955 "wss://temp.iris.to".to_string(),
956 "wss://vault.iris.to".to_string(),
957 "wss://upload.iris.to/nostr".to_string(),
958 ],
959 "0.0.0.0:8080",
960 );
961 assert_eq!(
962 relays,
963 vec![
964 "wss://temp.iris.to".to_string(),
965 "wss://vault.iris.to".to_string(),
966 "wss://relay.damus.io".to_string(),
967 "wss://relay.example".to_string(),
968 "wss://relay.primal.net".to_string(),
969 ]
970 );
971 }
972
973 #[test]
974 fn mirror_publish_relays_do_not_add_non_active_publish_targets() {
975 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
976 &[
977 "wss://graph-relay.iris.to".to_string(),
978 "wss://relay.example".to_string(),
979 ],
980 "0.0.0.0:8080",
981 );
982 assert_eq!(relays, vec!["wss://relay.example".to_string()]);
983 }
984
985 #[test]
986 fn mirror_publish_relays_falls_back_to_active_relays_when_all_are_blocklisted() {
987 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
988 &[
989 "wss://graph-relay.iris.to".to_string(),
990 "wss://upload.iris.to/nostr".to_string(),
991 ],
992 "0.0.0.0:8080",
993 );
994 assert_eq!(
995 relays,
996 vec![
997 "wss://graph-relay.iris.to".to_string(),
998 "wss://upload.iris.to/nostr".to_string(),
999 ]
1000 );
1001 }
1002
1003 #[test]
1004 fn nostr_mirror_config_allows_disabling_full_note_paging() {
1005 let mut config = Config::default();
1006 config.nostr.full_text_note_history_max_relay_pages = 0;
1007
1008 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1009 &config,
1010 &["wss://relay.example".to_string()],
1011 );
1012
1013 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
1014
1015 config.nostr.full_text_note_history_max_relay_pages = 64;
1016 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1017 &config,
1018 &["wss://relay.example".to_string()],
1019 );
1020
1021 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
1022 }
1023
1024 #[test]
1025 fn nostr_mirror_config_can_limit_mirror_distance_independently() {
1026 let mut config = Config::default();
1027 config.nostr.social_graph_crawl_depth = 6;
1028 config.nostr.mirror_max_follow_distance = Some(2);
1029
1030 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1031 &config,
1032 &["wss://relay.example".to_string()],
1033 );
1034
1035 assert_eq!(mirror_config.max_follow_distance, 2);
1036
1037 config.nostr.mirror_max_follow_distance = None;
1038 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1039 &config,
1040 &["wss://relay.example".to_string()],
1041 );
1042
1043 assert_eq!(mirror_config.max_follow_distance, 6);
1044 }
1045}