1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::net::TcpListener;
9use tokio::sync::{Mutex, Notify};
10use tokio::task::JoinHandle;
11use tower_http::cors::CorsLayer;
12
13use crate::config::{ensure_keys, ensure_keys_in, parse_npub, pubkey_bytes, Config};
14use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
15use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
16use crate::server::{AppState, HashtreeServer};
17use crate::socialgraph;
18use crate::storage::HashtreeStore;
19
20#[cfg(feature = "p2p")]
21use crate::webrtc::{ContentStore, PeerClassifier, WebRTCManager, WebRTCState};
22#[cfg(not(feature = "p2p"))]
23use crate::WebRTCState;
24
25#[cfg(feature = "p2p")]
26struct PeerRouterRuntime {
27 shutdown: Arc<tokio::sync::watch::Sender<bool>>,
28 join: JoinHandle<()>,
29}
30
31struct BackgroundSyncRuntime {
32 service: Arc<crate::sync::BackgroundSync>,
33 join: Option<JoinHandle<()>>,
34}
35
36impl Drop for BackgroundSyncRuntime {
37 fn drop(&mut self) {
38 self.service.shutdown();
39 if let Some(join) = self.join.take() {
40 join.abort();
41 }
42 }
43}
44
45struct BackgroundMirrorRuntime {
46 service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
47 join: Option<JoinHandle<()>>,
48}
49
50impl Drop for BackgroundMirrorRuntime {
51 fn drop(&mut self) {
52 self.service.shutdown();
53 if let Some(join) = self.join.take() {
54 join.abort();
55 }
56 }
57}
58
59struct BackgroundServicesRuntime {
60 crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
61 mirror: Option<BackgroundMirrorRuntime>,
62 sync: Option<BackgroundSyncRuntime>,
63}
64
65impl Drop for BackgroundServicesRuntime {
66 fn drop(&mut self) {
67 if let Some(handles) = self.crawler.as_ref() {
68 let _ = handles.shutdown_tx.send(true);
69 }
70 if let Some(runtime) = self.mirror.as_ref() {
71 runtime.service.shutdown();
72 }
73 if let Some(runtime) = self.sync.as_ref() {
74 runtime.service.shutdown();
75 }
76 }
77}
78
79impl BackgroundServicesRuntime {
80 fn status(&self) -> EmbeddedBackgroundServicesStatus {
81 EmbeddedBackgroundServicesStatus {
82 crawler_active: self.crawler.is_some(),
83 mirror_active: self.mirror.is_some(),
84 sync_active: self.sync.is_some(),
85 }
86 }
87}
88
89struct EmbeddedServerRuntime {
90 shutdown: Arc<Notify>,
91 join: Option<JoinHandle<()>>,
92}
93
94pub struct EmbeddedServerController {
95 runtime: Mutex<Option<EmbeddedServerRuntime>>,
96}
97
98impl EmbeddedServerController {
99 pub fn new(shutdown: Arc<Notify>, join: JoinHandle<()>) -> Self {
100 Self {
101 runtime: Mutex::new(Some(EmbeddedServerRuntime {
102 shutdown,
103 join: Some(join),
104 })),
105 }
106 }
107
108 pub async fn shutdown(&self) {
109 let mut runtime = self.runtime.lock().await;
110 let Some(mut runtime) = runtime.take() else {
111 return;
112 };
113
114 runtime.shutdown.notify_waiters();
115 if let Some(mut join) = runtime.join.take() {
116 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
117 Ok(Ok(())) => {}
118 Ok(Err(err)) => {
119 tracing::warn!("Embedded server task ended with join error: {}", err)
120 }
121 Err(_) => {
122 tracing::warn!("Timed out waiting for embedded server shutdown");
123 join.abort();
124 }
125 }
126 }
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub struct EmbeddedBackgroundServicesStatus {
132 pub crawler_active: bool,
133 pub mirror_active: bool,
134 pub sync_active: bool,
135}
136
137pub struct EmbeddedBackgroundServicesController {
138 keys: Keys,
139 data_dir: PathBuf,
140 store: Arc<HashtreeStore>,
141 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
142 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
143 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
144 webrtc_state: Option<Arc<WebRTCState>>,
145 runtime: Mutex<BackgroundServicesRuntime>,
146}
147
148impl EmbeddedBackgroundServicesController {
149 const MIRROR_PUBLISH_RELAY_PRIORITY: &[&str] = &[
150 "wss://temp.iris.to",
151 "wss://vault.iris.to",
152 "wss://relay.damus.io",
153 "wss://relay.primal.net",
154 ];
155 const MIRROR_PUBLISH_RELAY_BLOCKLIST: &[&str] =
156 &["wss://graph-relay.iris.to", "wss://upload.iris.to/nostr"];
157
158 fn mirror_publish_relays(active_relays: &[String], _bind_address: &str) -> Vec<String> {
159 let mut seen = HashSet::new();
160 let active_relays = active_relays
161 .iter()
162 .filter(|relay| seen.insert((*relay).clone()))
163 .cloned()
164 .collect::<Vec<_>>();
165 if active_relays.is_empty() {
166 return Vec::new();
167 }
168 let active_relay_set = active_relays.iter().cloned().collect::<HashSet<_>>();
169
170 let preferred = Self::MIRROR_PUBLISH_RELAY_PRIORITY
171 .iter()
172 .filter(|relay| active_relay_set.contains(**relay))
173 .map(|relay| (*relay).to_string())
174 .collect::<Vec<_>>();
175 if !preferred.is_empty() {
176 return preferred;
177 }
178
179 let filtered = active_relays
180 .iter()
181 .filter(|relay| !Self::MIRROR_PUBLISH_RELAY_BLOCKLIST.contains(&relay.as_str()))
182 .cloned()
183 .collect::<Vec<_>>();
184 if !filtered.is_empty() {
185 return filtered;
186 }
187
188 active_relays
189 }
190
191 pub fn new(
192 keys: Keys,
193 data_dir: PathBuf,
194 store: Arc<HashtreeStore>,
195 graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
196 graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
197 spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
198 webrtc_state: Option<Arc<WebRTCState>>,
199 ) -> Self {
200 Self {
201 keys,
202 data_dir,
203 store,
204 graph_store_concrete,
205 graph_store,
206 spambox,
207 webrtc_state,
208 runtime: Mutex::new(BackgroundServicesRuntime {
209 crawler: None,
210 mirror: None,
211 sync: None,
212 }),
213 }
214 }
215
216 pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
217 self.runtime.lock().await.status()
218 }
219
220 pub async fn shutdown(&self) {
221 let mut runtime = self.runtime.lock().await;
222 Self::shutdown_crawler(&mut runtime.crawler).await;
223 Self::shutdown_mirror(&mut runtime.mirror).await;
224 Self::shutdown_sync(&mut runtime.sync).await;
225 }
226
227 async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
228 let Some(handles) = crawler.take() else {
229 return;
230 };
231
232 let _ = handles.shutdown_tx.send(true);
233
234 let mut crawl_handle = handles.crawl_handle;
235 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
236 Ok(Ok(())) => {}
237 Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
238 Err(_) => {
239 tracing::warn!("Timed out waiting for crawler task shutdown");
240 crawl_handle.abort();
241 }
242 }
243
244 let mut local_list_handle = handles.local_list_handle;
245 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
246 {
247 Ok(Ok(())) => {}
248 Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
249 Err(_) => {
250 tracing::warn!("Timed out waiting for local list task shutdown");
251 local_list_handle.abort();
252 }
253 }
254 }
255
256 async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
257 let Some(mut runtime) = sync.take() else {
258 return;
259 };
260
261 runtime.service.shutdown();
262 if let Some(mut join) = runtime.join.take() {
263 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
264 Ok(Ok(())) => {}
265 Ok(Err(err)) => {
266 tracing::warn!("Background sync task ended with join error: {}", err)
267 }
268 Err(_) => {
269 tracing::warn!("Timed out waiting for background sync shutdown");
270 join.abort();
271 }
272 }
273 }
274 }
275
276 async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
277 let Some(mut runtime) = mirror.take() else {
278 return;
279 };
280
281 runtime.service.shutdown();
282 if let Some(mut join) = runtime.join.take() {
283 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
284 Ok(Ok(())) => {}
285 Ok(Err(err)) => {
286 tracing::warn!("Background mirror task ended with join error: {}", err)
287 }
288 Err(_) => {
289 tracing::warn!("Timed out waiting for background mirror shutdown");
290 join.abort();
291 }
292 }
293 }
294 }
295
296 pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
297 let mut runtime = self.runtime.lock().await;
298
299 Self::shutdown_crawler(&mut runtime.crawler).await;
300 Self::shutdown_mirror(&mut runtime.mirror).await;
301 Self::shutdown_sync(&mut runtime.sync).await;
302
303 if !config.server.mode.background_services_enabled() {
304 return Ok(runtime.status());
305 }
306
307 let active_relays = config.nostr.active_relays();
308
309 if config.nostr.enabled
310 && config.nostr.social_graph_crawl_depth > 0
311 && !active_relays.is_empty()
312 {
313 runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
314 self.graph_store.clone(),
315 self.keys.clone(),
316 active_relays.clone(),
317 config.nostr.social_graph_crawl_depth,
318 self.spambox.clone(),
319 self.data_dir.clone(),
320 ));
321
322 let service = Arc::new(
323 crate::nostr_mirror::BackgroundNostrMirror::new(
324 crate::nostr_mirror::NostrMirrorConfig {
325 relays: active_relays.clone(),
326 publish_relays: Self::mirror_publish_relays(
327 &active_relays,
328 &config.server.bind_address,
329 ),
330 blossom_write_servers: config.blossom.all_write_servers(),
331 max_follow_distance: config.nostr.social_graph_crawl_depth,
332 overmute_threshold: config.nostr.overmute_threshold,
333 require_negentropy: config.nostr.negentropy_only,
334 kinds: config.nostr.mirror_kinds.clone(),
335 history_sync_author_chunk_size: config
336 .nostr
337 .history_sync_author_chunk_size
338 .max(1),
339 history_sync_per_author_event_limit: config
340 .nostr
341 .history_sync_per_author_event_limit
342 .max(1),
343 missing_profile_backfill_batch_size: config
344 .nostr
345 .history_sync_author_chunk_size
346 .max(1),
347 history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
348 ..crate::nostr_mirror::NostrMirrorConfig::default()
349 },
350 self.store.clone(),
351 self.graph_store_concrete.clone(),
352 Some(
353 nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
354 .context("Failed to parse keys for background nostr mirror")?,
355 ),
356 )
357 .await
358 .context("Failed to create background nostr mirror")?,
359 );
360 let service_for_task = service.clone();
361 let join = tokio::task::spawn_blocking(move || {
362 let runtime = tokio::runtime::Builder::new_current_thread()
363 .enable_all()
364 .build()
365 .expect("build background nostr mirror runtime");
366 runtime.block_on(async {
367 if let Err(err) = service_for_task.run().await {
368 tracing::error!("Background nostr mirror error: {:#}", err);
369 }
370 });
371 });
372 runtime.mirror = Some(BackgroundMirrorRuntime {
373 service,
374 join: Some(join),
375 });
376 }
377
378 let has_pinned_refs = self
379 .store
380 .list_pinned_refs()
381 .map(|refs| !refs.is_empty())
382 .unwrap_or(false);
383 let has_tracked_authors = self
384 .store
385 .list_tracked_authors()
386 .map(|authors| !authors.is_empty())
387 .unwrap_or(false);
388
389 if config.sync.enabled
390 && (config.sync.sync_own
391 || config.sync.sync_followed
392 || has_pinned_refs
393 || has_tracked_authors)
394 && !active_relays.is_empty()
395 {
396 let sync_config = crate::sync::SyncConfig {
397 sync_own: config.sync.sync_own,
398 sync_followed: config.sync.sync_followed,
399 relays: active_relays,
400 max_concurrent: config.sync.max_concurrent,
401 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
402 blossom_timeout_ms: config.sync.blossom_timeout_ms,
403 };
404
405 let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
406 .context("Failed to parse keys for sync")?;
407 let service = Arc::new(
408 crate::sync::BackgroundSync::new(
409 sync_config,
410 self.store.clone(),
411 sync_keys,
412 self.webrtc_state.clone(),
413 )
414 .await
415 .context("Failed to create background sync service")?,
416 );
417 let contacts_file = self.data_dir.join("contacts.json");
418 let service_for_task = service.clone();
419 let join = tokio::spawn(async move {
420 if let Err(err) = service_for_task.run(contacts_file).await {
421 tracing::error!("Background sync error: {}", err);
422 }
423 });
424 runtime.sync = Some(BackgroundSyncRuntime {
425 service,
426 join: Some(join),
427 });
428 }
429
430 Ok(runtime.status())
431 }
432}
433
434#[cfg(feature = "p2p")]
435pub struct EmbeddedPeerRouterController {
436 keys: Keys,
437 state: Arc<WebRTCState>,
438 store: Arc<dyn ContentStore>,
439 peer_classifier: PeerClassifier,
440 nostr_relay: Arc<NostrRelay>,
441 runtime: Mutex<Option<PeerRouterRuntime>>,
442}
443
444#[cfg(feature = "p2p")]
445impl EmbeddedPeerRouterController {
446 pub fn new(
447 keys: Keys,
448 state: Arc<WebRTCState>,
449 store: Arc<dyn ContentStore>,
450 peer_classifier: PeerClassifier,
451 nostr_relay: Arc<NostrRelay>,
452 ) -> Self {
453 Self {
454 keys,
455 state,
456 store,
457 peer_classifier,
458 nostr_relay,
459 runtime: Mutex::new(None),
460 }
461 }
462
463 pub fn state(&self) -> Arc<WebRTCState> {
464 self.state.clone()
465 }
466
467 pub async fn apply_config(&self, config: &Config) -> Result<bool> {
468 let mut runtime = self.runtime.lock().await;
469 if let Some(runtime_handle) = runtime.take() {
470 let _ = runtime_handle.shutdown.send(true);
471 let mut join = runtime_handle.join;
472 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
473 Ok(Ok(())) => {}
474 Ok(Err(err)) => {
475 tracing::warn!("Peer router task ended with join error: {}", err);
476 }
477 Err(_) => {
478 tracing::warn!("Timed out waiting for peer router shutdown");
479 join.abort();
480 }
481 }
482 }
483
484 self.state.reset_runtime_state().await;
485
486 if !crate::p2p_common::peer_router_enabled(config) {
487 return Ok(false);
488 }
489
490 let webrtc_config = crate::p2p_common::default_webrtc_config(config);
491 let mut manager = if config.server.mode.hash_get_enabled() {
492 WebRTCManager::new_with_state_and_store_and_classifier(
493 self.keys.clone(),
494 webrtc_config,
495 self.state.clone(),
496 self.store.clone(),
497 self.peer_classifier.clone(),
498 )
499 } else {
500 let mut manager =
501 WebRTCManager::new_with_state(self.keys.clone(), webrtc_config, self.state.clone());
502 manager.set_peer_classifier(self.peer_classifier.clone());
503 manager
504 };
505 manager
506 .set_nostr_relay(self.nostr_relay.clone() as hashtree_network::SharedMeshRelayClient);
507 let shutdown = manager.shutdown_signal();
508 let join = tokio::spawn(async move {
509 if let Err(err) = manager.run().await {
510 tracing::error!("Peer router error: {}", err);
511 }
512 });
513 *runtime = Some(PeerRouterRuntime { shutdown, join });
514 Ok(true)
515 }
516
517 pub async fn shutdown(&self) {
518 let mut runtime = self.runtime.lock().await;
519 let Some(runtime_handle) = runtime.take() else {
520 return;
521 };
522
523 let _ = runtime_handle.shutdown.send(true);
524 let mut join = runtime_handle.join;
525 match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
526 Ok(Ok(())) => {}
527 Ok(Err(err)) => tracing::warn!("Peer router task ended with join error: {}", err),
528 Err(_) => {
529 tracing::warn!("Timed out waiting for peer router shutdown");
530 join.abort();
531 }
532 }
533
534 self.state.reset_runtime_state().await;
535 }
536}
537
538pub struct EmbeddedDaemonController {
539 server_controller: Arc<EmbeddedServerController>,
540 #[cfg(feature = "p2p")]
541 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
542 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
543}
544
545impl EmbeddedDaemonController {
546 #[cfg(feature = "p2p")]
547 pub fn new(
548 server_controller: Arc<EmbeddedServerController>,
549 peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
550 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
551 ) -> Self {
552 Self {
553 server_controller,
554 #[cfg(feature = "p2p")]
555 peer_router_controller,
556 background_services_controller,
557 }
558 }
559
560 #[cfg(not(feature = "p2p"))]
561 pub fn new(
562 server_controller: Arc<EmbeddedServerController>,
563 background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
564 ) -> Self {
565 Self {
566 server_controller,
567 background_services_controller,
568 }
569 }
570
571 pub async fn shutdown(&self) {
572 self.server_controller.shutdown().await;
573 if let Some(controller) = self.background_services_controller.as_ref() {
574 controller.shutdown().await;
575 }
576 #[cfg(feature = "p2p")]
577 if let Some(controller) = self.peer_router_controller.as_ref() {
578 controller.shutdown().await;
579 }
580 }
581}
582
583pub struct EmbeddedDaemonOptions {
584 pub config: Config,
585 pub data_dir: PathBuf,
586 pub config_dir: Option<PathBuf>,
587 pub bind_address: String,
588 pub relays: Option<Vec<String>>,
589 pub extra_routes: Option<Router<AppState>>,
590 pub cors: Option<CorsLayer>,
591}
592
593pub struct EmbeddedDaemonInfo {
594 pub addr: String,
595 pub port: u16,
596 pub npub: String,
597 pub store: Arc<HashtreeStore>,
598 pub daemon_controller: Arc<EmbeddedDaemonController>,
599 #[allow(dead_code)]
600 pub webrtc_state: Option<Arc<WebRTCState>>,
601 #[cfg(feature = "p2p")]
602 #[allow(dead_code)]
603 pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
604 #[allow(dead_code)]
605 pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
606}
607
608pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
609 let _ = rustls::crypto::ring::default_provider().install_default();
610
611 let mut config = opts.config;
612 if let Some(relays) = opts.relays {
613 config.nostr.relays = relays;
614 config.nostr.enabled = !config.nostr.relays.is_empty();
615 }
616
617 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
618 let nostr_db_max_bytes = config
619 .nostr
620 .db_max_size_gb
621 .saturating_mul(1024 * 1024 * 1024);
622 let spambox_db_max_bytes = config
623 .nostr
624 .spambox_max_size_gb
625 .saturating_mul(1024 * 1024 * 1024);
626
627 let store = Arc::new(HashtreeStore::with_options(
628 &opts.data_dir,
629 config.storage.s3.as_ref(),
630 max_size_bytes,
631 )?);
632
633 let (keys, _was_generated) = if let Some(config_dir) = opts.config_dir.as_ref() {
634 ensure_keys_in(config_dir, Some(&opts.data_dir), Some(&config))?
635 } else {
636 ensure_keys()?
637 };
638 let pk_bytes = pubkey_bytes(&keys);
639 let npub = keys
640 .public_key()
641 .to_bech32()
642 .context("Failed to encode npub")?;
643
644 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
645 allowed_pubkeys.insert(hex::encode(pk_bytes));
646 for npub_str in &config.nostr.allowed_npubs {
647 if let Ok(pk) = parse_npub(npub_str) {
648 allowed_pubkeys.insert(hex::encode(pk));
649 } else {
650 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
651 }
652 }
653
654 let graph_store = socialgraph::open_social_graph_store_with_storage(
655 &opts.data_dir,
656 store.store_arc(),
657 Some(nostr_db_max_bytes),
658 )
659 .context("Failed to initialize social graph store")?;
660 graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
661
662 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
663 parse_npub(root_npub).unwrap_or(pk_bytes)
664 } else {
665 pk_bytes
666 };
667 socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
668 socialgraph::sync_local_list_files_force(graph_store.as_ref(), &opts.data_dir, &keys)
669 .context("Failed to sync local social graph lists")?;
670 let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
671
672 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
673 Arc::clone(&social_graph_store),
674 config.nostr.max_write_distance,
675 allowed_pubkeys.clone(),
676 ));
677
678 let nostr_relay_config = NostrRelayConfig {
679 spambox_db_max_bytes,
680 ..Default::default()
681 };
682 let mut public_event_pubkeys = HashSet::new();
683 public_event_pubkeys.insert(hex::encode(pk_bytes));
684 let nostr_relay = Arc::new(
685 NostrRelay::new(
686 Arc::clone(&social_graph_store),
687 opts.data_dir.clone(),
688 public_event_pubkeys,
689 Some(social_graph.clone()),
690 nostr_relay_config,
691 )
692 .context("Failed to initialize Nostr relay")?,
693 );
694
695 let crawler_spambox = if spambox_db_max_bytes == 0 {
696 None
697 } else {
698 let spam_dir = opts.data_dir.join("socialgraph_spambox");
699 match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
700 Ok(store) => Some(store),
701 Err(err) => {
702 tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
703 None
704 }
705 }
706 };
707 let crawler_spambox_backend = crawler_spambox
708 .clone()
709 .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
710
711 #[cfg(feature = "p2p")]
712 let (webrtc_state, peer_router_controller): (
713 Option<Arc<WebRTCState>>,
714 Option<Arc<EmbeddedPeerRouterController>>,
715 ) = {
716 let router_config = crate::p2p_common::default_webrtc_config(&config);
717 let peer_classifier = crate::p2p_common::build_peer_classifier(
718 opts.data_dir.clone(),
719 Arc::clone(&social_graph_store),
720 );
721 let cashu_payment_client =
722 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
723 match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
724 Ok(client) => {
725 Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
726 }
727 Err(err) => {
728 tracing::warn!(
729 "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
730 err
731 );
732 None
733 }
734 }
735 } else {
736 None
737 };
738 let cashu_mint_metadata =
739 if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
740 let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
741 match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
742 Ok(store) => Some(store),
743 Err(err) => {
744 tracing::warn!(
745 "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
746 err
747 );
748 Some(crate::webrtc::CashuMintMetadataStore::in_memory())
749 }
750 }
751 } else {
752 None
753 };
754
755 let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
756 router_config.request_selection_strategy,
757 router_config.request_fairness_enabled,
758 router_config.request_dispatch,
759 std::time::Duration::from_millis(router_config.message_timeout_ms),
760 crate::webrtc::CashuRoutingConfig::from(&config.cashu),
761 cashu_payment_client,
762 cashu_mint_metadata,
763 ));
764 let controller = Arc::new(EmbeddedPeerRouterController::new(
765 keys.clone(),
766 state.clone(),
767 Arc::clone(&store) as Arc<dyn ContentStore>,
768 peer_classifier,
769 nostr_relay.clone(),
770 ));
771 controller.apply_config(&config).await?;
772 (Some(state), Some(controller))
773 };
774
775 #[cfg(not(feature = "p2p"))]
776 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
777
778 let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
779 keys.clone(),
780 opts.data_dir.clone(),
781 Arc::clone(&store),
782 graph_store.clone(),
783 Arc::clone(&social_graph_store),
784 crawler_spambox_backend,
785 webrtc_state.clone(),
786 ));
787 background_services_controller.apply_config(&config).await?;
788
789 let upstream_blossom = config.blossom.all_read_servers();
790 let active_nostr_relays = config.nostr.active_relays();
791
792 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
793 .with_server_mode(config.server.mode)
794 .with_hash_get_enabled(config.server.mode.hash_get_enabled())
795 .with_allowed_pubkeys(allowed_pubkeys.clone())
796 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
797 .with_public_writes(config.server.public_writes)
798 .with_upstream_blossom(upstream_blossom)
799 .with_nostr_relay_urls(active_nostr_relays)
800 .with_social_graph(social_graph)
801 .with_socialgraph_snapshot(
802 Arc::clone(&social_graph_store),
803 social_graph_root_bytes,
804 config.server.socialgraph_snapshot_public,
805 )
806 .with_nostr_relay(nostr_relay.clone());
807
808 if crate::p2p_common::peer_router_enabled(&config) {
809 if let Some(ref state) = webrtc_state {
810 server = server.with_webrtc_peers(state.clone());
811 }
812 }
813
814 if let Some(extra) = opts.extra_routes {
815 server = server.with_extra_routes(extra);
816 }
817 if let Some(cors) = opts.cors {
818 server = server.with_cors(cors);
819 }
820
821 spawn_background_eviction_task(
822 Arc::clone(&store),
823 BACKGROUND_EVICTION_INTERVAL,
824 "embedded daemon",
825 );
826
827 let listener = TcpListener::bind(&opts.bind_address).await?;
828 let local_addr = listener.local_addr()?;
829 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
830
831 let server_shutdown = Arc::new(Notify::new());
832 let server_shutdown_for_task = Arc::clone(&server_shutdown);
833 let server_join = tokio::spawn(async move {
834 if let Err(e) = server
835 .run_with_listener_until(listener, async move {
836 server_shutdown_for_task.notified().await;
837 })
838 .await
839 {
840 tracing::error!("Embedded daemon server error: {}", e);
841 }
842 });
843 let server_controller = Arc::new(EmbeddedServerController::new(server_shutdown, server_join));
844 #[cfg(feature = "p2p")]
845 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
846 server_controller,
847 peer_router_controller.clone(),
848 Some(background_services_controller.clone()),
849 ));
850 #[cfg(not(feature = "p2p"))]
851 let daemon_controller = Arc::new(EmbeddedDaemonController::new(
852 server_controller,
853 Some(background_services_controller.clone()),
854 ));
855
856 tracing::info!(
857 "Embedded daemon started on {}, identity {}",
858 actual_addr,
859 npub
860 );
861
862 Ok(EmbeddedDaemonInfo {
863 addr: actual_addr,
864 port: local_addr.port(),
865 npub,
866 store,
867 daemon_controller,
868 webrtc_state,
869 #[cfg(feature = "p2p")]
870 peer_router_controller,
871 background_services_controller: Some(background_services_controller),
872 })
873}
874
875#[cfg(test)]
876mod tests {
877 use super::EmbeddedBackgroundServicesController;
878
879 #[test]
880 fn mirror_publish_relays_prefers_known_root_publish_relays() {
881 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
882 &[
883 "wss://graph-relay.iris.to".to_string(),
884 "wss://relay.example".to_string(),
885 "wss://relay.damus.io".to_string(),
886 "wss://temp.iris.to".to_string(),
887 "wss://vault.iris.to".to_string(),
888 "wss://upload.iris.to/nostr".to_string(),
889 ],
890 "0.0.0.0:8080",
891 );
892 assert_eq!(
893 relays,
894 vec![
895 "wss://temp.iris.to".to_string(),
896 "wss://vault.iris.to".to_string(),
897 "wss://relay.damus.io".to_string(),
898 ]
899 );
900 }
901
902 #[test]
903 fn mirror_publish_relays_filters_known_bad_publish_targets_when_no_preferred_remain() {
904 let relays = EmbeddedBackgroundServicesController::mirror_publish_relays(
905 &[
906 "wss://graph-relay.iris.to".to_string(),
907 "wss://relay.snort.social".to_string(),
908 "wss://relay.nostr.band".to_string(),
909 "wss://upload.iris.to/nostr".to_string(),
910 ],
911 "0.0.0.0:8080",
912 );
913 assert_eq!(
914 relays,
915 vec![
916 "wss://relay.snort.social".to_string(),
917 "wss://relay.nostr.band".to_string(),
918 ]
919 );
920 }
921}