1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29 peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38 fn drop(&mut self) {
39 self.service.shutdown();
40 if let Some(join) = self.join.take() {
41 join.abort();
42 }
43 }
44}
45
46struct BackgroundMirrorRuntime {
47 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48 join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52 fn drop(&mut self) {
53 self.service.shutdown();
54 if let Some(join) = self.join.take() {
55 join.abort();
56 }
57 }
58}
59
60struct BackgroundServicesRuntime {
61 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62 mirror: Option<BackgroundMirrorRuntime>,
63 sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67 fn drop(&mut self) {
68 if let Some(handles) = self.crawler.as_ref() {
69 let _ = handles.shutdown_tx.send(true);
70 }
71 if let Some(runtime) = self.mirror.as_ref() {
72 runtime.service.shutdown();
73 }
74 if let Some(runtime) = self.sync.as_ref() {
75 runtime.service.shutdown();
76 }
77 }
78}
79
80impl BackgroundServicesRuntime {
81 fn status(&self) -> EmbeddedBackgroundServicesStatus {
82 EmbeddedBackgroundServicesStatus {
83 crawler_active: self.crawler.is_some(),
84 mirror_active: self.mirror.is_some(),
85 sync_active: self.sync.is_some(),
86 }
87 }
88}
89
90struct EmbeddedServerRuntime {
91 shutdown: Arc<Notify>,
92 join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96 runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101 Self {
102 runtime: Mutex::new(Some(EmbeddedServerRuntime {
103 shutdown,
104 join: Some(join),
105 })),
106 }
107 }
108
109 pub async fn shutdown(&self) {
110 let mut runtime = self.runtime.lock().await;
111 let Some(mut runtime) = runtime.take() else {
112 return;
113 };
114
115 runtime.shutdown.notify_waiters();
116 if let Some(mut join) = runtime.join.take() {
117 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118 Ok(Ok(())) => {}
119 Ok(Err(err)) => {
120 tracing::warn!("Embedded server task ended with join error: {}", err)
121 }
122 Err(_) => {
123 tracing::warn!("Timed out waiting for embedded server shutdown");
124 join.abort();
125 }
126 }
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133 pub crawler_active: bool,
134 pub mirror_active: bool,
135 pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139 keys: Keys,
140 data_dir: PathBuf,
141 store: Arc<HashtreeStore>,
142 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145 webrtc_state: Option<Arc<WebRTCState>>,
146 runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150 const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151 "wss://nos.lol",
152 "wss://temp.iris.to",
153 "wss://vault.iris.to",
154 "wss://relay.damus.io",
155 ];
156 const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
157 &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
158
159 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
160 let mut seen = HashSet::new();
161 let active_relays = active_relays
162 .iter()
163 .filter(|relay| seen.insert((*relay).clone()))
164 .cloned()
165 .collect::<Vec<_>>();
166 if active_relays.is_empty() {
167 return Vec::new();
168 }
169 let filtered = active_relays
170 .iter()
171 .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
172 .cloned()
173 .collect::<Vec<_>>();
174 if filtered.is_empty() {
175 return active_relays;
176 }
177
178 let mut selected = Vec::new();
179 let mut selected_set = HashSet::new();
180 for relay in Self::MIRROR_PUBLISH_RELAY_PRIORITY {
181 if filtered.iter().any(|active| active == relay) {
182 selected.push((*relay).to_string());
183 selected_set.insert((*relay).to_string());
184 }
185 }
186 for relay in filtered {
187 if selected_set.insert(relay.clone()) {
188 selected.push(relay);
189 }
190 }
191
192 selected
193 }
194
195 pub fn new(
196 keys: Keys,
197 data_dir: PathBuf,
198 store: Arc<HashtreeStore>,
199 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
200 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
201 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
202 webrtc_state: Option<Arc<WebRTCState>>,
203 ) -> Self {
204 Self {
205 keys,
206 data_dir,
207 store,
208 graph_store_concrete,
209 graph_store,
210 spambox,
211 webrtc_state,
212 runtime: Mutex::new(BackgroundServicesRuntime {
213 crawler: None,
214 mirror: None,
215 sync: None,
216 }),
217 }
218 }
219
220 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
221 self.runtime.lock().await.status()
222 }
223
224 pub async fn shutdown(&self) {
225 let mut runtime = self.runtime.lock().await;
226 Self::shutdown_crawler(&mut runtime.crawler).await;
227 Self::shutdown_mirror(&mut runtime.mirror).await;
228 Self::shutdown_sync(&mut runtime.sync).await;
229 }
230
231 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
232 let Some(handles) = crawler.take() else {
233 return;
234 };
235
236 let _ = handles.shutdown_tx.send(true);
237
238 let mut crawl_handle = handles.crawl_handle;
239 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
240 Ok(Ok(())) => {}
241 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
242 Err(_) => {
243 tracing::warn!("Timed out waiting for crawler task shutdown");
244 crawl_handle.abort();
245 }
246 }
247
248 let mut local_list_handle = handles.local_list_handle;
249 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
250 {
251 Ok(Ok(())) => {}
252 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
253 Err(_) => {
254 tracing::warn!("Timed out waiting for local list task shutdown");
255 local_list_handle.abort();
256 }
257 }
258 }
259
260 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
261 let Some(mut runtime) = sync.take() else {
262 return;
263 };
264
265 runtime.service.shutdown();
266 if let Some(mut join) = runtime.join.take() {
267 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
268 Ok(Ok(())) => {}
269 Ok(Err(err)) => {
270 tracing::warn!("Background sync task ended with join error: {}", err)
271 }
272 Err(_) => {
273 tracing::warn!("Timed out waiting for background sync shutdown");
274 join.abort();
275 }
276 }
277 }
278 }
279
280 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
281 let Some(mut runtime) = mirror.take() else {
282 return;
283 };
284
285 runtime.service.shutdown();
286 if let Some(mut join) = runtime.join.take() {
287 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
288 Ok(Ok(())) => {}
289 Ok(Err(err)) => {
290 tracing::warn!("Background mirror task ended with join error: {}", err)
291 }
292 Err(_) => {
293 tracing::warn!("Timed out waiting for background mirror shutdown");
294 join.abort();
295 }
296 }
297 }
298 }
299
300 fn nostr_mirror_config(
301 config: &Config,
302 active_relays: &[String],
303 ) -> crate::nostr_mirror::NostrMirrorConfig {
304 crate::nostr_mirror::NostrMirrorConfig {
305 relays: active_relays.to_vec(),
306 publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
307 blossom_write_servers: config.blossom.all_write_servers(),
308 max_follow_distance: config
309 .nostr
310 .mirror_max_follow_distance
311 .unwrap_or(config.nostr.social_graph_crawl_depth),
312 overmute_threshold: config.nostr.overmute_threshold,
313 require_negentropy: config.nostr.negentropy_only,
314 kinds: config.nostr.mirror_kinds.clone(),
315 history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
316 history_sync_per_author_event_limit: config
317 .nostr
318 .history_sync_per_author_event_limit
319 .max(1),
320 missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
321 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
322 full_text_note_history_follow_distance: config
323 .nostr
324 .full_text_note_history_follow_distance,
325 full_text_note_history_max_relay_pages: config
326 .nostr
327 .full_text_note_history_max_relay_pages,
328 ..crate::nostr_mirror::NostrMirrorConfig::default()
329 }
330 }
331
332 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
333 let mut runtime = self.runtime.lock().await;
334
335 Self::shutdown_crawler(&mut runtime.crawler).await;
336 Self::shutdown_mirror(&mut runtime.mirror).await;
337 Self::shutdown_sync(&mut runtime.sync).await;
338
339 if !config.server.mode.background_services_enabled() {
340 return Ok(runtime.status());
341 }
342
343 let active_relays = config.nostr.active_relays();
344
345 if config.nostr.enabled
346 && config.nostr.social_graph_crawl_depth > 0
347 && !active_relays.is_empty()
348 {
349 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
350 self.graph_store.clone(),
351 self.keys.clone(),
352 active_relays.clone(),
353 config.nostr.social_graph_crawl_depth,
354 self.spambox.clone(),
355 self.data_dir.clone(),
356 ));
357
358 let service = Arc::new(
359 crate::nostr_mirror::BackgroundNostrMirror::new(
360 Self::nostr_mirror_config(config, &active_relays),
361 self.store.clone(),
362 self.graph_store_concrete.clone(),
363 Some(
364 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
365 .context("Failed to parse keys for background nostr mirror")?,
366 ),
367 )
368 .await
369 .context("Failed to create background nostr mirror")?,
370 );
371 let service_for_task = service.clone();
372 let join = tokio::task::spawn_blocking(move || {
373 let runtime = tokio::runtime::Builder::new_current_thread()
374 .enable_all()
375 .build()
376 .expect("build background nostr mirror runtime");
377 runtime.block_on(async {
378 if let Err(err) = service_for_task.run().await {
379 tracing::error!("Background nostr mirror error: {:#}", err);
380 }
381 });
382 });
383 runtime.mirror = Some(BackgroundMirrorRuntime {
384 service,
385 join: Some(join),
386 });
387 }
388
389 let has_pinned_refs = self
390 .store
391 .list_pinned_refs()
392 .map(|refs| !refs.is_empty())
393 .unwrap_or(false);
394 let has_tracked_authors = self
395 .store
396 .list_tracked_authors()
397 .map(|authors| !authors.is_empty())
398 .unwrap_or(false);
399
400 if config.sync.enabled
401 && (config.sync.sync_own
402 || config.sync.sync_followed
403 || has_pinned_refs
404 || has_tracked_authors)
405 && !active_relays.is_empty()
406 {
407 let sync_config = crate::sync::SyncConfig {
408 sync_own: config.sync.sync_own,
409 sync_followed: config.sync.sync_followed,
410 relays: active_relays,
411 max_concurrent: config.sync.max_concurrent,
412 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
413 blossom_timeout_ms: config.sync.blossom_timeout_ms,
414 };
415
416 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
417 .context("Failed to parse keys for sync")?;
418 let service = Arc::new(
419 crate::sync::BackgroundSync::new(
420 sync_config,
421 self.store.clone(),
422 sync_keys,
423 self.webrtc_state.clone(),
424 )
425 .await
426 .context("Failed to create background sync service")?,
427 );
428 let contacts_file = self.data_dir.join("contacts.json");
429 let service_for_task = service.clone();
430 let join = tokio::spawn(async move {
431 if let Err(err) = service_for_task.run(contacts_file).await {
432 tracing::error!("Background sync error: {}", err);
433 }
434 });
435 runtime.sync = Some(BackgroundSyncRuntime {
436 service,
437 join: Some(join),
438 });
439 }
440
441 Ok(runtime.status())
442 }
443}
444
445#[cfg(feature = "p2p")]
446pub struct EmbeddedPeerRouterController {
447 keys: Keys,
448 data_dir: PathBuf,
449 state: Arc<WebRTCState>,
450 store: Arc<dyn ContentStore>,
451 peer_classifier: PeerClassifier,
452 nostr_relay: Arc<NostrRelay>,
453 runtime: Mutex<Option<PeerRouterRuntime>>,
454}
455
456#[cfg(feature = "p2p")]
457impl EmbeddedPeerRouterController {
458 pub fn new(
459 keys: Keys,
460 data_dir: PathBuf,
461 state: Arc<WebRTCState>,
462 store: Arc<dyn ContentStore>,
463 peer_classifier: PeerClassifier,
464 nostr_relay: Arc<NostrRelay>,
465 ) -> Self {
466 Self {
467 keys,
468 data_dir,
469 state,
470 store,
471 peer_classifier,
472 nostr_relay,
473 runtime: Mutex::new(None),
474 }
475 }
476
477 pub fn state(&self) -> Arc<WebRTCState> {
478 self.state.clone()
479 }
480
481 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
482 let mut runtime = self.runtime.lock().await;
483 if let Some(runtime_handle) = runtime.take() {
484 if let Err(err) =
485 crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
486 {
487 tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
488 }
489 let _ = runtime_handle.shutdown.send(true);
490 runtime_handle.peer_state_persist.abort();
491 let mut join = runtime_handle.join;
492 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
493 Ok(Ok(())) => {}
494 Ok(Err(err)) => {
495 tracing::warn!("Peer router task ended with join error: {}", err);
496 }
497 Err(_) => {
498 tracing::warn!("Timed out waiting for peer router shutdown");
499 join.abort();
500 }
501 }
502 }
503
504 self.state.reset_runtime_state().await;
505 if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
506 tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
507 }
508
509 if !crate::p2p_common::peer_router_enabled(config) {
510 return Ok(false);
511 }
512
513 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
514 let mut manager = if config.server.mode.hash_get_enabled() {
515 WebRTCManager::new_with_state_and_store_and_classifier(
516 self.keys.clone(),
517 webrtc_config,
518 self.state.clone(),
519 self.store.clone(),
520 self.peer_classifier.clone(),
521 )
522 } else {
523 let mut manager =
524 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
525 manager.set_peer_classifier(self.peer_classifier.clone());
526 manager
527 };
528 manager
529 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
530 let shutdown = manager.shutdown_signal();
531 let join = tokio::spawn(async move {
532 if let Err(err) = manager.run().await {
533 tracing::error!("Peer router error: {}", err);
534 }
535 });
536 let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
537 self.data_dir.clone(),
538 self.state.clone(),
539 );
540 *runtime = Some(PeerRouterRuntime {
541 shutdown,
542 join,
543 peer_state_persist,
544 });
545 Ok(true)
546 }
547
548 pub async fn shutdown(&self) {
549 let mut runtime = self.runtime.lock().await;
550 let Some(runtime_handle) = runtime.take() else {
551 return;
552 };
553
554 if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
555 tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
556 }
557 let _ = runtime_handle.shutdown.send(true);
558 runtime_handle.peer_state_persist.abort();
559 let mut join = runtime_handle.join;
560 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
561 Ok(Ok(())) => {}
562 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
563 Err(_) => {
564 tracing::warn!("Timed out waiting for peer router shutdown");
565 join.abort();
566 }
567 }
568
569 self.state.reset_runtime_state().await;
570 }
571}
572
573pub struct EmbeddedDaemonController {
574 server_controller: Arc<EmbeddedServerController>,
575 #[cfg(feature = "p2p")]
576 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
577 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
578}
579
580impl EmbeddedDaemonController {
581 #[cfg(feature = "p2p")]
582 pub fn new(
583 server_controller: Arc<EmbeddedServerController>,
584 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
585 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
586 ) -> Self {
587 Self {
588 server_controller,
589 #[cfg(feature = "p2p")]
590 peer_router_controller,
591 background_services_controller,
592 }
593 }
594
595 #[cfg(not(feature = "p2p"))]
596 pub fn new(
597 server_controller: Arc<EmbeddedServerController>,
598 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
599 ) -> Self {
600 Self {
601 server_controller,
602 background_services_controller,
603 }
604 }
605
606 pub async fn shutdown(&self) {
607 self.server_controller.shutdown().await;
608 if let Some(controller) = self.background_services_controller.as_ref() {
609 controller.shutdown().await;
610 }
611 #[cfg(feature = "p2p")]
612 if let Some(controller) = self.peer_router_controller.as_ref() {
613 controller.shutdown().await;
614 }
615 }
616}
617
618pub struct EmbeddedDaemonOptions {
619 pub config: Config,
620 pub data_dir: PathBuf,
621 pub config_dir: Option<PathBuf>,
622 pub bind_address: String,
623 pub relays: Option<Vec<String>>,
624 pub extra_routes: Option<Router<AppState>>,
625 pub cors: Option<CorsLayer>,
626}
627
628pub struct EmbeddedDaemonInfo {
629 pub addr: String,
630 pub port: u16,
631 pub npub: String,
632 pub store: Arc<HashtreeStore>,
633 pub daemon_controller: Arc<EmbeddedDaemonController>,
634 #[allow(dead_code)]
635 pub webrtc_state: Option<Arc<WebRTCState>>,
636 #[cfg(feature = "p2p")]
637 #[allow(dead_code)]
638 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
639 #[allow(dead_code)]
640 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
641}
642
643pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
644 let _ = rustls::crypto::ring::default_provider().install_default();
645
646 let mut config = opts.config;
647 config.server.bind_address = opts.bind_address.clone();
648 if let Some(relays) = opts.relays {
649 config.nostr.relays = relays;
650 config.nostr.enabled = !config.nostr.relays.is_empty();
651 }
652
653 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
654 let nostr_db_max_bytes = config
655 .nostr
656 .db_max_size_gb
657 .saturating_mul(1024 * 1024 * 1024);
658 let spambox_db_max_bytes = config
659 .nostr
660 .spambox_max_size_gb
661 .saturating_mul(1024 * 1024 * 1024);
662
663 let store = Arc::new(HashtreeStore::with_options(
664 &opts.data_dir,
665 config.storage.s3.as_ref(),
666 max_size_bytes,
667 )?);
668
669 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
670 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
671 } else {
672 ensure_keys()?
673 };
674 let pk_bytes = pubkey_bytes(&keys);
675 let npub = keys
676 .public_key()
677 .to_bech32()
678 .context("Failed to encode npub")?;
679
680 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
681 allowed_pubkeys.insert(hex::encode(pk_bytes));
682 for npub_str in &config.nostr.allowed_npubs {
683 if let Ok(pk) = parse_npub(npub_str) {
684 allowed_pubkeys.insert(hex::encode(pk));
685 } else {
686 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
687 }
688 }
689
690 let graph_store = socialgraph::open_social_graph_store_with_storage(
691 &opts.data_dir,
692 store.store_arc(),
693 Some(nostr_db_max_bytes),
694 )
695 .context("Failed to initialize social graph store")?;
696 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
697
698 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
699 parse_npub(root_npub).unwrap_or(pk_bytes)
700 } else {
701 pk_bytes
702 };
703 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
704 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
705 .context("Failed to sync local social graph lists")?;
706 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
707
708 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
709 Arc::clone(&social_graph_store),
710 config.nostr.max_write_distance,
711 allowed_pubkeys.clone(),
712 ));
713
714 let nostr_relay_config = NostrRelayConfig {
715 spambox_db_max_bytes,
716 ..Default::default()
717 };
718 let mut public_event_pubkeys = HashSet::new();
719 public_event_pubkeys.insert(hex::encode(pk_bytes));
720 let nostr_relay = Arc::new(
721 NostrRelay::new(
722 Arc::clone(&social_graph_store),
723 opts.data_dir.clone(),
724 public_event_pubkeys,
725 Some(social_graph.clone()),
726 nostr_relay_config,
727 )
728 .context("Failed to initialize Nostr relay")?,
729 );
730
731 let crawler_spambox = if spambox_db_max_bytes == 0 {
732 None
733 } else {
734 let spam_dir = opts.data_dir.join("socialgraph_spambox");
735 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
736 Ok(store) => Some(store),
737 Err(err) => {
738 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
739 None
740 }
741 }
742 };
743 let crawler_spambox_backend = crawler_spambox
744 .clone()
745 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
746
747 #[cfg(feature = "p2p")]
748 let (webrtc_state, peer_router_controller): (
749 Option<Arc<WebRTCState>>,
750 Option<Arc<EmbeddedPeerRouterController>>,
751 ) = {
752 let router_config = crate::p2p_common::default_webrtc_config(&config);
753 let peer_classifier = crate::p2p_common::build_peer_classifier(
754 opts.data_dir.clone(),
755 Arc::clone(&social_graph_store),
756 );
757 let cashu_payment_client =
758 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
759 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
760 Ok(client) => {
761 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
762 }
763 Err(err) => {
764 tracing::warn!(
765 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
766 err
767 );
768 None
769 }
770 }
771 } else {
772 None
773 };
774 let cashu_mint_metadata =
775 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
776 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
777 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
778 Ok(store) => Some(store),
779 Err(err) => {
780 tracing::warn!(
781 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
782 err
783 );
784 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
785 }
786 }
787 } else {
788 None
789 };
790
791 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
792 router_config.request_selection_strategy,
793 router_config.request_fairness_enabled,
794 router_config.request_dispatch,
795 std::time::Duration::from_millis(router_config.message_timeout_ms),
796 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
797 cashu_payment_client,
798 cashu_mint_metadata,
799 ));
800 let controller = Arc::new(EmbeddedPeerRouterController::new(
801 keys.clone(),
802 opts.data_dir.clone(),
803 state.clone(),
804 Arc::clone(&store) as Arc<dyn ContentStore>,
805 peer_classifier,
806 nostr_relay.clone(),
807 ));
808 controller.apply_config(&config).await?;
809 (Some(state), Some(controller))
810 };
811
812 #[cfg(not(feature = "p2p"))]
813 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
814
815 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
816 keys.clone(),
817 opts.data_dir.clone(),
818 Arc::clone(&store),
819 graph_store.clone(),
820 Arc::clone(&social_graph_store),
821 crawler_spambox_backend,
822 webrtc_state.clone(),
823 ));
824
825 let upstream_blossom = config.blossom.all_read_servers();
826 let active_nostr_relays = config.nostr.active_relays();
827
828 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
829 .with_server_mode(config.server.mode)
830 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
831 .with_allowed_pubkeys(allowed_pubkeys.clone())
832 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
833 .with_public_writes(config.server.public_writes)
834 .with_upstream_blossom(upstream_blossom)
835 .with_nostr_relay_urls(active_nostr_relays)
836 .with_social_graph(social_graph)
837 .with_socialgraph_snapshot(
838 Arc::clone(&social_graph_store),
839 social_graph_root_bytes,
840 config.server.socialgraph_snapshot_public,
841 )
842 .with_nostr_relay(nostr_relay.clone());
843
844 if crate::p2p_common::peer_router_enabled(&config) {
845 if let Some(ref state) = webrtc_state {
846 server = server.with_webrtc_peers(state.clone());
847 }
848 }
849
850 if let Some(extra) = opts.extra_routes {
851 server = server.with_extra_routes(extra);
852 }
853 if let Some(cors) = opts.cors {
854 server = server.with_cors(cors);
855 }
856
857 spawn_background_eviction_task(
858 Arc::clone(&store),
859 BACKGROUND_EVICTION_INTERVAL,
860 "embedded daemon",
861 );
862
863 let listener = TcpListener::bind(&opts.bind_address).await?;
864 let local_addr = listener.local_addr()?;
865 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
866
867 let server_shutdown = Arc::new(Notify::new());
868 let server_shutdown_for_task = Arc::clone(&server_shutdown);
869 let server_join = tokio::spawn(async move {
870 if let Err(e) = server
871 .run_with_listener_until(listener, async move {
872 server_shutdown_for_task.notified().await;
873 })
874 .await
875 {
876 tracing::error!("Embedded daemon server error: {}", e);
877 }
878 });
879 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
880 background_services_controller.apply_config(&config).await?;
881 #[cfg(feature = "p2p")]
882 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
883 server_controller,
884 peer_router_controller.clone(),
885 Some(background_services_controller.clone()),
886 ));
887 #[cfg(not(feature = "p2p"))]
888 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
889 server_controller,
890 Some(background_services_controller.clone()),
891 ));
892
893 tracing::info!(
894 "Embedded daemon started on {}, identity {}",
895 actual_addr,
896 npub
897 );
898
899 Ok(EmbeddedDaemonInfo {
900 addr: actual_addr,
901 port: local_addr.port(),
902 npub,
903 store,
904 daemon_controller,
905 webrtc_state,
906 #[cfg(feature = "p2p")]
907 peer_router_controller,
908 background_services_controller: Some(background_services_controller),
909 })
910}
911
912#[cfg(test)]
913mod tests {
914 use super::EmbeddedBackgroundServicesController;
915 use crate::config::Config;
916
917 #[test]
918 fn mirror_publish_relays_orders_known_root_publish_relays_first() {
919 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
920 &[
921 "wss://graph-relay.iris.to".to_string(),
922 "wss://relay.example".to_string(),
923 "wss://relay.primal.net".to_string(),
924 "wss://relay.damus.io".to_string(),
925 "wss://temp.iris.to".to_string(),
926 "wss://vault.iris.to".to_string(),
927 "wss://upload.iris.to/nostr".to_string(),
928 ],
929 "0.0.0.0:8080",
930 );
931 assert_eq!(
932 relays,
933 vec![
934 "wss://temp.iris.to".to_string(),
935 "wss://vault.iris.to".to_string(),
936 "wss://relay.damus.io".to_string(),
937 "wss://relay.example".to_string(),
938 "wss://relay.primal.net".to_string(),
939 ]
940 );
941 }
942
943 #[test]
944 fn mirror_publish_relays_do_not_add_non_active_publish_targets() {
945 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
946 &[
947 "wss://graph-relay.iris.to".to_string(),
948 "wss://relay.example".to_string(),
949 ],
950 "0.0.0.0:8080",
951 );
952 assert_eq!(relays, vec!["wss://relay.example".to_string()]);
953 }
954
955 #[test]
956 fn mirror_publish_relays_falls_back_to_active_relays_when_all_are_blocklisted() {
957 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
958 &[
959 "wss://graph-relay.iris.to".to_string(),
960 "wss://upload.iris.to/nostr".to_string(),
961 ],
962 "0.0.0.0:8080",
963 );
964 assert_eq!(
965 relays,
966 vec![
967 "wss://graph-relay.iris.to".to_string(),
968 "wss://upload.iris.to/nostr".to_string(),
969 ]
970 );
971 }
972
973 #[test]
974 fn nostr_mirror_config_allows_disabling_full_note_paging() {
975 let mut config = Config::default();
976 config.nostr.full_text_note_history_max_relay_pages = 0;
977
978 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
979 &config,
980 &["wss://relay.example".to_string()],
981 );
982
983 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
984
985 config.nostr.full_text_note_history_max_relay_pages = 64;
986 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
987 &config,
988 &["wss://relay.example".to_string()],
989 );
990
991 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
992 }
993
994 #[test]
995 fn nostr_mirror_config_can_limit_mirror_distance_independently() {
996 let mut config = Config::default();
997 config.nostr.social_graph_crawl_depth = 6;
998 config.nostr.mirror_max_follow_distance = Some(2);
999
1000 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1001 &config,
1002 &["wss://relay.example".to_string()],
1003 );
1004
1005 assert_eq!(mirror_config.max_follow_distance, 2);
1006
1007 config.nostr.mirror_max_follow_distance = None;
1008 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1009 &config,
1010 &["wss://relay.example".to_string()],
1011 );
1012
1013 assert_eq!(mirror_config.max_follow_distance, 6);
1014 }
1015}