1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29 peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38 fn drop(&mut self) {
39 self.service.shutdown();
40 if let Some(join) = self.join.take() {
41 join.abort();
42 }
43 }
44}
45
46struct BackgroundMirrorRuntime {
47 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48 join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52 fn drop(&mut self) {
53 self.service.shutdown();
54 if let Some(join) = self.join.take() {
55 join.abort();
56 }
57 }
58}
59
60struct BackgroundServicesRuntime {
61 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62 mirror: Option<BackgroundMirrorRuntime>,
63 sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67 fn drop(&mut self) {
68 if let Some(handles) = self.crawler.as_ref() {
69 let _ = handles.shutdown_tx.send(true);
70 }
71 if let Some(runtime) = self.mirror.as_ref() {
72 runtime.service.shutdown();
73 }
74 if let Some(runtime) = self.sync.as_ref() {
75 runtime.service.shutdown();
76 }
77 }
78}
79
80impl BackgroundServicesRuntime {
81 fn status(&self) -> EmbeddedBackgroundServicesStatus {
82 EmbeddedBackgroundServicesStatus {
83 crawler_active: self.crawler.is_some(),
84 mirror_active: self.mirror.is_some(),
85 sync_active: self.sync.is_some(),
86 }
87 }
88}
89
90struct EmbeddedServerRuntime {
91 shutdown: Arc<Notify>,
92 join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96 runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101 Self {
102 runtime: Mutex::new(Some(EmbeddedServerRuntime {
103 shutdown,
104 join: Some(join),
105 })),
106 }
107 }
108
109 pub async fn shutdown(&self) {
110 let mut runtime = self.runtime.lock().await;
111 let Some(mut runtime) = runtime.take() else {
112 return;
113 };
114
115 runtime.shutdown.notify_waiters();
116 if let Some(mut join) = runtime.join.take() {
117 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118 Ok(Ok(())) => {}
119 Ok(Err(err)) => {
120 tracing::warn!("Embedded server task ended with join error: {}", err)
121 }
122 Err(_) => {
123 tracing::warn!("Timed out waiting for embedded server shutdown");
124 join.abort();
125 }
126 }
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133 pub crawler_active: bool,
134 pub mirror_active: bool,
135 pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139 keys: Keys,
140 data_dir: PathBuf,
141 store: Arc<HashtreeStore>,
142 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145 webrtc_state: Option<Arc<WebRTCState>>,
146 runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150 const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151 "wss://temp.iris.to",
152 "wss://vault.iris.to",
153 "wss://relay.damus.io",
154 "wss://relay.primal.net",
155 ];
156 const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
157 &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
158
159 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
160 let mut seen = HashSet::new();
161 let active_relays = active_relays
162 .iter()
163 .filter(|relay| seen.insert((*relay).clone()))
164 .cloned()
165 .collect::<Vec<_>>();
166 if active_relays.is_empty() {
167 return Vec::new();
168 }
169 let active_relay_set = active_relays.iter().cloned().collect::<HashSet<_>>();
170
171 let preferred = Self::MIRROR_PUBLISH_RELAY_PRIORITY
172 .iter()
173 .filter(|relay| active_relay_set.contains(**relay))
174 .map(|relay| (*relay).to_string())
175 .collect::<Vec<_>>();
176 if !preferred.is_empty() {
177 return preferred;
178 }
179
180 let filtered = active_relays
181 .iter()
182 .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
183 .cloned()
184 .collect::<Vec<_>>();
185 if !filtered.is_empty() {
186 return filtered;
187 }
188
189 active_relays
190 }
191
192 pub fn new(
193 keys: Keys,
194 data_dir: PathBuf,
195 store: Arc<HashtreeStore>,
196 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
197 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
198 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
199 webrtc_state: Option<Arc<WebRTCState>>,
200 ) -> Self {
201 Self {
202 keys,
203 data_dir,
204 store,
205 graph_store_concrete,
206 graph_store,
207 spambox,
208 webrtc_state,
209 runtime: Mutex::new(BackgroundServicesRuntime {
210 crawler: None,
211 mirror: None,
212 sync: None,
213 }),
214 }
215 }
216
217 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
218 self.runtime.lock().await.status()
219 }
220
221 pub async fn shutdown(&self) {
222 let mut runtime = self.runtime.lock().await;
223 Self::shutdown_crawler(&mut runtime.crawler).await;
224 Self::shutdown_mirror(&mut runtime.mirror).await;
225 Self::shutdown_sync(&mut runtime.sync).await;
226 }
227
228 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
229 let Some(handles) = crawler.take() else {
230 return;
231 };
232
233 let _ = handles.shutdown_tx.send(true);
234
235 let mut crawl_handle = handles.crawl_handle;
236 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
237 Ok(Ok(())) => {}
238 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
239 Err(_) => {
240 tracing::warn!("Timed out waiting for crawler task shutdown");
241 crawl_handle.abort();
242 }
243 }
244
245 let mut local_list_handle = handles.local_list_handle;
246 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
247 {
248 Ok(Ok(())) => {}
249 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
250 Err(_) => {
251 tracing::warn!("Timed out waiting for local list task shutdown");
252 local_list_handle.abort();
253 }
254 }
255 }
256
257 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
258 let Some(mut runtime) = sync.take() else {
259 return;
260 };
261
262 runtime.service.shutdown();
263 if let Some(mut join) = runtime.join.take() {
264 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
265 Ok(Ok(())) => {}
266 Ok(Err(err)) => {
267 tracing::warn!("Background sync task ended with join error: {}", err)
268 }
269 Err(_) => {
270 tracing::warn!("Timed out waiting for background sync shutdown");
271 join.abort();
272 }
273 }
274 }
275 }
276
277 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
278 let Some(mut runtime) = mirror.take() else {
279 return;
280 };
281
282 runtime.service.shutdown();
283 if let Some(mut join) = runtime.join.take() {
284 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
285 Ok(Ok(())) => {}
286 Ok(Err(err)) => {
287 tracing::warn!("Background mirror task ended with join error: {}", err)
288 }
289 Err(_) => {
290 tracing::warn!("Timed out waiting for background mirror shutdown");
291 join.abort();
292 }
293 }
294 }
295 }
296
297 fn nostr_mirror_config(
298 config: &Config,
299 active_relays: &[String],
300 ) -> crate::nostr_mirror::NostrMirrorConfig {
301 crate::nostr_mirror::NostrMirrorConfig {
302 relays: active_relays.to_vec(),
303 publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
304 blossom_write_servers: config.blossom.all_write_servers(),
305 max_follow_distance: config.nostr.social_graph_crawl_depth,
306 overmute_threshold: config.nostr.overmute_threshold,
307 require_negentropy: config.nostr.negentropy_only,
308 kinds: config.nostr.mirror_kinds.clone(),
309 history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
310 history_sync_per_author_event_limit: config
311 .nostr
312 .history_sync_per_author_event_limit
313 .max(1),
314 missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
315 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
316 full_text_note_history_follow_distance: config
317 .nostr
318 .full_text_note_history_follow_distance,
319 full_text_note_history_max_relay_pages: config
320 .nostr
321 .full_text_note_history_max_relay_pages,
322 ..crate::nostr_mirror::NostrMirrorConfig::default()
323 }
324 }
325
326 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
327 let mut runtime = self.runtime.lock().await;
328
329 Self::shutdown_crawler(&mut runtime.crawler).await;
330 Self::shutdown_mirror(&mut runtime.mirror).await;
331 Self::shutdown_sync(&mut runtime.sync).await;
332
333 if !config.server.mode.background_services_enabled() {
334 return Ok(runtime.status());
335 }
336
337 let active_relays = config.nostr.active_relays();
338
339 if config.nostr.enabled
340 && config.nostr.social_graph_crawl_depth > 0
341 && !active_relays.is_empty()
342 {
343 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
344 self.graph_store.clone(),
345 self.keys.clone(),
346 active_relays.clone(),
347 config.nostr.social_graph_crawl_depth,
348 self.spambox.clone(),
349 self.data_dir.clone(),
350 ));
351
352 let service = Arc::new(
353 crate::nostr_mirror::BackgroundNostrMirror::new(
354 Self::nostr_mirror_config(config, &active_relays),
355 self.store.clone(),
356 self.graph_store_concrete.clone(),
357 Some(
358 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
359 .context("Failed to parse keys for background nostr mirror")?,
360 ),
361 )
362 .await
363 .context("Failed to create background nostr mirror")?,
364 );
365 let service_for_task = service.clone();
366 let join = tokio::task::spawn_blocking(move || {
367 let runtime = tokio::runtime::Builder::new_current_thread()
368 .enable_all()
369 .build()
370 .expect("build background nostr mirror runtime");
371 runtime.block_on(async {
372 if let Err(err) = service_for_task.run().await {
373 tracing::error!("Background nostr mirror error: {:#}", err);
374 }
375 });
376 });
377 runtime.mirror = Some(BackgroundMirrorRuntime {
378 service,
379 join: Some(join),
380 });
381 }
382
383 let has_pinned_refs = self
384 .store
385 .list_pinned_refs()
386 .map(|refs| !refs.is_empty())
387 .unwrap_or(false);
388 let has_tracked_authors = self
389 .store
390 .list_tracked_authors()
391 .map(|authors| !authors.is_empty())
392 .unwrap_or(false);
393
394 if config.sync.enabled
395 && (config.sync.sync_own
396 || config.sync.sync_followed
397 || has_pinned_refs
398 || has_tracked_authors)
399 && !active_relays.is_empty()
400 {
401 let sync_config = crate::sync::SyncConfig {
402 sync_own: config.sync.sync_own,
403 sync_followed: config.sync.sync_followed,
404 relays: active_relays,
405 max_concurrent: config.sync.max_concurrent,
406 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
407 blossom_timeout_ms: config.sync.blossom_timeout_ms,
408 };
409
410 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
411 .context("Failed to parse keys for sync")?;
412 let service = Arc::new(
413 crate::sync::BackgroundSync::new(
414 sync_config,
415 self.store.clone(),
416 sync_keys,
417 self.webrtc_state.clone(),
418 )
419 .await
420 .context("Failed to create background sync service")?,
421 );
422 let contacts_file = self.data_dir.join("contacts.json");
423 let service_for_task = service.clone();
424 let join = tokio::spawn(async move {
425 if let Err(err) = service_for_task.run(contacts_file).await {
426 tracing::error!("Background sync error: {}", err);
427 }
428 });
429 runtime.sync = Some(BackgroundSyncRuntime {
430 service,
431 join: Some(join),
432 });
433 }
434
435 Ok(runtime.status())
436 }
437}
438
439#[cfg(feature = "p2p")]
440pub struct EmbeddedPeerRouterController {
441 keys: Keys,
442 data_dir: PathBuf,
443 state: Arc<WebRTCState>,
444 store: Arc<dyn ContentStore>,
445 peer_classifier: PeerClassifier,
446 nostr_relay: Arc<NostrRelay>,
447 runtime: Mutex<Option<PeerRouterRuntime>>,
448}
449
450#[cfg(feature = "p2p")]
451impl EmbeddedPeerRouterController {
452 pub fn new(
453 keys: Keys,
454 data_dir: PathBuf,
455 state: Arc<WebRTCState>,
456 store: Arc<dyn ContentStore>,
457 peer_classifier: PeerClassifier,
458 nostr_relay: Arc<NostrRelay>,
459 ) -> Self {
460 Self {
461 keys,
462 data_dir,
463 state,
464 store,
465 peer_classifier,
466 nostr_relay,
467 runtime: Mutex::new(None),
468 }
469 }
470
471 pub fn state(&self) -> Arc<WebRTCState> {
472 self.state.clone()
473 }
474
475 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
476 let mut runtime = self.runtime.lock().await;
477 if let Some(runtime_handle) = runtime.take() {
478 if let Err(err) =
479 crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
480 {
481 tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
482 }
483 let _ = runtime_handle.shutdown.send(true);
484 runtime_handle.peer_state_persist.abort();
485 let mut join = runtime_handle.join;
486 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
487 Ok(Ok(())) => {}
488 Ok(Err(err)) => {
489 tracing::warn!("Peer router task ended with join error: {}", err);
490 }
491 Err(_) => {
492 tracing::warn!("Timed out waiting for peer router shutdown");
493 join.abort();
494 }
495 }
496 }
497
498 self.state.reset_runtime_state().await;
499 if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
500 tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
501 }
502
503 if !crate::p2p_common::peer_router_enabled(config) {
504 return Ok(false);
505 }
506
507 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
508 let mut manager = if config.server.mode.hash_get_enabled() {
509 WebRTCManager::new_with_state_and_store_and_classifier(
510 self.keys.clone(),
511 webrtc_config,
512 self.state.clone(),
513 self.store.clone(),
514 self.peer_classifier.clone(),
515 )
516 } else {
517 let mut manager =
518 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
519 manager.set_peer_classifier(self.peer_classifier.clone());
520 manager
521 };
522 manager
523 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
524 let shutdown = manager.shutdown_signal();
525 let join = tokio::spawn(async move {
526 if let Err(err) = manager.run().await {
527 tracing::error!("Peer router error: {}", err);
528 }
529 });
530 let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
531 self.data_dir.clone(),
532 self.state.clone(),
533 );
534 *runtime = Some(PeerRouterRuntime {
535 shutdown,
536 join,
537 peer_state_persist,
538 });
539 Ok(true)
540 }
541
542 pub async fn shutdown(&self) {
543 let mut runtime = self.runtime.lock().await;
544 let Some(runtime_handle) = runtime.take() else {
545 return;
546 };
547
548 if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
549 tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
550 }
551 let _ = runtime_handle.shutdown.send(true);
552 runtime_handle.peer_state_persist.abort();
553 let mut join = runtime_handle.join;
554 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
555 Ok(Ok(())) => {}
556 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
557 Err(_) => {
558 tracing::warn!("Timed out waiting for peer router shutdown");
559 join.abort();
560 }
561 }
562
563 self.state.reset_runtime_state().await;
564 }
565}
566
567pub struct EmbeddedDaemonController {
568 server_controller: Arc<EmbeddedServerController>,
569 #[cfg(feature = "p2p")]
570 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
571 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
572}
573
574impl EmbeddedDaemonController {
575 #[cfg(feature = "p2p")]
576 pub fn new(
577 server_controller: Arc<EmbeddedServerController>,
578 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
579 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
580 ) -> Self {
581 Self {
582 server_controller,
583 #[cfg(feature = "p2p")]
584 peer_router_controller,
585 background_services_controller,
586 }
587 }
588
589 #[cfg(not(feature = "p2p"))]
590 pub fn new(
591 server_controller: Arc<EmbeddedServerController>,
592 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
593 ) -> Self {
594 Self {
595 server_controller,
596 background_services_controller,
597 }
598 }
599
600 pub async fn shutdown(&self) {
601 self.server_controller.shutdown().await;
602 if let Some(controller) = self.background_services_controller.as_ref() {
603 controller.shutdown().await;
604 }
605 #[cfg(feature = "p2p")]
606 if let Some(controller) = self.peer_router_controller.as_ref() {
607 controller.shutdown().await;
608 }
609 }
610}
611
612pub struct EmbeddedDaemonOptions {
613 pub config: Config,
614 pub data_dir: PathBuf,
615 pub config_dir: Option<PathBuf>,
616 pub bind_address: String,
617 pub relays: Option<Vec<String>>,
618 pub extra_routes: Option<Router<AppState>>,
619 pub cors: Option<CorsLayer>,
620}
621
622pub struct EmbeddedDaemonInfo {
623 pub addr: String,
624 pub port: u16,
625 pub npub: String,
626 pub store: Arc<HashtreeStore>,
627 pub daemon_controller: Arc<EmbeddedDaemonController>,
628 #[allow(dead_code)]
629 pub webrtc_state: Option<Arc<WebRTCState>>,
630 #[cfg(feature = "p2p")]
631 #[allow(dead_code)]
632 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
633 #[allow(dead_code)]
634 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
635}
636
637pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
638 let _ = rustls::crypto::ring::default_provider().install_default();
639
640 let mut config = opts.config;
641 config.server.bind_address = opts.bind_address.clone();
642 if let Some(relays) = opts.relays {
643 config.nostr.relays = relays;
644 config.nostr.enabled = !config.nostr.relays.is_empty();
645 }
646
647 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
648 let nostr_db_max_bytes = config
649 .nostr
650 .db_max_size_gb
651 .saturating_mul(1024 * 1024 * 1024);
652 let spambox_db_max_bytes = config
653 .nostr
654 .spambox_max_size_gb
655 .saturating_mul(1024 * 1024 * 1024);
656
657 let store = Arc::new(HashtreeStore::with_options(
658 &opts.data_dir,
659 config.storage.s3.as_ref(),
660 max_size_bytes,
661 )?);
662
663 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
664 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
665 } else {
666 ensure_keys()?
667 };
668 let pk_bytes = pubkey_bytes(&keys);
669 let npub = keys
670 .public_key()
671 .to_bech32()
672 .context("Failed to encode npub")?;
673
674 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
675 allowed_pubkeys.insert(hex::encode(pk_bytes));
676 for npub_str in &config.nostr.allowed_npubs {
677 if let Ok(pk) = parse_npub(npub_str) {
678 allowed_pubkeys.insert(hex::encode(pk));
679 } else {
680 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
681 }
682 }
683
684 let graph_store = socialgraph::open_social_graph_store_with_storage(
685 &opts.data_dir,
686 store.store_arc(),
687 Some(nostr_db_max_bytes),
688 )
689 .context("Failed to initialize social graph store")?;
690 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
691
692 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
693 parse_npub(root_npub).unwrap_or(pk_bytes)
694 } else {
695 pk_bytes
696 };
697 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
698 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
699 .context("Failed to sync local social graph lists")?;
700 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
701
702 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
703 Arc::clone(&social_graph_store),
704 config.nostr.max_write_distance,
705 allowed_pubkeys.clone(),
706 ));
707
708 let nostr_relay_config = NostrRelayConfig {
709 spambox_db_max_bytes,
710 ..Default::default()
711 };
712 let mut public_event_pubkeys = HashSet::new();
713 public_event_pubkeys.insert(hex::encode(pk_bytes));
714 let nostr_relay = Arc::new(
715 NostrRelay::new(
716 Arc::clone(&social_graph_store),
717 opts.data_dir.clone(),
718 public_event_pubkeys,
719 Some(social_graph.clone()),
720 nostr_relay_config,
721 )
722 .context("Failed to initialize Nostr relay")?,
723 );
724
725 let crawler_spambox = if spambox_db_max_bytes == 0 {
726 None
727 } else {
728 let spam_dir = opts.data_dir.join("socialgraph_spambox");
729 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
730 Ok(store) => Some(store),
731 Err(err) => {
732 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
733 None
734 }
735 }
736 };
737 let crawler_spambox_backend = crawler_spambox
738 .clone()
739 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
740
741 #[cfg(feature = "p2p")]
742 let (webrtc_state, peer_router_controller): (
743 Option<Arc<WebRTCState>>,
744 Option<Arc<EmbeddedPeerRouterController>>,
745 ) = {
746 let router_config = crate::p2p_common::default_webrtc_config(&config);
747 let peer_classifier = crate::p2p_common::build_peer_classifier(
748 opts.data_dir.clone(),
749 Arc::clone(&social_graph_store),
750 );
751 let cashu_payment_client =
752 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
753 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
754 Ok(client) => {
755 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
756 }
757 Err(err) => {
758 tracing::warn!(
759 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
760 err
761 );
762 None
763 }
764 }
765 } else {
766 None
767 };
768 let cashu_mint_metadata =
769 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
770 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
771 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
772 Ok(store) => Some(store),
773 Err(err) => {
774 tracing::warn!(
775 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
776 err
777 );
778 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
779 }
780 }
781 } else {
782 None
783 };
784
785 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
786 router_config.request_selection_strategy,
787 router_config.request_fairness_enabled,
788 router_config.request_dispatch,
789 std::time::Duration::from_millis(router_config.message_timeout_ms),
790 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
791 cashu_payment_client,
792 cashu_mint_metadata,
793 ));
794 let controller = Arc::new(EmbeddedPeerRouterController::new(
795 keys.clone(),
796 opts.data_dir.clone(),
797 state.clone(),
798 Arc::clone(&store) as Arc<dyn ContentStore>,
799 peer_classifier,
800 nostr_relay.clone(),
801 ));
802 controller.apply_config(&config).await?;
803 (Some(state), Some(controller))
804 };
805
806 #[cfg(not(feature = "p2p"))]
807 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
808
809 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
810 keys.clone(),
811 opts.data_dir.clone(),
812 Arc::clone(&store),
813 graph_store.clone(),
814 Arc::clone(&social_graph_store),
815 crawler_spambox_backend,
816 webrtc_state.clone(),
817 ));
818 background_services_controller.apply_config(&config).await?;
819
820 let upstream_blossom = config.blossom.all_read_servers();
821 let active_nostr_relays = config.nostr.active_relays();
822
823 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
824 .with_server_mode(config.server.mode)
825 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
826 .with_allowed_pubkeys(allowed_pubkeys.clone())
827 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
828 .with_public_writes(config.server.public_writes)
829 .with_upstream_blossom(upstream_blossom)
830 .with_nostr_relay_urls(active_nostr_relays)
831 .with_social_graph(social_graph)
832 .with_socialgraph_snapshot(
833 Arc::clone(&social_graph_store),
834 social_graph_root_bytes,
835 config.server.socialgraph_snapshot_public,
836 )
837 .with_nostr_relay(nostr_relay.clone());
838
839 if crate::p2p_common::peer_router_enabled(&config) {
840 if let Some(ref state) = webrtc_state {
841 server = server.with_webrtc_peers(state.clone());
842 }
843 }
844
845 if let Some(extra) = opts.extra_routes {
846 server = server.with_extra_routes(extra);
847 }
848 if let Some(cors) = opts.cors {
849 server = server.with_cors(cors);
850 }
851
852 spawn_background_eviction_task(
853 Arc::clone(&store),
854 BACKGROUND_EVICTION_INTERVAL,
855 "embedded daemon",
856 );
857
858 let listener = TcpListener::bind(&opts.bind_address).await?;
859 let local_addr = listener.local_addr()?;
860 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
861
862 let server_shutdown = Arc::new(Notify::new());
863 let server_shutdown_for_task = Arc::clone(&server_shutdown);
864 let server_join = tokio::spawn(async move {
865 if let Err(e) = server
866 .run_with_listener_until(listener, async move {
867 server_shutdown_for_task.notified().await;
868 })
869 .await
870 {
871 tracing::error!("Embedded daemon server error: {}", e);
872 }
873 });
874 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
875 #[cfg(feature = "p2p")]
876 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
877 server_controller,
878 peer_router_controller.clone(),
879 Some(background_services_controller.clone()),
880 ));
881 #[cfg(not(feature = "p2p"))]
882 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
883 server_controller,
884 Some(background_services_controller.clone()),
885 ));
886
887 tracing::info!(
888 "Embedded daemon started on {}, identity {}",
889 actual_addr,
890 npub
891 );
892
893 Ok(EmbeddedDaemonInfo {
894 addr: actual_addr,
895 port: local_addr.port(),
896 npub,
897 store,
898 daemon_controller,
899 webrtc_state,
900 #[cfg(feature = "p2p")]
901 peer_router_controller,
902 background_services_controller: Some(background_services_controller),
903 })
904}
905
906#[cfg(test)]
907mod tests {
908 use super::EmbeddedBackgroundServicesController;
909 use crate::config::Config;
910
911 #[test]
912 fn mirror_publish_relays_prefers_known_root_publish_relays() {
913 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
914 &[
915 "wss://graph-relay.iris.to".to_string(),
916 "wss://relay.example".to_string(),
917 "wss://relay.damus.io".to_string(),
918 "wss://temp.iris.to".to_string(),
919 "wss://vault.iris.to".to_string(),
920 "wss://upload.iris.to/nostr".to_string(),
921 ],
922 "0.0.0.0:8080",
923 );
924 assert_eq!(
925 relays,
926 vec![
927 "wss://temp.iris.to".to_string(),
928 "wss://vault.iris.to".to_string(),
929 "wss://relay.damus.io".to_string(),
930 ]
931 );
932 }
933
934 #[test]
935 fn mirror_publish_relays_filters_known_bad_publish_targets_when_no_preferred_remain() {
936 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
937 &[
938 "wss://graph-relay.iris.to".to_string(),
939 "wss://relay.snort.social".to_string(),
940 "wss://relay.nostr.band".to_string(),
941 "wss://upload.iris.to/nostr".to_string(),
942 ],
943 "0.0.0.0:8080",
944 );
945 assert_eq!(
946 relays,
947 vec![
948 "wss://relay.snort.social".to_string(),
949 "wss://relay.nostr.band".to_string(),
950 ]
951 );
952 }
953
954 #[test]
955 fn nostr_mirror_config_allows_disabling_full_note_paging() {
956 let mut config = Config::default();
957 config.nostr.full_text_note_history_max_relay_pages = 0;
958
959 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
960 &config,
961 &["wss://relay.example".to_string()],
962 );
963
964 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
965
966 config.nostr.full_text_note_history_max_relay_pages = 64;
967 let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
968 &config,
969 &["wss://relay.example".to_string()],
970 );
971
972 assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
973 }
974}