Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28    join: JoinHandle<()>,
29    peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33    service: Arc<crate::sync::BackgroundSync>,
34    join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38    fn drop(&mut self) {
39        self.service.shutdown();
40        if let Some(join) = self.join.take() {
41            join.abort();
42        }
43    }
44}
45
46struct BackgroundMirrorRuntime {
47    service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48    join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52    fn drop(&mut self) {
53        self.service.shutdown();
54        if let Some(join) = self.join.take() {
55            join.abort();
56        }
57    }
58}
59
60struct BackgroundServicesRuntime {
61    crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62    mirror: Option<BackgroundMirrorRuntime>,
63    sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67    fn drop(&mut self) {
68        if let Some(handles) = self.crawler.as_ref() {
69            let _ = handles.shutdown_tx.send(true);
70        }
71        if let Some(runtime) = self.mirror.as_ref() {
72            runtime.service.shutdown();
73        }
74        if let Some(runtime) = self.sync.as_ref() {
75            runtime.service.shutdown();
76        }
77    }
78}
79
80impl BackgroundServicesRuntime {
81    fn status(&self) -> EmbeddedBackgroundServicesStatus {
82        EmbeddedBackgroundServicesStatus {
83            crawler_active: self.crawler.is_some(),
84            mirror_active: self.mirror.is_some(),
85            sync_active: self.sync.is_some(),
86        }
87    }
88}
89
90struct EmbeddedServerRuntime {
91    shutdown: Arc<Notify>,
92    join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96    runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100    pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101        Self {
102            runtime: Mutex::new(Some(EmbeddedServerRuntime {
103                shutdown,
104                join: Some(join),
105            })),
106        }
107    }
108
109    pub async fn shutdown(&self) {
110        let mut runtime = self.runtime.lock().await;
111        let Some(mut runtime) = runtime.take() else {
112            return;
113        };
114
115        runtime.shutdown.notify_waiters();
116        if let Some(mut join) = runtime.join.take() {
117            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118                Ok(Ok(())) => {}
119                Ok(Err(err)) => {
120                    tracing::warn!("Embedded server task ended with join error: {}", err)
121                }
122                Err(_) => {
123                    tracing::warn!("Timed out waiting for embedded server shutdown");
124                    join.abort();
125                }
126            }
127        }
128    }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133    pub crawler_active: bool,
134    pub mirror_active: bool,
135    pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139    keys: Keys,
140    data_dir: PathBuf,
141    store: Arc<HashtreeStore>,
142    graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143    graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144    spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145    webrtc_state: Option<Arc<WebRTCState>>,
146    runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150    const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151        "wss://nos.lol",
152        "wss://temp.iris.to",
153        "wss://vault.iris.to",
154        "wss://relay.damus.io",
155    ];
156    const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
157        &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
158
159    fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
160        let mut seen = HashSet::new();
161        let active_relays = active_relays
162            .iter()
163            .filter(|relay| seen.insert((*relay).clone()))
164            .cloned()
165            .collect::<Vec<_>>();
166        if active_relays.is_empty() {
167            return Vec::new();
168        }
169        let filtered = active_relays
170            .iter()
171            .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
172            .cloned()
173            .collect::<Vec<_>>();
174        if filtered.is_empty() {
175            return active_relays;
176        }
177
178        let mut selected = Vec::new();
179        let mut selected_set = HashSet::new();
180        for relay in Self::MIRROR_PUBLISH_RELAY_PRIORITY {
181            if filtered.iter().any(|active| active == relay) {
182                selected.push((*relay).to_string());
183                selected_set.insert((*relay).to_string());
184            }
185        }
186        for relay in filtered {
187            if selected_set.insert(relay.clone()) {
188                selected.push(relay);
189            }
190        }
191
192        selected
193    }
194
195    pub fn new(
196        keys: Keys,
197        data_dir: PathBuf,
198        store: Arc<HashtreeStore>,
199        graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
200        graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
201        spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
202        webrtc_state: Option<Arc<WebRTCState>>,
203    ) -> Self {
204        Self {
205            keys,
206            data_dir,
207            store,
208            graph_store_concrete,
209            graph_store,
210            spambox,
211            webrtc_state,
212            runtime: Mutex::new(BackgroundServicesRuntime {
213                crawler: None,
214                mirror: None,
215                sync: None,
216            }),
217        }
218    }
219
220    pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
221        self.runtime.lock().await.status()
222    }
223
224    pub async fn shutdown(&self) {
225        let mut runtime = self.runtime.lock().await;
226        Self::shutdown_crawler(&mut runtime.crawler).await;
227        Self::shutdown_mirror(&mut runtime.mirror).await;
228        Self::shutdown_sync(&mut runtime.sync).await;
229    }
230
231    async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
232        let Some(handles) = crawler.take() else {
233            return;
234        };
235
236        let _ = handles.shutdown_tx.send(true);
237
238        let mut crawl_handle = handles.crawl_handle;
239        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
240            Ok(Ok(())) => {}
241            Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
242            Err(_) => {
243                tracing::warn!("Timed out waiting for crawler task shutdown");
244                crawl_handle.abort();
245            }
246        }
247
248        let mut local_list_handle = handles.local_list_handle;
249        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
250        {
251            Ok(Ok(())) => {}
252            Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
253            Err(_) => {
254                tracing::warn!("Timed out waiting for local list task shutdown");
255                local_list_handle.abort();
256            }
257        }
258    }
259
260    async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
261        let Some(mut runtime) = sync.take() else {
262            return;
263        };
264
265        runtime.service.shutdown();
266        if let Some(mut join) = runtime.join.take() {
267            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
268                Ok(Ok(())) => {}
269                Ok(Err(err)) => {
270                    tracing::warn!("Background sync task ended with join error: {}", err)
271                }
272                Err(_) => {
273                    tracing::warn!("Timed out waiting for background sync shutdown");
274                    join.abort();
275                }
276            }
277        }
278    }
279
280    async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
281        let Some(mut runtime) = mirror.take() else {
282            return;
283        };
284
285        runtime.service.shutdown();
286        if let Some(mut join) = runtime.join.take() {
287            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
288                Ok(Ok(())) => {}
289                Ok(Err(err)) => {
290                    tracing::warn!("Background mirror task ended with join error: {}", err)
291                }
292                Err(_) => {
293                    tracing::warn!("Timed out waiting for background mirror shutdown");
294                    join.abort();
295                }
296            }
297        }
298    }
299
300    fn nostr_mirror_config(
301        config: &Config,
302        active_relays: &[String],
303    ) -> crate::nostr_mirror::NostrMirrorConfig {
304        crate::nostr_mirror::NostrMirrorConfig {
305            relays: active_relays.to_vec(),
306            publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
307            blossom_write_servers: config.blossom.all_write_servers(),
308            max_follow_distance: config
309                .nostr
310                .mirror_max_follow_distance
311                .unwrap_or(config.nostr.social_graph_crawl_depth),
312            overmute_threshold: config.nostr.overmute_threshold,
313            require_negentropy: config.nostr.negentropy_only,
314            kinds: config.nostr.mirror_kinds.clone(),
315            history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
316            history_sync_per_author_event_limit: config
317                .nostr
318                .history_sync_per_author_event_limit
319                .max(1),
320            missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
321            history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
322            full_text_note_history_follow_distance: config
323                .nostr
324                .full_text_note_history_follow_distance,
325            full_text_note_history_max_relay_pages: config
326                .nostr
327                .full_text_note_history_max_relay_pages,
328            ..crate::nostr_mirror::NostrMirrorConfig::default()
329        }
330    }
331
332    pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
333        let mut runtime = self.runtime.lock().await;
334
335        Self::shutdown_crawler(&mut runtime.crawler).await;
336        Self::shutdown_mirror(&mut runtime.mirror).await;
337        Self::shutdown_sync(&mut runtime.sync).await;
338
339        if !config.server.mode.background_services_enabled() {
340            return Ok(runtime.status());
341        }
342
343        let active_relays = config.nostr.active_relays();
344
345        if config.nostr.enabled
346            && config.nostr.social_graph_crawl_depth > 0
347            && !active_relays.is_empty()
348        {
349            runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
350                self.graph_store.clone(),
351                self.keys.clone(),
352                active_relays.clone(),
353                config.nostr.social_graph_crawl_depth,
354                self.spambox.clone(),
355                self.data_dir.clone(),
356            ));
357
358            let service = Arc::new(
359                crate::nostr_mirror::BackgroundNostrMirror::new(
360                    Self::nostr_mirror_config(config, &active_relays),
361                    self.store.clone(),
362                    self.graph_store_concrete.clone(),
363                    Some(
364                        nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
365                            .context("Failed to parse keys for background nostr mirror")?,
366                    ),
367                )
368                .await
369                .context("Failed to create background nostr mirror")?,
370            );
371            let service_for_task = service.clone();
372            let join = tokio::task::spawn_blocking(move || {
373                let runtime = tokio::runtime::Builder::new_current_thread()
374                    .enable_all()
375                    .build()
376                    .expect("build background nostr mirror runtime");
377                runtime.block_on(async {
378                    if let Err(err) = service_for_task.run().await {
379                        tracing::error!("Background nostr mirror error: {:#}", err);
380                    }
381                });
382            });
383            runtime.mirror = Some(BackgroundMirrorRuntime {
384                service,
385                join: Some(join),
386            });
387        }
388
389        if config.sync.enabled && !active_relays.is_empty() {
390            let has_pinned_refs = self
391                .store
392                .list_pinned_refs()
393                .map(|refs| !refs.is_empty())
394                .unwrap_or(false);
395            let has_tracked_authors = self
396                .store
397                .list_tracked_authors()
398                .map(|authors| !authors.is_empty())
399                .unwrap_or(false);
400            let should_sync = config.sync.sync_own
401                || config.sync.sync_followed
402                || has_pinned_refs
403                || has_tracked_authors;
404            if !should_sync {
405                return Ok(runtime.status());
406            }
407
408            let sync_config = crate::sync::SyncConfig {
409                sync_own: config.sync.sync_own,
410                sync_followed: config.sync.sync_followed,
411                relays: active_relays,
412                max_concurrent: config.sync.max_concurrent,
413                webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
414                blossom_timeout_ms: config.sync.blossom_timeout_ms,
415            };
416
417            let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
418                .context("Failed to parse keys for sync")?;
419            let service = Arc::new(
420                crate::sync::BackgroundSync::new(
421                    sync_config,
422                    self.store.clone(),
423                    sync_keys,
424                    self.webrtc_state.clone(),
425                )
426                .await
427                .context("Failed to create background sync service")?,
428            );
429            let contacts_file = self.data_dir.join("contacts.json");
430            let service_for_task = service.clone();
431            let join = tokio::spawn(async move {
432                if let Err(err) = service_for_task.run(contacts_file).await {
433                    tracing::error!("Background sync error: {}", err);
434                }
435            });
436            runtime.sync = Some(BackgroundSyncRuntime {
437                service,
438                join: Some(join),
439            });
440        }
441
442        Ok(runtime.status())
443    }
444}
445
446#[cfg(feature = "p2p")]
447pub struct EmbeddedPeerRouterController {
448    keys: Keys,
449    data_dir: PathBuf,
450    state: Arc<WebRTCState>,
451    store: Arc<dyn ContentStore>,
452    peer_classifier: PeerClassifier,
453    nostr_relay: Arc<NostrRelay>,
454    runtime: Mutex<Option<PeerRouterRuntime>>,
455}
456
457#[cfg(feature = "p2p")]
458impl EmbeddedPeerRouterController {
459    pub fn new(
460        keys: Keys,
461        data_dir: PathBuf,
462        state: Arc<WebRTCState>,
463        store: Arc<dyn ContentStore>,
464        peer_classifier: PeerClassifier,
465        nostr_relay: Arc<NostrRelay>,
466    ) -> Self {
467        Self {
468            keys,
469            data_dir,
470            state,
471            store,
472            peer_classifier,
473            nostr_relay,
474            runtime: Mutex::new(None),
475        }
476    }
477
478    pub fn state(&self) -> Arc<WebRTCState> {
479        self.state.clone()
480    }
481
482    pub async fn apply_config(&self, config: &Config) -> Result<bool> {
483        let mut runtime = self.runtime.lock().await;
484        if let Some(runtime_handle) = runtime.take() {
485            if let Err(err) =
486                crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
487            {
488                tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
489            }
490            let _ = runtime_handle.shutdown.send(true);
491            runtime_handle.peer_state_persist.abort();
492            let mut join = runtime_handle.join;
493            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
494                Ok(Ok(())) => {}
495                Ok(Err(err)) => {
496                    tracing::warn!("Peer router task ended with join error: {}", err);
497                }
498                Err(_) => {
499                    tracing::warn!("Timed out waiting for peer router shutdown");
500                    join.abort();
501                }
502            }
503        }
504
505        self.state.reset_runtime_state().await;
506        if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
507            tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
508        }
509
510        if !crate::p2p_common::peer_router_enabled(config) {
511            return Ok(false);
512        }
513
514        let webrtc_config = crate::p2p_common::default_webrtc_config(config);
515        let mut manager = if config.server.mode.hash_get_enabled() {
516            WebRTCManager::new_with_state_and_store_and_classifier(
517                self.keys.clone(),
518                webrtc_config,
519                self.state.clone(),
520                self.store.clone(),
521                self.peer_classifier.clone(),
522            )
523        } else {
524            let mut manager =
525                WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
526            manager.set_peer_classifier(self.peer_classifier.clone());
527            manager
528        };
529        manager
530            .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
531        let shutdown = manager.shutdown_signal();
532        let join = tokio::spawn(async move {
533            if let Err(err) = manager.run().await {
534                tracing::error!("Peer router error: {}", err);
535            }
536        });
537        let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
538            self.data_dir.clone(),
539            self.state.clone(),
540        );
541        *runtime = Some(PeerRouterRuntime {
542            shutdown,
543            join,
544            peer_state_persist,
545        });
546        Ok(true)
547    }
548
549    pub async fn shutdown(&self) {
550        let mut runtime = self.runtime.lock().await;
551        let Some(runtime_handle) = runtime.take() else {
552            return;
553        };
554
555        if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
556            tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
557        }
558        let _ = runtime_handle.shutdown.send(true);
559        runtime_handle.peer_state_persist.abort();
560        let mut join = runtime_handle.join;
561        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
562            Ok(Ok(())) => {}
563            Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
564            Err(_) => {
565                tracing::warn!("Timed out waiting for peer router shutdown");
566                join.abort();
567            }
568        }
569
570        self.state.reset_runtime_state().await;
571    }
572}
573
574pub struct EmbeddedDaemonController {
575    server_controller: Arc<EmbeddedServerController>,
576    fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
577    #[cfg(feature = "p2p")]
578    peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
579    background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
580}
581
582impl EmbeddedDaemonController {
583    #[cfg(feature = "p2p")]
584    pub fn new(
585        server_controller: Arc<EmbeddedServerController>,
586        fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
587        peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
588        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
589    ) -> Self {
590        Self {
591            server_controller,
592            fips_handle,
593            #[cfg(feature = "p2p")]
594            peer_router_controller,
595            background_services_controller,
596        }
597    }
598
599    #[cfg(not(feature = "p2p"))]
600    pub fn new(
601        server_controller: Arc<EmbeddedServerController>,
602        fips_handle: Option<Arc<crate::fips_transport::DaemonFipsHandle>>,
603        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
604    ) -> Self {
605        Self {
606            server_controller,
607            fips_handle,
608            background_services_controller,
609        }
610    }
611
612    pub async fn shutdown(&self) {
613        self.server_controller.shutdown().await;
614        if let Some(handle) = self.fips_handle.as_ref() {
615            handle.shutdown();
616        }
617        if let Some(controller) = self.background_services_controller.as_ref() {
618            controller.shutdown().await;
619        }
620        #[cfg(feature = "p2p")]
621        if let Some(controller) = self.peer_router_controller.as_ref() {
622            controller.shutdown().await;
623        }
624    }
625}
626
627pub struct EmbeddedDaemonOptions {
628    pub config: Config,
629    pub data_dir: PathBuf,
630    pub config_dir: Option<PathBuf>,
631    pub bind_address: String,
632    pub relays: Option<Vec<String>>,
633    pub extra_routes: Option<Router<AppState>>,
634    pub cors: Option<CorsLayer>,
635}
636
637pub struct EmbeddedDaemonInfo {
638    pub addr: String,
639    pub port: u16,
640    pub npub: String,
641    pub store: Arc<HashtreeStore>,
642    pub daemon_controller: Arc<EmbeddedDaemonController>,
643    #[allow(dead_code)]
644    pub webrtc_state: Option<Arc<WebRTCState>>,
645    #[cfg(feature = "p2p")]
646    #[allow(dead_code)]
647    pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
648    #[allow(dead_code)]
649    pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
650}
651
652pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
653    let _ = rustls::crypto::ring::default_provider().install_default();
654
655    let mut config = opts.config;
656    config.server.bind_address = opts.bind_address.clone();
657    if let Some(relays) = opts.relays {
658        config.nostr.relays = relays;
659        config.nostr.enabled = !config.nostr.relays.is_empty();
660    }
661
662    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
663    let nostr_db_max_bytes = config
664        .nostr
665        .db_max_size_gb
666        .saturating_mul(1024 * 1024 * 1024);
667    let spambox_db_max_bytes = config
668        .nostr
669        .spambox_max_size_gb
670        .saturating_mul(1024 * 1024 * 1024);
671
672    let store = Arc::new(HashtreeStore::with_options(
673        &opts.data_dir,
674        config.storage.s3.as_ref(),
675        max_size_bytes,
676    )?);
677
678    let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
679        ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
680    } else {
681        ensure_keys()?
682    };
683    let pk_bytes = pubkey_bytes(&keys);
684    let npub = keys
685        .public_key()
686        .to_bech32()
687        .context("Failed to encode npub")?;
688
689    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
690    allowed_pubkeys.insert(hex::encode(pk_bytes));
691    for npub_str in &config.nostr.allowed_npubs {
692        if let Ok(pk) = parse_npub(npub_str) {
693            allowed_pubkeys.insert(hex::encode(pk));
694        } else {
695            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
696        }
697    }
698
699    let graph_store = socialgraph::open_social_graph_store_with_storage(
700        &opts.data_dir,
701        store.store_arc(),
702        Some(nostr_db_max_bytes),
703    )
704    .context("Failed to initialize social graph store")?;
705    graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
706
707    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
708        parse_npub(root_npub).unwrap_or(pk_bytes)
709    } else {
710        pk_bytes
711    };
712    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
713    socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
714        .context("Failed to sync local social graph lists")?;
715    let fips_peer_ids = crate::fips_transport::fips_peer_ids_from_pubkeys(
716        socialgraph::get_follows(graph_store.as_ref(), &pk_bytes),
717    );
718    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
719
720    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
721        Arc::clone(&social_graph_store),
722        config.nostr.max_write_distance,
723        allowed_pubkeys.clone(),
724    ));
725
726    let nostr_relay_config = NostrRelayConfig {
727        spambox_db_max_bytes,
728        ..Default::default()
729    };
730    let nostr_relay = if config.nostr.enabled {
731        let mut public_event_pubkeys = HashSet::new();
732        public_event_pubkeys.insert(hex::encode(pk_bytes));
733        Some(Arc::new(
734            NostrRelay::new(
735                Arc::clone(&social_graph_store),
736                opts.data_dir.clone(),
737                public_event_pubkeys,
738                Some(social_graph.clone()),
739                nostr_relay_config,
740            )
741            .context("Failed to initialize Nostr relay")?,
742        ))
743    } else {
744        None
745    };
746
747    let crawler_spambox = if config.nostr.enabled && spambox_db_max_bytes != 0 {
748        let spam_dir = opts.data_dir.join("socialgraph_spambox");
749        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
750            Ok(store) => Some(store),
751            Err(err) => {
752                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
753                None
754            }
755        }
756    } else {
757        None
758    };
759    let crawler_spambox_backend = crawler_spambox
760        .clone()
761        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
762
763    #[cfg(feature = "p2p")]
764    let (webrtc_state, peer_router_controller): (
765        Option<Arc<WebRTCState>>,
766        Option<Arc<EmbeddedPeerRouterController>>,
767    ) = if let Some(nostr_relay) = nostr_relay.clone() {
768        let router_config = crate::p2p_common::default_webrtc_config(&config);
769        let peer_classifier = crate::p2p_common::build_peer_classifier(
770            opts.data_dir.clone(),
771            Arc::clone(&social_graph_store),
772        );
773        let cashu_payment_client =
774            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
775                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
776                    Ok(client) => {
777                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
778                    }
779                    Err(err) => {
780                        tracing::warn!(
781                        "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
782                        err
783                    );
784                        None
785                    }
786                }
787            } else {
788                None
789            };
790        let cashu_mint_metadata =
791            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
792                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
793                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
794                    Ok(store) => Some(store),
795                    Err(err) => {
796                        tracing::warn!(
797                        "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
798                        err
799                    );
800                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
801                    }
802                }
803            } else {
804                None
805            };
806
807        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
808            router_config.request_selection_strategy,
809            router_config.request_fairness_enabled,
810            router_config.request_dispatch,
811            std::time::Duration::from_millis(router_config.message_timeout_ms),
812            crate::webrtc::CashuRoutingConfig::from(&config.cashu),
813            cashu_payment_client,
814            cashu_mint_metadata,
815        ));
816        let controller = Arc::new(EmbeddedPeerRouterController::new(
817            keys.clone(),
818            opts.data_dir.clone(),
819            state.clone(),
820            Arc::clone(&store) as Arc<dyn ContentStore>,
821            peer_classifier,
822            nostr_relay.clone(),
823        ));
824        controller.apply_config(&config).await?;
825        (Some(state), Some(controller))
826    } else {
827        (None, None)
828    };
829
830    #[cfg(not(feature = "p2p"))]
831    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
832
833    let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
834        keys.clone(),
835        opts.data_dir.clone(),
836        Arc::clone(&store),
837        graph_store.clone(),
838        Arc::clone(&social_graph_store),
839        crawler_spambox_backend,
840        webrtc_state.clone(),
841    ));
842
843    let upstream_blossom = config.blossom.all_read_servers();
844    let active_nostr_relays = config.nostr.active_relays();
845    let fips_handle = crate::fips_transport::start_daemon_fips_transport(
846        &config,
847        &keys,
848        Arc::clone(&store),
849        fips_peer_ids,
850    )
851    .await?
852    .map(Arc::new);
853
854    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
855        .with_server_mode(config.server.mode)
856        .with_hash_get_enabled(config.server.mode.hash_get_enabled())
857        .with_fetch_from_fips_peers(config.server.fetch_from_fips_peers)
858        .with_allowed_pubkeys(allowed_pubkeys.clone())
859        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
860        .with_public_writes(config.server.public_writes)
861        .with_public_plaintext_reads(config.server.public_plaintext_reads)
862        .with_require_random_untrusted_ingest(config.blossom.require_random_untrusted_ingest)
863        .with_optimistic_blossom_uploads(config.blossom.optimistic_uploads)
864        .with_upstream_blossom(upstream_blossom)
865        .with_nostr_relay_urls(active_nostr_relays)
866        .with_social_graph(social_graph)
867        .with_socialgraph_snapshot(
868            Arc::clone(&social_graph_store),
869            social_graph_root_bytes,
870            config.server.socialgraph_snapshot_public,
871        );
872    if let Some(nostr_relay) = nostr_relay {
873        server = server.with_nostr_relay(nostr_relay);
874    }
875
876    if crate::p2p_common::peer_router_enabled(&config) {
877        if let Some(ref state) = webrtc_state {
878            server = server.with_webrtc_peers(state.clone());
879        }
880    }
881    if let Some(ref fips_handle) = fips_handle {
882        server = server.with_fips_transport(fips_handle.transport.clone());
883    }
884
885    if let Some(extra) = opts.extra_routes {
886        server = server.with_extra_routes(extra);
887    }
888    if let Some(cors) = opts.cors {
889        server = server.with_cors(cors);
890    }
891
892    spawn_background_eviction_task(
893        Arc::clone(&store),
894        BACKGROUND_EVICTION_INTERVAL,
895        "embedded daemon",
896    );
897
898    let listener = TcpListener::bind(&opts.bind_address).await?;
899    let local_addr = listener.local_addr()?;
900    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
901
902    let server_shutdown = Arc::new(Notify::new());
903    let server_shutdown_for_task = Arc::clone(&server_shutdown);
904    let server_join = tokio::spawn(async move {
905        if let Err(e) = server
906            .run_with_listener_until(listener, async move {
907                server_shutdown_for_task.notified().await;
908            })
909            .await
910        {
911            tracing::error!("Embedded daemon server error: {}", e);
912        }
913    });
914    let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
915    background_services_controller.apply_config(&config).await?;
916    #[cfg(feature = "p2p")]
917    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
918        server_controller,
919        fips_handle.clone(),
920        peer_router_controller.clone(),
921        Some(background_services_controller.clone()),
922    ));
923    #[cfg(not(feature = "p2p"))]
924    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
925        server_controller,
926        fips_handle.clone(),
927        Some(background_services_controller.clone()),
928    ));
929
930    tracing::info!(
931        "Embedded daemon started on {}, identity {}",
932        actual_addr,
933        npub
934    );
935
936    Ok(EmbeddedDaemonInfo {
937        addr: actual_addr,
938        port: local_addr.port(),
939        npub,
940        store,
941        daemon_controller,
942        webrtc_state,
943        #[cfg(feature = "p2p")]
944        peer_router_controller,
945        background_services_controller: Some(background_services_controller),
946    })
947}
948
949#[cfg(test)]
950mod tests {
951    use super::EmbeddedBackgroundServicesController;
952    use crate::config::Config;
953
954    #[test]
955    fn mirror_publish_relays_orders_known_root_publish_relays_first() {
956        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
957            &[
958                "wss://graph-relay.iris.to".to_string(),
959                "wss://relay.example".to_string(),
960                "wss://relay.primal.net".to_string(),
961                "wss://relay.damus.io".to_string(),
962                "wss://temp.iris.to".to_string(),
963                "wss://vault.iris.to".to_string(),
964                "wss://upload.iris.to/nostr".to_string(),
965            ],
966            "0.0.0.0:8080",
967        );
968        assert_eq!(
969            relays,
970            vec![
971                "wss://temp.iris.to".to_string(),
972                "wss://vault.iris.to".to_string(),
973                "wss://relay.damus.io".to_string(),
974                "wss://relay.example".to_string(),
975                "wss://relay.primal.net".to_string(),
976            ]
977        );
978    }
979
980    #[test]
981    fn mirror_publish_relays_do_not_add_non_active_publish_targets() {
982        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
983            &[
984                "wss://graph-relay.iris.to".to_string(),
985                "wss://relay.example".to_string(),
986            ],
987            "0.0.0.0:8080",
988        );
989        assert_eq!(relays, vec!["wss://relay.example".to_string()]);
990    }
991
992    #[test]
993    fn mirror_publish_relays_falls_back_to_active_relays_when_all_are_blocklisted() {
994        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
995            &[
996                "wss://graph-relay.iris.to".to_string(),
997                "wss://upload.iris.to/nostr".to_string(),
998            ],
999            "0.0.0.0:8080",
1000        );
1001        assert_eq!(
1002            relays,
1003            vec![
1004                "wss://graph-relay.iris.to".to_string(),
1005                "wss://upload.iris.to/nostr".to_string(),
1006            ]
1007        );
1008    }
1009
1010    #[test]
1011    fn nostr_mirror_config_allows_disabling_full_note_paging() {
1012        let mut config = Config::default();
1013        config.nostr.full_text_note_history_max_relay_pages = 0;
1014
1015        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1016            &config,
1017            &["wss://relay.example".to_string()],
1018        );
1019
1020        assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
1021
1022        config.nostr.full_text_note_history_max_relay_pages = 64;
1023        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1024            &config,
1025            &["wss://relay.example".to_string()],
1026        );
1027
1028        assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
1029    }
1030
1031    #[test]
1032    fn nostr_mirror_config_can_limit_mirror_distance_independently() {
1033        let mut config = Config::default();
1034        config.nostr.social_graph_crawl_depth = 6;
1035        config.nostr.mirror_max_follow_distance = Some(2);
1036
1037        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1038            &config,
1039            &["wss://relay.example".to_string()],
1040        );
1041
1042        assert_eq!(mirror_config.max_follow_distance, 2);
1043
1044        config.nostr.mirror_max_follow_distance = None;
1045        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1046            &config,
1047            &["wss://relay.example".to_string()],
1048        );
1049
1050        assert_eq!(mirror_config.max_follow_distance, 6);
1051    }
1052}