1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::net::TcpListener;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12use tower_http::cors::CorsLayer;
13
14use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
15use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
16use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
17use crate::server::{AppState, HashtreeServer};
18use crate::socialgraph;
19use crate::storage::HashtreeStore;
20
21#[cfg(feature = "p2p")]
22use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
23#[cfg(not(feature = "p2p"))]
24use crate::WebRTCState;
25
26#[cfg(feature = "p2p")]
27struct PeerRouterRuntime {
28 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
29 join: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33 service: Arc<crate::sync::BackgroundSync>,
34 join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38 fn drop(&mut self) {
39 self.service.shutdown();
40 if let Some(join) = self.join.take() {
41 join.abort();
42 }
43 }
44}
45
46struct BackgroundMirrorRuntime {
47 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48 join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52 fn drop(&mut self) {
53 self.service.shutdown();
54 if let Some(join) = self.join.take() {
55 join.abort();
56 }
57 }
58}
59
60struct BackgroundServicesRuntime {
61 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62 mirror: Option<BackgroundMirrorRuntime>,
63 sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67 fn drop(&mut self) {
68 if let Some(handles) = self.crawler.as_ref() {
69 let _ = handles.shutdown_tx.send(true);
70 }
71 if let Some(runtime) = self.mirror.as_ref() {
72 runtime.service.shutdown();
73 }
74 if let Some(runtime) = self.sync.as_ref() {
75 runtime.service.shutdown();
76 }
77 }
78}
79
80impl BackgroundServicesRuntime {
81 fn status(&self) -> EmbeddedBackgroundServicesStatus {
82 EmbeddedBackgroundServicesStatus {
83 crawler_active: self.crawler.is_some(),
84 mirror_active: self.mirror.is_some(),
85 sync_active: self.sync.is_some(),
86 }
87 }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub struct EmbeddedBackgroundServicesStatus {
92 pub crawler_active: bool,
93 pub mirror_active: bool,
94 pub sync_active: bool,
95}
96
97pub struct EmbeddedBackgroundServicesController {
98 keys: Keys,
99 data_dir: PathBuf,
100 store: Arc<HashtreeStore>,
101 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
102 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
103 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
104 webrtc_state: Option<Arc<WebRTCState>>,
105 runtime: Mutex<BackgroundServicesRuntime>,
106}
107
108impl EmbeddedBackgroundServicesController {
109 fn local_publish_relay(bind_address: &str) -> Option<String> {
110 let addr: SocketAddr = bind_address.parse().ok()?;
111 if addr.port() == 0 {
112 return None;
113 }
114
115 let host = match addr.ip() {
116 IpAddr::V4(ip) if ip.is_unspecified() => Ipv4Addr::LOCALHOST.to_string(),
117 IpAddr::V6(ip) if ip.is_unspecified() => format!("[{}]", Ipv6Addr::LOCALHOST),
118 IpAddr::V4(ip) => ip.to_string(),
119 IpAddr::V6(ip) => format!("[{ip}]"),
120 };
121 Some(format!("ws://{host}:{}/ws", addr.port()))
122 }
123
124 pub fn new(
125 keys: Keys,
126 data_dir: PathBuf,
127 store: Arc<HashtreeStore>,
128 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
129 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
130 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
131 webrtc_state: Option<Arc<WebRTCState>>,
132 ) -> Self {
133 Self {
134 keys,
135 data_dir,
136 store,
137 graph_store_concrete,
138 graph_store,
139 spambox,
140 webrtc_state,
141 runtime: Mutex::new(BackgroundServicesRuntime {
142 crawler: None,
143 mirror: None,
144 sync: None,
145 }),
146 }
147 }
148
149 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
150 self.runtime.lock().await.status()
151 }
152
153 pub async fn shutdown(&self) {
154 let mut runtime = self.runtime.lock().await;
155 Self::shutdown_crawler(&mut runtime.crawler).await;
156 Self::shutdown_mirror(&mut runtime.mirror).await;
157 Self::shutdown_sync(&mut runtime.sync).await;
158 }
159
160 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
161 let Some(handles) = crawler.take() else {
162 return;
163 };
164
165 let _ = handles.shutdown_tx.send(true);
166
167 let mut crawl_handle = handles.crawl_handle;
168 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
169 Ok(Ok(())) => {}
170 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
171 Err(_) => {
172 tracing::warn!("Timed out waiting for crawler task shutdown");
173 crawl_handle.abort();
174 }
175 }
176
177 let mut local_list_handle = handles.local_list_handle;
178 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
179 {
180 Ok(Ok(())) => {}
181 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
182 Err(_) => {
183 tracing::warn!("Timed out waiting for local list task shutdown");
184 local_list_handle.abort();
185 }
186 }
187 }
188
189 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
190 let Some(mut runtime) = sync.take() else {
191 return;
192 };
193
194 runtime.service.shutdown();
195 if let Some(mut join) = runtime.join.take() {
196 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
197 Ok(Ok(())) => {}
198 Ok(Err(err)) => {
199 tracing::warn!("Background sync task ended with join error: {}", err)
200 }
201 Err(_) => {
202 tracing::warn!("Timed out waiting for background sync shutdown");
203 join.abort();
204 }
205 }
206 }
207 }
208
209 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
210 let Some(mut runtime) = mirror.take() else {
211 return;
212 };
213
214 runtime.service.shutdown();
215 if let Some(mut join) = runtime.join.take() {
216 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
217 Ok(Ok(())) => {}
218 Ok(Err(err)) => {
219 tracing::warn!("Background mirror task ended with join error: {}", err)
220 }
221 Err(_) => {
222 tracing::warn!("Timed out waiting for background mirror shutdown");
223 join.abort();
224 }
225 }
226 }
227 }
228
229 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
230 let mut runtime = self.runtime.lock().await;
231
232 Self::shutdown_crawler(&mut runtime.crawler).await;
233 Self::shutdown_mirror(&mut runtime.mirror).await;
234 Self::shutdown_sync(&mut runtime.sync).await;
235
236 let active_relays = config.nostr.active_relays();
237
238 if config.nostr.enabled
239 && config.nostr.social_graph_crawl_depth > 0
240 && !active_relays.is_empty()
241 {
242 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
243 self.graph_store.clone(),
244 self.keys.clone(),
245 active_relays.clone(),
246 config.nostr.social_graph_crawl_depth,
247 self.spambox.clone(),
248 self.data_dir.clone(),
249 ));
250
251 let service = Arc::new(
252 crate::nostr_mirror::BackgroundNostrMirror::new(
253 crate::nostr_mirror::NostrMirrorConfig {
254 relays: active_relays.clone(),
255 publish_relays: Self::local_publish_relay(&config.server.bind_address)
256 .into_iter()
257 .collect(),
258 max_follow_distance: config.nostr.social_graph_crawl_depth,
259 overmute_threshold: config.nostr.overmute_threshold,
260 require_negentropy: config.nostr.negentropy_only,
261 kinds: config.nostr.mirror_kinds.clone(),
262 history_sync_author_chunk_size: config
263 .nostr
264 .history_sync_author_chunk_size
265 .max(1),
266 missing_profile_backfill_batch_size: config
267 .nostr
268 .history_sync_author_chunk_size
269 .max(1),
270 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
271 ..crate::nostr_mirror::NostrMirrorConfig::default()
272 },
273 self.store.clone(),
274 self.graph_store_concrete.clone(),
275 Some(
276 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
277 .context("Failed to parse keys for background nostr mirror")?,
278 ),
279 )
280 .await
281 .context("Failed to create background nostr mirror")?,
282 );
283 let service_for_task = service.clone();
284 let join = tokio::task::spawn_blocking(move || {
285 let runtime = tokio::runtime::Builder::new_current_thread()
286 .enable_all()
287 .build()
288 .expect("build background nostr mirror runtime");
289 runtime.block_on(async {
290 if let Err(err) = service_for_task.run().await {
291 tracing::error!("Background nostr mirror error: {:#}", err);
292 }
293 });
294 });
295 runtime.mirror = Some(BackgroundMirrorRuntime {
296 service,
297 join: Some(join),
298 });
299 }
300
301 if config.sync.enabled
302 && (config.sync.sync_own || config.sync.sync_followed)
303 && !active_relays.is_empty()
304 {
305 let sync_config = crate::sync::SyncConfig {
306 sync_own: config.sync.sync_own,
307 sync_followed: config.sync.sync_followed,
308 relays: active_relays,
309 max_concurrent: config.sync.max_concurrent,
310 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
311 blossom_timeout_ms: config.sync.blossom_timeout_ms,
312 };
313
314 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
315 .context("Failed to parse keys for sync")?;
316 let service = Arc::new(
317 crate::sync::BackgroundSync::new(
318 sync_config,
319 self.store.clone(),
320 sync_keys,
321 self.webrtc_state.clone(),
322 )
323 .await
324 .context("Failed to create background sync service")?,
325 );
326 let contacts_file = self.data_dir.join("contacts.json");
327 let service_for_task = service.clone();
328 let join = tokio::spawn(async move {
329 if let Err(err) = service_for_task.run(contacts_file).await {
330 tracing::error!("Background sync error: {}", err);
331 }
332 });
333 runtime.sync = Some(BackgroundSyncRuntime {
334 service,
335 join: Some(join),
336 });
337 }
338
339 Ok(runtime.status())
340 }
341}
342
343#[cfg(feature = "p2p")]
344pub struct EmbeddedPeerRouterController {
345 keys: Keys,
346 state: Arc<WebRTCState>,
347 store: Arc<dyn ContentStore>,
348 peer_classifier: PeerClassifier,
349 nostr_relay: Arc<NostrRelay>,
350 runtime: Mutex<Option<PeerRouterRuntime>>,
351}
352
353#[cfg(feature = "p2p")]
354impl EmbeddedPeerRouterController {
355 pub fn new(
356 keys: Keys,
357 state: Arc<WebRTCState>,
358 store: Arc<dyn ContentStore>,
359 peer_classifier: PeerClassifier,
360 nostr_relay: Arc<NostrRelay>,
361 ) -> Self {
362 Self {
363 keys,
364 state,
365 store,
366 peer_classifier,
367 nostr_relay,
368 runtime: Mutex::new(None),
369 }
370 }
371
372 pub fn state(&self) -> Arc<WebRTCState> {
373 self.state.clone()
374 }
375
376 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
377 let mut runtime = self.runtime.lock().await;
378 if let Some(runtime_handle) = runtime.take() {
379 let _ = runtime_handle.shutdown.send(true);
380 let mut join = runtime_handle.join;
381 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
382 Ok(Ok(())) => {}
383 Ok(Err(err)) => {
384 tracing::warn!("Peer router task ended with join error: {}", err);
385 }
386 Err(_) => {
387 tracing::warn!("Timed out waiting for peer router shutdown");
388 join.abort();
389 }
390 }
391 }
392
393 self.state.reset_runtime_state().await;
394
395 if !crate::p2p_common::peer_router_enabled(config) {
396 return Ok(false);
397 }
398
399 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
400 let mut manager = WebRTCManager::new_with_state_and_store_and_classifier(
401 self.keys.clone(),
402 webrtc_config,
403 self.state.clone(),
404 self.store.clone(),
405 self.peer_classifier.clone(),
406 );
407 manager.set_nostr_relay(self.nostr_relay.clone());
408 let shutdown = manager.shutdown_signal();
409 let join = tokio::spawn(async move {
410 if let Err(err) = manager.run().await {
411 tracing::error!("Peer router error: {}", err);
412 }
413 });
414 *runtime = Some(PeerRouterRuntime { shutdown, join });
415 Ok(true)
416 }
417}
418
419pub struct EmbeddedDaemonOptions {
420 pub config: Config,
421 pub data_dir: PathBuf,
422 pub bind_address: String,
423 pub relays: Option<Vec<String>>,
424 pub extra_routes: Option<Router<AppState>>,
425 pub cors: Option<CorsLayer>,
426}
427
428pub struct EmbeddedDaemonInfo {
429 pub addr: String,
430 pub port: u16,
431 pub npub: String,
432 pub store: Arc<HashtreeStore>,
433 #[allow(dead_code)]
434 pub webrtc_state: Option<Arc<WebRTCState>>,
435 #[cfg(feature = "p2p")]
436 #[allow(dead_code)]
437 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
438 #[allow(dead_code)]
439 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
440}
441
442pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
443 let _ = rustls::crypto::ring::default_provider().install_default();
444
445 let mut config = opts.config;
446 if let Some(relays) = opts.relays {
447 config.nostr.relays = relays;
448 config.nostr.enabled = !config.nostr.relays.is_empty();
449 }
450
451 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
452 let nostr_db_max_bytes = config
453 .nostr
454 .db_max_size_gb
455 .saturating_mul(1024 * 1024 * 1024);
456 let spambox_db_max_bytes = config
457 .nostr
458 .spambox_max_size_gb
459 .saturating_mul(1024 * 1024 * 1024);
460
461 let store = Arc::new(HashtreeStore::with_options(
462 &opts.data_dir,
463 config.storage.s3.as_ref(),
464 max_size_bytes,
465 )?);
466
467 let (keys, _was_generated) = ensure_keys()?;
468 let pk_bytes = pubkey_bytes(&keys);
469 let npub = keys
470 .public_key()
471 .to_bech32()
472 .context("Failed to encode npub")?;
473
474 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
475 allowed_pubkeys.insert(hex::encode(pk_bytes));
476 for npub_str in &config.nostr.allowed_npubs {
477 if let Ok(pk) = parse_npub(npub_str) {
478 allowed_pubkeys.insert(hex::encode(pk));
479 } else {
480 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
481 }
482 }
483
484 let graph_store = socialgraph::open_social_graph_store_with_storage(
485 &opts.data_dir,
486 store.store_arc(),
487 Some(nostr_db_max_bytes),
488 )
489 .context("Failed to initialize social graph store")?;
490 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
491
492 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
493 parse_npub(root_npub).unwrap_or(pk_bytes)
494 } else {
495 pk_bytes
496 };
497 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
498 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
499
500 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
501 Arc::clone(&social_graph_store),
502 config.nostr.max_write_distance,
503 allowed_pubkeys.clone(),
504 ));
505
506 let nostr_relay_config = NostrRelayConfig {
507 spambox_db_max_bytes,
508 ..Default::default()
509 };
510 let mut public_event_pubkeys = HashSet::new();
511 public_event_pubkeys.insert(hex::encode(pk_bytes));
512 let nostr_relay = Arc::new(
513 NostrRelay::new(
514 Arc::clone(&social_graph_store),
515 opts.data_dir.clone(),
516 public_event_pubkeys,
517 Some(social_graph.clone()),
518 nostr_relay_config,
519 )
520 .context("Failed to initialize Nostr relay")?,
521 );
522
523 let crawler_spambox = if spambox_db_max_bytes == 0 {
524 None
525 } else {
526 let spam_dir = opts.data_dir.join("socialgraph_spambox");
527 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
528 Ok(store) => Some(store),
529 Err(err) => {
530 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
531 None
532 }
533 }
534 };
535 let crawler_spambox_backend = crawler_spambox
536 .clone()
537 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
538
539 #[cfg(feature = "p2p")]
540 let (webrtc_state, peer_router_controller): (
541 Option<Arc<WebRTCState>>,
542 Option<Arc<EmbeddedPeerRouterController>>,
543 ) = {
544 let router_config = crate::p2p_common::default_webrtc_config(&config);
545 let peer_classifier = crate::p2p_common::build_peer_classifier(
546 opts.data_dir.clone(),
547 Arc::clone(&social_graph_store),
548 );
549 let cashu_payment_client =
550 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
551 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
552 Ok(client) => {
553 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
554 }
555 Err(err) => {
556 tracing::warn!(
557 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
558 err
559 );
560 None
561 }
562 }
563 } else {
564 None
565 };
566 let cashu_mint_metadata =
567 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
568 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
569 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
570 Ok(store) => Some(store),
571 Err(err) => {
572 tracing::warn!(
573 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
574 err
575 );
576 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
577 }
578 }
579 } else {
580 None
581 };
582
583 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
584 router_config.request_selection_strategy,
585 router_config.request_fairness_enabled,
586 router_config.request_dispatch,
587 std::time::Duration::from_millis(router_config.message_timeout_ms),
588 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
589 cashu_payment_client,
590 cashu_mint_metadata,
591 ));
592 let controller = Arc::new(EmbeddedPeerRouterController::new(
593 keys.clone(),
594 state.clone(),
595 Arc::clone(&store) as Arc<dyn ContentStore>,
596 peer_classifier,
597 nostr_relay.clone(),
598 ));
599 controller.apply_config(&config).await?;
600 (Some(state), Some(controller))
601 };
602
603 #[cfg(not(feature = "p2p"))]
604 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
605 #[cfg(not(feature = "p2p"))]
606 let peer_router_controller = None;
607
608 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
609 keys.clone(),
610 opts.data_dir.clone(),
611 Arc::clone(&store),
612 graph_store.clone(),
613 Arc::clone(&social_graph_store),
614 crawler_spambox_backend,
615 webrtc_state.clone(),
616 ));
617 background_services_controller.apply_config(&config).await?;
618
619 let upstream_blossom = config.blossom.all_read_servers();
620 let active_nostr_relays = config.nostr.active_relays();
621
622 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
623 .with_allowed_pubkeys(allowed_pubkeys.clone())
624 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
625 .with_public_writes(config.server.public_writes)
626 .with_upstream_blossom(upstream_blossom)
627 .with_nostr_relay_urls(active_nostr_relays)
628 .with_social_graph(social_graph)
629 .with_socialgraph_snapshot(
630 Arc::clone(&social_graph_store),
631 social_graph_root_bytes,
632 config.server.socialgraph_snapshot_public,
633 )
634 .with_nostr_relay(nostr_relay.clone());
635
636 if let Some(ref state) = webrtc_state {
637 server = server.with_webrtc_peers(state.clone());
638 }
639
640 if let Some(extra) = opts.extra_routes {
641 server = server.with_extra_routes(extra);
642 }
643 if let Some(cors) = opts.cors {
644 server = server.with_cors(cors);
645 }
646
647 spawn_background_eviction_task(
648 Arc::clone(&store),
649 BACKGROUND_EVICTION_INTERVAL,
650 "embedded daemon",
651 );
652
653 let listener = TcpListener::bind(&opts.bind_address).await?;
654 let local_addr = listener.local_addr()?;
655 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
656
657 tokio::spawn(async move {
658 if let Err(e) = server.run_with_listener(listener).await {
659 tracing::error!("Embedded daemon server error: {}", e);
660 }
661 });
662
663 tracing::info!(
664 "Embedded daemon started on {}, identity {}",
665 actual_addr,
666 npub
667 );
668
669 Ok(EmbeddedDaemonInfo {
670 addr: actual_addr,
671 port: local_addr.port(),
672 npub,
673 store,
674 webrtc_state,
675 #[cfg(feature = "p2p")]
676 peer_router_controller,
677 background_services_controller: Some(background_services_controller),
678 })
679}