Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use std::collections::HashSet;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::net::TcpListener;
8use tower_http::cors::CorsLayer;
9
10use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
11use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
12use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
13use crate::server::{AppState, HashtreeServer};
14use crate::socialgraph;
15use crate::storage::HashtreeStore;
16
17#[cfg(feature = "p2p")]
18use crate::webrtc::{ContentStore, WebRTCManager, WebRTCState};
19#[cfg(not(feature = "p2p"))]
20use crate::WebRTCState;
21
22pub struct EmbeddedDaemonOptions {
23    pub config: Config,
24    pub data_dir: PathBuf,
25    pub bind_address: String,
26    pub relays: Option<Vec<String>>,
27    pub extra_routes: Option<Router<AppState>>,
28    pub cors: Option<CorsLayer>,
29}
30
31pub struct EmbeddedDaemonInfo {
32    pub addr: String,
33    pub port: u16,
34    pub npub: String,
35    pub store: Arc<HashtreeStore>,
36    #[allow(dead_code)]
37    pub webrtc_state: Option<Arc<WebRTCState>>,
38}
39
40pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
41    let _ = rustls::crypto::ring::default_provider().install_default();
42
43    let mut config = opts.config;
44    if let Some(relays) = opts.relays {
45        config.nostr.relays = relays;
46    }
47
48    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
49    let nostr_db_max_bytes = config
50        .nostr
51        .db_max_size_gb
52        .saturating_mul(1024 * 1024 * 1024);
53    let spambox_db_max_bytes = config
54        .nostr
55        .spambox_max_size_gb
56        .saturating_mul(1024 * 1024 * 1024);
57
58    let store = Arc::new(HashtreeStore::with_options(
59        &opts.data_dir,
60        config.storage.s3.as_ref(),
61        max_size_bytes,
62    )?);
63
64    let (keys, _was_generated) = ensure_keys()?;
65    let pk_bytes = pubkey_bytes(&keys);
66    let npub = keys
67        .public_key()
68        .to_bech32()
69        .context("Failed to encode npub")?;
70
71    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
72    allowed_pubkeys.insert(hex::encode(pk_bytes));
73    for npub_str in &config.nostr.allowed_npubs {
74        if let Ok(pk) = parse_npub(npub_str) {
75            allowed_pubkeys.insert(hex::encode(pk));
76        } else {
77            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
78        }
79    }
80
81    let graph_store = socialgraph::open_social_graph_store_with_storage(
82        &opts.data_dir,
83        store.store_arc(),
84        Some(nostr_db_max_bytes),
85    )
86    .context("Failed to initialize social graph store")?;
87
88    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
89        parse_npub(root_npub).unwrap_or(pk_bytes)
90    } else {
91        pk_bytes
92    };
93    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
94    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
95
96    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
97        Arc::clone(&social_graph_store),
98        config.nostr.max_write_distance,
99        allowed_pubkeys.clone(),
100    ));
101
102    let nostr_relay_config = NostrRelayConfig {
103        spambox_db_max_bytes,
104        ..Default::default()
105    };
106    let nostr_relay = Arc::new(
107        NostrRelay::new(
108            Arc::clone(&social_graph_store),
109            opts.data_dir.clone(),
110            Some(social_graph.clone()),
111            nostr_relay_config,
112        )
113        .context("Failed to initialize Nostr relay")?,
114    );
115
116    let crawler_spambox = if spambox_db_max_bytes == 0 {
117        None
118    } else {
119        let spam_dir = opts.data_dir.join("socialgraph_spambox");
120        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
121            Ok(store) => Some(store),
122            Err(err) => {
123                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
124                None
125            }
126        }
127    };
128    let crawler_spambox_backend = crawler_spambox
129        .clone()
130        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
131    let _crawler_tasks = socialgraph::crawler::spawn_social_graph_tasks(
132        graph_store.clone(),
133        keys.clone(),
134        config.nostr.relays.clone(),
135        config.nostr.crawl_depth,
136        crawler_spambox_backend,
137        opts.data_dir.clone(),
138    );
139
140    #[cfg(feature = "p2p")]
141    let webrtc_state: Option<Arc<WebRTCState>> = {
142        let (webrtc_state, webrtc_handle) = if config.server.enable_webrtc {
143            let webrtc_config = crate::p2p_common::default_webrtc_config(&config.nostr.relays);
144            let peer_classifier = crate::p2p_common::build_peer_classifier(
145                opts.data_dir.clone(),
146                Arc::clone(&social_graph_store),
147            );
148            let cashu_payment_client = if config.cashu.default_mint.is_some()
149                || !config.cashu.accepted_mints.is_empty()
150            {
151                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
152                    Ok(client) => {
153                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
154                    }
155                    Err(err) => {
156                        tracing::warn!(
157                            "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
158                            err
159                        );
160                        None
161                    }
162                }
163            } else {
164                None
165            };
166            let cashu_mint_metadata = if config.cashu.default_mint.is_some()
167                || !config.cashu.accepted_mints.is_empty()
168            {
169                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
170                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
171                    Ok(store) => Some(store),
172                    Err(err) => {
173                        tracing::warn!(
174                            "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
175                            err
176                        );
177                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
178                    }
179                }
180            } else {
181                None
182            };
183
184            let mut manager = WebRTCManager::new_with_store_and_classifier_and_cashu(
185                keys.clone(),
186                webrtc_config,
187                Arc::clone(&store) as Arc<dyn ContentStore>,
188                peer_classifier,
189                crate::webrtc::CashuRoutingConfig::from(&config.cashu),
190                cashu_payment_client,
191                cashu_mint_metadata,
192            );
193            manager.set_nostr_relay(nostr_relay.clone());
194
195            let webrtc_state = manager.state();
196            let handle = tokio::spawn(async move {
197                if let Err(e) = manager.run().await {
198                    tracing::error!("WebRTC manager error: {}", e);
199                }
200            });
201            (Some(webrtc_state), Some(handle))
202        } else {
203            (None, None)
204        };
205        let _ = webrtc_handle;
206        webrtc_state
207    };
208
209    #[cfg(not(feature = "p2p"))]
210    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
211
212    let mut upstream_blossom = config.blossom.servers.clone();
213    upstream_blossom.extend(config.blossom.read_servers.clone());
214
215    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
216        .with_allowed_pubkeys(allowed_pubkeys.clone())
217        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
218        .with_public_writes(config.server.public_writes)
219        .with_upstream_blossom(upstream_blossom)
220        .with_social_graph(social_graph)
221        .with_socialgraph_snapshot(
222            Arc::clone(&social_graph_store),
223            social_graph_root_bytes,
224            config.server.socialgraph_snapshot_public,
225        )
226        .with_nostr_relay(nostr_relay.clone());
227
228    if let Some(ref state) = webrtc_state {
229        server = server.with_webrtc_peers(state.clone());
230    }
231
232    if let Some(extra) = opts.extra_routes {
233        server = server.with_extra_routes(extra);
234    }
235    if let Some(cors) = opts.cors {
236        server = server.with_cors(cors);
237    }
238
239    if config.sync.enabled {
240        let mut blossom_read_servers = config.blossom.servers.clone();
241        blossom_read_servers.extend(config.blossom.read_servers.clone());
242        let sync_config = crate::sync::SyncConfig {
243            sync_own: config.sync.sync_own,
244            sync_followed: config.sync.sync_followed,
245            relays: config.nostr.relays.clone(),
246            max_concurrent: config.sync.max_concurrent,
247            webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
248            blossom_timeout_ms: config.sync.blossom_timeout_ms,
249        };
250
251        let sync_keys = nostr_sdk::Keys::parse(&keys.secret_key().to_bech32()?)
252            .context("Failed to parse keys for sync")?;
253
254        let sync_service = crate::sync::BackgroundSync::new(
255            sync_config,
256            Arc::clone(&store),
257            sync_keys,
258            webrtc_state.clone(),
259        )
260        .await
261        .context("Failed to create background sync service")?;
262
263        let contacts_file = opts.data_dir.join("contacts.json");
264        tokio::spawn(async move {
265            if let Err(e) = sync_service.run(contacts_file).await {
266                tracing::error!("Background sync error: {}", e);
267            }
268        });
269    }
270
271    spawn_background_eviction_task(
272        Arc::clone(&store),
273        BACKGROUND_EVICTION_INTERVAL,
274        "embedded daemon",
275    );
276
277    let listener = TcpListener::bind(&opts.bind_address).await?;
278    let local_addr = listener.local_addr()?;
279    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
280
281    tokio::spawn(async move {
282        if let Err(e) = server.run_with_listener(listener).await {
283            tracing::error!("Embedded daemon server error: {}", e);
284        }
285    });
286
287    tracing::info!(
288        "Embedded daemon started on {}, identity {}",
289        actual_addr,
290        npub
291    );
292
293    Ok(EmbeddedDaemonInfo {
294        addr: actual_addr,
295        port: local_addr.port(),
296        npub,
297        store,
298        webrtc_state,
299    })
300}