1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29 peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38 fn drop(&mut self) {
39 self.service.shutdown();
40 if let Some(join) = self.join.take() {
41 join.abort();
42 }
43 }
44}
45
46struct BackgroundMirrorRuntime {
47 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48 join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52 fn drop(&mut self) {
53 self.service.shutdown();
54 if let Some(join) = self.join.take() {
55 join.abort();
56 }
57 }
58}
59
60struct BackgroundServicesRuntime {
61 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62 mirror: Option<BackgroundMirrorRuntime>,
63 sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67 fn drop(&mut self) {
68 if let Some(handles) = self.crawler.as_ref() {
69 let _ = handles.shutdown_tx.send(true);
70 }
71 if let Some(runtime) = self.mirror.as_ref() {
72 runtime.service.shutdown();
73 }
74 if let Some(runtime) = self.sync.as_ref() {
75 runtime.service.shutdown();
76 }
77 }
78}
79
80impl BackgroundServicesRuntime {
81 fn status(&self) -> EmbeddedBackgroundServicesStatus {
82 EmbeddedBackgroundServicesStatus {
83 crawler_active: self.crawler.is_some(),
84 mirror_active: self.mirror.is_some(),
85 sync_active: self.sync.is_some(),
86 }
87 }
88}
89
90struct EmbeddedServerRuntime {
91 shutdown: Arc<Notify>,
92 join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96 runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101 Self {
102 runtime: Mutex::new(Some(EmbeddedServerRuntime {
103 shutdown,
104 join: Some(join),
105 })),
106 }
107 }
108
109 pub async fn shutdown(&self) {
110 let mut runtime = self.runtime.lock().await;
111 let Some(mut runtime) = runtime.take() else {
112 return;
113 };
114
115 runtime.shutdown.notify_waiters();
116 if let Some(mut join) = runtime.join.take() {
117 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118 Ok(Ok(())) => {}
119 Ok(Err(err)) => {
120 tracing::warn!("Embedded server task ended with join error: {}", err)
121 }
122 Err(_) => {
123 tracing::warn!("Timed out waiting for embedded server shutdown");
124 join.abort();
125 }
126 }
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133 pub crawler_active: bool,
134 pub mirror_active: bool,
135 pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139 keys: Keys,
140 data_dir: PathBuf,
141 store: Arc<HashtreeStore>,
142 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145 webrtc_state: Option<Arc<WebRTCState>>,
146 runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150 const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151 "wss://nos.lol",
152 "wss://temp.iris.to",
153 "wss://vault.iris.to",
154 "wss://relay.damus.io",
155 ];
156 const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
157 &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
158
159 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
160 let mut seen = HashSet::new();
161 let active_relays = active_relays
162 .iter()
163 .filter(|relay| seen.insert((*relay).clone()))
164 .cloned()
165 .collect::<Vec<_>>();
166 if active_relays.is_empty() {
167 return Vec::new();
168 }
169 let filtered = active_relays
170 .iter()
171 .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
172 .cloned()
173 .collect::<Vec<_>>();
174 if filtered.is_empty() {
175 return active_relays;
176 }
177
178 let mut selected = Vec::new();
179 let mut selected_set = HashSet::new();
180 for relay in Self::MIRROR_PUBLISH_RELAY_PRIORITY {
181 if filtered.iter().any(|active| active == relay) {
182 selected.push((*relay).to_string());
183 selected_set.insert((*relay).to_string());
184 }
185 }
186 for relay in filtered {
187 if selected_set.insert(relay.clone()) {
188 selected.push(relay);
189 }
190 }
191
192 selected
193 }
194
195 pub fn new(
196 keys: Keys,
197 data_dir: PathBuf,
198 store: Arc<HashtreeStore>,
199 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
200 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
201 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
202 webrtc_state: Option<Arc<WebRTCState>>,
203 ) -> Self {
204 Self {
205 keys,
206 data_dir,
207 store,
208 graph_store_concrete,
209 graph_store,
210 spambox,
211 webrtc_state,
212 runtime: Mutex::new(BackgroundServicesRuntime {
213 crawler: None,
214 mirror: None,
215 sync: None,
216 }),
217 }
218 }
219
220 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
221 self.runtime.lock().await.status()
222 }
223
224 pub async fn shutdown(&self) {
225 let mut runtime = self.runtime.lock().await;
226 Self::shutdown_crawler(&mut runtime.crawler).await;
227 Self::shutdown_mirror(&mut runtime.mirror).await;
228 Self::shutdown_sync(&mut runtime.sync).await;
229 }
230
231 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
232 let Some(handles) = crawler.take() else {
233 return;
234 };
235
236 let _ = handles.shutdown_tx.send(true);
237
238 let mut crawl_handle = handles.crawl_handle;
239 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
240 Ok(Ok(())) => {}
241 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
242 Err(_) => {
243 tracing::warn!("Timed out waiting for crawler task shutdown");
244 crawl_handle.abort();
245 }
246 }
247
248 let mut local_list_handle = handles.local_list_handle;
249 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
250 {
251 Ok(Ok(())) => {}
252 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
253 Err(_) => {
254 tracing::warn!("Timed out waiting for local list task shutdown");
255 local_list_handle.abort();
256 }
257 }
258 }
259
260 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
261 let Some(mut runtime) = sync.take() else {
262 return;
263 };
264
265 runtime.service.shutdown();
266 if let Some(mut join) = runtime.join.take() {
267 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
268 Ok(Ok(())) => {}
269 Ok(Err(err)) => {
270 tracing::warn!("Background sync task ended with join error: {}", err)
271 }
272 Err(_) => {
273 tracing::warn!("Timed out waiting for background sync shutdown");
274 join.abort();
275 }
276 }
277 }
278 }
279
280 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
281 let Some(mut runtime) = mirror.take() else {
282 return;
283 };
284
285 runtime.service.shutdown();
286 if let Some(mut join) = runtime.join.take() {
287 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
288 Ok(Ok(())) => {}
289 Ok(Err(err)) => {
290 tracing::warn!("Background mirror task ended with join error: {}", err)
291 }
292 Err(_) => {
293 tracing::warn!("Timed out waiting for background mirror shutdown");
294 join.abort();
295 }
296 }
297 }
298 }
299
300 fn nostr_mirror_config(
301 config: &Config,
302 active_relays: &[String],
303 ) -> crate::nostr_mirror::NostrMirrorConfig {
304 crate::nostr_mirror::NostrMirrorConfig {
305 relays: active_relays.to_vec(),
306 publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
307 blossom_write_servers: config.blossom.all_write_servers(),
308 max_follow_distance: config
309 .nostr
310 .mirror_max_follow_distance
311 .unwrap_or(config.nostr.social_graph_crawl_depth),
312 overmute_threshold: config.nostr.overmute_threshold,
313 require_negentropy: config.nostr.negentropy_only,
314 kinds: config.nostr.mirror_kinds.clone(),
315 history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
316 history_sync_per_author_event_limit: config
317 .nostr
318 .history_sync_per_author_event_limit
319 .max(1),
320 missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
321 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
322 full_text_note_history_follow_distance: config
323 .nostr
324 .full_text_note_history_follow_distance,
325 full_text_note_history_max_relay_pages: config
326 .nostr
327 .full_text_note_history_max_relay_pages,
328 ..crate::nostr_mirror::NostrMirrorConfig::default()
329 }
330 }
331
332 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
333 let mut runtime = self.runtime.lock().await;
334
335 Self::shutdown_crawler(&mut runtime.crawler).await;
336 Self::shutdown_mirror(&mut runtime.mirror).await;
337 Self::shutdown_sync(&mut runtime.sync).await;
338
339 if !config.server.mode.background_services_enabled() {
340 return Ok(runtime.status());
341 }
342
343 let active_relays = config.nostr.active_relays();
344
345 if config.nostr.enabled
346 && config.nostr.social_graph_crawl_depth > 0
347 && !active_relays.is_empty()
348 {
349 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
350 self.graph_store.clone(),
351 self.keys.clone(),
352 active_relays.clone(),
353 config.nostr.social_graph_crawl_depth,
354 self.spambox.clone(),
355 self.data_dir.clone(),
356 ));
357
358 let service = Arc::new(
359 crate::nostr_mirror::BackgroundNostrMirror::new(
360 Self::nostr_mirror_config(config, &active_relays),
361 self.store.clone(),
362 self.graph_store_concrete.clone(),
363 Some(
364 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
365 .context("Failed to parse keys for background nostr mirror")?,
366 ),
367 )
368 .await
369 .context("Failed to create background nostr mirror")?,
370 );
371 let service_for_task = service.clone();
372 let join = tokio::task::spawn_blocking(move || {
373 let runtime = tokio::runtime::Builder::new_current_thread()
374 .enable_all()
375 .build()
376 .expect("build background nostr mirror runtime");
377 runtime.block_on(async {
378 if let Err(err) = service_for_task.run().await {
379 tracing::error!("Background nostr mirror error: {:#}", err);
380 }
381 });
382 });
383 runtime.mirror = Some(BackgroundMirrorRuntime {
384 service,
385 join: Some(join),
386 });
387 }
388
389 if config.sync.enabled && !active_relays.is_empty() {
390 let has_pinned_refs = self
391 .store
392 .list_pinned_refs()
393 .map(|refs| !refs.is_empty())
394 .unwrap_or(false);
395 let has_tracked_authors = self
396 .store
397 .list_tracked_authors()
398 .map(|authors| !authors.is_empty())
399 .unwrap_or(false);
400 let should_sync = config.sync.sync_own
401 || config.sync.sync_followed
402 || has_pinned_refs
403 || has_tracked_authors;
404 if !should_sync {
405 return Ok(runtime.status());
406 }
407
408 let sync_config = crate::sync::SyncConfig {
409 sync_own: config.sync.sync_own,
410 sync_followed: config.sync.sync_followed,
411 relays: active_relays,
412 max_concurrent: config.sync.max_concurrent,
413 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
414 blossom_timeout_ms: config.sync.blossom_timeout_ms,
415 };
416
417 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
418 .context("Failed to parse keys for sync")?;
419 let service = Arc::new(
420 crate::sync::BackgroundSync::new(
421 sync_config,
422 self.store.clone(),
423 sync_keys,
424 self.webrtc_state.clone(),
425 )
426 .await
427 .context("Failed to create background sync service")?,
428 );
429 let contacts_file = self.data_dir.join("contacts.json");
430 let service_for_task = service.clone();
431 let join = tokio::spawn(async move {
432 if let Err(err) = service_for_task.run(contacts_file).await {
433 tracing::error!("Background sync error: {}", err);
434 }
435 });
436 runtime.sync = Some(BackgroundSyncRuntime {
437 service,
438 join: Some(join),
439 });
440 }
441
442 Ok(runtime.status())
443 }
444}
445
446#[cfg(feature = "p2p")]
447pub struct EmbeddedPeerRouterController {
448 keys: Keys,
449 data_dir: PathBuf,
450 state: Arc<WebRTCState>,
451 store: Arc<dyn ContentStore>,
452 peer_classifier: PeerClassifier,
453 nostr_relay: Arc<NostrRelay>,
454 runtime: Mutex<Option<PeerRouterRuntime>>,
455}
456
457#[cfg(feature = "p2p")]
458impl EmbeddedPeerRouterController {
459 pub fn new(
460 keys: Keys,
461 data_dir: PathBuf,
462 state: Arc<WebRTCState>,
463 store: Arc<dyn ContentStore>,
464 peer_classifier: PeerClassifier,
465 nostr_relay: Arc<NostrRelay>,
466 ) -> Self {
467 Self {
468 keys,
469 data_dir,
470 state,
471 store,
472 peer_classifier,
473 nostr_relay,
474 runtime: Mutex::new(None),
475 }
476 }
477
478 pub fn state(&self) -> Arc<WebRTCState> {
479 self.state.clone()
480 }
481
482 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
483 let mut runtime = self.runtime.lock().await;
484 if let Some(runtime_handle) = runtime.take() {
485 if let Err(err) =
486 crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
487 {
488 tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
489 }
490 let _ = runtime_handle.shutdown.send(true);
491 runtime_handle.peer_state_persist.abort();
492 let mut join = runtime_handle.join;
493 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
494 Ok(Ok(())) => {}
495 Ok(Err(err)) => {
496 tracing::warn!("Peer router task ended with join error: {}", err);
497 }
498 Err(_) => {
499 tracing::warn!("Timed out waiting for peer router shutdown");
500 join.abort();
501 }
502 }
503 }
504
505 self.state.reset_runtime_state().await;
506 if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
507 tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
508 }
509
510 if !crate::p2p_common::peer_router_enabled(config) {
511 return Ok(false);
512 }
513
514 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
515 let mut manager = if config.server.mode.hash_get_enabled() {
516 WebRTCManager::new_with_state_and_store_and_classifier(
517 self.keys.clone(),
518 webrtc_config,
519 self.state.clone(),
520 self.store.clone(),
521 self.peer_classifier.clone(),
522 )
523 } else {
524 let mut manager =
525 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
526 manager.set_peer_classifier(self.peer_classifier.clone());
527 manager
528 };
529 manager
530 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
531 let shutdown = manager.shutdown_signal();
532 let join = tokio::spawn(async move {
533 if let Err(err) = manager.run().await {
534 tracing::error!("Peer router error: {}", err);
535 }
536 });
537 let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
538 self.data_dir.clone(),
539 self.state.clone(),
540 );
541 *runtime = Some(PeerRouterRuntime {
542 shutdown,
543 join,
544 peer_state_persist,
545 });
546 Ok(true)
547 }
548
549 pub async fn shutdown(&self) {
550 let mut runtime = self.runtime.lock().await;
551 let Some(runtime_handle) = runtime.take() else {
552 return;
553 };
554
555 if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
556 tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
557 }
558 let _ = runtime_handle.shutdown.send(true);
559 runtime_handle.peer_state_persist.abort();
560 let mut join = runtime_handle.join;
561 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
562 Ok(Ok(())) => {}
563 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
564 Err(_) => {
565 tracing::warn!("Timed out waiting for peer router shutdown");
566 join.abort();
567 }
568 }
569
570 self.state.reset_runtime_state().await;
571 }
572}
573
574pub struct EmbeddedDaemonController {
575 server_controller: Arc<EmbeddedServerController>,
576 #[cfg(feature = "p2p")]
577 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
578 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
579}
580
581impl EmbeddedDaemonController {
582 #[cfg(feature = "p2p")]
583 pub fn new(
584 server_controller: Arc<EmbeddedServerController>,
585 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
586 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
587 ) -> Self {
588 Self {
589 server_controller,
590 #[cfg(feature = "p2p")]
591 peer_router_controller,
592 background_services_controller,
593 }
594 }
595
596 #[cfg(not(feature = "p2p"))]
597 pub fn new(
598 server_controller: Arc<EmbeddedServerController>,
599 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
600 ) -> Self {
601 Self {
602 server_controller,
603 background_services_controller,
604 }
605 }
606
607 pub async fn shutdown(&self) {
608 self.server_controller.shutdown().await;
609 if let Some(controller) = self.background_services_controller.as_ref() {
610 controller.shutdown().await;
611 }
612 #[cfg(feature = "p2p")]
613 if let Some(controller) = self.peer_router_controller.as_ref() {
614 controller.shutdown().await;
615 }
616 }
617}
618
619pub struct EmbeddedDaemonOptions {
620 pub config: Config,
621 pub data_dir: PathBuf,
622 pub config_dir: Option<PathBuf>,
623 pub bind_address: String,
624 pub relays: Option<Vec<String>>,
625 pub extra_routes: Option<Router<AppState>>,
626 pub cors: Option<CorsLayer>,
627}
628
629pub struct EmbeddedDaemonInfo {
630 pub addr: String,
631 pub port: u16,
632 pub npub: String,
633 pub store: Arc<HashtreeStore>,
634 pub daemon_controller: Arc<EmbeddedDaemonController>,
635 #[allow(dead_code)]
636 pub webrtc_state: Option<Arc<WebRTCState>>,
637 #[cfg(feature = "p2p")]
638 #[allow(dead_code)]
639 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
640 #[allow(dead_code)]
641 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
642}
643
644pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
645 let _ = rustls::crypto::ring::default_provider().install_default();
646
647 let mut config = opts.config;
648 config.server.bind_address = opts.bind_address.clone();
649 if let Some(relays) = opts.relays {
650 config.nostr.relays = relays;
651 config.nostr.enabled = !config.nostr.relays.is_empty();
652 }
653
654 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
655 let nostr_db_max_bytes = config
656 .nostr
657 .db_max_size_gb
658 .saturating_mul(1024 * 1024 * 1024);
659 let spambox_db_max_bytes = config
660 .nostr
661 .spambox_max_size_gb
662 .saturating_mul(1024 * 1024 * 1024);
663
664 let store = Arc::new(HashtreeStore::with_options(
665 &opts.data_dir,
666 config.storage.s3.as_ref(),
667 max_size_bytes,
668 )?);
669
670 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
671 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
672 } else {
673 ensure_keys()?
674 };
675 let pk_bytes = pubkey_bytes(&keys);
676 let npub = keys
677 .public_key()
678 .to_bech32()
679 .context("Failed to encode npub")?;
680
681 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
682 allowed_pubkeys.insert(hex::encode(pk_bytes));
683 for npub_str in &config.nostr.allowed_npubs {
684 if let Ok(pk) = parse_npub(npub_str) {
685 allowed_pubkeys.insert(hex::encode(pk));
686 } else {
687 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
688 }
689 }
690
691 let graph_store = socialgraph::open_social_graph_store_with_storage(
692 &opts.data_dir,
693 store.store_arc(),
694 Some(nostr_db_max_bytes),
695 )
696 .context("Failed to initialize social graph store")?;
697 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
698
699 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
700 parse_npub(root_npub).unwrap_or(pk_bytes)
701 } else {
702 pk_bytes
703 };
704 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
705 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
706 .context("Failed to sync local social graph lists")?;
707 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
708
709 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
710 Arc::clone(&social_graph_store),
711 config.nostr.max_write_distance,
712 allowed_pubkeys.clone(),
713 ));
714
715 let nostr_relay_config = NostrRelayConfig {
716 spambox_db_max_bytes,
717 ..Default::default()
718 };
719 let nostr_relay = if config.nostr.enabled {
720 let mut public_event_pubkeys = HashSet::new();
721 public_event_pubkeys.insert(hex::encode(pk_bytes));
722 Some(Arc::new(
723 NostrRelay::new(
724 Arc::clone(&social_graph_store),
725 opts.data_dir.clone(),
726 public_event_pubkeys,
727 Some(social_graph.clone()),
728 nostr_relay_config,
729 )
730 .context("Failed to initialize Nostr relay")?,
731 ))
732 } else {
733 None
734 };
735
736 let crawler_spambox = if config.nostr.enabled && spambox_db_max_bytes != 0 {
737 let spam_dir = opts.data_dir.join("socialgraph_spambox");
738 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
739 Ok(store) => Some(store),
740 Err(err) => {
741 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
742 None
743 }
744 }
745 } else {
746 None
747 };
748 let crawler_spambox_backend = crawler_spambox
749 .clone()
750 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
751
752 #[cfg(feature = "p2p")]
753 let (webrtc_state, peer_router_controller): (
754 Option<Arc<WebRTCState>>,
755 Option<Arc<EmbeddedPeerRouterController>>,
756 ) = if let Some(nostr_relay) = nostr_relay.clone() {
757 let router_config = crate::p2p_common::default_webrtc_config(&config);
758 let peer_classifier = crate::p2p_common::build_peer_classifier(
759 opts.data_dir.clone(),
760 Arc::clone(&social_graph_store),
761 );
762 let cashu_payment_client =
763 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
764 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
765 Ok(client) => {
766 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
767 }
768 Err(err) => {
769 tracing::warn!(
770 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
771 err
772 );
773 None
774 }
775 }
776 } else {
777 None
778 };
779 let cashu_mint_metadata =
780 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
781 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
782 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
783 Ok(store) => Some(store),
784 Err(err) => {
785 tracing::warn!(
786 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
787 err
788 );
789 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
790 }
791 }
792 } else {
793 None
794 };
795
796 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
797 router_config.request_selection_strategy,
798 router_config.request_fairness_enabled,
799 router_config.request_dispatch,
800 std::time::Duration::from_millis(router_config.message_timeout_ms),
801 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
802 cashu_payment_client,
803 cashu_mint_metadata,
804 ));
805 let controller = Arc::new(EmbeddedPeerRouterController::new(
806 keys.clone(),
807 opts.data_dir.clone(),
808 state.clone(),
809 Arc::clone(&store) as Arc<dyn ContentStore>,
810 peer_classifier,
811 nostr_relay.clone(),
812 ));
813 controller.apply_config(&config).await?;
814 (Some(state), Some(controller))
815 } else {
816 (None, None)
817 };
818
819 #[cfg(not(feature = "p2p"))]
820 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
821
822 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
823 keys.clone(),
824 opts.data_dir.clone(),
825 Arc::clone(&store),
826 graph_store.clone(),
827 Arc::clone(&social_graph_store),
828 crawler_spambox_backend,
829 webrtc_state.clone(),
830 ));
831
832 let upstream_blossom = config.blossom.all_read_servers();
833 let active_nostr_relays = config.nostr.active_relays();
834
835 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
836 .with_server_mode(config.server.mode)
837 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
838 .with_allowed_pubkeys(allowed_pubkeys.clone())
839 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
840 .with_public_writes(config.server.public_writes)
841 .with_require_random_untrusted_ingest(config.blossom.require_random_untrusted_ingest)
842 .with_optimistic_blossom_uploads(config.blossom.optimistic_uploads)
843 .with_upstream_blossom(upstream_blossom)
844 .with_nostr_relay_urls(active_nostr_relays)
845 .with_social_graph(social_graph)
846 .with_socialgraph_snapshot(
847 Arc::clone(&social_graph_store),
848 social_graph_root_bytes,
849 config.server.socialgraph_snapshot_public,
850 );
851 if let Some(nostr_relay) = nostr_relay {
852 server = server.with_nostr_relay(nostr_relay);
853 }
854
855 if crate::p2p_common::peer_router_enabled(&config) {
856 if let Some(ref state) = webrtc_state {
857 server = server.with_webrtc_peers(state.clone());
858 }
859 }
860
861 if let Some(extra) = opts.extra_routes {
862 server = server.with_extra_routes(extra);
863 }
864 if let Some(cors) = opts.cors {
865 server = server.with_cors(cors);
866 }
867
868 spawn_background_eviction_task(
869 Arc::clone(&store),
870 BACKGROUND_EVICTION_INTERVAL,
871 "embedded daemon",
872 );
873
874 let listener = TcpListener::bind(&opts.bind_address).await?;
875 let local_addr = listener.local_addr()?;
876 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
877
878 let server_shutdown = Arc::new(Notify::new());
879 let server_shutdown_for_task = Arc::clone(&server_shutdown);
880 let server_join = tokio::spawn(async move {
881 if let Err(e) = server
882 .run_with_listener_until(listener, async move {
883 server_shutdown_for_task.notified().await;
884 })
885 .await
886 {
887 tracing::error!("Embedded daemon server error: {}", e);
888 }
889 });
890 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
891 background_services_controller.apply_config(&config).await?;
892 #[cfg(feature = "p2p")]
893 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
894 server_controller,
895 peer_router_controller.clone(),
896 Some(background_services_controller.clone()),
897 ));
898 #[cfg(not(feature = "p2p"))]
899 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
900 server_controller,
901 Some(background_services_controller.clone()),
902 ));
903
904 tracing::info!(
905 "Embedded daemon started on {}, identity {}",
906 actual_addr,
907 npub
908 );
909
910 Ok(EmbeddedDaemonInfo {
911 addr: actual_addr,
912 port: local_addr.port(),
913 npub,
914 store,
915 daemon_controller,
916 webrtc_state,
917 #[cfg(feature = "p2p")]
918 peer_router_controller,
919 background_services_controller: Some(background_services_controller),
920 })
921}
922
923#[cfg(test)]
924mod tests {
925 use super::EmbeddedBackgroundServicesController;
926 use crate::config::Config;
927
928 #[test]
929 fn mirror_publish_relays_orders_known_root_publish_relays_first() {
930 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
931 &[
932 "wss://graph-relay.iris.to".to_string(),
933 "wss://relay.example".to_string(),
934 "wss://relay.primal.net".to_string(),
935 "wss://relay.damus.io".to_string(),
936 "wss://temp.iris.to".to_string(),
937 "wss://vault.iris.to".to_string(),
938 "wss://upload.iris.to/nostr".to_string(),
939 ],
940 "0.0.0.0:8080",
941 );
942 assert_eq!(
943 relays,
944 vec![
945 "wss://temp.iris.to".to_string(),
946 "wss://vault.iris.to".to_string(),
947 "wss://relay.damus.io".to_string(),
948 "wss://relay.example".to_string(),
949 "wss://relay.primal.net".to_string(),
950 ]
951 );
952 }
953
954 #[test]
955 fn mirror_publish_relays_do_not_add_non_active_publish_targets() {
956 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
957 &[
958 "wss://graph-relay.iris.to".to_string(),
959 "wss://relay.example".to_string(),
960 ],
961 "0.0.0.0:8080",
962 );
963 assert_eq!(relays, vec!["wss://relay.example".to_string()]);
964 }
965
966 #[test]
967 fn mirror_publish_relays_falls_back_to_active_relays_when_all_are_blocklisted() {
968 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
969 &[
970 "wss://graph-relay.iris.to".to_string(),
971 "wss://upload.iris.to/nostr".to_string(),
972 ],
973 "0.0.0.0:8080",
974 );
975 assert_eq!(
976 relays,
977 vec![
978 "wss://graph-relay.iris.to".to_string(),
979 "wss://upload.iris.to/nostr".to_string(),
980 ]
981 );
982 }
983
984 #[test]
985 fn nostr_mirror_config_allows_disabling_full_note_paging() {
986 let mut config = Config::default();
987 config.nostr.full_text_note_history_max_relay_pages = 0;
988
989 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
990 &config,
991 &["wss://relay.example".to_string()],
992 );
993
994 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
995
996 config.nostr.full_text_note_history_max_relay_pages = 64;
997 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
998 &config,
999 &["wss://relay.example".to_string()],
1000 );
1001
1002 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
1003 }
1004
1005 #[test]
1006 fn nostr_mirror_config_can_limit_mirror_distance_independently() {
1007 let mut config = Config::default();
1008 config.nostr.social_graph_crawl_depth = 6;
1009 config.nostr.mirror_max_follow_distance = Some(2);
1010
1011 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1012 &config,
1013 &["wss://relay.example".to_string()],
1014 );
1015
1016 assert_eq!(mirror_config.max_follow_distance, 2);
1017
1018 config.nostr.mirror_max_follow_distance = None;
1019 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1020 &config,
1021 &["wss://relay.example".to_string()],
1022 );
1023
1024 assert_eq!(mirror_config.max_follow_distance, 6);
1025 }
1026}