Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28    join: JoinHandle<()>,
29}
30
31struct BackgroundSyncRuntime {
32    service: Arc<crate::sync::BackgroundSync>,
33    join: Option<JoinHandle<()>>,
34}
35
36impl Drop for BackgroundSyncRuntime {
37    fn drop(&mut self) {
38        self.service.shutdown();
39        if let Some(join) = self.join.take() {
40            join.abort();
41        }
42    }
43}
44
45struct BackgroundMirrorRuntime {
46    service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
47    join: Option<JoinHandle<()>>,
48}
49
50impl Drop for BackgroundMirrorRuntime {
51    fn drop(&mut self) {
52        self.service.shutdown();
53        if let Some(join) = self.join.take() {
54            join.abort();
55        }
56    }
57}
58
59struct BackgroundServicesRuntime {
60    crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
61    mirror: Option<BackgroundMirrorRuntime>,
62    sync: Option<BackgroundSyncRuntime>,
63}
64
65impl Drop for BackgroundServicesRuntime {
66    fn drop(&mut self) {
67        if let Some(handles) = self.crawler.as_ref() {
68            let _ = handles.shutdown_tx.send(true);
69        }
70        if let Some(runtime) = self.mirror.as_ref() {
71            runtime.service.shutdown();
72        }
73        if let Some(runtime) = self.sync.as_ref() {
74            runtime.service.shutdown();
75        }
76    }
77}
78
79impl BackgroundServicesRuntime {
80    fn status(&self) -> EmbeddedBackgroundServicesStatus {
81        EmbeddedBackgroundServicesStatus {
82            crawler_active: self.crawler.is_some(),
83            mirror_active: self.mirror.is_some(),
84            sync_active: self.sync.is_some(),
85        }
86    }
87}
88
89struct EmbeddedServerRuntime {
90    shutdown: Arc<Notify>,
91    join: Option<JoinHandle<()>>,
92}
93
94pub struct EmbeddedServerController {
95    runtime: Mutex<Option<EmbeddedServerRuntime>>,
96}
97
98impl EmbeddedServerController {
99    pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
100        Self {
101            runtime: Mutex::new(Some(EmbeddedServerRuntime {
102                shutdown,
103                join: Some(join),
104            })),
105        }
106    }
107
108    pub async fn shutdown(&self) {
109        let mut runtime = self.runtime.lock().await;
110        let Some(mut runtime) = runtime.take() else {
111            return;
112        };
113
114        runtime.shutdown.notify_waiters();
115        if let Some(mut join) = runtime.join.take() {
116            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
117                Ok(Ok(())) => {}
118                Ok(Err(err)) => {
119                    tracing::warn!("Embedded server task ended with join error: {}", err)
120                }
121                Err(_) => {
122                    tracing::warn!("Timed out waiting for embedded server shutdown");
123                    join.abort();
124                }
125            }
126        }
127    }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub struct EmbeddedBackgroundServicesStatus {
132    pub crawler_active: bool,
133    pub mirror_active: bool,
134    pub sync_active: bool,
135}
136
137pub struct EmbeddedBackgroundServicesController {
138    keys: Keys,
139    data_dir: PathBuf,
140    store: Arc<HashtreeStore>,
141    graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
142    graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
143    spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
144    webrtc_state: Option<Arc<WebRTCState>>,
145    runtime: Mutex<BackgroundServicesRuntime>,
146}
147
148impl EmbeddedBackgroundServicesController {
149    const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
150        "wss://temp.iris.to",
151        "wss://vault.iris.to",
152        "wss://relay.damus.io",
153        "wss://relay.primal.net",
154    ];
155    const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
156        &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
157
158    fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
159        let mut seen = HashSet::new();
160        let active_relays = active_relays
161            .iter()
162            .filter(|relay| seen.insert((*relay).clone()))
163            .cloned()
164            .collect::<Vec<_>>();
165        if active_relays.is_empty() {
166            return Vec::new();
167        }
168        let active_relay_set = active_relays.iter().cloned().collect::<HashSet<_>>();
169
170        let preferred = Self::MIRROR_PUBLISH_RELAY_PRIORITY
171            .iter()
172            .filter(|relay| active_relay_set.contains(**relay))
173            .map(|relay| (*relay).to_string())
174            .collect::<Vec<_>>();
175        if !preferred.is_empty() {
176            return preferred;
177        }
178
179        let filtered = active_relays
180            .iter()
181            .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
182            .cloned()
183            .collect::<Vec<_>>();
184        if !filtered.is_empty() {
185            return filtered;
186        }
187
188        active_relays
189    }
190
191    pub fn new(
192        keys: Keys,
193        data_dir: PathBuf,
194        store: Arc<HashtreeStore>,
195        graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
196        graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
197        spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
198        webrtc_state: Option<Arc<WebRTCState>>,
199    ) -> Self {
200        Self {
201            keys,
202            data_dir,
203            store,
204            graph_store_concrete,
205            graph_store,
206            spambox,
207            webrtc_state,
208            runtime: Mutex::new(BackgroundServicesRuntime {
209                crawler: None,
210                mirror: None,
211                sync: None,
212            }),
213        }
214    }
215
216    pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
217        self.runtime.lock().await.status()
218    }
219
220    pub async fn shutdown(&self) {
221        let mut runtime = self.runtime.lock().await;
222        Self::shutdown_crawler(&mut runtime.crawler).await;
223        Self::shutdown_mirror(&mut runtime.mirror).await;
224        Self::shutdown_sync(&mut runtime.sync).await;
225    }
226
227    async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
228        let Some(handles) = crawler.take() else {
229            return;
230        };
231
232        let _ = handles.shutdown_tx.send(true);
233
234        let mut crawl_handle = handles.crawl_handle;
235        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
236            Ok(Ok(())) => {}
237            Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
238            Err(_) => {
239                tracing::warn!("Timed out waiting for crawler task shutdown");
240                crawl_handle.abort();
241            }
242        }
243
244        let mut local_list_handle = handles.local_list_handle;
245        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
246        {
247            Ok(Ok(())) => {}
248            Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
249            Err(_) => {
250                tracing::warn!("Timed out waiting for local list task shutdown");
251                local_list_handle.abort();
252            }
253        }
254    }
255
256    async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
257        let Some(mut runtime) = sync.take() else {
258            return;
259        };
260
261        runtime.service.shutdown();
262        if let Some(mut join) = runtime.join.take() {
263            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
264                Ok(Ok(())) => {}
265                Ok(Err(err)) => {
266                    tracing::warn!("Background sync task ended with join error: {}", err)
267                }
268                Err(_) => {
269                    tracing::warn!("Timed out waiting for background sync shutdown");
270                    join.abort();
271                }
272            }
273        }
274    }
275
276    async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
277        let Some(mut runtime) = mirror.take() else {
278            return;
279        };
280
281        runtime.service.shutdown();
282        if let Some(mut join) = runtime.join.take() {
283            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
284                Ok(Ok(())) => {}
285                Ok(Err(err)) => {
286                    tracing::warn!("Background mirror task ended with join error: {}", err)
287                }
288                Err(_) => {
289                    tracing::warn!("Timed out waiting for background mirror shutdown");
290                    join.abort();
291                }
292            }
293        }
294    }
295
296    pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
297        let mut runtime = self.runtime.lock().await;
298
299        Self::shutdown_crawler(&mut runtime.crawler).await;
300        Self::shutdown_mirror(&mut runtime.mirror).await;
301        Self::shutdown_sync(&mut runtime.sync).await;
302
303        if !config.server.mode.background_services_enabled() {
304            return Ok(runtime.status());
305        }
306
307        let active_relays = config.nostr.active_relays();
308
309        if config.nostr.enabled
310            && config.nostr.social_graph_crawl_depth > 0
311            && !active_relays.is_empty()
312        {
313            runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
314                self.graph_store.clone(),
315                self.keys.clone(),
316                active_relays.clone(),
317                config.nostr.social_graph_crawl_depth,
318                self.spambox.clone(),
319                self.data_dir.clone(),
320            ));
321
322            let service = Arc::new(
323                crate::nostr_mirror::BackgroundNostrMirror::new(
324                    crate::nostr_mirror::NostrMirrorConfig {
325                        relays: active_relays.clone(),
326                        publish_relays: Self::mirror_publish_relays(
327                            &active_relays,
328                            &config.server.bind_address,
329                        ),
330                        blossom_write_servers: config.blossom.all_write_servers(),
331                        max_follow_distance: config.nostr.social_graph_crawl_depth,
332                        overmute_threshold: config.nostr.overmute_threshold,
333                        require_negentropy: config.nostr.negentropy_only,
334                        kinds: config.nostr.mirror_kinds.clone(),
335                        history_sync_author_chunk_size: config
336                            .nostr
337                            .history_sync_author_chunk_size
338                            .max(1),
339                        history_sync_per_author_event_limit: config
340                            .nostr
341                            .history_sync_per_author_event_limit
342                            .max(1),
343                        missing_profile_backfill_batch_size: config
344                            .nostr
345                            .history_sync_author_chunk_size
346                            .max(1),
347                        history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
348                        ..crate::nostr_mirror::NostrMirrorConfig::default()
349                    },
350                    self.store.clone(),
351                    self.graph_store_concrete.clone(),
352                    Some(
353                        nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
354                            .context("Failed to parse keys for background nostr mirror")?,
355                    ),
356                )
357                .await
358                .context("Failed to create background nostr mirror")?,
359            );
360            let service_for_task = service.clone();
361            let join = tokio::task::spawn_blocking(move || {
362                let runtime = tokio::runtime::Builder::new_current_thread()
363                    .enable_all()
364                    .build()
365                    .expect("build background nostr mirror runtime");
366                runtime.block_on(async {
367                    if let Err(err) = service_for_task.run().await {
368                        tracing::error!("Background nostr mirror error: {:#}", err);
369                    }
370                });
371            });
372            runtime.mirror = Some(BackgroundMirrorRuntime {
373                service,
374                join: Some(join),
375            });
376        }
377
378        let has_pinned_refs = self
379            .store
380            .list_pinned_refs()
381            .map(|refs| !refs.is_empty())
382            .unwrap_or(false);
383        let has_tracked_authors = self
384            .store
385            .list_tracked_authors()
386            .map(|authors| !authors.is_empty())
387            .unwrap_or(false);
388
389        if config.sync.enabled
390            && (config.sync.sync_own
391                || config.sync.sync_followed
392                || has_pinned_refs
393                || has_tracked_authors)
394            && !active_relays.is_empty()
395        {
396            let sync_config = crate::sync::SyncConfig {
397                sync_own: config.sync.sync_own,
398                sync_followed: config.sync.sync_followed,
399                relays: active_relays,
400                max_concurrent: config.sync.max_concurrent,
401                webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
402                blossom_timeout_ms: config.sync.blossom_timeout_ms,
403            };
404
405            let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
406                .context("Failed to parse keys for sync")?;
407            let service = Arc::new(
408                crate::sync::BackgroundSync::new(
409                    sync_config,
410                    self.store.clone(),
411                    sync_keys,
412                    self.webrtc_state.clone(),
413                )
414                .await
415                .context("Failed to create background sync service")?,
416            );
417            let contacts_file = self.data_dir.join("contacts.json");
418            let service_for_task = service.clone();
419            let join = tokio::spawn(async move {
420                if let Err(err) = service_for_task.run(contacts_file).await {
421                    tracing::error!("Background sync error: {}", err);
422                }
423            });
424            runtime.sync = Some(BackgroundSyncRuntime {
425                service,
426                join: Some(join),
427            });
428        }
429
430        Ok(runtime.status())
431    }
432}
433
434#[cfg(feature = "p2p")]
435pub struct EmbeddedPeerRouterController {
436    keys: Keys,
437    state: Arc<WebRTCState>,
438    store: Arc<dyn ContentStore>,
439    peer_classifier: PeerClassifier,
440    nostr_relay: Arc<NostrRelay>,
441    runtime: Mutex<Option<PeerRouterRuntime>>,
442}
443
444#[cfg(feature = "p2p")]
445impl EmbeddedPeerRouterController {
446    pub fn new(
447        keys: Keys,
448        state: Arc<WebRTCState>,
449        store: Arc<dyn ContentStore>,
450        peer_classifier: PeerClassifier,
451        nostr_relay: Arc<NostrRelay>,
452    ) -> Self {
453        Self {
454            keys,
455            state,
456            store,
457            peer_classifier,
458            nostr_relay,
459            runtime: Mutex::new(None),
460        }
461    }
462
463    pub fn state(&self) -> Arc<WebRTCState> {
464        self.state.clone()
465    }
466
467    pub async fn apply_config(&self, config: &Config) -> Result<bool> {
468        let mut runtime = self.runtime.lock().await;
469        if let Some(runtime_handle) = runtime.take() {
470            let _ = runtime_handle.shutdown.send(true);
471            let mut join = runtime_handle.join;
472            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
473                Ok(Ok(())) => {}
474                Ok(Err(err)) => {
475                    tracing::warn!("Peer router task ended with join error: {}", err);
476                }
477                Err(_) => {
478                    tracing::warn!("Timed out waiting for peer router shutdown");
479                    join.abort();
480                }
481            }
482        }
483
484        self.state.reset_runtime_state().await;
485
486        if !crate::p2p_common::peer_router_enabled(config) {
487            return Ok(false);
488        }
489
490        let webrtc_config = crate::p2p_common::default_webrtc_config(config);
491        let mut manager = if config.server.mode.hash_get_enabled() {
492            WebRTCManager::new_with_state_and_store_and_classifier(
493                self.keys.clone(),
494                webrtc_config,
495                self.state.clone(),
496                self.store.clone(),
497                self.peer_classifier.clone(),
498            )
499        } else {
500            let mut manager =
501                WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
502            manager.set_peer_classifier(self.peer_classifier.clone());
503            manager
504        };
505        manager
506            .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
507        let shutdown = manager.shutdown_signal();
508        let join = tokio::spawn(async move {
509            if let Err(err) = manager.run().await {
510                tracing::error!("Peer router error: {}", err);
511            }
512        });
513        *runtime = Some(PeerRouterRuntime { shutdown, join });
514        Ok(true)
515    }
516
517    pub async fn shutdown(&self) {
518        let mut runtime = self.runtime.lock().await;
519        let Some(runtime_handle) = runtime.take() else {
520            return;
521        };
522
523        let _ = runtime_handle.shutdown.send(true);
524        let mut join = runtime_handle.join;
525        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
526            Ok(Ok(())) => {}
527            Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
528            Err(_) => {
529                tracing::warn!("Timed out waiting for peer router shutdown");
530                join.abort();
531            }
532        }
533
534        self.state.reset_runtime_state().await;
535    }
536}
537
538pub struct EmbeddedDaemonController {
539    server_controller: Arc<EmbeddedServerController>,
540    #[cfg(feature = "p2p")]
541    peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
542    background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
543}
544
545impl EmbeddedDaemonController {
546    #[cfg(feature = "p2p")]
547    pub fn new(
548        server_controller: Arc<EmbeddedServerController>,
549        peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
550        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
551    ) -> Self {
552        Self {
553            server_controller,
554            #[cfg(feature = "p2p")]
555            peer_router_controller,
556            background_services_controller,
557        }
558    }
559
560    #[cfg(not(feature = "p2p"))]
561    pub fn new(
562        server_controller: Arc<EmbeddedServerController>,
563        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
564    ) -> Self {
565        Self {
566            server_controller,
567            background_services_controller,
568        }
569    }
570
571    pub async fn shutdown(&self) {
572        self.server_controller.shutdown().await;
573        if let Some(controller) = self.background_services_controller.as_ref() {
574            controller.shutdown().await;
575        }
576        #[cfg(feature = "p2p")]
577        if let Some(controller) = self.peer_router_controller.as_ref() {
578            controller.shutdown().await;
579        }
580    }
581}
582
583pub struct EmbeddedDaemonOptions {
584    pub config: Config,
585    pub data_dir: PathBuf,
586    pub config_dir: Option<PathBuf>,
587    pub bind_address: String,
588    pub relays: Option<Vec<String>>,
589    pub extra_routes: Option<Router<AppState>>,
590    pub cors: Option<CorsLayer>,
591}
592
593pub struct EmbeddedDaemonInfo {
594    pub addr: String,
595    pub port: u16,
596    pub npub: String,
597    pub store: Arc<HashtreeStore>,
598    pub daemon_controller: Arc<EmbeddedDaemonController>,
599    #[allow(dead_code)]
600    pub webrtc_state: Option<Arc<WebRTCState>>,
601    #[cfg(feature = "p2p")]
602    #[allow(dead_code)]
603    pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
604    #[allow(dead_code)]
605    pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
606}
607
608pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
609    let _ = rustls::crypto::ring::default_provider().install_default();
610
611    let mut config = opts.config;
612    if let Some(relays) = opts.relays {
613        config.nostr.relays = relays;
614        config.nostr.enabled = !config.nostr.relays.is_empty();
615    }
616
617    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
618    let nostr_db_max_bytes = config
619        .nostr
620        .db_max_size_gb
621        .saturating_mul(1024 * 1024 * 1024);
622    let spambox_db_max_bytes = config
623        .nostr
624        .spambox_max_size_gb
625        .saturating_mul(1024 * 1024 * 1024);
626
627    let store = Arc::new(HashtreeStore::with_options(
628        &opts.data_dir,
629        config.storage.s3.as_ref(),
630        max_size_bytes,
631    )?);
632
633    let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
634        ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
635    } else {
636        ensure_keys()?
637    };
638    let pk_bytes = pubkey_bytes(&keys);
639    let npub = keys
640        .public_key()
641        .to_bech32()
642        .context("Failed to encode npub")?;
643
644    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
645    allowed_pubkeys.insert(hex::encode(pk_bytes));
646    for npub_str in &config.nostr.allowed_npubs {
647        if let Ok(pk) = parse_npub(npub_str) {
648            allowed_pubkeys.insert(hex::encode(pk));
649        } else {
650            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
651        }
652    }
653
654    let graph_store = socialgraph::open_social_graph_store_with_storage(
655        &opts.data_dir,
656        store.store_arc(),
657        Some(nostr_db_max_bytes),
658    )
659    .context("Failed to initialize social graph store")?;
660    graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
661
662    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
663        parse_npub(root_npub).unwrap_or(pk_bytes)
664    } else {
665        pk_bytes
666    };
667    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
668    socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
669        .context("Failed to sync local social graph lists")?;
670    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
671
672    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
673        Arc::clone(&social_graph_store),
674        config.nostr.max_write_distance,
675        allowed_pubkeys.clone(),
676    ));
677
678    let nostr_relay_config = NostrRelayConfig {
679        spambox_db_max_bytes,
680        ..Default::default()
681    };
682    let mut public_event_pubkeys = HashSet::new();
683    public_event_pubkeys.insert(hex::encode(pk_bytes));
684    let nostr_relay = Arc::new(
685        NostrRelay::new(
686            Arc::clone(&social_graph_store),
687            opts.data_dir.clone(),
688            public_event_pubkeys,
689            Some(social_graph.clone()),
690            nostr_relay_config,
691        )
692        .context("Failed to initialize Nostr relay")?,
693    );
694
695    let crawler_spambox = if spambox_db_max_bytes == 0 {
696        None
697    } else {
698        let spam_dir = opts.data_dir.join("socialgraph_spambox");
699        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
700            Ok(store) => Some(store),
701            Err(err) => {
702                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
703                None
704            }
705        }
706    };
707    let crawler_spambox_backend = crawler_spambox
708        .clone()
709        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
710
711    #[cfg(feature = "p2p")]
712    let (webrtc_state, peer_router_controller): (
713        Option<Arc<WebRTCState>>,
714        Option<Arc<EmbeddedPeerRouterController>>,
715    ) = {
716        let router_config = crate::p2p_common::default_webrtc_config(&config);
717        let peer_classifier = crate::p2p_common::build_peer_classifier(
718            opts.data_dir.clone(),
719            Arc::clone(&social_graph_store),
720        );
721        let cashu_payment_client =
722            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
723                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
724                    Ok(client) => {
725                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
726                    }
727                    Err(err) => {
728                        tracing::warn!(
729                        "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
730                        err
731                    );
732                        None
733                    }
734                }
735            } else {
736                None
737            };
738        let cashu_mint_metadata =
739            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
740                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
741                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
742                    Ok(store) => Some(store),
743                    Err(err) => {
744                        tracing::warn!(
745                        "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
746                        err
747                    );
748                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
749                    }
750                }
751            } else {
752                None
753            };
754
755        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
756            router_config.request_selection_strategy,
757            router_config.request_fairness_enabled,
758            router_config.request_dispatch,
759            std::time::Duration::from_millis(router_config.message_timeout_ms),
760            crate::webrtc::CashuRoutingConfig::from(&config.cashu),
761            cashu_payment_client,
762            cashu_mint_metadata,
763        ));
764        let controller = Arc::new(EmbeddedPeerRouterController::new(
765            keys.clone(),
766            state.clone(),
767            Arc::clone(&store) as Arc<dyn ContentStore>,
768            peer_classifier,
769            nostr_relay.clone(),
770        ));
771        controller.apply_config(&config).await?;
772        (Some(state), Some(controller))
773    };
774
775    #[cfg(not(feature = "p2p"))]
776    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
777
778    let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
779        keys.clone(),
780        opts.data_dir.clone(),
781        Arc::clone(&store),
782        graph_store.clone(),
783        Arc::clone(&social_graph_store),
784        crawler_spambox_backend,
785        webrtc_state.clone(),
786    ));
787    background_services_controller.apply_config(&config).await?;
788
789    let upstream_blossom = config.blossom.all_read_servers();
790    let active_nostr_relays = config.nostr.active_relays();
791
792    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
793        .with_server_mode(config.server.mode)
794        .with_hash_get_enabled(config.server.mode.hash_get_enabled())
795        .with_allowed_pubkeys(allowed_pubkeys.clone())
796        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
797        .with_public_writes(config.server.public_writes)
798        .with_upstream_blossom(upstream_blossom)
799        .with_nostr_relay_urls(active_nostr_relays)
800        .with_social_graph(social_graph)
801        .with_socialgraph_snapshot(
802            Arc::clone(&social_graph_store),
803            social_graph_root_bytes,
804            config.server.socialgraph_snapshot_public,
805        )
806        .with_nostr_relay(nostr_relay.clone());
807
808    if crate::p2p_common::peer_router_enabled(&config) {
809        if let Some(ref state) = webrtc_state {
810            server = server.with_webrtc_peers(state.clone());
811        }
812    }
813
814    if let Some(extra) = opts.extra_routes {
815        server = server.with_extra_routes(extra);
816    }
817    if let Some(cors) = opts.cors {
818        server = server.with_cors(cors);
819    }
820
821    spawn_background_eviction_task(
822        Arc::clone(&store),
823        BACKGROUND_EVICTION_INTERVAL,
824        "embedded daemon",
825    );
826
827    let listener = TcpListener::bind(&opts.bind_address).await?;
828    let local_addr = listener.local_addr()?;
829    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
830
831    let server_shutdown = Arc::new(Notify::new());
832    let server_shutdown_for_task = Arc::clone(&server_shutdown);
833    let server_join = tokio::spawn(async move {
834        if let Err(e) = server
835            .run_with_listener_until(listener, async move {
836                server_shutdown_for_task.notified().await;
837            })
838            .await
839        {
840            tracing::error!("Embedded daemon server error: {}", e);
841        }
842    });
843    let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
844    #[cfg(feature = "p2p")]
845    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
846        server_controller,
847        peer_router_controller.clone(),
848        Some(background_services_controller.clone()),
849    ));
850    #[cfg(not(feature = "p2p"))]
851    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
852        server_controller,
853        Some(background_services_controller.clone()),
854    ));
855
856    tracing::info!(
857        "Embedded daemon started on {}, identity {}",
858        actual_addr,
859        npub
860    );
861
862    Ok(EmbeddedDaemonInfo {
863        addr: actual_addr,
864        port: local_addr.port(),
865        npub,
866        store,
867        daemon_controller,
868        webrtc_state,
869        #[cfg(feature = "p2p")]
870        peer_router_controller,
871        background_services_controller: Some(background_services_controller),
872    })
873}
874
875#[cfg(test)]
876mod tests {
877    use super::EmbeddedBackgroundServicesController;
878
879    #[test]
880    fn mirror_publish_relays_prefers_known_root_publish_relays() {
881        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
882            &[
883                "wss://graph-relay.iris.to".to_string(),
884                "wss://relay.example".to_string(),
885                "wss://relay.damus.io".to_string(),
886                "wss://temp.iris.to".to_string(),
887                "wss://vault.iris.to".to_string(),
888                "wss://upload.iris.to/nostr".to_string(),
889            ],
890            "0.0.0.0:8080",
891        );
892        assert_eq!(
893            relays,
894            vec![
895                "wss://temp.iris.to".to_string(),
896                "wss://vault.iris.to".to_string(),
897                "wss://relay.damus.io".to_string(),
898            ]
899        );
900    }
901
902    #[test]
903    fn mirror_publish_relays_filters_known_bad_publish_targets_when_no_preferred_remain() {
904        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
905            &[
906                "wss://graph-relay.iris.to".to_string(),
907                "wss://relay.snort.social".to_string(),
908                "wss://relay.nostr.band".to_string(),
909                "wss://upload.iris.to/nostr".to_string(),
910            ],
911            "0.0.0.0:8080",
912        );
913        assert_eq!(
914            relays,
915            vec![
916                "wss://relay.snort.social".to_string(),
917                "wss://relay.nostr.band".to_string(),
918            ]
919        );
920    }
921}