Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28    join: JoinHandle<()>,
29    peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33    service: Arc<crate::sync::BackgroundSync>,
34    join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38    fn drop(&mut self) {
39        self.service.shutdown();
40        if let Some(join) = self.join.take() {
41            join.abort();
42        }
43    }
44}
45
46struct BackgroundMirrorRuntime {
47    service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48    join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52    fn drop(&mut self) {
53        self.service.shutdown();
54        if let Some(join) = self.join.take() {
55            join.abort();
56        }
57    }
58}
59
60struct BackgroundServicesRuntime {
61    crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62    mirror: Option<BackgroundMirrorRuntime>,
63    sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67    fn drop(&mut self) {
68        if let Some(handles) = self.crawler.as_ref() {
69            let _ = handles.shutdown_tx.send(true);
70        }
71        if let Some(runtime) = self.mirror.as_ref() {
72            runtime.service.shutdown();
73        }
74        if let Some(runtime) = self.sync.as_ref() {
75            runtime.service.shutdown();
76        }
77    }
78}
79
80impl BackgroundServicesRuntime {
81    fn status(&self) -> EmbeddedBackgroundServicesStatus {
82        EmbeddedBackgroundServicesStatus {
83            crawler_active: self.crawler.is_some(),
84            mirror_active: self.mirror.is_some(),
85            sync_active: self.sync.is_some(),
86        }
87    }
88}
89
90struct EmbeddedServerRuntime {
91    shutdown: Arc<Notify>,
92    join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96    runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100    pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101        Self {
102            runtime: Mutex::new(Some(EmbeddedServerRuntime {
103                shutdown,
104                join: Some(join),
105            })),
106        }
107    }
108
109    pub async fn shutdown(&self) {
110        let mut runtime = self.runtime.lock().await;
111        let Some(mut runtime) = runtime.take() else {
112            return;
113        };
114
115        runtime.shutdown.notify_waiters();
116        if let Some(mut join) = runtime.join.take() {
117            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118                Ok(Ok(())) => {}
119                Ok(Err(err)) => {
120                    tracing::warn!("Embedded server task ended with join error: {}", err)
121                }
122                Err(_) => {
123                    tracing::warn!("Timed out waiting for embedded server shutdown");
124                    join.abort();
125                }
126            }
127        }
128    }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133    pub crawler_active: bool,
134    pub mirror_active: bool,
135    pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139    keys: Keys,
140    data_dir: PathBuf,
141    store: Arc<HashtreeStore>,
142    graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143    graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144    spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145    webrtc_state: Option<Arc<WebRTCState>>,
146    runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150    const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151        "wss://relay.primal.net",
152        "wss://nos.lol",
153        "wss://relay.nostr.band",
154        "wss://relay.snort.social",
155        "wss://temp.iris.to",
156        "wss://vault.iris.to",
157        "wss://relay.damus.io",
158    ];
159    const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
160        &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
161
162    fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
163        let mut seen = HashSet::new();
164        let active_relays = active_relays
165            .iter()
166            .filter(|relay| seen.insert((*relay).clone()))
167            .cloned()
168            .collect::<Vec<_>>();
169        if active_relays.is_empty() {
170            return Vec::new();
171        }
172        let filtered = active_relays
173            .iter()
174            .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
175            .cloned()
176            .collect::<Vec<_>>();
177        if filtered.is_empty() {
178            return active_relays;
179        }
180
181        let filtered_set = filtered.iter().cloned().collect::<HashSet<_>>();
182        let mut selected = Self::MIRROR_PUBLISH_RELAY_PRIORITY
183            .iter()
184            .filter(|relay| filtered_set.contains(**relay))
185            .map(|relay| (*relay).to_string())
186            .collect::<Vec<_>>();
187        let mut selected_set = selected.iter().cloned().collect::<HashSet<_>>();
188        for relay in filtered {
189            if selected_set.insert(relay.clone()) {
190                selected.push(relay);
191            }
192        }
193
194        selected
195    }
196
197    pub fn new(
198        keys: Keys,
199        data_dir: PathBuf,
200        store: Arc<HashtreeStore>,
201        graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
202        graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
203        spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
204        webrtc_state: Option<Arc<WebRTCState>>,
205    ) -> Self {
206        Self {
207            keys,
208            data_dir,
209            store,
210            graph_store_concrete,
211            graph_store,
212            spambox,
213            webrtc_state,
214            runtime: Mutex::new(BackgroundServicesRuntime {
215                crawler: None,
216                mirror: None,
217                sync: None,
218            }),
219        }
220    }
221
222    pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
223        self.runtime.lock().await.status()
224    }
225
226    pub async fn shutdown(&self) {
227        let mut runtime = self.runtime.lock().await;
228        Self::shutdown_crawler(&mut runtime.crawler).await;
229        Self::shutdown_mirror(&mut runtime.mirror).await;
230        Self::shutdown_sync(&mut runtime.sync).await;
231    }
232
233    async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
234        let Some(handles) = crawler.take() else {
235            return;
236        };
237
238        let _ = handles.shutdown_tx.send(true);
239
240        let mut crawl_handle = handles.crawl_handle;
241        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
242            Ok(Ok(())) => {}
243            Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
244            Err(_) => {
245                tracing::warn!("Timed out waiting for crawler task shutdown");
246                crawl_handle.abort();
247            }
248        }
249
250        let mut local_list_handle = handles.local_list_handle;
251        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
252        {
253            Ok(Ok(())) => {}
254            Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
255            Err(_) => {
256                tracing::warn!("Timed out waiting for local list task shutdown");
257                local_list_handle.abort();
258            }
259        }
260    }
261
262    async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
263        let Some(mut runtime) = sync.take() else {
264            return;
265        };
266
267        runtime.service.shutdown();
268        if let Some(mut join) = runtime.join.take() {
269            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
270                Ok(Ok(())) => {}
271                Ok(Err(err)) => {
272                    tracing::warn!("Background sync task ended with join error: {}", err)
273                }
274                Err(_) => {
275                    tracing::warn!("Timed out waiting for background sync shutdown");
276                    join.abort();
277                }
278            }
279        }
280    }
281
282    async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
283        let Some(mut runtime) = mirror.take() else {
284            return;
285        };
286
287        runtime.service.shutdown();
288        if let Some(mut join) = runtime.join.take() {
289            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
290                Ok(Ok(())) => {}
291                Ok(Err(err)) => {
292                    tracing::warn!("Background mirror task ended with join error: {}", err)
293                }
294                Err(_) => {
295                    tracing::warn!("Timed out waiting for background mirror shutdown");
296                    join.abort();
297                }
298            }
299        }
300    }
301
302    fn nostr_mirror_config(
303        config: &Config,
304        active_relays: &[String],
305    ) -> crate::nostr_mirror::NostrMirrorConfig {
306        crate::nostr_mirror::NostrMirrorConfig {
307            relays: active_relays.to_vec(),
308            publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
309            blossom_write_servers: config.blossom.all_write_servers(),
310            max_follow_distance: config
311                .nostr
312                .mirror_max_follow_distance
313                .unwrap_or(config.nostr.social_graph_crawl_depth),
314            overmute_threshold: config.nostr.overmute_threshold,
315            require_negentropy: config.nostr.negentropy_only,
316            kinds: config.nostr.mirror_kinds.clone(),
317            history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
318            history_sync_per_author_event_limit: config
319                .nostr
320                .history_sync_per_author_event_limit
321                .max(1),
322            missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
323            history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
324            full_text_note_history_follow_distance: config
325                .nostr
326                .full_text_note_history_follow_distance,
327            full_text_note_history_max_relay_pages: config
328                .nostr
329                .full_text_note_history_max_relay_pages,
330            ..crate::nostr_mirror::NostrMirrorConfig::default()
331        }
332    }
333
334    pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
335        let mut runtime = self.runtime.lock().await;
336
337        Self::shutdown_crawler(&mut runtime.crawler).await;
338        Self::shutdown_mirror(&mut runtime.mirror).await;
339        Self::shutdown_sync(&mut runtime.sync).await;
340
341        if !config.server.mode.background_services_enabled() {
342            return Ok(runtime.status());
343        }
344
345        let active_relays = config.nostr.active_relays();
346
347        if config.nostr.enabled
348            && config.nostr.social_graph_crawl_depth > 0
349            && !active_relays.is_empty()
350        {
351            runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
352                self.graph_store.clone(),
353                self.keys.clone(),
354                active_relays.clone(),
355                config.nostr.social_graph_crawl_depth,
356                self.spambox.clone(),
357                self.data_dir.clone(),
358            ));
359
360            let service = Arc::new(
361                crate::nostr_mirror::BackgroundNostrMirror::new(
362                    Self::nostr_mirror_config(config, &active_relays),
363                    self.store.clone(),
364                    self.graph_store_concrete.clone(),
365                    Some(
366                        nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
367                            .context("Failed to parse keys for background nostr mirror")?,
368                    ),
369                )
370                .await
371                .context("Failed to create background nostr mirror")?,
372            );
373            let service_for_task = service.clone();
374            let join = tokio::task::spawn_blocking(move || {
375                let runtime = tokio::runtime::Builder::new_current_thread()
376                    .enable_all()
377                    .build()
378                    .expect("build background nostr mirror runtime");
379                runtime.block_on(async {
380                    if let Err(err) = service_for_task.run().await {
381                        tracing::error!("Background nostr mirror error: {:#}", err);
382                    }
383                });
384            });
385            runtime.mirror = Some(BackgroundMirrorRuntime {
386                service,
387                join: Some(join),
388            });
389        }
390
391        let has_pinned_refs = self
392            .store
393            .list_pinned_refs()
394            .map(|refs| !refs.is_empty())
395            .unwrap_or(false);
396        let has_tracked_authors = self
397            .store
398            .list_tracked_authors()
399            .map(|authors| !authors.is_empty())
400            .unwrap_or(false);
401
402        if config.sync.enabled
403            && (config.sync.sync_own
404                || config.sync.sync_followed
405                || has_pinned_refs
406                || has_tracked_authors)
407            && !active_relays.is_empty()
408        {
409            let sync_config = crate::sync::SyncConfig {
410                sync_own: config.sync.sync_own,
411                sync_followed: config.sync.sync_followed,
412                relays: active_relays,
413                max_concurrent: config.sync.max_concurrent,
414                webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
415                blossom_timeout_ms: config.sync.blossom_timeout_ms,
416            };
417
418            let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
419                .context("Failed to parse keys for sync")?;
420            let service = Arc::new(
421                crate::sync::BackgroundSync::new(
422                    sync_config,
423                    self.store.clone(),
424                    sync_keys,
425                    self.webrtc_state.clone(),
426                )
427                .await
428                .context("Failed to create background sync service")?,
429            );
430            let contacts_file = self.data_dir.join("contacts.json");
431            let service_for_task = service.clone();
432            let join = tokio::spawn(async move {
433                if let Err(err) = service_for_task.run(contacts_file).await {
434                    tracing::error!("Background sync error: {}", err);
435                }
436            });
437            runtime.sync = Some(BackgroundSyncRuntime {
438                service,
439                join: Some(join),
440            });
441        }
442
443        Ok(runtime.status())
444    }
445}
446
447#[cfg(feature = "p2p")]
448pub struct EmbeddedPeerRouterController {
449    keys: Keys,
450    data_dir: PathBuf,
451    state: Arc<WebRTCState>,
452    store: Arc<dyn ContentStore>,
453    peer_classifier: PeerClassifier,
454    nostr_relay: Arc<NostrRelay>,
455    runtime: Mutex<Option<PeerRouterRuntime>>,
456}
457
458#[cfg(feature = "p2p")]
459impl EmbeddedPeerRouterController {
460    pub fn new(
461        keys: Keys,
462        data_dir: PathBuf,
463        state: Arc<WebRTCState>,
464        store: Arc<dyn ContentStore>,
465        peer_classifier: PeerClassifier,
466        nostr_relay: Arc<NostrRelay>,
467    ) -> Self {
468        Self {
469            keys,
470            data_dir,
471            state,
472            store,
473            peer_classifier,
474            nostr_relay,
475            runtime: Mutex::new(None),
476        }
477    }
478
479    pub fn state(&self) -> Arc<WebRTCState> {
480        self.state.clone()
481    }
482
483    pub async fn apply_config(&self, config: &Config) -> Result<bool> {
484        let mut runtime = self.runtime.lock().await;
485        if let Some(runtime_handle) = runtime.take() {
486            if let Err(err) =
487                crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
488            {
489                tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
490            }
491            let _ = runtime_handle.shutdown.send(true);
492            runtime_handle.peer_state_persist.abort();
493            let mut join = runtime_handle.join;
494            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
495                Ok(Ok(())) => {}
496                Ok(Err(err)) => {
497                    tracing::warn!("Peer router task ended with join error: {}", err);
498                }
499                Err(_) => {
500                    tracing::warn!("Timed out waiting for peer router shutdown");
501                    join.abort();
502                }
503            }
504        }
505
506        self.state.reset_runtime_state().await;
507        if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
508            tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
509        }
510
511        if !crate::p2p_common::peer_router_enabled(config) {
512            return Ok(false);
513        }
514
515        let webrtc_config = crate::p2p_common::default_webrtc_config(config);
516        let mut manager = if config.server.mode.hash_get_enabled() {
517            WebRTCManager::new_with_state_and_store_and_classifier(
518                self.keys.clone(),
519                webrtc_config,
520                self.state.clone(),
521                self.store.clone(),
522                self.peer_classifier.clone(),
523            )
524        } else {
525            let mut manager =
526                WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
527            manager.set_peer_classifier(self.peer_classifier.clone());
528            manager
529        };
530        manager
531            .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
532        let shutdown = manager.shutdown_signal();
533        let join = tokio::spawn(async move {
534            if let Err(err) = manager.run().await {
535                tracing::error!("Peer router error: {}", err);
536            }
537        });
538        let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
539            self.data_dir.clone(),
540            self.state.clone(),
541        );
542        *runtime = Some(PeerRouterRuntime {
543            shutdown,
544            join,
545            peer_state_persist,
546        });
547        Ok(true)
548    }
549
550    pub async fn shutdown(&self) {
551        let mut runtime = self.runtime.lock().await;
552        let Some(runtime_handle) = runtime.take() else {
553            return;
554        };
555
556        if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
557            tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
558        }
559        let _ = runtime_handle.shutdown.send(true);
560        runtime_handle.peer_state_persist.abort();
561        let mut join = runtime_handle.join;
562        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
563            Ok(Ok(())) => {}
564            Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
565            Err(_) => {
566                tracing::warn!("Timed out waiting for peer router shutdown");
567                join.abort();
568            }
569        }
570
571        self.state.reset_runtime_state().await;
572    }
573}
574
575pub struct EmbeddedDaemonController {
576    server_controller: Arc<EmbeddedServerController>,
577    #[cfg(feature = "p2p")]
578    peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
579    background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
580}
581
582impl EmbeddedDaemonController {
583    #[cfg(feature = "p2p")]
584    pub fn new(
585        server_controller: Arc<EmbeddedServerController>,
586        peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
587        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
588    ) -> Self {
589        Self {
590            server_controller,
591            #[cfg(feature = "p2p")]
592            peer_router_controller,
593            background_services_controller,
594        }
595    }
596
597    #[cfg(not(feature = "p2p"))]
598    pub fn new(
599        server_controller: Arc<EmbeddedServerController>,
600        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
601    ) -> Self {
602        Self {
603            server_controller,
604            background_services_controller,
605        }
606    }
607
608    pub async fn shutdown(&self) {
609        self.server_controller.shutdown().await;
610        if let Some(controller) = self.background_services_controller.as_ref() {
611            controller.shutdown().await;
612        }
613        #[cfg(feature = "p2p")]
614        if let Some(controller) = self.peer_router_controller.as_ref() {
615            controller.shutdown().await;
616        }
617    }
618}
619
620pub struct EmbeddedDaemonOptions {
621    pub config: Config,
622    pub data_dir: PathBuf,
623    pub config_dir: Option<PathBuf>,
624    pub bind_address: String,
625    pub relays: Option<Vec<String>>,
626    pub extra_routes: Option<Router<AppState>>,
627    pub cors: Option<CorsLayer>,
628}
629
630pub struct EmbeddedDaemonInfo {
631    pub addr: String,
632    pub port: u16,
633    pub npub: String,
634    pub store: Arc<HashtreeStore>,
635    pub daemon_controller: Arc<EmbeddedDaemonController>,
636    #[allow(dead_code)]
637    pub webrtc_state: Option<Arc<WebRTCState>>,
638    #[cfg(feature = "p2p")]
639    #[allow(dead_code)]
640    pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
641    #[allow(dead_code)]
642    pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
643}
644
645pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
646    let _ = rustls::crypto::ring::default_provider().install_default();
647
648    let mut config = opts.config;
649    config.server.bind_address = opts.bind_address.clone();
650    if let Some(relays) = opts.relays {
651        config.nostr.relays = relays;
652        config.nostr.enabled = !config.nostr.relays.is_empty();
653    }
654
655    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
656    let nostr_db_max_bytes = config
657        .nostr
658        .db_max_size_gb
659        .saturating_mul(1024 * 1024 * 1024);
660    let spambox_db_max_bytes = config
661        .nostr
662        .spambox_max_size_gb
663        .saturating_mul(1024 * 1024 * 1024);
664
665    let store = Arc::new(HashtreeStore::with_options(
666        &opts.data_dir,
667        config.storage.s3.as_ref(),
668        max_size_bytes,
669    )?);
670
671    let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
672        ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
673    } else {
674        ensure_keys()?
675    };
676    let pk_bytes = pubkey_bytes(&keys);
677    let npub = keys
678        .public_key()
679        .to_bech32()
680        .context("Failed to encode npub")?;
681
682    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
683    allowed_pubkeys.insert(hex::encode(pk_bytes));
684    for npub_str in &config.nostr.allowed_npubs {
685        if let Ok(pk) = parse_npub(npub_str) {
686            allowed_pubkeys.insert(hex::encode(pk));
687        } else {
688            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
689        }
690    }
691
692    let graph_store = socialgraph::open_social_graph_store_with_storage(
693        &opts.data_dir,
694        store.store_arc(),
695        Some(nostr_db_max_bytes),
696    )
697    .context("Failed to initialize social graph store")?;
698    graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
699
700    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
701        parse_npub(root_npub).unwrap_or(pk_bytes)
702    } else {
703        pk_bytes
704    };
705    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
706    socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
707        .context("Failed to sync local social graph lists")?;
708    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
709
710    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
711        Arc::clone(&social_graph_store),
712        config.nostr.max_write_distance,
713        allowed_pubkeys.clone(),
714    ));
715
716    let nostr_relay_config = NostrRelayConfig {
717        spambox_db_max_bytes,
718        ..Default::default()
719    };
720    let mut public_event_pubkeys = HashSet::new();
721    public_event_pubkeys.insert(hex::encode(pk_bytes));
722    let nostr_relay = Arc::new(
723        NostrRelay::new(
724            Arc::clone(&social_graph_store),
725            opts.data_dir.clone(),
726            public_event_pubkeys,
727            Some(social_graph.clone()),
728            nostr_relay_config,
729        )
730        .context("Failed to initialize Nostr relay")?,
731    );
732
733    let crawler_spambox = if spambox_db_max_bytes == 0 {
734        None
735    } else {
736        let spam_dir = opts.data_dir.join("socialgraph_spambox");
737        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
738            Ok(store) => Some(store),
739            Err(err) => {
740                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
741                None
742            }
743        }
744    };
745    let crawler_spambox_backend = crawler_spambox
746        .clone()
747        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
748
749    #[cfg(feature = "p2p")]
750    let (webrtc_state, peer_router_controller): (
751        Option<Arc<WebRTCState>>,
752        Option<Arc<EmbeddedPeerRouterController>>,
753    ) = {
754        let router_config = crate::p2p_common::default_webrtc_config(&config);
755        let peer_classifier = crate::p2p_common::build_peer_classifier(
756            opts.data_dir.clone(),
757            Arc::clone(&social_graph_store),
758        );
759        let cashu_payment_client =
760            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
761                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
762                    Ok(client) => {
763                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
764                    }
765                    Err(err) => {
766                        tracing::warn!(
767                        "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
768                        err
769                    );
770                        None
771                    }
772                }
773            } else {
774                None
775            };
776        let cashu_mint_metadata =
777            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
778                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
779                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
780                    Ok(store) => Some(store),
781                    Err(err) => {
782                        tracing::warn!(
783                        "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
784                        err
785                    );
786                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
787                    }
788                }
789            } else {
790                None
791            };
792
793        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
794            router_config.request_selection_strategy,
795            router_config.request_fairness_enabled,
796            router_config.request_dispatch,
797            std::time::Duration::from_millis(router_config.message_timeout_ms),
798            crate::webrtc::CashuRoutingConfig::from(&config.cashu),
799            cashu_payment_client,
800            cashu_mint_metadata,
801        ));
802        let controller = Arc::new(EmbeddedPeerRouterController::new(
803            keys.clone(),
804            opts.data_dir.clone(),
805            state.clone(),
806            Arc::clone(&store) as Arc<dyn ContentStore>,
807            peer_classifier,
808            nostr_relay.clone(),
809        ));
810        controller.apply_config(&config).await?;
811        (Some(state), Some(controller))
812    };
813
814    #[cfg(not(feature = "p2p"))]
815    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
816
817    let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
818        keys.clone(),
819        opts.data_dir.clone(),
820        Arc::clone(&store),
821        graph_store.clone(),
822        Arc::clone(&social_graph_store),
823        crawler_spambox_backend,
824        webrtc_state.clone(),
825    ));
826    background_services_controller.apply_config(&config).await?;
827
828    let upstream_blossom = config.blossom.all_read_servers();
829    let active_nostr_relays = config.nostr.active_relays();
830
831    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
832        .with_server_mode(config.server.mode)
833        .with_hash_get_enabled(config.server.mode.hash_get_enabled())
834        .with_allowed_pubkeys(allowed_pubkeys.clone())
835        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
836        .with_public_writes(config.server.public_writes)
837        .with_upstream_blossom(upstream_blossom)
838        .with_nostr_relay_urls(active_nostr_relays)
839        .with_social_graph(social_graph)
840        .with_socialgraph_snapshot(
841            Arc::clone(&social_graph_store),
842            social_graph_root_bytes,
843            config.server.socialgraph_snapshot_public,
844        )
845        .with_nostr_relay(nostr_relay.clone());
846
847    if crate::p2p_common::peer_router_enabled(&config) {
848        if let Some(ref state) = webrtc_state {
849            server = server.with_webrtc_peers(state.clone());
850        }
851    }
852
853    if let Some(extra) = opts.extra_routes {
854        server = server.with_extra_routes(extra);
855    }
856    if let Some(cors) = opts.cors {
857        server = server.with_cors(cors);
858    }
859
860    spawn_background_eviction_task(
861        Arc::clone(&store),
862        BACKGROUND_EVICTION_INTERVAL,
863        "embedded daemon",
864    );
865
866    let listener = TcpListener::bind(&opts.bind_address).await?;
867    let local_addr = listener.local_addr()?;
868    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
869
870    let server_shutdown = Arc::new(Notify::new());
871    let server_shutdown_for_task = Arc::clone(&server_shutdown);
872    let server_join = tokio::spawn(async move {
873        if let Err(e) = server
874            .run_with_listener_until(listener, async move {
875                server_shutdown_for_task.notified().await;
876            })
877            .await
878        {
879            tracing::error!("Embedded daemon server error: {}", e);
880        }
881    });
882    let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
883    #[cfg(feature = "p2p")]
884    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
885        server_controller,
886        peer_router_controller.clone(),
887        Some(background_services_controller.clone()),
888    ));
889    #[cfg(not(feature = "p2p"))]
890    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
891        server_controller,
892        Some(background_services_controller.clone()),
893    ));
894
895    tracing::info!(
896        "Embedded daemon started on {}, identity {}",
897        actual_addr,
898        npub
899    );
900
901    Ok(EmbeddedDaemonInfo {
902        addr: actual_addr,
903        port: local_addr.port(),
904        npub,
905        store,
906        daemon_controller,
907        webrtc_state,
908        #[cfg(feature = "p2p")]
909        peer_router_controller,
910        background_services_controller: Some(background_services_controller),
911    })
912}
913
914#[cfg(test)]
915mod tests {
916    use super::EmbeddedBackgroundServicesController;
917    use crate::config::Config;
918
919    #[test]
920    fn mirror_publish_relays_orders_known_root_publish_relays_first() {
921        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
922            &[
923                "wss://graph-relay.iris.to".to_string(),
924                "wss://relay.example".to_string(),
925                "wss://relay.primal.net".to_string(),
926                "wss://relay.damus.io".to_string(),
927                "wss://temp.iris.to".to_string(),
928                "wss://vault.iris.to".to_string(),
929                "wss://upload.iris.to/nostr".to_string(),
930            ],
931            "0.0.0.0:8080",
932        );
933        assert_eq!(
934            relays,
935            vec![
936                "wss://relay.primal.net".to_string(),
937                "wss://temp.iris.to".to_string(),
938                "wss://vault.iris.to".to_string(),
939                "wss://relay.damus.io".to_string(),
940                "wss://relay.example".to_string(),
941            ]
942        );
943    }
944
945    #[test]
946    fn mirror_publish_relays_filters_known_bad_publish_targets_when_no_preferred_remain() {
947        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
948            &[
949                "wss://graph-relay.iris.to".to_string(),
950                "wss://relay.snort.social".to_string(),
951                "wss://relay.nostr.band".to_string(),
952                "wss://upload.iris.to/nostr".to_string(),
953            ],
954            "0.0.0.0:8080",
955        );
956        assert_eq!(
957            relays,
958            vec![
959                "wss://relay.nostr.band".to_string(),
960                "wss://relay.snort.social".to_string(),
961            ]
962        );
963    }
964
965    #[test]
966    fn nostr_mirror_config_allows_disabling_full_note_paging() {
967        let mut config = Config::default();
968        config.nostr.full_text_note_history_max_relay_pages = 0;
969
970        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
971            &config,
972            &["wss://relay.example".to_string()],
973        );
974
975        assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
976
977        config.nostr.full_text_note_history_max_relay_pages = 64;
978        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
979            &config,
980            &["wss://relay.example".to_string()],
981        );
982
983        assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
984    }
985
986    #[test]
987    fn nostr_mirror_config_can_limit_mirror_distance_independently() {
988        let mut config = Config::default();
989        config.nostr.social_graph_crawl_depth = 6;
990        config.nostr.mirror_max_follow_distance = Some(2);
991
992        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
993            &config,
994            &["wss://relay.example".to_string()],
995        );
996
997        assert_eq!(mirror_config.max_follow_distance, 2);
998
999        config.nostr.mirror_max_follow_distance = None;
1000        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
1001            &config,
1002            &["wss://relay.example".to_string()],
1003        );
1004
1005        assert_eq!(mirror_config.max_follow_distance, 6);
1006    }
1007}