1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29 peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38 fn drop(&mut self) {
39 self.service.shutdown();
40 if let Some(join) = self.join.take() {
41 join.abort();
42 }
43 }
44}
45
46struct BackgroundMirrorRuntime {
47 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48 join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52 fn drop(&mut self) {
53 self.service.shutdown();
54 if let Some(join) = self.join.take() {
55 join.abort();
56 }
57 }
58}
59
60struct BackgroundServicesRuntime {
61 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62 mirror: Option<BackgroundMirrorRuntime>,
63 sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67 fn drop(&mut self) {
68 if let Some(handles) = self.crawler.as_ref() {
69 let _ = handles.shutdown_tx.send(true);
70 }
71 if let Some(runtime) = self.mirror.as_ref() {
72 runtime.service.shutdown();
73 }
74 if let Some(runtime) = self.sync.as_ref() {
75 runtime.service.shutdown();
76 }
77 }
78}
79
80impl BackgroundServicesRuntime {
81 fn status(&self) -> EmbeddedBackgroundServicesStatus {
82 EmbeddedBackgroundServicesStatus {
83 crawler_active: self.crawler.is_some(),
84 mirror_active: self.mirror.is_some(),
85 sync_active: self.sync.is_some(),
86 }
87 }
88}
89
90struct EmbeddedServerRuntime {
91 shutdown: Arc<Notify>,
92 join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96 runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101 Self {
102 runtime: Mutex::new(Some(EmbeddedServerRuntime {
103 shutdown,
104 join: Some(join),
105 })),
106 }
107 }
108
109 pub async fn shutdown(&self) {
110 let mut runtime = self.runtime.lock().await;
111 let Some(mut runtime) = runtime.take() else {
112 return;
113 };
114
115 runtime.shutdown.notify_waiters();
116 if let Some(mut join) = runtime.join.take() {
117 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118 Ok(Ok(())) => {}
119 Ok(Err(err)) => {
120 tracing::warn!("Embedded server task ended with join error: {}", err)
121 }
122 Err(_) => {
123 tracing::warn!("Timed out waiting for embedded server shutdown");
124 join.abort();
125 }
126 }
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133 pub crawler_active: bool,
134 pub mirror_active: bool,
135 pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139 keys: Keys,
140 data_dir: PathBuf,
141 store: Arc<HashtreeStore>,
142 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145 webrtc_state: Option<Arc<WebRTCState>>,
146 runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150 const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151 "wss://nos.lol",
152 "wss://temp.iris.to",
153 "wss://vault.iris.to",
154 "wss://relay.damus.io",
155 ];
156 const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
157 &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
158
159 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
160 let mut seen = HashSet::new();
161 let active_relays = active_relays
162 .iter()
163 .filter(|relay| seen.insert((*relay).clone()))
164 .cloned()
165 .collect::<Vec<_>>();
166 if active_relays.is_empty() {
167 return Vec::new();
168 }
169 let filtered = active_relays
170 .iter()
171 .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
172 .cloned()
173 .collect::<Vec<_>>();
174 if filtered.is_empty() {
175 return active_relays;
176 }
177
178 let mut selected = Vec::new();
179 let mut selected_set = HashSet::new();
180 for relay in Self::MIRROR_PUBLISH_RELAY_PRIORITY {
181 if filtered.iter().any(|active| active == relay) {
182 selected.push((*relay).to_string());
183 selected_set.insert((*relay).to_string());
184 }
185 }
186 for relay in filtered {
187 if selected_set.insert(relay.clone()) {
188 selected.push(relay);
189 }
190 }
191
192 selected
193 }
194
195 pub fn new(
196 keys: Keys,
197 data_dir: PathBuf,
198 store: Arc<HashtreeStore>,
199 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
200 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
201 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
202 webrtc_state: Option<Arc<WebRTCState>>,
203 ) -> Self {
204 Self {
205 keys,
206 data_dir,
207 store,
208 graph_store_concrete,
209 graph_store,
210 spambox,
211 webrtc_state,
212 runtime: Mutex::new(BackgroundServicesRuntime {
213 crawler: None,
214 mirror: None,
215 sync: None,
216 }),
217 }
218 }
219
220 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
221 self.runtime.lock().await.status()
222 }
223
224 pub async fn shutdown(&self) {
225 let mut runtime = self.runtime.lock().await;
226 Self::shutdown_crawler(&mut runtime.crawler).await;
227 Self::shutdown_mirror(&mut runtime.mirror).await;
228 Self::shutdown_sync(&mut runtime.sync).await;
229 }
230
231 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
232 let Some(handles) = crawler.take() else {
233 return;
234 };
235
236 let _ = handles.shutdown_tx.send(true);
237
238 let mut crawl_handle = handles.crawl_handle;
239 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
240 Ok(Ok(())) => {}
241 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
242 Err(_) => {
243 tracing::warn!("Timed out waiting for crawler task shutdown");
244 crawl_handle.abort();
245 }
246 }
247
248 let mut local_list_handle = handles.local_list_handle;
249 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
250 {
251 Ok(Ok(())) => {}
252 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
253 Err(_) => {
254 tracing::warn!("Timed out waiting for local list task shutdown");
255 local_list_handle.abort();
256 }
257 }
258 }
259
260 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
261 let Some(mut runtime) = sync.take() else {
262 return;
263 };
264
265 runtime.service.shutdown();
266 if let Some(mut join) = runtime.join.take() {
267 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
268 Ok(Ok(())) => {}
269 Ok(Err(err)) => {
270 tracing::warn!("Background sync task ended with join error: {}", err)
271 }
272 Err(_) => {
273 tracing::warn!("Timed out waiting for background sync shutdown");
274 join.abort();
275 }
276 }
277 }
278 }
279
280 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
281 let Some(mut runtime) = mirror.take() else {
282 return;
283 };
284
285 runtime.service.shutdown();
286 if let Some(mut join) = runtime.join.take() {
287 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
288 Ok(Ok(())) => {}
289 Ok(Err(err)) => {
290 tracing::warn!("Background mirror task ended with join error: {}", err)
291 }
292 Err(_) => {
293 tracing::warn!("Timed out waiting for background mirror shutdown");
294 join.abort();
295 }
296 }
297 }
298 }
299
300 fn nostr_mirror_config(
301 config: &Config,
302 active_relays: &[String],
303 ) -> crate::nostr_mirror::NostrMirrorConfig {
304 crate::nostr_mirror::NostrMirrorConfig {
305 relays: active_relays.to_vec(),
306 publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
307 blossom_write_servers: config.blossom.all_write_servers(),
308 max_follow_distance: config
309 .nostr
310 .mirror_max_follow_distance
311 .unwrap_or(config.nostr.social_graph_crawl_depth),
312 overmute_threshold: config.nostr.overmute_threshold,
313 require_negentropy: config.nostr.negentropy_only,
314 kinds: config.nostr.mirror_kinds.clone(),
315 history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
316 history_sync_per_author_event_limit: config
317 .nostr
318 .history_sync_per_author_event_limit
319 .max(1),
320 missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
321 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
322 full_text_note_history_follow_distance: config
323 .nostr
324 .full_text_note_history_follow_distance,
325 full_text_note_history_max_relay_pages: config
326 .nostr
327 .full_text_note_history_max_relay_pages,
328 ..crate::nostr_mirror::NostrMirrorConfig::default()
329 }
330 }
331
332 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
333 let mut runtime = self.runtime.lock().await;
334
335 Self::shutdown_crawler(&mut runtime.crawler).await;
336 Self::shutdown_mirror(&mut runtime.mirror).await;
337 Self::shutdown_sync(&mut runtime.sync).await;
338
339 if !config.server.mode.background_services_enabled() {
340 return Ok(runtime.status());
341 }
342
343 let active_relays = config.nostr.active_relays();
344
345 if config.nostr.enabled
346 && config.nostr.social_graph_crawl_depth > 0
347 && !active_relays.is_empty()
348 {
349 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
350 self.graph_store.clone(),
351 self.keys.clone(),
352 active_relays.clone(),
353 config.nostr.social_graph_crawl_depth,
354 self.spambox.clone(),
355 self.data_dir.clone(),
356 ));
357
358 let service = Arc::new(
359 crate::nostr_mirror::BackgroundNostrMirror::new(
360 Self::nostr_mirror_config(config, &active_relays),
361 self.store.clone(),
362 self.graph_store_concrete.clone(),
363 Some(
364 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
365 .context("Failed to parse keys for background nostr mirror")?,
366 ),
367 )
368 .await
369 .context("Failed to create background nostr mirror")?,
370 );
371 let service_for_task = service.clone();
372 let join = tokio::task::spawn_blocking(move || {
373 let runtime = tokio::runtime::Builder::new_current_thread()
374 .enable_all()
375 .build()
376 .expect("build background nostr mirror runtime");
377 runtime.block_on(async {
378 if let Err(err) = service_for_task.run().await {
379 tracing::error!("Background nostr mirror error: {:#}", err);
380 }
381 });
382 });
383 runtime.mirror = Some(BackgroundMirrorRuntime {
384 service,
385 join: Some(join),
386 });
387 }
388
389 if config.sync.enabled && !active_relays.is_empty() {
390 let has_pinned_refs = self
391 .store
392 .list_pinned_refs()
393 .map(|refs| !refs.is_empty())
394 .unwrap_or(false);
395 let has_tracked_authors = self
396 .store
397 .list_tracked_authors()
398 .map(|authors| !authors.is_empty())
399 .unwrap_or(false);
400 let should_sync = config.sync.sync_own
401 || config.sync.sync_followed
402 || has_pinned_refs
403 || has_tracked_authors;
404 if !should_sync {
405 return Ok(runtime.status());
406 }
407
408 let sync_config = crate::sync::SyncConfig {
409 sync_own: config.sync.sync_own,
410 sync_followed: config.sync.sync_followed,
411 relays: active_relays,
412 max_concurrent: config.sync.max_concurrent,
413 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
414 blossom_timeout_ms: config.sync.blossom_timeout_ms,
415 };
416
417 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
418 .context("Failed to parse keys for sync")?;
419 let service = Arc::new(
420 crate::sync::BackgroundSync::new(
421 sync_config,
422 self.store.clone(),
423 sync_keys,
424 self.webrtc_state.clone(),
425 )
426 .await
427 .context("Failed to create background sync service")?,
428 );
429 let contacts_file = self.data_dir.join("contacts.json");
430 let service_for_task = service.clone();
431 let join = tokio::spawn(async move {
432 if let Err(err) = service_for_task.run(contacts_file).await {
433 tracing::error!("Background sync error: {}", err);
434 }
435 });
436 runtime.sync = Some(BackgroundSyncRuntime {
437 service,
438 join: Some(join),
439 });
440 }
441
442 Ok(runtime.status())
443 }
444}
445
446#[cfg(feature = "p2p")]
447pub struct EmbeddedPeerRouterController {
448 keys: Keys,
449 data_dir: PathBuf,
450 state: Arc<WebRTCState>,
451 store: Arc<dyn ContentStore>,
452 peer_classifier: PeerClassifier,
453 nostr_relay: Arc<NostrRelay>,
454 runtime: Mutex<Option<PeerRouterRuntime>>,
455}
456
457#[cfg(feature = "p2p")]
458impl EmbeddedPeerRouterController {
459 pub fn new(
460 keys: Keys,
461 data_dir: PathBuf,
462 state: Arc<WebRTCState>,
463 store: Arc<dyn ContentStore>,
464 peer_classifier: PeerClassifier,
465 nostr_relay: Arc<NostrRelay>,
466 ) -> Self {
467 Self {
468 keys,
469 data_dir,
470 state,
471 store,
472 peer_classifier,
473 nostr_relay,
474 runtime: Mutex::new(None),
475 }
476 }
477
478 pub fn state(&self) -> Arc<WebRTCState> {
479 self.state.clone()
480 }
481
482 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
483 let mut runtime = self.runtime.lock().await;
484 if let Some(runtime_handle) = runtime.take() {
485 if let Err(err) =
486 crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
487 {
488 tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
489 }
490 let _ = runtime_handle.shutdown.send(true);
491 runtime_handle.peer_state_persist.abort();
492 let mut join = runtime_handle.join;
493 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
494 Ok(Ok(())) => {}
495 Ok(Err(err)) => {
496 tracing::warn!("Peer router task ended with join error: {}", err);
497 }
498 Err(_) => {
499 tracing::warn!("Timed out waiting for peer router shutdown");
500 join.abort();
501 }
502 }
503 }
504
505 self.state.reset_runtime_state().await;
506 if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
507 tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
508 }
509
510 if !crate::p2p_common::peer_router_enabled(config) {
511 return Ok(false);
512 }
513
514 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
515 let mut manager = if config.server.mode.hash_get_enabled() {
516 WebRTCManager::new_with_state_and_store_and_classifier(
517 self.keys.clone(),
518 webrtc_config,
519 self.state.clone(),
520 self.store.clone(),
521 self.peer_classifier.clone(),
522 )
523 } else {
524 let mut manager =
525 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
526 manager.set_peer_classifier(self.peer_classifier.clone());
527 manager
528 };
529 manager
530 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
531 let shutdown = manager.shutdown_signal();
532 let join = tokio::spawn(async move {
533 if let Err(err) = manager.run().await {
534 tracing::error!("Peer router error: {}", err);
535 }
536 });
537 let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
538 self.data_dir.clone(),
539 self.state.clone(),
540 );
541 *runtime = Some(PeerRouterRuntime {
542 shutdown,
543 join,
544 peer_state_persist,
545 });
546 Ok(true)
547 }
548
549 pub async fn shutdown(&self) {
550 let mut runtime = self.runtime.lock().await;
551 let Some(runtime_handle) = runtime.take() else {
552 return;
553 };
554
555 if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
556 tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
557 }
558 let _ = runtime_handle.shutdown.send(true);
559 runtime_handle.peer_state_persist.abort();
560 let mut join = runtime_handle.join;
561 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
562 Ok(Ok(())) => {}
563 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
564 Err(_) => {
565 tracing::warn!("Timed out waiting for peer router shutdown");
566 join.abort();
567 }
568 }
569
570 self.state.reset_runtime_state().await;
571 }
572}
573
574pub struct EmbeddedDaemonController {
575 server_controller: Arc<EmbeddedServerController>,
576 fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
577 #[cfg(feature = "p2p")]
578 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
579 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
580}
581
582impl EmbeddedDaemonController {
583 #[cfg(feature = "p2p")]
584 pub fn new(
585 server_controller: Arc<EmbeddedServerController>,
586 fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
587 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
588 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
589 ) -> Self {
590 Self {
591 server_controller,
592 fips_handle,
593 #[cfg(feature = "p2p")]
594 peer_router_controller,
595 background_services_controller,
596 }
597 }
598
599 #[cfg(not(feature = "p2p"))]
600 pub fn new(
601 server_controller: Arc<EmbeddedServerController>,
602 fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
603 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
604 ) -> Self {
605 Self {
606 server_controller,
607 fips_handle,
608 background_services_controller,
609 }
610 }
611
612 pub async fn shutdown(&self) {
613 self.server_controller.shutdown().await;
614 if let Some(handle) = self.fips_handle.as_ref() {
615 handle.shutdown();
616 }
617 if let Some(controller) = self.background_services_controller.as_ref() {
618 controller.shutdown().await;
619 }
620 #[cfg(feature = "p2p")]
621 if let Some(controller) = self.peer_router_controller.as_ref() {
622 controller.shutdown().await;
623 }
624 }
625}
626
627pub struct EmbeddedDaemonOptions {
628 pub config: Config,
629 pub data_dir: PathBuf,
630 pub config_dir: Option<PathBuf>,
631 pub bind_address: String,
632 pub relays: Option<Vec<String>>,
633 pub extra_routes: Option<Router<AppState>>,
634 pub cors: Option<CorsLayer>,
635}
636
637pub struct EmbeddedDaemonInfo {
638 pub addr: String,
639 pub port: u16,
640 pub npub: String,
641 pub store: Arc<HashtreeStore>,
642 pub daemon_controller: Arc<EmbeddedDaemonController>,
643 #[allow(dead_code)]
644 pub webrtc_state: Option<Arc<WebRTCState>>,
645 #[cfg(feature = "p2p")]
646 #[allow(dead_code)]
647 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
648 #[allow(dead_code)]
649 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
650}
651
652pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
653 let _ = rustls::crypto::ring::default_provider().install_default();
654
655 let mut config = opts.config;
656 config.server.bind_address = opts.bind_address.clone();
657 if let Some(relays) = opts.relays {
658 config.nostr.relays = relays;
659 config.nostr.enabled = !config.nostr.relays.is_empty();
660 }
661
662 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
663 let nostr_db_max_bytes = config
664 .nostr
665 .db_max_size_gb
666 .saturating_mul(1024 * 1024 * 1024);
667 let spambox_db_max_bytes = config
668 .nostr
669 .spambox_max_size_gb
670 .saturating_mul(1024 * 1024 * 1024);
671
672 let store = Arc::new(HashtreeStore::with_options(
673 &opts.data_dir,
674 config.storage.s3.as_ref(),
675 max_size_bytes,
676 )?);
677
678 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
679 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
680 } else {
681 ensure_keys()?
682 };
683 let pk_bytes = pubkey_bytes(&keys);
684 let npub = keys
685 .public_key()
686 .to_bech32()
687 .context("Failed to encode npub")?;
688
689 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
690 allowed_pubkeys.insert(hex::encode(pk_bytes));
691 for npub_str in &config.nostr.allowed_npubs {
692 if let Ok(pk) = parse_npub(npub_str) {
693 allowed_pubkeys.insert(hex::encode(pk));
694 } else {
695 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
696 }
697 }
698
699 let graph_store = socialgraph::open_social_graph_store_with_storage(
700 &opts.data_dir,
701 store.store_arc(),
702 Some(nostr_db_max_bytes),
703 )
704 .context("Failed to initialize social graph store")?;
705 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
706
707 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
708 parse_npub(root_npub).unwrap_or(pk_bytes)
709 } else {
710 pk_bytes
711 };
712 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
713 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
714 .context("Failed to sync local social graph lists")?;
715 let fips_peer_ids = crate::fips_transport::fips_peer_ids_from_pubkeys(
716 socialgraph::get_follows(graph_store.as_ref(), &pk_bytes),
717 );
718 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
719
720 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
721 Arc::clone(&social_graph_store),
722 config.nostr.max_write_distance,
723 allowed_pubkeys.clone(),
724 ));
725
726 let nostr_relay_config = NostrRelayConfig {
727 spambox_db_max_bytes,
728 ..Default::default()
729 };
730 let nostr_relay = if config.nostr.enabled {
731 let mut public_event_pubkeys = HashSet::new();
732 public_event_pubkeys.insert(hex::encode(pk_bytes));
733 Some(Arc::new(
734 NostrRelay::new(
735 Arc::clone(&social_graph_store),
736 opts.data_dir.clone(),
737 public_event_pubkeys,
738 Some(social_graph.clone()),
739 nostr_relay_config,
740 )
741 .context("Failed to initialize Nostr relay")?,
742 ))
743 } else {
744 None
745 };
746
747 let crawler_spambox = if config.nostr.enabled && spambox_db_max_bytes != 0 {
748 let spam_dir = opts.data_dir.join("socialgraph_spambox");
749 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
750 Ok(store) => Some(store),
751 Err(err) => {
752 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
753 None
754 }
755 }
756 } else {
757 None
758 };
759 let crawler_spambox_backend = crawler_spambox
760 .clone()
761 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
762
763 #[cfg(feature = "p2p")]
764 let (webrtc_state, peer_router_controller): (
765 Option<Arc<WebRTCState>>,
766 Option<Arc<EmbeddedPeerRouterController>>,
767 ) = if let Some(nostr_relay) = nostr_relay.clone() {
768 let router_config = crate::p2p_common::default_webrtc_config(&config);
769 let peer_classifier = crate::p2p_common::build_peer_classifier(
770 opts.data_dir.clone(),
771 Arc::clone(&social_graph_store),
772 );
773 let cashu_payment_client =
774 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
775 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
776 Ok(client) => {
777 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
778 }
779 Err(err) => {
780 tracing::warn!(
781 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
782 err
783 );
784 None
785 }
786 }
787 } else {
788 None
789 };
790 let cashu_mint_metadata =
791 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
792 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
793 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
794 Ok(store) => Some(store),
795 Err(err) => {
796 tracing::warn!(
797 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
798 err
799 );
800 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
801 }
802 }
803 } else {
804 None
805 };
806
807 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
808 router_config.request_selection_strategy,
809 router_config.request_fairness_enabled,
810 router_config.request_dispatch,
811 std::time::Duration::from_millis(router_config.message_timeout_ms),
812 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
813 cashu_payment_client,
814 cashu_mint_metadata,
815 ));
816 let controller = Arc::new(EmbeddedPeerRouterController::new(
817 keys.clone(),
818 opts.data_dir.clone(),
819 state.clone(),
820 Arc::clone(&store) as Arc<dyn ContentStore>,
821 peer_classifier,
822 nostr_relay.clone(),
823 ));
824 controller.apply_config(&config).await?;
825 (Some(state), Some(controller))
826 } else {
827 (None, None)
828 };
829
830 #[cfg(not(feature = "p2p"))]
831 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
832
833 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
834 keys.clone(),
835 opts.data_dir.clone(),
836 Arc::clone(&store),
837 graph_store.clone(),
838 Arc::clone(&social_graph_store),
839 crawler_spambox_backend,
840 webrtc_state.clone(),
841 ));
842
843 let upstream_blossom = config.blossom.all_read_servers();
844 let active_nostr_relays = config.nostr.active_relays();
845 let fips_handle = crate::fips_transport::start_daemon_fips_transport(
846 &config,
847 &keys,
848 Arc::clone(&store),
849 fips_peer_ids,
850 )
851 .await?
852 .map(Arc::new);
853
854 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
855 .with_server_mode(config.server.mode)
856 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
857 .with_fetch_from_fips_peers(config.server.fetch_from_fips_peers)
858 .with_allowed_pubkeys(allowed_pubkeys.clone())
859 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
860 .with_public_writes(config.server.public_writes)
861 .with_public_plaintext_reads(config.server.public_plaintext_reads)
862 .with_require_random_untrusted_ingest(config.blossom.require_random_untrusted_ingest)
863 .with_optimistic_blossom_uploads(config.blossom.optimistic_uploads)
864 .with_upstream_blossom(upstream_blossom)
865 .with_nostr_relay_urls(active_nostr_relays)
866 .with_social_graph(social_graph)
867 .with_socialgraph_snapshot(
868 Arc::clone(&social_graph_store),
869 social_graph_root_bytes,
870 config.server.socialgraph_snapshot_public,
871 );
872 if let Some(nostr_relay) = nostr_relay {
873 server = server.with_nostr_relay(nostr_relay);
874 }
875
876 if crate::p2p_common::peer_router_enabled(&config) {
877 if let Some(ref state) = webrtc_state {
878 server = server.with_webrtc_peers(state.clone());
879 }
880 }
881 if let Some(ref fips_handle) = fips_handle {
882 server = server.with_fips_transport(fips_handle.transport.clone());
883 }
884
885 if let Some(extra) = opts.extra_routes {
886 server = server.with_extra_routes(extra);
887 }
888 if let Some(cors) = opts.cors {
889 server = server.with_cors(cors);
890 }
891
892 spawn_background_eviction_task(
893 Arc::clone(&store),
894 BACKGROUND_EVICTION_INTERVAL,
895 "embedded daemon",
896 );
897
898 let listener = TcpListener::bind(&opts.bind_address).await?;
899 let local_addr = listener.local_addr()?;
900 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
901
902 let server_shutdown = Arc::new(Notify::new());
903 let server_shutdown_for_task = Arc::clone(&server_shutdown);
904 let server_join = tokio::spawn(async move {
905 if let Err(e) = server
906 .run_with_listener_until(listener, async move {
907 server_shutdown_for_task.notified().await;
908 })
909 .await
910 {
911 tracing::error!("Embedded daemon server error: {}", e);
912 }
913 });
914 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
915 background_services_controller.apply_config(&config).await?;
916 #[cfg(feature = "p2p")]
917 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
918 server_controller,
919 fips_handle.clone(),
920 peer_router_controller.clone(),
921 Some(background_services_controller.clone()),
922 ));
923 #[cfg(not(feature = "p2p"))]
924 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
925 server_controller,
926 fips_handle.clone(),
927 Some(background_services_controller.clone()),
928 ));
929
930 tracing::info!(
931 "Embedded daemon started on {}, identity {}",
932 actual_addr,
933 npub
934 );
935
936 Ok(EmbeddedDaemonInfo {
937 addr: actual_addr,
938 port: local_addr.port(),
939 npub,
940 store,
941 daemon_controller,
942 webrtc_state,
943 #[cfg(feature = "p2p")]
944 peer_router_controller,
945 background_services_controller: Some(background_services_controller),
946 })
947}
948
949#[cfg(test)]
950mod tests {
951 use super::EmbeddedBackgroundServicesController;
952 use crate::config::Config;
953
954 #[test]
955 fn mirror_publish_relays_orders_known_root_publish_relays_first() {
956 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
957 &[
958 "wss://graph-relay.iris.to".to_string(),
959 "wss://relay.example".to_string(),
960 "wss://relay.primal.net".to_string(),
961 "wss://relay.damus.io".to_string(),
962 "wss://temp.iris.to".to_string(),
963 "wss://vault.iris.to".to_string(),
964 "wss://upload.iris.to/nostr".to_string(),
965 ],
966 "0.0.0.0:8080",
967 );
968 assert_eq!(
969 relays,
970 vec![
971 "wss://temp.iris.to".to_string(),
972 "wss://vault.iris.to".to_string(),
973 "wss://relay.damus.io".to_string(),
974 "wss://relay.example".to_string(),
975 "wss://relay.primal.net".to_string(),
976 ]
977 );
978 }
979
980 #[test]
981 fn mirror_publish_relays_do_not_add_non_active_publish_targets() {
982 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
983 &[
984 "wss://graph-relay.iris.to".to_string(),
985 "wss://relay.example".to_string(),
986 ],
987 "0.0.0.0:8080",
988 );
989 assert_eq!(relays, vec!["wss://relay.example".to_string()]);
990 }
991
992 #[test]
993 fn mirror_publish_relays_falls_back_to_active_relays_when_all_are_blocklisted() {
994 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
995 &[
996 "wss://graph-relay.iris.to".to_string(),
997 "wss://upload.iris.to/nostr".to_string(),
998 ],
999 "0.0.0.0:8080",
1000 );
1001 assert_eq!(
1002 relays,
1003 vec![
1004 "wss://graph-relay.iris.to".to_string(),
1005 "wss://upload.iris.to/nostr".to_string(),
1006 ]
1007 );
1008 }
1009
1010 #[test]
1011 fn nostr_mirror_config_allows_disabling_full_note_paging() {
1012 let mut config = Config::default();
1013 config.nostr.full_text_note_history_max_relay_pages = 0;
1014
1015 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1016 &config,
1017 &["wss://relay.example".to_string()],
1018 );
1019
1020 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
1021
1022 config.nostr.full_text_note_history_max_relay_pages = 64;
1023 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1024 &config,
1025 &["wss://relay.example".to_string()],
1026 );
1027
1028 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
1029 }
1030
1031 #[test]
1032 fn nostr_mirror_config_can_limit_mirror_distance_independently() {
1033 let mut config = Config::default();
1034 config.nostr.social_graph_crawl_depth = 6;
1035 config.nostr.mirror_max_follow_distance = Some(2);
1036
1037 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1038 &config,
1039 &["wss://relay.example".to_string()],
1040 );
1041
1042 assert_eq!(mirror_config.max_follow_distance, 2);
1043
1044 config.nostr.mirror_max_follow_distance = None;
1045 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1046 &config,
1047 &["wss://relay.example".to_string()],
1048 );
1049
1050 assert_eq!(mirror_config.max_follow_distance, 6);
1051 }
1052}