1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use std::collections::HashSet;
5use std::path::PathBuf;
6use std::sync::Arc;
7use tokio::net::TcpListener;
8use tower_http::cors::CorsLayer;
9
10use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
11use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
12use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
13use crate::server::{AppState, HashtreeServer};
14use crate::socialgraph;
15use crate::storage::HashtreeStore;
16
17#[cfg(feature = "p2p")]
18use crate::webrtc::{ContentStore, WebRTCManager, WebRTCState};
19#[cfg(not(feature = "p2p"))]
20use crate::WebRTCState;
21
22pub struct EmbeddedDaemonOptions {
23 pub config: Config,
24 pub data_dir: PathBuf,
25 pub bind_address: String,
26 pub relays: Option<Vec<String>>,
27 pub extra_routes: Option<Router<AppState>>,
28 pub cors: Option<CorsLayer>,
29}
30
31pub struct EmbeddedDaemonInfo {
32 pub addr: String,
33 pub port: u16,
34 pub npub: String,
35 pub store: Arc<HashtreeStore>,
36 #[allow(dead_code)]
37 pub webrtc_state: Option<Arc<WebRTCState>>,
38}
39
40pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
41 let _ = rustls::crypto::ring::default_provider().install_default();
42
43 let mut config = opts.config;
44 if let Some(relays) = opts.relays {
45 config.nostr.relays = relays;
46 }
47
48 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
49 let nostr_db_max_bytes = config
50 .nostr
51 .db_max_size_gb
52 .saturating_mul(1024 * 1024 * 1024);
53 let spambox_db_max_bytes = config
54 .nostr
55 .spambox_max_size_gb
56 .saturating_mul(1024 * 1024 * 1024);
57
58 let store = Arc::new(HashtreeStore::with_options(
59 &opts.data_dir,
60 config.storage.s3.as_ref(),
61 max_size_bytes,
62 )?);
63
64 let (keys, _was_generated) = ensure_keys()?;
65 let pk_bytes = pubkey_bytes(&keys);
66 let npub = keys
67 .public_key()
68 .to_bech32()
69 .context("Failed to encode npub")?;
70
71 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
72 allowed_pubkeys.insert(hex::encode(pk_bytes));
73 for npub_str in &config.nostr.allowed_npubs {
74 if let Ok(pk) = parse_npub(npub_str) {
75 allowed_pubkeys.insert(hex::encode(pk));
76 } else {
77 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
78 }
79 }
80
81 let graph_store = socialgraph::open_social_graph_store_with_storage(
82 &opts.data_dir,
83 store.store_arc(),
84 Some(nostr_db_max_bytes),
85 )
86 .context("Failed to initialize social graph store")?;
87
88 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
89 parse_npub(root_npub).unwrap_or(pk_bytes)
90 } else {
91 pk_bytes
92 };
93 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
94 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
95
96 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
97 Arc::clone(&social_graph_store),
98 config.nostr.max_write_distance,
99 allowed_pubkeys.clone(),
100 ));
101
102 let nostr_relay_config = NostrRelayConfig {
103 spambox_db_max_bytes,
104 ..Default::default()
105 };
106 let nostr_relay = Arc::new(
107 NostrRelay::new(
108 Arc::clone(&social_graph_store),
109 opts.data_dir.clone(),
110 Some(social_graph.clone()),
111 nostr_relay_config,
112 )
113 .context("Failed to initialize Nostr relay")?,
114 );
115
116 let crawler_spambox = if spambox_db_max_bytes == 0 {
117 None
118 } else {
119 let spam_dir = opts.data_dir.join("socialgraph_spambox");
120 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
121 Ok(store) => Some(store),
122 Err(err) => {
123 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
124 None
125 }
126 }
127 };
128 let crawler_spambox_backend = crawler_spambox
129 .clone()
130 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
131 let _crawler_tasks = socialgraph::crawler::spawn_social_graph_tasks(
132 graph_store.clone(),
133 keys.clone(),
134 config.nostr.relays.clone(),
135 config.nostr.crawl_depth,
136 crawler_spambox_backend,
137 opts.data_dir.clone(),
138 );
139
140 #[cfg(feature = "p2p")]
141 let webrtc_state: Option<Arc<WebRTCState>> = {
142 let (webrtc_state, webrtc_handle) = if config.server.enable_webrtc {
143 let webrtc_config = crate::p2p_common::default_webrtc_config(&config.nostr.relays);
144 let peer_classifier = crate::p2p_common::build_peer_classifier(
145 opts.data_dir.clone(),
146 Arc::clone(&social_graph_store),
147 );
148 let cashu_payment_client = if config.cashu.default_mint.is_some()
149 || !config.cashu.accepted_mints.is_empty()
150 {
151 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
152 Ok(client) => {
153 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
154 }
155 Err(err) => {
156 tracing::warn!(
157 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
158 err
159 );
160 None
161 }
162 }
163 } else {
164 None
165 };
166 let cashu_mint_metadata = if config.cashu.default_mint.is_some()
167 || !config.cashu.accepted_mints.is_empty()
168 {
169 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
170 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
171 Ok(store) => Some(store),
172 Err(err) => {
173 tracing::warn!(
174 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
175 err
176 );
177 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
178 }
179 }
180 } else {
181 None
182 };
183
184 let mut manager = WebRTCManager::new_with_store_and_classifier_and_cashu(
185 keys.clone(),
186 webrtc_config,
187 Arc::clone(&store) as Arc<dyn ContentStore>,
188 peer_classifier,
189 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
190 cashu_payment_client,
191 cashu_mint_metadata,
192 );
193 manager.set_nostr_relay(nostr_relay.clone());
194
195 let webrtc_state = manager.state();
196 let handle = tokio::spawn(async move {
197 if let Err(e) = manager.run().await {
198 tracing::error!("WebRTC manager error: {}", e);
199 }
200 });
201 (Some(webrtc_state), Some(handle))
202 } else {
203 (None, None)
204 };
205 let _ = webrtc_handle;
206 webrtc_state
207 };
208
209 #[cfg(not(feature = "p2p"))]
210 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
211
212 let mut upstream_blossom = config.blossom.servers.clone();
213 upstream_blossom.extend(config.blossom.read_servers.clone());
214
215 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
216 .with_allowed_pubkeys(allowed_pubkeys.clone())
217 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
218 .with_public_writes(config.server.public_writes)
219 .with_upstream_blossom(upstream_blossom)
220 .with_social_graph(social_graph)
221 .with_socialgraph_snapshot(
222 Arc::clone(&social_graph_store),
223 social_graph_root_bytes,
224 config.server.socialgraph_snapshot_public,
225 )
226 .with_nostr_relay(nostr_relay.clone());
227
228 if let Some(ref state) = webrtc_state {
229 server = server.with_webrtc_peers(state.clone());
230 }
231
232 if let Some(extra) = opts.extra_routes {
233 server = server.with_extra_routes(extra);
234 }
235 if let Some(cors) = opts.cors {
236 server = server.with_cors(cors);
237 }
238
239 if config.sync.enabled {
240 let mut blossom_read_servers = config.blossom.servers.clone();
241 blossom_read_servers.extend(config.blossom.read_servers.clone());
242 let sync_config = crate::sync::SyncConfig {
243 sync_own: config.sync.sync_own,
244 sync_followed: config.sync.sync_followed,
245 relays: config.nostr.relays.clone(),
246 max_concurrent: config.sync.max_concurrent,
247 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
248 blossom_timeout_ms: config.sync.blossom_timeout_ms,
249 };
250
251 let sync_keys = nostr_sdk::Keys::parse(&keys.secret_key().to_bech32()?)
252 .context("Failed to parse keys for sync")?;
253
254 let sync_service = crate::sync::BackgroundSync::new(
255 sync_config,
256 Arc::clone(&store),
257 sync_keys,
258 webrtc_state.clone(),
259 )
260 .await
261 .context("Failed to create background sync service")?;
262
263 let contacts_file = opts.data_dir.join("contacts.json");
264 tokio::spawn(async move {
265 if let Err(e) = sync_service.run(contacts_file).await {
266 tracing::error!("Background sync error: {}", e);
267 }
268 });
269 }
270
271 spawn_background_eviction_task(
272 Arc::clone(&store),
273 BACKGROUND_EVICTION_INTERVAL,
274 "embedded daemon",
275 );
276
277 let listener = TcpListener::bind(&opts.bind_address).await?;
278 let local_addr = listener.local_addr()?;
279 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
280
281 tokio::spawn(async move {
282 if let Err(e) = server.run_with_listener(listener).await {
283 tracing::error!("Embedded daemon server error: {}", e);
284 }
285 });
286
287 tracing::info!(
288 "Embedded daemon started on {}, identity {}",
289 actual_addr,
290 npub
291 );
292
293 Ok(EmbeddedDaemonInfo {
294 addr: actual_addr,
295 port: local_addr.port(),
296 npub,
297 store,
298 webrtc_state,
299 })
300}