Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::net::TcpListener;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12use tower_http::cors::CorsLayer;
13
14use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
15use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
16use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
17use crate::server::{AppState, HashtreeServer};
18use crate::socialgraph;
19use crate::storage::HashtreeStore;
20
21#[cfg(feature = "p2p")]
22use crate::webrtc::{ContentStore, PeerClassifier, PeerRouter, WebRTCState};
23#[cfg(not(feature = "p2p"))]
24use crate::WebRTCState;
25
26#[cfg(feature = "p2p")]
27struct PeerRouterRuntime {
28    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
29    join: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33    service: Arc<crate::sync::BackgroundSync>,
34    join: JoinHandle<()>,
35}
36
37struct BackgroundMirrorRuntime {
38    service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
39    join: JoinHandle<()>,
40}
41
42struct BackgroundServicesRuntime {
43    crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
44    mirror: Option<BackgroundMirrorRuntime>,
45    sync: Option<BackgroundSyncRuntime>,
46}
47
48impl BackgroundServicesRuntime {
49    fn status(&self) -> EmbeddedBackgroundServicesStatus {
50        EmbeddedBackgroundServicesStatus {
51            crawler_active: self.crawler.is_some(),
52            mirror_active: self.mirror.is_some(),
53            sync_active: self.sync.is_some(),
54        }
55    }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct EmbeddedBackgroundServicesStatus {
60    pub crawler_active: bool,
61    pub mirror_active: bool,
62    pub sync_active: bool,
63}
64
65pub struct EmbeddedBackgroundServicesController {
66    keys: Keys,
67    data_dir: PathBuf,
68    store: Arc<HashtreeStore>,
69    graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
70    graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
71    spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
72    webrtc_state: Option<Arc<WebRTCState>>,
73    runtime: Mutex<BackgroundServicesRuntime>,
74}
75
76impl EmbeddedBackgroundServicesController {
77    fn local_publish_relay(bind_address: &str) -> Option<String> {
78        let addr: SocketAddr = bind_address.parse().ok()?;
79        if addr.port() == 0 {
80            return None;
81        }
82
83        let host = match addr.ip() {
84            IpAddr::V4(ip) if ip.is_unspecified() => Ipv4Addr::LOCALHOST.to_string(),
85            IpAddr::V6(ip) if ip.is_unspecified() => format!("[{}]", Ipv6Addr::LOCALHOST),
86            IpAddr::V4(ip) => ip.to_string(),
87            IpAddr::V6(ip) => format!("[{ip}]"),
88        };
89        Some(format!("ws://{host}:{}/ws", addr.port()))
90    }
91
92    pub fn new(
93        keys: Keys,
94        data_dir: PathBuf,
95        store: Arc<HashtreeStore>,
96        graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
97        graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
98        spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
99        webrtc_state: Option<Arc<WebRTCState>>,
100    ) -> Self {
101        Self {
102            keys,
103            data_dir,
104            store,
105            graph_store_concrete,
106            graph_store,
107            spambox,
108            webrtc_state,
109            runtime: Mutex::new(BackgroundServicesRuntime {
110                crawler: None,
111                mirror: None,
112                sync: None,
113            }),
114        }
115    }
116
117    pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
118        self.runtime.lock().await.status()
119    }
120
121    pub async fn shutdown(&self) {
122        let mut runtime = self.runtime.lock().await;
123        Self::shutdown_crawler(&mut runtime.crawler).await;
124        Self::shutdown_mirror(&mut runtime.mirror).await;
125        Self::shutdown_sync(&mut runtime.sync).await;
126    }
127
128    async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
129        let Some(handles) = crawler.take() else {
130            return;
131        };
132
133        let _ = handles.shutdown_tx.send(true);
134
135        let mut crawl_handle = handles.crawl_handle;
136        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
137            Ok(Ok(())) => {}
138            Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
139            Err(_) => {
140                tracing::warn!("Timed out waiting for crawler task shutdown");
141                crawl_handle.abort();
142            }
143        }
144
145        let mut local_list_handle = handles.local_list_handle;
146        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
147        {
148            Ok(Ok(())) => {}
149            Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
150            Err(_) => {
151                tracing::warn!("Timed out waiting for local list task shutdown");
152                local_list_handle.abort();
153            }
154        }
155    }
156
157    async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
158        let Some(runtime) = sync.take() else {
159            return;
160        };
161
162        runtime.service.shutdown();
163        let mut join = runtime.join;
164        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
165            Ok(Ok(())) => {}
166            Ok(Err(err)) => tracing::warn!("Background sync task ended with join error: {}", err),
167            Err(_) => {
168                tracing::warn!("Timed out waiting for background sync shutdown");
169                join.abort();
170            }
171        }
172    }
173
174    async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
175        let Some(runtime) = mirror.take() else {
176            return;
177        };
178
179        runtime.service.shutdown();
180        let mut join = runtime.join;
181        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
182            Ok(Ok(())) => {}
183            Ok(Err(err)) => tracing::warn!("Background mirror task ended with join error: {}", err),
184            Err(_) => {
185                tracing::warn!("Timed out waiting for background mirror shutdown");
186                join.abort();
187            }
188        }
189    }
190
191    pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
192        let mut runtime = self.runtime.lock().await;
193
194        Self::shutdown_crawler(&mut runtime.crawler).await;
195        Self::shutdown_mirror(&mut runtime.mirror).await;
196        Self::shutdown_sync(&mut runtime.sync).await;
197
198        let active_relays = config.nostr.active_relays();
199
200        if config.nostr.enabled
201            && config.nostr.social_graph_crawl_depth > 0
202            && !active_relays.is_empty()
203        {
204            runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
205                self.graph_store.clone(),
206                self.keys.clone(),
207                active_relays.clone(),
208                config.nostr.social_graph_crawl_depth,
209                self.spambox.clone(),
210                self.data_dir.clone(),
211            ));
212
213            let service = Arc::new(
214                crate::nostr_mirror::BackgroundNostrMirror::new(
215                    crate::nostr_mirror::NostrMirrorConfig {
216                        relays: active_relays.clone(),
217                        publish_relays: Self::local_publish_relay(&config.server.bind_address)
218                            .into_iter()
219                            .collect(),
220                        max_follow_distance: config.nostr.social_graph_crawl_depth,
221                        require_negentropy: config.nostr.negentropy_only,
222                        kinds: config.nostr.mirror_kinds.clone(),
223                        history_sync_author_chunk_size: config
224                            .nostr
225                            .history_sync_author_chunk_size
226                            .max(1),
227                        history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
228                        ..crate::nostr_mirror::NostrMirrorConfig::default()
229                    },
230                    self.store.clone(),
231                    self.graph_store_concrete.clone(),
232                    Some(
233                        nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
234                            .context("Failed to parse keys for background nostr mirror")?,
235                    ),
236                )
237                .await
238                .context("Failed to create background nostr mirror")?,
239            );
240            let service_for_task = service.clone();
241            let join = tokio::task::spawn_blocking(move || {
242                let runtime = tokio::runtime::Builder::new_current_thread()
243                    .enable_all()
244                    .build()
245                    .expect("build background nostr mirror runtime");
246                runtime.block_on(async {
247                    if let Err(err) = service_for_task.run().await {
248                        tracing::error!("Background nostr mirror error: {:#}", err);
249                    }
250                });
251            });
252            runtime.mirror = Some(BackgroundMirrorRuntime { service, join });
253        }
254
255        if config.sync.enabled
256            && (config.sync.sync_own || config.sync.sync_followed)
257            && !active_relays.is_empty()
258        {
259            let sync_config = crate::sync::SyncConfig {
260                sync_own: config.sync.sync_own,
261                sync_followed: config.sync.sync_followed,
262                relays: active_relays,
263                max_concurrent: config.sync.max_concurrent,
264                webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
265                blossom_timeout_ms: config.sync.blossom_timeout_ms,
266            };
267
268            let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
269                .context("Failed to parse keys for sync")?;
270            let service = Arc::new(
271                crate::sync::BackgroundSync::new(
272                    sync_config,
273                    self.store.clone(),
274                    sync_keys,
275                    self.webrtc_state.clone(),
276                )
277                .await
278                .context("Failed to create background sync service")?,
279            );
280            let contacts_file = self.data_dir.join("contacts.json");
281            let service_for_task = service.clone();
282            let join = tokio::spawn(async move {
283                if let Err(err) = service_for_task.run(contacts_file).await {
284                    tracing::error!("Background sync error: {}", err);
285                }
286            });
287            runtime.sync = Some(BackgroundSyncRuntime { service, join });
288        }
289
290        Ok(runtime.status())
291    }
292}
293
294#[cfg(feature = "p2p")]
295pub struct EmbeddedPeerRouterController {
296    keys: Keys,
297    state: Arc<WebRTCState>,
298    store: Arc<dyn ContentStore>,
299    peer_classifier: PeerClassifier,
300    nostr_relay: Arc<NostrRelay>,
301    runtime: Mutex<Option<PeerRouterRuntime>>,
302}
303
304#[cfg(feature = "p2p")]
305impl EmbeddedPeerRouterController {
306    pub fn new(
307        keys: Keys,
308        state: Arc<WebRTCState>,
309        store: Arc<dyn ContentStore>,
310        peer_classifier: PeerClassifier,
311        nostr_relay: Arc<NostrRelay>,
312    ) -> Self {
313        Self {
314            keys,
315            state,
316            store,
317            peer_classifier,
318            nostr_relay,
319            runtime: Mutex::new(None),
320        }
321    }
322
323    pub fn state(&self) -> Arc<WebRTCState> {
324        self.state.clone()
325    }
326
327    pub async fn apply_config(&self, config: &Config) -> Result<bool> {
328        let mut runtime = self.runtime.lock().await;
329        if let Some(runtime_handle) = runtime.take() {
330            let _ = runtime_handle.shutdown.send(true);
331            let mut join = runtime_handle.join;
332            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
333                Ok(Ok(())) => {}
334                Ok(Err(err)) => {
335                    tracing::warn!("Peer router task ended with join error: {}", err);
336                }
337                Err(_) => {
338                    tracing::warn!("Timed out waiting for peer router shutdown");
339                    join.abort();
340                }
341            }
342        }
343
344        self.state.reset_runtime_state().await;
345
346        if !crate::p2p_common::peer_router_enabled(config) {
347            return Ok(false);
348        }
349
350        let webrtc_config = crate::p2p_common::default_webrtc_config(config);
351        let mut manager = PeerRouter::new_with_state_and_store_and_classifier(
352            self.keys.clone(),
353            webrtc_config,
354            self.state.clone(),
355            self.store.clone(),
356            self.peer_classifier.clone(),
357        );
358        manager.set_nostr_relay(self.nostr_relay.clone());
359        let shutdown = manager.shutdown_signal();
360        let join = tokio::spawn(async move {
361            if let Err(err) = manager.run().await {
362                tracing::error!("Peer router error: {}", err);
363            }
364        });
365        *runtime = Some(PeerRouterRuntime { shutdown, join });
366        Ok(true)
367    }
368}
369
370pub struct EmbeddedDaemonOptions {
371    pub config: Config,
372    pub data_dir: PathBuf,
373    pub bind_address: String,
374    pub relays: Option<Vec<String>>,
375    pub extra_routes: Option<Router<AppState>>,
376    pub cors: Option<CorsLayer>,
377}
378
379pub struct EmbeddedDaemonInfo {
380    pub addr: String,
381    pub port: u16,
382    pub npub: String,
383    pub store: Arc<HashtreeStore>,
384    #[allow(dead_code)]
385    pub webrtc_state: Option<Arc<WebRTCState>>,
386    #[cfg(feature = "p2p")]
387    #[allow(dead_code)]
388    pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
389    #[allow(dead_code)]
390    pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
391}
392
393pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
394    let _ = rustls::crypto::ring::default_provider().install_default();
395
396    let mut config = opts.config;
397    if let Some(relays) = opts.relays {
398        config.nostr.relays = relays;
399        config.nostr.enabled = !config.nostr.relays.is_empty();
400    }
401
402    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
403    let nostr_db_max_bytes = config
404        .nostr
405        .db_max_size_gb
406        .saturating_mul(1024 * 1024 * 1024);
407    let spambox_db_max_bytes = config
408        .nostr
409        .spambox_max_size_gb
410        .saturating_mul(1024 * 1024 * 1024);
411
412    let store = Arc::new(HashtreeStore::with_options(
413        &opts.data_dir,
414        config.storage.s3.as_ref(),
415        max_size_bytes,
416    )?);
417
418    let (keys, _was_generated) = ensure_keys()?;
419    let pk_bytes = pubkey_bytes(&keys);
420    let npub = keys
421        .public_key()
422        .to_bech32()
423        .context("Failed to encode npub")?;
424
425    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
426    allowed_pubkeys.insert(hex::encode(pk_bytes));
427    for npub_str in &config.nostr.allowed_npubs {
428        if let Ok(pk) = parse_npub(npub_str) {
429            allowed_pubkeys.insert(hex::encode(pk));
430        } else {
431            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
432        }
433    }
434
435    let graph_store = socialgraph::open_social_graph_store_with_storage(
436        &opts.data_dir,
437        store.store_arc(),
438        Some(nostr_db_max_bytes),
439    )
440    .context("Failed to initialize social graph store")?;
441
442    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
443        parse_npub(root_npub).unwrap_or(pk_bytes)
444    } else {
445        pk_bytes
446    };
447    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
448    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
449
450    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
451        Arc::clone(&social_graph_store),
452        config.nostr.max_write_distance,
453        allowed_pubkeys.clone(),
454    ));
455
456    let nostr_relay_config = NostrRelayConfig {
457        spambox_db_max_bytes,
458        ..Default::default()
459    };
460    let mut public_event_pubkeys = HashSet::new();
461    public_event_pubkeys.insert(hex::encode(pk_bytes));
462    let nostr_relay = Arc::new(
463        NostrRelay::new(
464            Arc::clone(&social_graph_store),
465            opts.data_dir.clone(),
466            public_event_pubkeys,
467            Some(social_graph.clone()),
468            nostr_relay_config,
469        )
470        .context("Failed to initialize Nostr relay")?,
471    );
472
473    let crawler_spambox = if spambox_db_max_bytes == 0 {
474        None
475    } else {
476        let spam_dir = opts.data_dir.join("socialgraph_spambox");
477        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
478            Ok(store) => Some(store),
479            Err(err) => {
480                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
481                None
482            }
483        }
484    };
485    let crawler_spambox_backend = crawler_spambox
486        .clone()
487        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
488
489    #[cfg(feature = "p2p")]
490    let (webrtc_state, peer_router_controller): (
491        Option<Arc<WebRTCState>>,
492        Option<Arc<EmbeddedPeerRouterController>>,
493    ) = {
494        let router_config = crate::p2p_common::default_webrtc_config(&config);
495        let peer_classifier = crate::p2p_common::build_peer_classifier(
496            opts.data_dir.clone(),
497            Arc::clone(&social_graph_store),
498        );
499        let cashu_payment_client =
500            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
501                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
502                    Ok(client) => {
503                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
504                    }
505                    Err(err) => {
506                        tracing::warn!(
507                        "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
508                        err
509                    );
510                        None
511                    }
512                }
513            } else {
514                None
515            };
516        let cashu_mint_metadata =
517            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
518                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
519                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
520                    Ok(store) => Some(store),
521                    Err(err) => {
522                        tracing::warn!(
523                        "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
524                        err
525                    );
526                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
527                    }
528                }
529            } else {
530                None
531            };
532
533        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
534            router_config.request_selection_strategy,
535            router_config.request_fairness_enabled,
536            router_config.request_dispatch,
537            std::time::Duration::from_millis(router_config.message_timeout_ms),
538            crate::webrtc::CashuRoutingConfig::from(&config.cashu),
539            cashu_payment_client,
540            cashu_mint_metadata,
541        ));
542        let controller = Arc::new(EmbeddedPeerRouterController::new(
543            keys.clone(),
544            state.clone(),
545            Arc::clone(&store) as Arc<dyn ContentStore>,
546            peer_classifier,
547            nostr_relay.clone(),
548        ));
549        controller.apply_config(&config).await?;
550        (Some(state), Some(controller))
551    };
552
553    #[cfg(not(feature = "p2p"))]
554    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
555    #[cfg(not(feature = "p2p"))]
556    let peer_router_controller = None;
557
558    let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
559        keys.clone(),
560        opts.data_dir.clone(),
561        Arc::clone(&store),
562        graph_store.clone(),
563        Arc::clone(&social_graph_store),
564        crawler_spambox_backend,
565        webrtc_state.clone(),
566    ));
567    background_services_controller.apply_config(&config).await?;
568
569    let upstream_blossom = config.blossom.all_read_servers();
570    let active_nostr_relays = config.nostr.active_relays();
571
572    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
573        .with_allowed_pubkeys(allowed_pubkeys.clone())
574        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
575        .with_public_writes(config.server.public_writes)
576        .with_upstream_blossom(upstream_blossom)
577        .with_nostr_relay_urls(active_nostr_relays)
578        .with_social_graph(social_graph)
579        .with_socialgraph_snapshot(
580            Arc::clone(&social_graph_store),
581            social_graph_root_bytes,
582            config.server.socialgraph_snapshot_public,
583        )
584        .with_nostr_relay(nostr_relay.clone());
585
586    if let Some(ref state) = webrtc_state {
587        server = server.with_webrtc_peers(state.clone());
588    }
589
590    if let Some(extra) = opts.extra_routes {
591        server = server.with_extra_routes(extra);
592    }
593    if let Some(cors) = opts.cors {
594        server = server.with_cors(cors);
595    }
596
597    spawn_background_eviction_task(
598        Arc::clone(&store),
599        BACKGROUND_EVICTION_INTERVAL,
600        "embedded daemon",
601    );
602
603    let listener = TcpListener::bind(&opts.bind_address).await?;
604    let local_addr = listener.local_addr()?;
605    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
606
607    tokio::spawn(async move {
608        if let Err(e) = server.run_with_listener(listener).await {
609            tracing::error!("Embedded daemon server error: {}", e);
610        }
611    });
612
613    tracing::info!(
614        "Embedded daemon started on {}, identity {}",
615        actual_addr,
616        npub
617    );
618
619    Ok(EmbeddedDaemonInfo {
620        addr: actual_addr,
621        port: local_addr.port(),
622        npub,
623        store,
624        webrtc_state,
625        #[cfg(feature = "p2p")]
626        peer_router_controller,
627        background_services_controller: Some(background_services_controller),
628    })
629}