Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28    join: JoinHandle<()>,
29    peer_state_persist: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33    service: Arc<crate::sync::BackgroundSync>,
34    join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38    fn drop(&mut self) {
39        self.service.shutdown();
40        if let Some(join) = self.join.take() {
41            join.abort();
42        }
43    }
44}
45
46struct BackgroundMirrorRuntime {
47    service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48    join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52    fn drop(&mut self) {
53        self.service.shutdown();
54        if let Some(join) = self.join.take() {
55            join.abort();
56        }
57    }
58}
59
60struct BackgroundServicesRuntime {
61    crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62    mirror: Option<BackgroundMirrorRuntime>,
63    sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67    fn drop(&mut self) {
68        if let Some(handles) = self.crawler.as_ref() {
69            let _ = handles.shutdown_tx.send(true);
70        }
71        if let Some(runtime) = self.mirror.as_ref() {
72            runtime.service.shutdown();
73        }
74        if let Some(runtime) = self.sync.as_ref() {
75            runtime.service.shutdown();
76        }
77    }
78}
79
80impl BackgroundServicesRuntime {
81    fn status(&self) -> EmbeddedBackgroundServicesStatus {
82        EmbeddedBackgroundServicesStatus {
83            crawler_active: self.crawler.is_some(),
84            mirror_active: self.mirror.is_some(),
85            sync_active: self.sync.is_some(),
86        }
87    }
88}
89
90struct EmbeddedServerRuntime {
91    shutdown: Arc<Notify>,
92    join: Option<JoinHandle<()>>,
93}
94
95pub struct EmbeddedServerController {
96    runtime: Mutex<Option<EmbeddedServerRuntime>>,
97}
98
99impl EmbeddedServerController {
100    pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
101        Self {
102            runtime: Mutex::new(Some(EmbeddedServerRuntime {
103                shutdown,
104                join: Some(join),
105            })),
106        }
107    }
108
109    pub async fn shutdown(&self) {
110        let mut runtime = self.runtime.lock().await;
111        let Some(mut runtime) = runtime.take() else {
112            return;
113        };
114
115        runtime.shutdown.notify_waiters();
116        if let Some(mut join) = runtime.join.take() {
117            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
118                Ok(Ok(())) => {}
119                Ok(Err(err)) => {
120                    tracing::warn!("Embedded server task ended with join error: {}", err)
121                }
122                Err(_) => {
123                    tracing::warn!("Timed out waiting for embedded server shutdown");
124                    join.abort();
125                }
126            }
127        }
128    }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub struct EmbeddedBackgroundServicesStatus {
133    pub crawler_active: bool,
134    pub mirror_active: bool,
135    pub sync_active: bool,
136}
137
138pub struct EmbeddedBackgroundServicesController {
139    keys: Keys,
140    data_dir: PathBuf,
141    store: Arc<HashtreeStore>,
142    graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
143    graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
144    spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
145    webrtc_state: Option<Arc<WebRTCState>>,
146    runtime: Mutex<BackgroundServicesRuntime>,
147}
148
149impl EmbeddedBackgroundServicesController {
150    const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
151        "wss://temp.iris.to",
152        "wss://vault.iris.to",
153        "wss://relay.damus.io",
154        "wss://relay.primal.net",
155    ];
156    const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
157        &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
158
159    fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
160        let mut seen = HashSet::new();
161        let active_relays = active_relays
162            .iter()
163            .filter(|relay| seen.insert((*relay).clone()))
164            .cloned()
165            .collect::<Vec<_>>();
166        if active_relays.is_empty() {
167            return Vec::new();
168        }
169        let active_relay_set = active_relays.iter().cloned().collect::<HashSet<_>>();
170
171        let preferred = Self::MIRROR_PUBLISH_RELAY_PRIORITY
172            .iter()
173            .filter(|relay| active_relay_set.contains(**relay))
174            .map(|relay| (*relay).to_string())
175            .collect::<Vec<_>>();
176        if !preferred.is_empty() {
177            return preferred;
178        }
179
180        let filtered = active_relays
181            .iter()
182            .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
183            .cloned()
184            .collect::<Vec<_>>();
185        if !filtered.is_empty() {
186            return filtered;
187        }
188
189        active_relays
190    }
191
192    pub fn new(
193        keys: Keys,
194        data_dir: PathBuf,
195        store: Arc<HashtreeStore>,
196        graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
197        graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
198        spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
199        webrtc_state: Option<Arc<WebRTCState>>,
200    ) -> Self {
201        Self {
202            keys,
203            data_dir,
204            store,
205            graph_store_concrete,
206            graph_store,
207            spambox,
208            webrtc_state,
209            runtime: Mutex::new(BackgroundServicesRuntime {
210                crawler: None,
211                mirror: None,
212                sync: None,
213            }),
214        }
215    }
216
217    pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
218        self.runtime.lock().await.status()
219    }
220
221    pub async fn shutdown(&self) {
222        let mut runtime = self.runtime.lock().await;
223        Self::shutdown_crawler(&mut runtime.crawler).await;
224        Self::shutdown_mirror(&mut runtime.mirror).await;
225        Self::shutdown_sync(&mut runtime.sync).await;
226    }
227
228    async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
229        let Some(handles) = crawler.take() else {
230            return;
231        };
232
233        let _ = handles.shutdown_tx.send(true);
234
235        let mut crawl_handle = handles.crawl_handle;
236        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
237            Ok(Ok(())) => {}
238            Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
239            Err(_) => {
240                tracing::warn!("Timed out waiting for crawler task shutdown");
241                crawl_handle.abort();
242            }
243        }
244
245        let mut local_list_handle = handles.local_list_handle;
246        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
247        {
248            Ok(Ok(())) => {}
249            Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
250            Err(_) => {
251                tracing::warn!("Timed out waiting for local list task shutdown");
252                local_list_handle.abort();
253            }
254        }
255    }
256
257    async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
258        let Some(mut runtime) = sync.take() else {
259            return;
260        };
261
262        runtime.service.shutdown();
263        if let Some(mut join) = runtime.join.take() {
264            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
265                Ok(Ok(())) => {}
266                Ok(Err(err)) => {
267                    tracing::warn!("Background sync task ended with join error: {}", err)
268                }
269                Err(_) => {
270                    tracing::warn!("Timed out waiting for background sync shutdown");
271                    join.abort();
272                }
273            }
274        }
275    }
276
277    async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
278        let Some(mut runtime) = mirror.take() else {
279            return;
280        };
281
282        runtime.service.shutdown();
283        if let Some(mut join) = runtime.join.take() {
284            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
285                Ok(Ok(())) => {}
286                Ok(Err(err)) => {
287                    tracing::warn!("Background mirror task ended with join error: {}", err)
288                }
289                Err(_) => {
290                    tracing::warn!("Timed out waiting for background mirror shutdown");
291                    join.abort();
292                }
293            }
294        }
295    }
296
297    fn nostr_mirror_config(
298        config: &Config,
299        active_relays: &[String],
300    ) -> crate::nostr_mirror::NostrMirrorConfig {
301        crate::nostr_mirror::NostrMirrorConfig {
302            relays: active_relays.to_vec(),
303            publish_relays: Self::mirror_publish_relays(active_relays, &config.server.bind_address),
304            blossom_write_servers: config.blossom.all_write_servers(),
305            max_follow_distance: config.nostr.social_graph_crawl_depth,
306            overmute_threshold: config.nostr.overmute_threshold,
307            require_negentropy: config.nostr.negentropy_only,
308            kinds: config.nostr.mirror_kinds.clone(),
309            history_sync_author_chunk_size: config.nostr.history_sync_author_chunk_size.max(1),
310            history_sync_per_author_event_limit: config
311                .nostr
312                .history_sync_per_author_event_limit
313                .max(1),
314            missing_profile_backfill_batch_size: config.nostr.history_sync_author_chunk_size.max(1),
315            history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
316            full_text_note_history_follow_distance: config
317                .nostr
318                .full_text_note_history_follow_distance,
319            full_text_note_history_max_relay_pages: config
320                .nostr
321                .full_text_note_history_max_relay_pages,
322            ..crate::nostr_mirror::NostrMirrorConfig::default()
323        }
324    }
325
326    pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
327        let mut runtime = self.runtime.lock().await;
328
329        Self::shutdown_crawler(&mut runtime.crawler).await;
330        Self::shutdown_mirror(&mut runtime.mirror).await;
331        Self::shutdown_sync(&mut runtime.sync).await;
332
333        if !config.server.mode.background_services_enabled() {
334            return Ok(runtime.status());
335        }
336
337        let active_relays = config.nostr.active_relays();
338
339        if config.nostr.enabled
340            && config.nostr.social_graph_crawl_depth > 0
341            && !active_relays.is_empty()
342        {
343            runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
344                self.graph_store.clone(),
345                self.keys.clone(),
346                active_relays.clone(),
347                config.nostr.social_graph_crawl_depth,
348                self.spambox.clone(),
349                self.data_dir.clone(),
350            ));
351
352            let service = Arc::new(
353                crate::nostr_mirror::BackgroundNostrMirror::new(
354                    Self::nostr_mirror_config(config, &active_relays),
355                    self.store.clone(),
356                    self.graph_store_concrete.clone(),
357                    Some(
358                        nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
359                            .context("Failed to parse keys for background nostr mirror")?,
360                    ),
361                )
362                .await
363                .context("Failed to create background nostr mirror")?,
364            );
365            let service_for_task = service.clone();
366            let join = tokio::task::spawn_blocking(move || {
367                let runtime = tokio::runtime::Builder::new_current_thread()
368                    .enable_all()
369                    .build()
370                    .expect("build background nostr mirror runtime");
371                runtime.block_on(async {
372                    if let Err(err) = service_for_task.run().await {
373                        tracing::error!("Background nostr mirror error: {:#}", err);
374                    }
375                });
376            });
377            runtime.mirror = Some(BackgroundMirrorRuntime {
378                service,
379                join: Some(join),
380            });
381        }
382
383        let has_pinned_refs = self
384            .store
385            .list_pinned_refs()
386            .map(|refs| !refs.is_empty())
387            .unwrap_or(false);
388        let has_tracked_authors = self
389            .store
390            .list_tracked_authors()
391            .map(|authors| !authors.is_empty())
392            .unwrap_or(false);
393
394        if config.sync.enabled
395            && (config.sync.sync_own
396                || config.sync.sync_followed
397                || has_pinned_refs
398                || has_tracked_authors)
399            && !active_relays.is_empty()
400        {
401            let sync_config = crate::sync::SyncConfig {
402                sync_own: config.sync.sync_own,
403                sync_followed: config.sync.sync_followed,
404                relays: active_relays,
405                max_concurrent: config.sync.max_concurrent,
406                webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
407                blossom_timeout_ms: config.sync.blossom_timeout_ms,
408            };
409
410            let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
411                .context("Failed to parse keys for sync")?;
412            let service = Arc::new(
413                crate::sync::BackgroundSync::new(
414                    sync_config,
415                    self.store.clone(),
416                    sync_keys,
417                    self.webrtc_state.clone(),
418                )
419                .await
420                .context("Failed to create background sync service")?,
421            );
422            let contacts_file = self.data_dir.join("contacts.json");
423            let service_for_task = service.clone();
424            let join = tokio::spawn(async move {
425                if let Err(err) = service_for_task.run(contacts_file).await {
426                    tracing::error!("Background sync error: {}", err);
427                }
428            });
429            runtime.sync = Some(BackgroundSyncRuntime {
430                service,
431                join: Some(join),
432            });
433        }
434
435        Ok(runtime.status())
436    }
437}
438
439#[cfg(feature = "p2p")]
440pub struct EmbeddedPeerRouterController {
441    keys: Keys,
442    data_dir: PathBuf,
443    state: Arc<WebRTCState>,
444    store: Arc<dyn ContentStore>,
445    peer_classifier: PeerClassifier,
446    nostr_relay: Arc<NostrRelay>,
447    runtime: Mutex<Option<PeerRouterRuntime>>,
448}
449
450#[cfg(feature = "p2p")]
451impl EmbeddedPeerRouterController {
452    pub fn new(
453        keys: Keys,
454        data_dir: PathBuf,
455        state: Arc<WebRTCState>,
456        store: Arc<dyn ContentStore>,
457        peer_classifier: PeerClassifier,
458        nostr_relay: Arc<NostrRelay>,
459    ) -> Self {
460        Self {
461            keys,
462            data_dir,
463            state,
464            store,
465            peer_classifier,
466            nostr_relay,
467            runtime: Mutex::new(None),
468        }
469    }
470
471    pub fn state(&self) -> Arc<WebRTCState> {
472        self.state.clone()
473    }
474
475    pub async fn apply_config(&self, config: &Config) -> Result<bool> {
476        let mut runtime = self.runtime.lock().await;
477        if let Some(runtime_handle) = runtime.take() {
478            if let Err(err) =
479                crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await
480            {
481                tracing::warn!("Failed to persist mesh peer state before router restart: {err:#}");
482            }
483            let _ = runtime_handle.shutdown.send(true);
484            runtime_handle.peer_state_persist.abort();
485            let mut join = runtime_handle.join;
486            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
487                Ok(Ok(())) => {}
488                Ok(Err(err)) => {
489                    tracing::warn!("Peer router task ended with join error: {}", err);
490                }
491                Err(_) => {
492                    tracing::warn!("Timed out waiting for peer router shutdown");
493                    join.abort();
494                }
495            }
496        }
497
498        self.state.reset_runtime_state().await;
499        if let Err(err) = crate::p2p_common::load_peer_state(&self.data_dir, &self.state).await {
500            tracing::warn!("Failed to load persisted mesh peer state: {err:#}");
501        }
502
503        if !crate::p2p_common::peer_router_enabled(config) {
504            return Ok(false);
505        }
506
507        let webrtc_config = crate::p2p_common::default_webrtc_config(config);
508        let mut manager = if config.server.mode.hash_get_enabled() {
509            WebRTCManager::new_with_state_and_store_and_classifier(
510                self.keys.clone(),
511                webrtc_config,
512                self.state.clone(),
513                self.store.clone(),
514                self.peer_classifier.clone(),
515            )
516        } else {
517            let mut manager =
518                WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
519            manager.set_peer_classifier(self.peer_classifier.clone());
520            manager
521        };
522        manager
523            .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
524        let shutdown = manager.shutdown_signal();
525        let join = tokio::spawn(async move {
526            if let Err(err) = manager.run().await {
527                tracing::error!("Peer router error: {}", err);
528            }
529        });
530        let peer_state_persist = crate::p2p_common::spawn_peer_state_persist_task(
531            self.data_dir.clone(),
532            self.state.clone(),
533        );
534        *runtime = Some(PeerRouterRuntime {
535            shutdown,
536            join,
537            peer_state_persist,
538        });
539        Ok(true)
540    }
541
542    pub async fn shutdown(&self) {
543        let mut runtime = self.runtime.lock().await;
544        let Some(runtime_handle) = runtime.take() else {
545            return;
546        };
547
548        if let Err(err) = crate::p2p_common::persist_peer_state(&self.data_dir, &self.state).await {
549            tracing::warn!("Failed to persist mesh peer state during router shutdown: {err:#}");
550        }
551        let _ = runtime_handle.shutdown.send(true);
552        runtime_handle.peer_state_persist.abort();
553        let mut join = runtime_handle.join;
554        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
555            Ok(Ok(())) => {}
556            Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
557            Err(_) => {
558                tracing::warn!("Timed out waiting for peer router shutdown");
559                join.abort();
560            }
561        }
562
563        self.state.reset_runtime_state().await;
564    }
565}
566
567pub struct EmbeddedDaemonController {
568    server_controller: Arc<EmbeddedServerController>,
569    #[cfg(feature = "p2p")]
570    peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
571    background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
572}
573
574impl EmbeddedDaemonController {
575    #[cfg(feature = "p2p")]
576    pub fn new(
577        server_controller: Arc<EmbeddedServerController>,
578        peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
579        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
580    ) -> Self {
581        Self {
582            server_controller,
583            #[cfg(feature = "p2p")]
584            peer_router_controller,
585            background_services_controller,
586        }
587    }
588
589    #[cfg(not(feature = "p2p"))]
590    pub fn new(
591        server_controller: Arc<EmbeddedServerController>,
592        background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
593    ) -> Self {
594        Self {
595            server_controller,
596            background_services_controller,
597        }
598    }
599
600    pub async fn shutdown(&self) {
601        self.server_controller.shutdown().await;
602        if let Some(controller) = self.background_services_controller.as_ref() {
603            controller.shutdown().await;
604        }
605        #[cfg(feature = "p2p")]
606        if let Some(controller) = self.peer_router_controller.as_ref() {
607            controller.shutdown().await;
608        }
609    }
610}
611
612pub struct EmbeddedDaemonOptions {
613    pub config: Config,
614    pub data_dir: PathBuf,
615    pub config_dir: Option<PathBuf>,
616    pub bind_address: String,
617    pub relays: Option<Vec<String>>,
618    pub extra_routes: Option<Router<AppState>>,
619    pub cors: Option<CorsLayer>,
620}
621
622pub struct EmbeddedDaemonInfo {
623    pub addr: String,
624    pub port: u16,
625    pub npub: String,
626    pub store: Arc<HashtreeStore>,
627    pub daemon_controller: Arc<EmbeddedDaemonController>,
628    #[allow(dead_code)]
629    pub webrtc_state: Option<Arc<WebRTCState>>,
630    #[cfg(feature = "p2p")]
631    #[allow(dead_code)]
632    pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
633    #[allow(dead_code)]
634    pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
635}
636
637pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
638    let _ = rustls::crypto::ring::default_provider().install_default();
639
640    let mut config = opts.config;
641    config.server.bind_address = opts.bind_address.clone();
642    if let Some(relays) = opts.relays {
643        config.nostr.relays = relays;
644        config.nostr.enabled = !config.nostr.relays.is_empty();
645    }
646
647    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
648    let nostr_db_max_bytes = config
649        .nostr
650        .db_max_size_gb
651        .saturating_mul(1024 * 1024 * 1024);
652    let spambox_db_max_bytes = config
653        .nostr
654        .spambox_max_size_gb
655        .saturating_mul(1024 * 1024 * 1024);
656
657    let store = Arc::new(HashtreeStore::with_options(
658        &opts.data_dir,
659        config.storage.s3.as_ref(),
660        max_size_bytes,
661    )?);
662
663    let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
664        ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
665    } else {
666        ensure_keys()?
667    };
668    let pk_bytes = pubkey_bytes(&keys);
669    let npub = keys
670        .public_key()
671        .to_bech32()
672        .context("Failed to encode npub")?;
673
674    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
675    allowed_pubkeys.insert(hex::encode(pk_bytes));
676    for npub_str in &config.nostr.allowed_npubs {
677        if let Ok(pk) = parse_npub(npub_str) {
678            allowed_pubkeys.insert(hex::encode(pk));
679        } else {
680            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
681        }
682    }
683
684    let graph_store = socialgraph::open_social_graph_store_with_storage(
685        &opts.data_dir,
686        store.store_arc(),
687        Some(nostr_db_max_bytes),
688    )
689    .context("Failed to initialize social graph store")?;
690    graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
691
692    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
693        parse_npub(root_npub).unwrap_or(pk_bytes)
694    } else {
695        pk_bytes
696    };
697    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
698    socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
699        .context("Failed to sync local social graph lists")?;
700    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
701
702    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
703        Arc::clone(&social_graph_store),
704        config.nostr.max_write_distance,
705        allowed_pubkeys.clone(),
706    ));
707
708    let nostr_relay_config = NostrRelayConfig {
709        spambox_db_max_bytes,
710        ..Default::default()
711    };
712    let mut public_event_pubkeys = HashSet::new();
713    public_event_pubkeys.insert(hex::encode(pk_bytes));
714    let nostr_relay = Arc::new(
715        NostrRelay::new(
716            Arc::clone(&social_graph_store),
717            opts.data_dir.clone(),
718            public_event_pubkeys,
719            Some(social_graph.clone()),
720            nostr_relay_config,
721        )
722        .context("Failed to initialize Nostr relay")?,
723    );
724
725    let crawler_spambox = if spambox_db_max_bytes == 0 {
726        None
727    } else {
728        let spam_dir = opts.data_dir.join("socialgraph_spambox");
729        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
730            Ok(store) => Some(store),
731            Err(err) => {
732                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
733                None
734            }
735        }
736    };
737    let crawler_spambox_backend = crawler_spambox
738        .clone()
739        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
740
741    #[cfg(feature = "p2p")]
742    let (webrtc_state, peer_router_controller): (
743        Option<Arc<WebRTCState>>,
744        Option<Arc<EmbeddedPeerRouterController>>,
745    ) = {
746        let router_config = crate::p2p_common::default_webrtc_config(&config);
747        let peer_classifier = crate::p2p_common::build_peer_classifier(
748            opts.data_dir.clone(),
749            Arc::clone(&social_graph_store),
750        );
751        let cashu_payment_client =
752            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
753                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
754                    Ok(client) => {
755                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
756                    }
757                    Err(err) => {
758                        tracing::warn!(
759                        "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
760                        err
761                    );
762                        None
763                    }
764                }
765            } else {
766                None
767            };
768        let cashu_mint_metadata =
769            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
770                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
771                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
772                    Ok(store) => Some(store),
773                    Err(err) => {
774                        tracing::warn!(
775                        "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
776                        err
777                    );
778                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
779                    }
780                }
781            } else {
782                None
783            };
784
785        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
786            router_config.request_selection_strategy,
787            router_config.request_fairness_enabled,
788            router_config.request_dispatch,
789            std::time::Duration::from_millis(router_config.message_timeout_ms),
790            crate::webrtc::CashuRoutingConfig::from(&config.cashu),
791            cashu_payment_client,
792            cashu_mint_metadata,
793        ));
794        let controller = Arc::new(EmbeddedPeerRouterController::new(
795            keys.clone(),
796            opts.data_dir.clone(),
797            state.clone(),
798            Arc::clone(&store) as Arc<dyn ContentStore>,
799            peer_classifier,
800            nostr_relay.clone(),
801        ));
802        controller.apply_config(&config).await?;
803        (Some(state), Some(controller))
804    };
805
806    #[cfg(not(feature = "p2p"))]
807    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
808
809    let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
810        keys.clone(),
811        opts.data_dir.clone(),
812        Arc::clone(&store),
813        graph_store.clone(),
814        Arc::clone(&social_graph_store),
815        crawler_spambox_backend,
816        webrtc_state.clone(),
817    ));
818    background_services_controller.apply_config(&config).await?;
819
820    let upstream_blossom = config.blossom.all_read_servers();
821    let active_nostr_relays = config.nostr.active_relays();
822
823    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
824        .with_server_mode(config.server.mode)
825        .with_hash_get_enabled(config.server.mode.hash_get_enabled())
826        .with_allowed_pubkeys(allowed_pubkeys.clone())
827        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
828        .with_public_writes(config.server.public_writes)
829        .with_upstream_blossom(upstream_blossom)
830        .with_nostr_relay_urls(active_nostr_relays)
831        .with_social_graph(social_graph)
832        .with_socialgraph_snapshot(
833            Arc::clone(&social_graph_store),
834            social_graph_root_bytes,
835            config.server.socialgraph_snapshot_public,
836        )
837        .with_nostr_relay(nostr_relay.clone());
838
839    if crate::p2p_common::peer_router_enabled(&config) {
840        if let Some(ref state) = webrtc_state {
841            server = server.with_webrtc_peers(state.clone());
842        }
843    }
844
845    if let Some(extra) = opts.extra_routes {
846        server = server.with_extra_routes(extra);
847    }
848    if let Some(cors) = opts.cors {
849        server = server.with_cors(cors);
850    }
851
852    spawn_background_eviction_task(
853        Arc::clone(&store),
854        BACKGROUND_EVICTION_INTERVAL,
855        "embedded daemon",
856    );
857
858    let listener = TcpListener::bind(&opts.bind_address).await?;
859    let local_addr = listener.local_addr()?;
860    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
861
862    let server_shutdown = Arc::new(Notify::new());
863    let server_shutdown_for_task = Arc::clone(&server_shutdown);
864    let server_join = tokio::spawn(async move {
865        if let Err(e) = server
866            .run_with_listener_until(listener, async move {
867                server_shutdown_for_task.notified().await;
868            })
869            .await
870        {
871            tracing::error!("Embedded daemon server error: {}", e);
872        }
873    });
874    let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
875    #[cfg(feature = "p2p")]
876    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
877        server_controller,
878        peer_router_controller.clone(),
879        Some(background_services_controller.clone()),
880    ));
881    #[cfg(not(feature = "p2p"))]
882    let daemon_controller = Arc::new(EmbeddedDaemonController::new(
883        server_controller,
884        Some(background_services_controller.clone()),
885    ));
886
887    tracing::info!(
888        "Embedded daemon started on {}, identity {}",
889        actual_addr,
890        npub
891    );
892
893    Ok(EmbeddedDaemonInfo {
894        addr: actual_addr,
895        port: local_addr.port(),
896        npub,
897        store,
898        daemon_controller,
899        webrtc_state,
900        #[cfg(feature = "p2p")]
901        peer_router_controller,
902        background_services_controller: Some(background_services_controller),
903    })
904}
905
906#[cfg(test)]
907mod tests {
908    use super::EmbeddedBackgroundServicesController;
909    use crate::config::Config;
910
911    #[test]
912    fn mirror_publish_relays_prefers_known_root_publish_relays() {
913        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
914            &[
915                "wss://graph-relay.iris.to".to_string(),
916                "wss://relay.example".to_string(),
917                "wss://relay.damus.io".to_string(),
918                "wss://temp.iris.to".to_string(),
919                "wss://vault.iris.to".to_string(),
920                "wss://upload.iris.to/nostr".to_string(),
921            ],
922            "0.0.0.0:8080",
923        );
924        assert_eq!(
925            relays,
926            vec![
927                "wss://temp.iris.to".to_string(),
928                "wss://vault.iris.to".to_string(),
929                "wss://relay.damus.io".to_string(),
930            ]
931        );
932    }
933
934    #[test]
935    fn mirror_publish_relays_filters_known_bad_publish_targets_when_no_preferred_remain() {
936        let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
937            &[
938                "wss://graph-relay.iris.to".to_string(),
939                "wss://relay.snort.social".to_string(),
940                "wss://relay.nostr.band".to_string(),
941                "wss://upload.iris.to/nostr".to_string(),
942            ],
943            "0.0.0.0:8080",
944        );
945        assert_eq!(
946            relays,
947            vec![
948                "wss://relay.snort.social".to_string(),
949                "wss://relay.nostr.band".to_string(),
950            ]
951        );
952    }
953
954    #[test]
955    fn nostr_mirror_config_allows_disabling_full_note_paging() {
956        let mut config = Config::default();
957        config.nostr.full_text_note_history_max_relay_pages = 0;
958
959        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
960            &config,
961            &["wss://relay.example".to_string()],
962        );
963
964        assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 0);
965
966        config.nostr.full_text_note_history_max_relay_pages = 64;
967        let mirror_config = EmbeddedBackgroundServicesController::nostr_mirror_config(
968            &config,
969            &["wss://relay.example".to_string()],
970        );
971
972        assert_eq!(mirror_config.full_text_note_history_max_relay_pages, 64);
973    }
974}