Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28    join: JoinHandle<()>,
29}
30
31struct BackgroundSyncRuntime {
32    service: Arc<crate::sync::BackgroundSync>,
33    join: Option<JoinHandle<()>>,
34}
35
36impl Drop for BackgroundSyncRuntime {
37    fn drop(&mut self) {
38        self.service.shutdown();
39        if let Some(join) = self.join.take() {
40            join.abort();
41        }
42    }
43}
44
45struct BackgroundMirrorRuntime {
46    service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
47    join: Option<JoinHandle<()>>,
48}
49
50impl Drop for BackgroundMirrorRuntime {
51    fn drop(&mut self) {
52        self.service.shutdown();
53        if let Some(join) = self.join.take() {
54            join.abort();
55        }
56    }
57}
58
59struct BackgroundServicesRuntime {
60    crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
61    mirror: Option<BackgroundMirrorRuntime>,
62    sync: Option<BackgroundSyncRuntime>,
63}
64
65impl Drop for BackgroundServicesRuntime {
66    fn drop(&mut self) {
67        if let Some(handles) = self.crawler.as_ref() {
68            let _ = handles.shutdown_tx.send(true);
69        }
70        if let Some(runtime) = self.mirror.as_ref() {
71            runtime.service.shutdown();
72        }
73        if let Some(runtime) = self.sync.as_ref() {
74            runtime.service.shutdown();
75        }
76    }
77}
78
79impl BackgroundServicesRuntime {
80    fn status(&self) -> EmbeddedBackgroundServicesStatus {
81        EmbeddedBackgroundServicesStatus {
82            crawler_active: self.crawler.is_some(),
83            mirror_active: self.mirror.is_some(),
84            sync_active: self.sync.is_some(),
85        }
86    }
87}
88
89struct EmbeddedServerRuntime {
90    shutdown: Arc<Notify>,
91    join: Option<JoinHandle<()>>,
92}
93
94pub struct EmbeddedServerController {
95    runtime: Mutex<Option<EmbeddedServerRuntime>>,
96}
97
98impl EmbeddedServerController {
99    pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
100        Self {
101            runtime: Mutex::new(Some(EmbeddedServerRuntime {
102                shutdown,
103                join: Some(join),
104            })),
105        }
106    }
107
108    pub async fn shutdown(&self) {
109        let mut runtime = self.runtime.lock().await;
110        let Some(mut runtime) = runtime.take() else {
111            return;
112        };
113
114        runtime.shutdown.notify_waiters();
115        if let Some(mut join) = runtime.join.take() {
116            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
117                Ok(Ok(())) => {}
118                Ok(Err(err)) => {
119                    tracing::warn!("Embedded server task ended with join error: {}", err)
120                }
121                Err(_) => {
122                    tracing::warn!("Timed out waiting for embedded server shutdown");
123                    join.abort();
124                }
125            }
126        }
127    }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub struct EmbeddedBackgroundServicesStatus {
132    pub crawler_active: bool,
133    pub mirror_active: bool,
134    pub sync_active: bool,
135}
136
137pub struct EmbeddedBackgroundServicesController {
138    keys: Keys,
139    data_dir: PathBuf,
140    store: Arc<HashtreeStore>,
141    graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
142    graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
143    spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
144    webrtc_state: Option<Arc<WebRTCState>>,
145    runtime: Mutex<BackgroundServicesRuntime>,
146}
147
148impl EmbeddedBackgroundServicesController {
149    fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
150        active_relays.to_vec()
151    }
152
153    pub fn new(
154        keys: Keys,
155        data_dir: PathBuf,
156        store: Arc<HashtreeStore>,
157        graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
158        graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
159        spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
160        webrtc_state: Option<Arc<WebRTCState>>,
161    ) -> Self {
162        Self {
163            keys,
164            data_dir,
165            store,
166            graph_store_concrete,
167            graph_store,
168            spambox,
169            webrtc_state,
170            runtime: Mutex::new(BackgroundServicesRuntime {
171                crawler: None,
172                mirror: None,
173                sync: None,
174            }),
175        }
176    }
177
178    pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
179        self.runtime.lock().await.status()
180    }
181
182    pub async fn shutdown(&self) {
183        let mut runtime = self.runtime.lock().await;
184        Self::shutdown_crawler(&mut runtime.crawler).await;
185        Self::shutdown_mirror(&mut runtime.mirror).await;
186        Self::shutdown_sync(&mut runtime.sync).await;
187    }
188
189    async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
190        let Some(handles) = crawler.take() else {
191            return;
192        };
193
194        let _ = handles.shutdown_tx.send(true);
195
196        let mut crawl_handle = handles.crawl_handle;
197        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
198            Ok(Ok(())) => {}
199            Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
200            Err(_) => {
201                tracing::warn!("Timed out waiting for crawler task shutdown");
202                crawl_handle.abort();
203            }
204        }
205
206        let mut local_list_handle = handles.local_list_handle;
207        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
208        {
209            Ok(Ok(())) => {}
210            Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
211            Err(_) => {
212                tracing::warn!("Timed out waiting for local list task shutdown");
213                local_list_handle.abort();
214            }
215        }
216    }
217
218    async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
219        let Some(mut runtime) = sync.take() else {
220            return;
221        };
222
223        runtime.service.shutdown();
224        if let Some(mut join) = runtime.join.take() {
225            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
226                Ok(Ok(())) => {}
227                Ok(Err(err)) => {
228                    tracing::warn!("Background sync task ended with join error: {}", err)
229                }
230                Err(_) => {
231                    tracing::warn!("Timed out waiting for background sync shutdown");
232                    join.abort();
233                }
234            }
235        }
236    }
237
238    async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
239        let Some(mut runtime) = mirror.take() else {
240            return;
241        };
242
243        runtime.service.shutdown();
244        if let Some(mut join) = runtime.join.take() {
245            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
246                Ok(Ok(())) => {}
247                Ok(Err(err)) => {
248                    tracing::warn!("Background mirror task ended with join error: {}", err)
249                }
250                Err(_) => {
251                    tracing::warn!("Timed out waiting for background mirror shutdown");
252                    join.abort();
253                }
254            }
255        }
256    }
257
258    pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
259        let mut runtime = self.runtime.lock().await;
260
261        Self::shutdown_crawler(&mut runtime.crawler).await;
262        Self::shutdown_mirror(&mut runtime.mirror).await;
263        Self::shutdown_sync(&mut runtime.sync).await;
264
265        if !config.server.mode.background_services_enabled() {
266            return Ok(runtime.status());
267        }
268
269        let active_relays = config.nostr.active_relays();
270
271        if config.nostr.enabled
272            && config.nostr.social_graph_crawl_depth > 0
273            && !active_relays.is_empty()
274        {
275            runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
276                self.graph_store.clone(),
277                self.keys.clone(),
278                active_relays.clone(),
279                config.nostr.social_graph_crawl_depth,
280                self.spambox.clone(),
281                self.data_dir.clone(),
282            ));
283
284            let service = Arc::new(
285                crate::nostr_mirror::BackgroundNostrMirror::new(
286                    crate::nostr_mirror::NostrMirrorConfig {
287                        relays: active_relays.clone(),
288                        publish_relays: Self::mirror_publish_relays(
289                            &active_relays,
290                            &config.server.bind_address,
291                        ),
292                        max_follow_distance: config.nostr.social_graph_crawl_depth,
293                        overmute_threshold: config.nostr.overmute_threshold,
294                        require_negentropy: config.nostr.negentropy_only,
295                        kinds: config.nostr.mirror_kinds.clone(),
296                        history_sync_author_chunk_size: config
297                            .nostr
298                            .history_sync_author_chunk_size
299                            .max(1),
300                        history_sync_per_author_event_limit: config
301                            .nostr
302                            .history_sync_per_author_event_limit
303                            .max(1),
304                        missing_profile_backfill_batch_size: config
305                            .nostr
306                            .history_sync_author_chunk_size
307                            .max(1),
308                        history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
309                        ..crate::nostr_mirror::NostrMirrorConfig::default()
310                    },
311                    self.store.clone(),
312                    self.graph_store_concrete.clone(),
313                    Some(
314                        nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
315                            .context("Failed to parse keys for background nostr mirror")?,
316                    ),
317                )
318                .await
319                .context("Failed to create background nostr mirror")?,
320            );
321            let service_for_task = service.clone();
322            let join = tokio::task::spawn_blocking(move || {
323                let runtime = tokio::runtime::Builder::new_current_thread()
324                    .enable_all()
325                    .build()
326                    .expect("build background nostr mirror runtime");
327                runtime.block_on(async {
328                    if let Err(err) = service_for_task.run().await {
329                        tracing::error!("Background nostr mirror error: {:#}", err);
330                    }
331                });
332            });
333            runtime.mirror = Some(BackgroundMirrorRuntime {
334                service,
335                join: Some(join),
336            });
337        }
338
339        let has_pinned_refs = self
340            .store
341            .list_pinned_refs()
342            .map(|refs| !refs.is_empty())
343            .unwrap_or(false);
344
345        if config.sync.enabled
346            && (config.sync.sync_own || config.sync.sync_followed || has_pinned_refs)
347            && !active_relays.is_empty()
348        {
349            let sync_config = crate::sync::SyncConfig {
350                sync_own: config.sync.sync_own,
351                sync_followed: config.sync.sync_followed,
352                relays: active_relays,
353                max_concurrent: config.sync.max_concurrent,
354                webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
355                blossom_timeout_ms: config.sync.blossom_timeout_ms,
356            };
357
358            let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
359                .context("Failed to parse keys for sync")?;
360            let service = Arc::new(
361                crate::sync::BackgroundSync::new(
362                    sync_config,
363                    self.store.clone(),
364                    sync_keys,
365                    self.webrtc_state.clone(),
366                )
367                .await
368                .context("Failed to create background sync service")?,
369            );
370            let contacts_file = self.data_dir.join("contacts.json");
371            let service_for_task = service.clone();
372            let join = tokio::spawn(async move {
373                if let Err(err) = service_for_task.run(contacts_file).await {
374                    tracing::error!("Background sync error: {}", err);
375                }
376            });
377            runtime.sync = Some(BackgroundSyncRuntime {
378                service,
379                join: Some(join),
380            });
381        }
382
383        Ok(runtime.status())
384    }
385}
386
387#[cfg(feature = "p2p")]
388pub struct EmbeddedPeerRouterController {
389    keys: Keys,
390    state: Arc<WebRTCState>,
391    store: Arc<dyn ContentStore>,
392    peer_classifier: PeerClassifier,
393    nostr_relay: Arc<NostrRelay>,
394    runtime: Mutex<Option<PeerRouterRuntime>>,
395}
396
397#[cfg(feature = "p2p")]
398impl EmbeddedPeerRouterController {
399    pub fn new(
400        keys: Keys,
401        state: Arc<WebRTCState>,
402        store: Arc<dyn ContentStore>,
403        peer_classifier: PeerClassifier,
404        nostr_relay: Arc<NostrRelay>,
405    ) -> Self {
406        Self {
407            keys,
408            state,
409            store,
410            peer_classifier,
411            nostr_relay,
412            runtime: Mutex::new(None),
413        }
414    }
415
416    pub fn state(&self) -> Arc<WebRTCState> {
417        self.state.clone()
418    }
419
420    pub async fn apply_config(&self, config: &Config) -> Result<bool> {
421        let mut runtime = self.runtime.lock().await;
422        if let Some(runtime_handle) = runtime.take() {
423            let _ = runtime_handle.shutdown.send(true);
424            let mut join = runtime_handle.join;
425            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
426                Ok(Ok(())) => {}
427                Ok(Err(err)) => {
428                    tracing::warn!("Peer router task ended with join error: {}", err);
429                }
430                Err(_) => {
431                    tracing::warn!("Timed out waiting for peer router shutdown");
432                    join.abort();
433                }
434            }
435        }
436
437        self.state.reset_runtime_state().await;
438
439        if !crate::p2p_common::peer_router_enabled(config) {
440            return Ok(false);
441        }
442
443        let webrtc_config = crate::p2p_common::default_webrtc_config(config);
444        let mut manager = if config.server.mode.hash_get_enabled() {
445            WebRTCManager::new_with_state_and_store_and_classifier(
446                self.keys.clone(),
447                webrtc_config,
448                self.state.clone(),
449                self.store.clone(),
450                self.peer_classifier.clone(),
451            )
452        } else {
453            let mut manager =
454                WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
455            manager.set_peer_classifier(self.peer_classifier.clone());
456            manager
457        };
458        manager
459            .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
460        let shutdown = manager.shutdown_signal();
461        let join = tokio::spawn(async move {
462            if let Err(err) = manager.run().await {
463                tracing::error!("Peer router error: {}", err);
464            }
465        });
466        *runtime = Some(PeerRouterRuntime { shutdown, join });
467        Ok(true)
468    }
469
470    pub async fn shutdown(&self) {
471        let mut runtime = self.runtime.lock().await;
472        let Some(runtime_handle) = runtime.take() else {
473            return;
474        };
475
476        let _ = runtime_handle.shutdown.send(true);
477        let mut join = runtime_handle.join;
478        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
479            Ok(Ok(())) => {}
480            Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
481            Err(_) => {
482                tracing::warn!("Timed out waiting for peer router shutdown");
483                join.abort();
484            }
485        }
486
487        self.state.reset_runtime_state().await;
488    }
489}
490
491pub struct EmbeddedDaemonController {
492    server_controller: Arc<EmbeddedServerController>,
493    #[cfg(feature = "p2p")]
494    peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
495    background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
496}
497
498impl EmbeddedDaemonController {
499    #[cfg(feature = "p2p")]
500    pub fn new(
501        server_controller: Arc<EmbeddedServerController>,
502        peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
503        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
504    ) -> Self {
505        Self {
506            server_controller,
507            #[cfg(feature = "p2p")]
508            peer_router_controller,
509            background_services_controller,
510        }
511    }
512
513    #[cfg(not(feature = "p2p"))]
514    pub fn new(
515        server_controller: Arc<EmbeddedServerController>,
516        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
517    ) -> Self {
518        Self {
519            server_controller,
520            background_services_controller,
521        }
522    }
523
524    pub async fn shutdown(&self) {
525        self.server_controller.shutdown().await;
526        if let Some(controller) = self.background_services_controller.as_ref() {
527            controller.shutdown().await;
528        }
529        #[cfg(feature = "p2p")]
530        if let Some(controller) = self.peer_router_controller.as_ref() {
531            controller.shutdown().await;
532        }
533    }
534}
535
536pub struct EmbeddedDaemonOptions {
537    pub config: Config,
538    pub data_dir: PathBuf,
539    pub config_dir: Option<PathBuf>,
540    pub bind_address: String,
541    pub relays: Option<Vec<String>>,
542    pub extra_routes: Option<Router<AppState>>,
543    pub cors: Option<CorsLayer>,
544}
545
546pub struct EmbeddedDaemonInfo {
547    pub addr: String,
548    pub port: u16,
549    pub npub: String,
550    pub store: Arc<HashtreeStore>,
551    pub daemon_controller: Arc<EmbeddedDaemonController>,
552    #[allow(dead_code)]
553    pub webrtc_state: Option<Arc<WebRTCState>>,
554    #[cfg(feature = "p2p")]
555    #[allow(dead_code)]
556    pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
557    #[allow(dead_code)]
558    pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
559}
560
561pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
562    let _ = rustls::crypto::ring::default_provider().install_default();
563
564    let mut config = opts.config;
565    if let Some(relays) = opts.relays {
566        config.nostr.relays = relays;
567        config.nostr.enabled = !config.nostr.relays.is_empty();
568    }
569
570    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
571    let nostr_db_max_bytes = config
572        .nostr
573        .db_max_size_gb
574        .saturating_mul(1024 * 1024 * 1024);
575    let spambox_db_max_bytes = config
576        .nostr
577        .spambox_max_size_gb
578        .saturating_mul(1024 * 1024 * 1024);
579
580    let store = Arc::new(HashtreeStore::with_options(
581        &opts.data_dir,
582        config.storage.s3.as_ref(),
583        max_size_bytes,
584    )?);
585
586    let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
587        ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
588    } else {
589        ensure_keys()?
590    };
591    let pk_bytes = pubkey_bytes(&keys);
592    let npub = keys
593        .public_key()
594        .to_bech32()
595        .context("Failed to encode npub")?;
596
597    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
598    allowed_pubkeys.insert(hex::encode(pk_bytes));
599    for npub_str in &config.nostr.allowed_npubs {
600        if let Ok(pk) = parse_npub(npub_str) {
601            allowed_pubkeys.insert(hex::encode(pk));
602        } else {
603            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
604        }
605    }
606
607    let graph_store = socialgraph::open_social_graph_store_with_storage(
608        &opts.data_dir,
609        store.store_arc(),
610        Some(nostr_db_max_bytes),
611    )
612    .context("Failed to initialize social graph store")?;
613    graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
614
615    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
616        parse_npub(root_npub).unwrap_or(pk_bytes)
617    } else {
618        pk_bytes
619    };
620    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
621    socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
622        .context("Failed to sync local social graph lists")?;
623    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
624
625    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
626        Arc::clone(&social_graph_store),
627        config.nostr.max_write_distance,
628        allowed_pubkeys.clone(),
629    ));
630
631    let nostr_relay_config = NostrRelayConfig {
632        spambox_db_max_bytes,
633        ..Default::default()
634    };
635    let mut public_event_pubkeys = HashSet::new();
636    public_event_pubkeys.insert(hex::encode(pk_bytes));
637    let nostr_relay = Arc::new(
638        NostrRelay::new(
639            Arc::clone(&social_graph_store),
640            opts.data_dir.clone(),
641            public_event_pubkeys,
642            Some(social_graph.clone()),
643            nostr_relay_config,
644        )
645        .context("Failed to initialize Nostr relay")?,
646    );
647
648    let crawler_spambox = if spambox_db_max_bytes == 0 {
649        None
650    } else {
651        let spam_dir = opts.data_dir.join("socialgraph_spambox");
652        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
653            Ok(store) => Some(store),
654            Err(err) => {
655                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
656                None
657            }
658        }
659    };
660    let crawler_spambox_backend = crawler_spambox
661        .clone()
662        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
663
664    #[cfg(feature = "p2p")]
665    let (webrtc_state, peer_router_controller): (
666        Option<Arc<WebRTCState>>,
667        Option<Arc<EmbeddedPeerRouterController>>,
668    ) = {
669        let router_config = crate::p2p_common::default_webrtc_config(&config);
670        let peer_classifier = crate::p2p_common::build_peer_classifier(
671            opts.data_dir.clone(),
672            Arc::clone(&social_graph_store),
673        );
674        let cashu_payment_client =
675            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
676                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
677                    Ok(client) => {
678                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
679                    }
680                    Err(err) => {
681                        tracing::warn!(
682                        "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
683                        err
684                    );
685                        None
686                    }
687                }
688            } else {
689                None
690            };
691        let cashu_mint_metadata =
692            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
693                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
694                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
695                    Ok(store) => Some(store),
696                    Err(err) => {
697                        tracing::warn!(
698                        "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
699                        err
700                    );
701                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
702                    }
703                }
704            } else {
705                None
706            };
707
708        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
709            router_config.request_selection_strategy,
710            router_config.request_fairness_enabled,
711            router_config.request_dispatch,
712            std::time::Duration::from_millis(router_config.message_timeout_ms),
713            crate::webrtc::CashuRoutingConfig::from(&config.cashu),
714            cashu_payment_client,
715            cashu_mint_metadata,
716        ));
717        let controller = Arc::new(EmbeddedPeerRouterController::new(
718            keys.clone(),
719            state.clone(),
720            Arc::clone(&store) as Arc<dyn ContentStore>,
721            peer_classifier,
722            nostr_relay.clone(),
723        ));
724        controller.apply_config(&config).await?;
725        (Some(state), Some(controller))
726    };
727
728    #[cfg(not(feature = "p2p"))]
729    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
730
731    let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
732        keys.clone(),
733        opts.data_dir.clone(),
734        Arc::clone(&store),
735        graph_store.clone(),
736        Arc::clone(&social_graph_store),
737        crawler_spambox_backend,
738        webrtc_state.clone(),
739    ));
740    background_services_controller.apply_config(&config).await?;
741
742    let upstream_blossom = config.blossom.all_read_servers();
743    let active_nostr_relays = config.nostr.active_relays();
744
745    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
746        .with_server_mode(config.server.mode)
747        .with_hash_get_enabled(config.server.mode.hash_get_enabled())
748        .with_allowed_pubkeys(allowed_pubkeys.clone())
749        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
750        .with_public_writes(config.server.public_writes)
751        .with_upstream_blossom(upstream_blossom)
752        .with_nostr_relay_urls(active_nostr_relays)
753        .with_social_graph(social_graph)
754        .with_socialgraph_snapshot(
755            Arc::clone(&social_graph_store),
756            social_graph_root_bytes,
757            config.server.socialgraph_snapshot_public,
758        )
759        .with_nostr_relay(nostr_relay.clone());
760
761    if let Some(ref state) = webrtc_state {
762        server = server.with_webrtc_peers(state.clone());
763    }
764
765    if let Some(extra) = opts.extra_routes {
766        server = server.with_extra_routes(extra);
767    }
768    if let Some(cors) = opts.cors {
769        server = server.with_cors(cors);
770    }
771
772    spawn_background_eviction_task(
773        Arc::clone(&store),
774        BACKGROUND_EVICTION_INTERVAL,
775        "embedded daemon",
776    );
777
778    let listener = TcpListener::bind(&opts.bind_address).await?;
779    let local_addr = listener.local_addr()?;
780    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
781
782    let server_shutdown = Arc::new(Notify::new());
783    let server_shutdown_for_task = Arc::clone(&server_shutdown);
784    let server_join = tokio::spawn(async move {
785        if let Err(e) = server
786            .run_with_listener_until(listener, async move {
787                server_shutdown_for_task.notified().await;
788            })
789            .await
790        {
791            tracing::error!("Embedded daemon server error: {}", e);
792        }
793    });
794    let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
795    #[cfg(feature = "p2p")]
796    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
797        server_controller,
798        peer_router_controller.clone(),
799        Some(background_services_controller.clone()),
800    ));
801    #[cfg(not(feature = "p2p"))]
802    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
803        server_controller,
804        Some(background_services_controller.clone()),
805    ));
806
807    tracing::info!(
808        "Embedded daemon started on {}, identity {}",
809        actual_addr,
810        npub
811    );
812
813    Ok(EmbeddedDaemonInfo {
814        addr: actual_addr,
815        port: local_addr.port(),
816        npub,
817        store,
818        daemon_controller,
819        webrtc_state,
820        #[cfg(feature = "p2p")]
821        peer_router_controller,
822        background_services_controller: Some(background_services_controller),
823    })
824}
825
826#[cfg(test)]
827mod tests {
828    use super::EmbeddedBackgroundServicesController;
829
830    #[test]
831    fn mirror_publish_relays_match_upstream_relays_only() {
832        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
833            &[
834                "wss://relay.example".to_string(),
835                "wss://relay.two".to_string(),
836            ],
837            "0.0.0.0:8080",
838        );
839        assert_eq!(
840            relays,
841            vec![
842                "wss://relay.example".to_string(),
843                "wss://relay.two".to_string(),
844            ]
845        );
846    }
847}