1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29}
30
31struct BackgroundSyncRuntime {
32 service: Arc<crate::sync::BackgroundSync>,
33 join: Option<JoinHandle<()>>,
34}
35
36impl Drop for BackgroundSyncRuntime {
37 fn drop(&mut self) {
38 self.service.shutdown();
39 if let Some(join) = self.join.take() {
40 join.abort();
41 }
42 }
43}
44
45struct BackgroundMirrorRuntime {
46 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
47 join: Option<JoinHandle<()>>,
48}
49
50impl Drop for BackgroundMirrorRuntime {
51 fn drop(&mut self) {
52 self.service.shutdown();
53 if let Some(join) = self.join.take() {
54 join.abort();
55 }
56 }
57}
58
59struct BackgroundServicesRuntime {
60 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
61 mirror: Option<BackgroundMirrorRuntime>,
62 sync: Option<BackgroundSyncRuntime>,
63}
64
65impl Drop for BackgroundServicesRuntime {
66 fn drop(&mut self) {
67 if let Some(handles) = self.crawler.as_ref() {
68 let _ = handles.shutdown_tx.send(true);
69 }
70 if let Some(runtime) = self.mirror.as_ref() {
71 runtime.service.shutdown();
72 }
73 if let Some(runtime) = self.sync.as_ref() {
74 runtime.service.shutdown();
75 }
76 }
77}
78
79impl BackgroundServicesRuntime {
80 fn status(&self) -> EmbeddedBackgroundServicesStatus {
81 EmbeddedBackgroundServicesStatus {
82 crawler_active: self.crawler.is_some(),
83 mirror_active: self.mirror.is_some(),
84 sync_active: self.sync.is_some(),
85 }
86 }
87}
88
89struct EmbeddedServerRuntime {
90 shutdown: Arc<Notify>,
91 join: Option<JoinHandle<()>>,
92}
93
94pub struct EmbeddedServerController {
95 runtime: Mutex<Option<EmbeddedServerRuntime>>,
96}
97
98impl EmbeddedServerController {
99 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
100 Self {
101 runtime: Mutex::new(Some(EmbeddedServerRuntime {
102 shutdown,
103 join: Some(join),
104 })),
105 }
106 }
107
108 pub async fn shutdown(&self) {
109 let mut runtime = self.runtime.lock().await;
110 let Some(mut runtime) = runtime.take() else {
111 return;
112 };
113
114 runtime.shutdown.notify_waiters();
115 if let Some(mut join) = runtime.join.take() {
116 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
117 Ok(Ok(())) => {}
118 Ok(Err(err)) => {
119 tracing::warn!("Embedded server task ended with join error: {}", err)
120 }
121 Err(_) => {
122 tracing::warn!("Timed out waiting for embedded server shutdown");
123 join.abort();
124 }
125 }
126 }
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub struct EmbeddedBackgroundServicesStatus {
132 pub crawler_active: bool,
133 pub mirror_active: bool,
134 pub sync_active: bool,
135}
136
137pub struct EmbeddedBackgroundServicesController {
138 keys: Keys,
139 data_dir: PathBuf,
140 store: Arc<HashtreeStore>,
141 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
142 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
143 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
144 webrtc_state: Option<Arc<WebRTCState>>,
145 runtime: Mutex<BackgroundServicesRuntime>,
146}
147
148impl EmbeddedBackgroundServicesController {
149 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
150 active_relays.to_vec()
151 }
152
153 pub fn new(
154 keys: Keys,
155 data_dir: PathBuf,
156 store: Arc<HashtreeStore>,
157 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
158 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
159 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
160 webrtc_state: Option<Arc<WebRTCState>>,
161 ) -> Self {
162 Self {
163 keys,
164 data_dir,
165 store,
166 graph_store_concrete,
167 graph_store,
168 spambox,
169 webrtc_state,
170 runtime: Mutex::new(BackgroundServicesRuntime {
171 crawler: None,
172 mirror: None,
173 sync: None,
174 }),
175 }
176 }
177
178 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
179 self.runtime.lock().await.status()
180 }
181
182 pub async fn shutdown(&self) {
183 let mut runtime = self.runtime.lock().await;
184 Self::shutdown_crawler(&mut runtime.crawler).await;
185 Self::shutdown_mirror(&mut runtime.mirror).await;
186 Self::shutdown_sync(&mut runtime.sync).await;
187 }
188
189 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
190 let Some(handles) = crawler.take() else {
191 return;
192 };
193
194 let _ = handles.shutdown_tx.send(true);
195
196 let mut crawl_handle = handles.crawl_handle;
197 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
198 Ok(Ok(())) => {}
199 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
200 Err(_) => {
201 tracing::warn!("Timed out waiting for crawler task shutdown");
202 crawl_handle.abort();
203 }
204 }
205
206 let mut local_list_handle = handles.local_list_handle;
207 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
208 {
209 Ok(Ok(())) => {}
210 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
211 Err(_) => {
212 tracing::warn!("Timed out waiting for local list task shutdown");
213 local_list_handle.abort();
214 }
215 }
216 }
217
218 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
219 let Some(mut runtime) = sync.take() else {
220 return;
221 };
222
223 runtime.service.shutdown();
224 if let Some(mut join) = runtime.join.take() {
225 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
226 Ok(Ok(())) => {}
227 Ok(Err(err)) => {
228 tracing::warn!("Background sync task ended with join error: {}", err)
229 }
230 Err(_) => {
231 tracing::warn!("Timed out waiting for background sync shutdown");
232 join.abort();
233 }
234 }
235 }
236 }
237
238 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
239 let Some(mut runtime) = mirror.take() else {
240 return;
241 };
242
243 runtime.service.shutdown();
244 if let Some(mut join) = runtime.join.take() {
245 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
246 Ok(Ok(())) => {}
247 Ok(Err(err)) => {
248 tracing::warn!("Background mirror task ended with join error: {}", err)
249 }
250 Err(_) => {
251 tracing::warn!("Timed out waiting for background mirror shutdown");
252 join.abort();
253 }
254 }
255 }
256 }
257
258 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
259 let mut runtime = self.runtime.lock().await;
260
261 Self::shutdown_crawler(&mut runtime.crawler).await;
262 Self::shutdown_mirror(&mut runtime.mirror).await;
263 Self::shutdown_sync(&mut runtime.sync).await;
264
265 if !config.server.mode.background_services_enabled() {
266 return Ok(runtime.status());
267 }
268
269 let active_relays = config.nostr.active_relays();
270
271 if config.nostr.enabled
272 && config.nostr.social_graph_crawl_depth > 0
273 && !active_relays.is_empty()
274 {
275 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
276 self.graph_store.clone(),
277 self.keys.clone(),
278 active_relays.clone(),
279 config.nostr.social_graph_crawl_depth,
280 self.spambox.clone(),
281 self.data_dir.clone(),
282 ));
283
284 let service = Arc::new(
285 crate::nostr_mirror::BackgroundNostrMirror::new(
286 crate::nostr_mirror::NostrMirrorConfig {
287 relays: active_relays.clone(),
288 publish_relays: Self::mirror_publish_relays(
289 &active_relays,
290 &config.server.bind_address,
291 ),
292 max_follow_distance: config.nostr.social_graph_crawl_depth,
293 overmute_threshold: config.nostr.overmute_threshold,
294 require_negentropy: config.nostr.negentropy_only,
295 kinds: config.nostr.mirror_kinds.clone(),
296 history_sync_author_chunk_size: config
297 .nostr
298 .history_sync_author_chunk_size
299 .max(1),
300 history_sync_per_author_event_limit: config
301 .nostr
302 .history_sync_per_author_event_limit
303 .max(1),
304 missing_profile_backfill_batch_size: config
305 .nostr
306 .history_sync_author_chunk_size
307 .max(1),
308 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
309 ..crate::nostr_mirror::NostrMirrorConfig::default()
310 },
311 self.store.clone(),
312 self.graph_store_concrete.clone(),
313 Some(
314 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
315 .context("Failed to parse keys for background nostr mirror")?,
316 ),
317 )
318 .await
319 .context("Failed to create background nostr mirror")?,
320 );
321 let service_for_task = service.clone();
322 let join = tokio::task::spawn_blocking(move || {
323 let runtime = tokio::runtime::Builder::new_current_thread()
324 .enable_all()
325 .build()
326 .expect("build background nostr mirror runtime");
327 runtime.block_on(async {
328 if let Err(err) = service_for_task.run().await {
329 tracing::error!("Background nostr mirror error: {:#}", err);
330 }
331 });
332 });
333 runtime.mirror = Some(BackgroundMirrorRuntime {
334 service,
335 join: Some(join),
336 });
337 }
338
339 let has_pinned_refs = self
340 .store
341 .list_pinned_refs()
342 .map(|refs| !refs.is_empty())
343 .unwrap_or(false);
344
345 if config.sync.enabled
346 && (config.sync.sync_own || config.sync.sync_followed || has_pinned_refs)
347 && !active_relays.is_empty()
348 {
349 let sync_config = crate::sync::SyncConfig {
350 sync_own: config.sync.sync_own,
351 sync_followed: config.sync.sync_followed,
352 relays: active_relays,
353 max_concurrent: config.sync.max_concurrent,
354 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
355 blossom_timeout_ms: config.sync.blossom_timeout_ms,
356 };
357
358 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
359 .context("Failed to parse keys for sync")?;
360 let service = Arc::new(
361 crate::sync::BackgroundSync::new(
362 sync_config,
363 self.store.clone(),
364 sync_keys,
365 self.webrtc_state.clone(),
366 )
367 .await
368 .context("Failed to create background sync service")?,
369 );
370 let contacts_file = self.data_dir.join("contacts.json");
371 let service_for_task = service.clone();
372 let join = tokio::spawn(async move {
373 if let Err(err) = service_for_task.run(contacts_file).await {
374 tracing::error!("Background sync error: {}", err);
375 }
376 });
377 runtime.sync = Some(BackgroundSyncRuntime {
378 service,
379 join: Some(join),
380 });
381 }
382
383 Ok(runtime.status())
384 }
385}
386
387#[cfg(feature = "p2p")]
388pub struct EmbeddedPeerRouterController {
389 keys: Keys,
390 state: Arc<WebRTCState>,
391 store: Arc<dyn ContentStore>,
392 peer_classifier: PeerClassifier,
393 nostr_relay: Arc<NostrRelay>,
394 runtime: Mutex<Option<PeerRouterRuntime>>,
395}
396
397#[cfg(feature = "p2p")]
398impl EmbeddedPeerRouterController {
399 pub fn new(
400 keys: Keys,
401 state: Arc<WebRTCState>,
402 store: Arc<dyn ContentStore>,
403 peer_classifier: PeerClassifier,
404 nostr_relay: Arc<NostrRelay>,
405 ) -> Self {
406 Self {
407 keys,
408 state,
409 store,
410 peer_classifier,
411 nostr_relay,
412 runtime: Mutex::new(None),
413 }
414 }
415
416 pub fn state(&self) -> Arc<WebRTCState> {
417 self.state.clone()
418 }
419
420 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
421 let mut runtime = self.runtime.lock().await;
422 if let Some(runtime_handle) = runtime.take() {
423 let _ = runtime_handle.shutdown.send(true);
424 let mut join = runtime_handle.join;
425 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
426 Ok(Ok(())) => {}
427 Ok(Err(err)) => {
428 tracing::warn!("Peer router task ended with join error: {}", err);
429 }
430 Err(_) => {
431 tracing::warn!("Timed out waiting for peer router shutdown");
432 join.abort();
433 }
434 }
435 }
436
437 self.state.reset_runtime_state().await;
438
439 if !crate::p2p_common::peer_router_enabled(config) {
440 return Ok(false);
441 }
442
443 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
444 let mut manager = if config.server.mode.hash_get_enabled() {
445 WebRTCManager::new_with_state_and_store_and_classifier(
446 self.keys.clone(),
447 webrtc_config,
448 self.state.clone(),
449 self.store.clone(),
450 self.peer_classifier.clone(),
451 )
452 } else {
453 let mut manager =
454 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
455 manager.set_peer_classifier(self.peer_classifier.clone());
456 manager
457 };
458 manager
459 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
460 let shutdown = manager.shutdown_signal();
461 let join = tokio::spawn(async move {
462 if let Err(err) = manager.run().await {
463 tracing::error!("Peer router error: {}", err);
464 }
465 });
466 *runtime = Some(PeerRouterRuntime { shutdown, join });
467 Ok(true)
468 }
469
470 pub async fn shutdown(&self) {
471 let mut runtime = self.runtime.lock().await;
472 let Some(runtime_handle) = runtime.take() else {
473 return;
474 };
475
476 let _ = runtime_handle.shutdown.send(true);
477 let mut join = runtime_handle.join;
478 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
479 Ok(Ok(())) => {}
480 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
481 Err(_) => {
482 tracing::warn!("Timed out waiting for peer router shutdown");
483 join.abort();
484 }
485 }
486
487 self.state.reset_runtime_state().await;
488 }
489}
490
491pub struct EmbeddedDaemonController {
492 server_controller: Arc<EmbeddedServerController>,
493 #[cfg(feature = "p2p")]
494 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
495 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
496}
497
498impl EmbeddedDaemonController {
499 #[cfg(feature = "p2p")]
500 pub fn new(
501 server_controller: Arc<EmbeddedServerController>,
502 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
503 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
504 ) -> Self {
505 Self {
506 server_controller,
507 #[cfg(feature = "p2p")]
508 peer_router_controller,
509 background_services_controller,
510 }
511 }
512
513 #[cfg(not(feature = "p2p"))]
514 pub fn new(
515 server_controller: Arc<EmbeddedServerController>,
516 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
517 ) -> Self {
518 Self {
519 server_controller,
520 background_services_controller,
521 }
522 }
523
524 pub async fn shutdown(&self) {
525 self.server_controller.shutdown().await;
526 if let Some(controller) = self.background_services_controller.as_ref() {
527 controller.shutdown().await;
528 }
529 #[cfg(feature = "p2p")]
530 if let Some(controller) = self.peer_router_controller.as_ref() {
531 controller.shutdown().await;
532 }
533 }
534}
535
536pub struct EmbeddedDaemonOptions {
537 pub config: Config,
538 pub data_dir: PathBuf,
539 pub config_dir: Option<PathBuf>,
540 pub bind_address: String,
541 pub relays: Option<Vec<String>>,
542 pub extra_routes: Option<Router<AppState>>,
543 pub cors: Option<CorsLayer>,
544}
545
546pub struct EmbeddedDaemonInfo {
547 pub addr: String,
548 pub port: u16,
549 pub npub: String,
550 pub store: Arc<HashtreeStore>,
551 pub daemon_controller: Arc<EmbeddedDaemonController>,
552 #[allow(dead_code)]
553 pub webrtc_state: Option<Arc<WebRTCState>>,
554 #[cfg(feature = "p2p")]
555 #[allow(dead_code)]
556 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
557 #[allow(dead_code)]
558 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
559}
560
561pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
562 let _ = rustls::crypto::ring::default_provider().install_default();
563
564 let mut config = opts.config;
565 if let Some(relays) = opts.relays {
566 config.nostr.relays = relays;
567 config.nostr.enabled = !config.nostr.relays.is_empty();
568 }
569
570 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
571 let nostr_db_max_bytes = config
572 .nostr
573 .db_max_size_gb
574 .saturating_mul(1024 * 1024 * 1024);
575 let spambox_db_max_bytes = config
576 .nostr
577 .spambox_max_size_gb
578 .saturating_mul(1024 * 1024 * 1024);
579
580 let store = Arc::new(HashtreeStore::with_options(
581 &opts.data_dir,
582 config.storage.s3.as_ref(),
583 max_size_bytes,
584 )?);
585
586 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
587 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
588 } else {
589 ensure_keys()?
590 };
591 let pk_bytes = pubkey_bytes(&keys);
592 let npub = keys
593 .public_key()
594 .to_bech32()
595 .context("Failed to encode npub")?;
596
597 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
598 allowed_pubkeys.insert(hex::encode(pk_bytes));
599 for npub_str in &config.nostr.allowed_npubs {
600 if let Ok(pk) = parse_npub(npub_str) {
601 allowed_pubkeys.insert(hex::encode(pk));
602 } else {
603 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
604 }
605 }
606
607 let graph_store = socialgraph::open_social_graph_store_with_storage(
608 &opts.data_dir,
609 store.store_arc(),
610 Some(nostr_db_max_bytes),
611 )
612 .context("Failed to initialize social graph store")?;
613 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
614
615 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
616 parse_npub(root_npub).unwrap_or(pk_bytes)
617 } else {
618 pk_bytes
619 };
620 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
621 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
622 .context("Failed to sync local social graph lists")?;
623 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
624
625 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
626 Arc::clone(&social_graph_store),
627 config.nostr.max_write_distance,
628 allowed_pubkeys.clone(),
629 ));
630
631 let nostr_relay_config = NostrRelayConfig {
632 spambox_db_max_bytes,
633 ..Default::default()
634 };
635 let mut public_event_pubkeys = HashSet::new();
636 public_event_pubkeys.insert(hex::encode(pk_bytes));
637 let nostr_relay = Arc::new(
638 NostrRelay::new(
639 Arc::clone(&social_graph_store),
640 opts.data_dir.clone(),
641 public_event_pubkeys,
642 Some(social_graph.clone()),
643 nostr_relay_config,
644 )
645 .context("Failed to initialize Nostr relay")?,
646 );
647
648 let crawler_spambox = if spambox_db_max_bytes == 0 {
649 None
650 } else {
651 let spam_dir = opts.data_dir.join("socialgraph_spambox");
652 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
653 Ok(store) => Some(store),
654 Err(err) => {
655 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
656 None
657 }
658 }
659 };
660 let crawler_spambox_backend = crawler_spambox
661 .clone()
662 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
663
664 #[cfg(feature = "p2p")]
665 let (webrtc_state, peer_router_controller): (
666 Option<Arc<WebRTCState>>,
667 Option<Arc<EmbeddedPeerRouterController>>,
668 ) = {
669 let router_config = crate::p2p_common::default_webrtc_config(&config);
670 let peer_classifier = crate::p2p_common::build_peer_classifier(
671 opts.data_dir.clone(),
672 Arc::clone(&social_graph_store),
673 );
674 let cashu_payment_client =
675 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
676 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
677 Ok(client) => {
678 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
679 }
680 Err(err) => {
681 tracing::warn!(
682 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
683 err
684 );
685 None
686 }
687 }
688 } else {
689 None
690 };
691 let cashu_mint_metadata =
692 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
693 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
694 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
695 Ok(store) => Some(store),
696 Err(err) => {
697 tracing::warn!(
698 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
699 err
700 );
701 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
702 }
703 }
704 } else {
705 None
706 };
707
708 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
709 router_config.request_selection_strategy,
710 router_config.request_fairness_enabled,
711 router_config.request_dispatch,
712 std::time::Duration::from_millis(router_config.message_timeout_ms),
713 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
714 cashu_payment_client,
715 cashu_mint_metadata,
716 ));
717 let controller = Arc::new(EmbeddedPeerRouterController::new(
718 keys.clone(),
719 state.clone(),
720 Arc::clone(&store) as Arc<dyn ContentStore>,
721 peer_classifier,
722 nostr_relay.clone(),
723 ));
724 controller.apply_config(&config).await?;
725 (Some(state), Some(controller))
726 };
727
728 #[cfg(not(feature = "p2p"))]
729 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
730
731 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
732 keys.clone(),
733 opts.data_dir.clone(),
734 Arc::clone(&store),
735 graph_store.clone(),
736 Arc::clone(&social_graph_store),
737 crawler_spambox_backend,
738 webrtc_state.clone(),
739 ));
740 background_services_controller.apply_config(&config).await?;
741
742 let upstream_blossom = config.blossom.all_read_servers();
743 let active_nostr_relays = config.nostr.active_relays();
744
745 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
746 .with_server_mode(config.server.mode)
747 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
748 .with_allowed_pubkeys(allowed_pubkeys.clone())
749 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
750 .with_public_writes(config.server.public_writes)
751 .with_upstream_blossom(upstream_blossom)
752 .with_nostr_relay_urls(active_nostr_relays)
753 .with_social_graph(social_graph)
754 .with_socialgraph_snapshot(
755 Arc::clone(&social_graph_store),
756 social_graph_root_bytes,
757 config.server.socialgraph_snapshot_public,
758 )
759 .with_nostr_relay(nostr_relay.clone());
760
761 if let Some(ref state) = webrtc_state {
762 server = server.with_webrtc_peers(state.clone());
763 }
764
765 if let Some(extra) = opts.extra_routes {
766 server = server.with_extra_routes(extra);
767 }
768 if let Some(cors) = opts.cors {
769 server = server.with_cors(cors);
770 }
771
772 spawn_background_eviction_task(
773 Arc::clone(&store),
774 BACKGROUND_EVICTION_INTERVAL,
775 "embedded daemon",
776 );
777
778 let listener = TcpListener::bind(&opts.bind_address).await?;
779 let local_addr = listener.local_addr()?;
780 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
781
782 let server_shutdown = Arc::new(Notify::new());
783 let server_shutdown_for_task = Arc::clone(&server_shutdown);
784 let server_join = tokio::spawn(async move {
785 if let Err(e) = server
786 .run_with_listener_until(listener, async move {
787 server_shutdown_for_task.notified().await;
788 })
789 .await
790 {
791 tracing::error!("Embedded daemon server error: {}", e);
792 }
793 });
794 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
795 #[cfg(feature = "p2p")]
796 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
797 server_controller,
798 peer_router_controller.clone(),
799 Some(background_services_controller.clone()),
800 ));
801 #[cfg(not(feature = "p2p"))]
802 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
803 server_controller,
804 Some(background_services_controller.clone()),
805 ));
806
807 tracing::info!(
808 "Embedded daemon started on {}, identity {}",
809 actual_addr,
810 npub
811 );
812
813 Ok(EmbeddedDaemonInfo {
814 addr: actual_addr,
815 port: local_addr.port(),
816 npub,
817 store,
818 daemon_controller,
819 webrtc_state,
820 #[cfg(feature = "p2p")]
821 peer_router_controller,
822 background_services_controller: Some(background_services_controller),
823 })
824}
825
826#[cfg(test)]
827mod tests {
828 use super::EmbeddedBackgroundServicesController;
829
830 #[test]
831 fn mirror_publish_relays_match_upstream_relays_only() {
832 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
833 &[
834 "wss://relay.example".to_string(),
835 "wss://relay.two".to_string(),
836 ],
837 "0.0.0.0:8080",
838 );
839 assert_eq!(
840 relays,
841 vec![
842 "wss://relay.example".to_string(),
843 "wss://relay.two".to_string(),
844 ]
845 );
846 }
847}