1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::net::TcpListener;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12use tower_http::cors::CorsLayer;
13
14use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
15use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
16use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
17use crate::server::{AppState, HashtreeServer};
18use crate::socialgraph;
19use crate::storage::HashtreeStore;
20
21#[cfg(feature = "p2p")]
22use crate::webrtc::{ContentStore, PeerClassifier, PeerRouter, WebRTCState};
23#[cfg(not(feature = "p2p"))]
24use crate::WebRTCState;
25
26#[cfg(feature = "p2p")]
27struct PeerRouterRuntime {
28 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
29 join: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: JoinHandle<()>,
35}
36
37struct BackgroundMirrorRuntime {
38 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
39 join: JoinHandle<()>,
40}
41
42struct BackgroundServicesRuntime {
43 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
44 mirror: Option<BackgroundMirrorRuntime>,
45 sync: Option<BackgroundSyncRuntime>,
46}
47
48impl BackgroundServicesRuntime {
49 fn status(&self) -> EmbeddedBackgroundServicesStatus {
50 EmbeddedBackgroundServicesStatus {
51 crawler_active: self.crawler.is_some(),
52 mirror_active: self.mirror.is_some(),
53 sync_active: self.sync.is_some(),
54 }
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct EmbeddedBackgroundServicesStatus {
60 pub crawler_active: bool,
61 pub mirror_active: bool,
62 pub sync_active: bool,
63}
64
65pub struct EmbeddedBackgroundServicesController {
66 keys: Keys,
67 data_dir: PathBuf,
68 store: Arc<HashtreeStore>,
69 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
70 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
71 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
72 webrtc_state: Option<Arc<WebRTCState>>,
73 runtime: Mutex<BackgroundServicesRuntime>,
74}
75
76impl EmbeddedBackgroundServicesController {
77 fn local_publish_relay(bind_address: &str) -> Option<String> {
78 let addr: SocketAddr = bind_address.parse().ok()?;
79 if addr.port() == 0 {
80 return None;
81 }
82
83 let host = match addr.ip() {
84 IpAddr::V4(ip) if ip.is_unspecified() => Ipv4Addr::LOCALHOST.to_string(),
85 IpAddr::V6(ip) if ip.is_unspecified() => format!("[{}]", Ipv6Addr::LOCALHOST),
86 IpAddr::V4(ip) => ip.to_string(),
87 IpAddr::V6(ip) => format!("[{ip}]"),
88 };
89 Some(format!("ws://{host}:{}/ws", addr.port()))
90 }
91
92 pub fn new(
93 keys: Keys,
94 data_dir: PathBuf,
95 store: Arc<HashtreeStore>,
96 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
97 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
98 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
99 webrtc_state: Option<Arc<WebRTCState>>,
100 ) -> Self {
101 Self {
102 keys,
103 data_dir,
104 store,
105 graph_store_concrete,
106 graph_store,
107 spambox,
108 webrtc_state,
109 runtime: Mutex::new(BackgroundServicesRuntime {
110 crawler: None,
111 mirror: None,
112 sync: None,
113 }),
114 }
115 }
116
117 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
118 self.runtime.lock().await.status()
119 }
120
121 pub async fn shutdown(&self) {
122 let mut runtime = self.runtime.lock().await;
123 Self::shutdown_crawler(&mut runtime.crawler).await;
124 Self::shutdown_mirror(&mut runtime.mirror).await;
125 Self::shutdown_sync(&mut runtime.sync).await;
126 }
127
128 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
129 let Some(handles) = crawler.take() else {
130 return;
131 };
132
133 let _ = handles.shutdown_tx.send(true);
134
135 let mut crawl_handle = handles.crawl_handle;
136 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
137 Ok(Ok(())) => {}
138 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
139 Err(_) => {
140 tracing::warn!("Timed out waiting for crawler task shutdown");
141 crawl_handle.abort();
142 }
143 }
144
145 let mut local_list_handle = handles.local_list_handle;
146 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
147 {
148 Ok(Ok(())) => {}
149 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
150 Err(_) => {
151 tracing::warn!("Timed out waiting for local list task shutdown");
152 local_list_handle.abort();
153 }
154 }
155 }
156
157 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
158 let Some(runtime) = sync.take() else {
159 return;
160 };
161
162 runtime.service.shutdown();
163 let mut join = runtime.join;
164 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
165 Ok(Ok(())) => {}
166 Ok(Err(err)) => tracing::warn!("Background sync task ended with join error: {}", err),
167 Err(_) => {
168 tracing::warn!("Timed out waiting for background sync shutdown");
169 join.abort();
170 }
171 }
172 }
173
174 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
175 let Some(runtime) = mirror.take() else {
176 return;
177 };
178
179 runtime.service.shutdown();
180 let mut join = runtime.join;
181 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
182 Ok(Ok(())) => {}
183 Ok(Err(err)) => tracing::warn!("Background mirror task ended with join error: {}", err),
184 Err(_) => {
185 tracing::warn!("Timed out waiting for background mirror shutdown");
186 join.abort();
187 }
188 }
189 }
190
191 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
192 let mut runtime = self.runtime.lock().await;
193
194 Self::shutdown_crawler(&mut runtime.crawler).await;
195 Self::shutdown_mirror(&mut runtime.mirror).await;
196 Self::shutdown_sync(&mut runtime.sync).await;
197
198 let active_relays = config.nostr.active_relays();
199
200 if config.nostr.enabled
201 && config.nostr.social_graph_crawl_depth > 0
202 && !active_relays.is_empty()
203 {
204 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
205 self.graph_store.clone(),
206 self.keys.clone(),
207 active_relays.clone(),
208 config.nostr.social_graph_crawl_depth,
209 self.spambox.clone(),
210 self.data_dir.clone(),
211 ));
212
213 let service = Arc::new(
214 crate::nostr_mirror::BackgroundNostrMirror::new(
215 crate::nostr_mirror::NostrMirrorConfig {
216 relays: active_relays.clone(),
217 publish_relays: Self::local_publish_relay(&config.server.bind_address)
218 .into_iter()
219 .collect(),
220 max_follow_distance: config.nostr.social_graph_crawl_depth,
221 require_negentropy: config.nostr.negentropy_only,
222 kinds: config.nostr.mirror_kinds.clone(),
223 history_sync_author_chunk_size: config
224 .nostr
225 .history_sync_author_chunk_size
226 .max(1),
227 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
228 ..crate::nostr_mirror::NostrMirrorConfig::default()
229 },
230 self.store.clone(),
231 self.graph_store_concrete.clone(),
232 Some(
233 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
234 .context("Failed to parse keys for background nostr mirror")?,
235 ),
236 )
237 .await
238 .context("Failed to create background nostr mirror")?,
239 );
240 let service_for_task = service.clone();
241 let join = tokio::task::spawn_blocking(move || {
242 let runtime = tokio::runtime::Builder::new_current_thread()
243 .enable_all()
244 .build()
245 .expect("build background nostr mirror runtime");
246 runtime.block_on(async {
247 if let Err(err) = service_for_task.run().await {
248 tracing::error!("Background nostr mirror error: {:#}", err);
249 }
250 });
251 });
252 runtime.mirror = Some(BackgroundMirrorRuntime { service, join });
253 }
254
255 if config.sync.enabled
256 && (config.sync.sync_own || config.sync.sync_followed)
257 && !active_relays.is_empty()
258 {
259 let sync_config = crate::sync::SyncConfig {
260 sync_own: config.sync.sync_own,
261 sync_followed: config.sync.sync_followed,
262 relays: active_relays,
263 max_concurrent: config.sync.max_concurrent,
264 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
265 blossom_timeout_ms: config.sync.blossom_timeout_ms,
266 };
267
268 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
269 .context("Failed to parse keys for sync")?;
270 let service = Arc::new(
271 crate::sync::BackgroundSync::new(
272 sync_config,
273 self.store.clone(),
274 sync_keys,
275 self.webrtc_state.clone(),
276 )
277 .await
278 .context("Failed to create background sync service")?,
279 );
280 let contacts_file = self.data_dir.join("contacts.json");
281 let service_for_task = service.clone();
282 let join = tokio::spawn(async move {
283 if let Err(err) = service_for_task.run(contacts_file).await {
284 tracing::error!("Background sync error: {}", err);
285 }
286 });
287 runtime.sync = Some(BackgroundSyncRuntime { service, join });
288 }
289
290 Ok(runtime.status())
291 }
292}
293
294#[cfg(feature = "p2p")]
295pub struct EmbeddedPeerRouterController {
296 keys: Keys,
297 state: Arc<WebRTCState>,
298 store: Arc<dyn ContentStore>,
299 peer_classifier: PeerClassifier,
300 nostr_relay: Arc<NostrRelay>,
301 runtime: Mutex<Option<PeerRouterRuntime>>,
302}
303
304#[cfg(feature = "p2p")]
305impl EmbeddedPeerRouterController {
306 pub fn new(
307 keys: Keys,
308 state: Arc<WebRTCState>,
309 store: Arc<dyn ContentStore>,
310 peer_classifier: PeerClassifier,
311 nostr_relay: Arc<NostrRelay>,
312 ) -> Self {
313 Self {
314 keys,
315 state,
316 store,
317 peer_classifier,
318 nostr_relay,
319 runtime: Mutex::new(None),
320 }
321 }
322
323 pub fn state(&self) -> Arc<WebRTCState> {
324 self.state.clone()
325 }
326
327 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
328 let mut runtime = self.runtime.lock().await;
329 if let Some(runtime_handle) = runtime.take() {
330 let _ = runtime_handle.shutdown.send(true);
331 let mut join = runtime_handle.join;
332 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
333 Ok(Ok(())) => {}
334 Ok(Err(err)) => {
335 tracing::warn!("Peer router task ended with join error: {}", err);
336 }
337 Err(_) => {
338 tracing::warn!("Timed out waiting for peer router shutdown");
339 join.abort();
340 }
341 }
342 }
343
344 self.state.reset_runtime_state().await;
345
346 if !crate::p2p_common::peer_router_enabled(config) {
347 return Ok(false);
348 }
349
350 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
351 let mut manager = PeerRouter::new_with_state_and_store_and_classifier(
352 self.keys.clone(),
353 webrtc_config,
354 self.state.clone(),
355 self.store.clone(),
356 self.peer_classifier.clone(),
357 );
358 manager.set_nostr_relay(self.nostr_relay.clone());
359 let shutdown = manager.shutdown_signal();
360 let join = tokio::spawn(async move {
361 if let Err(err) = manager.run().await {
362 tracing::error!("Peer router error: {}", err);
363 }
364 });
365 *runtime = Some(PeerRouterRuntime { shutdown, join });
366 Ok(true)
367 }
368}
369
370pub struct EmbeddedDaemonOptions {
371 pub config: Config,
372 pub data_dir: PathBuf,
373 pub bind_address: String,
374 pub relays: Option<Vec<String>>,
375 pub extra_routes: Option<Router<AppState>>,
376 pub cors: Option<CorsLayer>,
377}
378
379pub struct EmbeddedDaemonInfo {
380 pub addr: String,
381 pub port: u16,
382 pub npub: String,
383 pub store: Arc<HashtreeStore>,
384 #[allow(dead_code)]
385 pub webrtc_state: Option<Arc<WebRTCState>>,
386 #[cfg(feature = "p2p")]
387 #[allow(dead_code)]
388 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
389 #[allow(dead_code)]
390 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
391}
392
393pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
394 let _ = rustls::crypto::ring::default_provider().install_default();
395
396 let mut config = opts.config;
397 if let Some(relays) = opts.relays {
398 config.nostr.relays = relays;
399 config.nostr.enabled = !config.nostr.relays.is_empty();
400 }
401
402 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
403 let nostr_db_max_bytes = config
404 .nostr
405 .db_max_size_gb
406 .saturating_mul(1024 * 1024 * 1024);
407 let spambox_db_max_bytes = config
408 .nostr
409 .spambox_max_size_gb
410 .saturating_mul(1024 * 1024 * 1024);
411
412 let store = Arc::new(HashtreeStore::with_options(
413 &opts.data_dir,
414 config.storage.s3.as_ref(),
415 max_size_bytes,
416 )?);
417
418 let (keys, _was_generated) = ensure_keys()?;
419 let pk_bytes = pubkey_bytes(&keys);
420 let npub = keys
421 .public_key()
422 .to_bech32()
423 .context("Failed to encode npub")?;
424
425 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
426 allowed_pubkeys.insert(hex::encode(pk_bytes));
427 for npub_str in &config.nostr.allowed_npubs {
428 if let Ok(pk) = parse_npub(npub_str) {
429 allowed_pubkeys.insert(hex::encode(pk));
430 } else {
431 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
432 }
433 }
434
435 let graph_store = socialgraph::open_social_graph_store_with_storage(
436 &opts.data_dir,
437 store.store_arc(),
438 Some(nostr_db_max_bytes),
439 )
440 .context("Failed to initialize social graph store")?;
441
442 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
443 parse_npub(root_npub).unwrap_or(pk_bytes)
444 } else {
445 pk_bytes
446 };
447 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
448 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
449
450 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
451 Arc::clone(&social_graph_store),
452 config.nostr.max_write_distance,
453 allowed_pubkeys.clone(),
454 ));
455
456 let nostr_relay_config = NostrRelayConfig {
457 spambox_db_max_bytes,
458 ..Default::default()
459 };
460 let mut public_event_pubkeys = HashSet::new();
461 public_event_pubkeys.insert(hex::encode(pk_bytes));
462 let nostr_relay = Arc::new(
463 NostrRelay::new(
464 Arc::clone(&social_graph_store),
465 opts.data_dir.clone(),
466 public_event_pubkeys,
467 Some(social_graph.clone()),
468 nostr_relay_config,
469 )
470 .context("Failed to initialize Nostr relay")?,
471 );
472
473 let crawler_spambox = if spambox_db_max_bytes == 0 {
474 None
475 } else {
476 let spam_dir = opts.data_dir.join("socialgraph_spambox");
477 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
478 Ok(store) => Some(store),
479 Err(err) => {
480 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
481 None
482 }
483 }
484 };
485 let crawler_spambox_backend = crawler_spambox
486 .clone()
487 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
488
489 #[cfg(feature = "p2p")]
490 let (webrtc_state, peer_router_controller): (
491 Option<Arc<WebRTCState>>,
492 Option<Arc<EmbeddedPeerRouterController>>,
493 ) = {
494 let router_config = crate::p2p_common::default_webrtc_config(&config);
495 let peer_classifier = crate::p2p_common::build_peer_classifier(
496 opts.data_dir.clone(),
497 Arc::clone(&social_graph_store),
498 );
499 let cashu_payment_client =
500 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
501 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
502 Ok(client) => {
503 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
504 }
505 Err(err) => {
506 tracing::warn!(
507 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
508 err
509 );
510 None
511 }
512 }
513 } else {
514 None
515 };
516 let cashu_mint_metadata =
517 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
518 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
519 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
520 Ok(store) => Some(store),
521 Err(err) => {
522 tracing::warn!(
523 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
524 err
525 );
526 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
527 }
528 }
529 } else {
530 None
531 };
532
533 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
534 router_config.request_selection_strategy,
535 router_config.request_fairness_enabled,
536 router_config.request_dispatch,
537 std::time::Duration::from_millis(router_config.message_timeout_ms),
538 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
539 cashu_payment_client,
540 cashu_mint_metadata,
541 ));
542 let controller = Arc::new(EmbeddedPeerRouterController::new(
543 keys.clone(),
544 state.clone(),
545 Arc::clone(&store) as Arc<dyn ContentStore>,
546 peer_classifier,
547 nostr_relay.clone(),
548 ));
549 controller.apply_config(&config).await?;
550 (Some(state), Some(controller))
551 };
552
553 #[cfg(not(feature = "p2p"))]
554 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
555 #[cfg(not(feature = "p2p"))]
556 let peer_router_controller = None;
557
558 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
559 keys.clone(),
560 opts.data_dir.clone(),
561 Arc::clone(&store),
562 graph_store.clone(),
563 Arc::clone(&social_graph_store),
564 crawler_spambox_backend,
565 webrtc_state.clone(),
566 ));
567 background_services_controller.apply_config(&config).await?;
568
569 let upstream_blossom = config.blossom.all_read_servers();
570 let active_nostr_relays = config.nostr.active_relays();
571
572 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
573 .with_allowed_pubkeys(allowed_pubkeys.clone())
574 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
575 .with_public_writes(config.server.public_writes)
576 .with_upstream_blossom(upstream_blossom)
577 .with_nostr_relay_urls(active_nostr_relays)
578 .with_social_graph(social_graph)
579 .with_socialgraph_snapshot(
580 Arc::clone(&social_graph_store),
581 social_graph_root_bytes,
582 config.server.socialgraph_snapshot_public,
583 )
584 .with_nostr_relay(nostr_relay.clone());
585
586 if let Some(ref state) = webrtc_state {
587 server = server.with_webrtc_peers(state.clone());
588 }
589
590 if let Some(extra) = opts.extra_routes {
591 server = server.with_extra_routes(extra);
592 }
593 if let Some(cors) = opts.cors {
594 server = server.with_cors(cors);
595 }
596
597 spawn_background_eviction_task(
598 Arc::clone(&store),
599 BACKGROUND_EVICTION_INTERVAL,
600 "embedded daemon",
601 );
602
603 let listener = TcpListener::bind(&opts.bind_address).await?;
604 let local_addr = listener.local_addr()?;
605 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
606
607 tokio::spawn(async move {
608 if let Err(e) = server.run_with_listener(listener).await {
609 tracing::error!("Embedded daemon server error: {}", e);
610 }
611 });
612
613 tracing::info!(
614 "Embedded daemon started on {}, identity {}",
615 actual_addr,
616 npub
617 );
618
619 Ok(EmbeddedDaemonInfo {
620 addr: actual_addr,
621 port: local_addr.port(),
622 npub,
623 store,
624 webrtc_state,
625 #[cfg(feature = "p2p")]
626 peer_router_controller,
627 background_services_controller: Some(background_services_controller),
628 })
629}