1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use axum::Router;
8use nostr::nips::nip19::ToBech32;
9use tokio::net::TcpListener;
10use tower_http::cors::CorsLayer;
11
12use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
13use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
14use crate::server::{AppState, HashtreeServer};
15use crate::socialgraph;
16use crate::storage::HashtreeStore;
17
18#[cfg(feature = "p2p")]
19use crate::webrtc::{
20 ContentStore, PeerClassifier, PeerPool, WebRTCConfig, WebRTCManager, WebRTCState,
21};
22
23pub struct EmbeddedDaemonOptions {
24 pub config: Config,
25 pub data_dir: PathBuf,
26 pub bind_address: String,
27 pub relays: Option<Vec<String>>,
28 pub extra_routes: Option<Router<AppState>>,
29 pub cors: Option<CorsLayer>,
30}
31
32pub struct EmbeddedDaemonInfo {
33 pub addr: String,
34 pub port: u16,
35 pub store: Arc<HashtreeStore>,
36 #[allow(dead_code)]
37 pub webrtc_state: Option<Arc<WebRTCState>>,
38}
39
40pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
41 let _ = rustls::crypto::ring::default_provider().install_default();
42
43 let mut config = opts.config;
44 if let Some(relays) = opts.relays {
45 config.nostr.relays = relays;
46 }
47
48 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
49 let nostr_db_max_bytes = config
50 .nostr
51 .db_max_size_gb
52 .saturating_mul(1024 * 1024 * 1024);
53 let spambox_db_max_bytes = config
54 .nostr
55 .spambox_max_size_gb
56 .saturating_mul(1024 * 1024 * 1024);
57
58 let store = Arc::new(HashtreeStore::with_options(
59 &opts.data_dir,
60 config.storage.s3.as_ref(),
61 max_size_bytes,
62 )?);
63
64 let (keys, _was_generated) = ensure_keys()?;
65 let pk_bytes = pubkey_bytes(&keys);
66 let npub = keys
67 .public_key()
68 .to_bech32()
69 .context("Failed to encode npub")?;
70
71 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
72 allowed_pubkeys.insert(hex::encode(pk_bytes));
73 for npub_str in &config.nostr.allowed_npubs {
74 if let Ok(pk) = parse_npub(npub_str) {
75 allowed_pubkeys.insert(hex::encode(pk));
76 } else {
77 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
78 }
79 }
80
81 let ndb = socialgraph::init_ndb_with_mapsize(&opts.data_dir, Some(nostr_db_max_bytes))
82 .context("Failed to initialize nostrdb")?;
83
84 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
85 parse_npub(root_npub).unwrap_or(pk_bytes)
86 } else {
87 pk_bytes
88 };
89 socialgraph::set_social_graph_root(&ndb, &social_graph_root_bytes);
90
91 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
92 Arc::clone(&ndb),
93 config.nostr.max_write_distance,
94 allowed_pubkeys.clone(),
95 ));
96
97 let nostr_relay_config = NostrRelayConfig {
98 spambox_db_max_bytes,
99 ..Default::default()
100 };
101 let nostr_relay = Arc::new(
102 NostrRelay::new(
103 Arc::clone(&ndb),
104 opts.data_dir.clone(),
105 Some(social_graph.clone()),
106 nostr_relay_config,
107 )
108 .context("Failed to initialize Nostr relay")?,
109 );
110
111 let crawler_spambox = if spambox_db_max_bytes == 0 {
112 None
113 } else {
114 let spam_dir = opts.data_dir.join("nostrdb_spambox");
115 match socialgraph::init_ndb_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
116 Ok(db) => Some(db),
117 Err(err) => {
118 tracing::warn!("Failed to open spambox nostrdb for crawler: {}", err);
119 None
120 }
121 }
122 };
123
124 let crawler_ndb = Arc::clone(&ndb);
125 let crawler_keys = keys.clone();
126 let crawler_relays = config.nostr.relays.clone();
127 let crawler_depth = config.nostr.crawl_depth;
128 let crawler_spambox = crawler_spambox.clone();
129 let (_crawler_shutdown_tx, crawler_shutdown_rx) = tokio::sync::watch::channel(false);
130 tokio::spawn(async move {
131 tokio::time::sleep(Duration::from_secs(5)).await;
132 let mut crawler = socialgraph::SocialGraphCrawler::new(
133 crawler_ndb,
134 crawler_keys,
135 crawler_relays,
136 crawler_depth,
137 );
138 if let Some(spambox) = crawler_spambox {
139 crawler = crawler.with_spambox(spambox);
140 }
141 crawler.crawl(crawler_shutdown_rx).await;
142 });
143
144 #[cfg(feature = "p2p")]
145 let webrtc_state: Option<Arc<WebRTCState>> = {
146 let (webrtc_state, webrtc_handle) = if config.server.enable_webrtc {
147 let webrtc_config = WebRTCConfig {
148 relays: config.nostr.relays.clone(),
149 ..Default::default()
150 };
151
152 let contacts_file = opts.data_dir.join("contacts.json");
153 let classifier_ndb = Arc::clone(&ndb);
154 let peer_classifier: PeerClassifier = Arc::new(move |pubkey_hex: &str| {
155 if contacts_file.exists() {
156 if let Ok(data) = std::fs::read_to_string(&contacts_file) {
157 if let Ok(contacts) = serde_json::from_str::<Vec<String>>(&data) {
158 if contacts.contains(&pubkey_hex.to_string()) {
159 return PeerPool::Follows;
160 }
161 }
162 }
163 }
164 if let Ok(pk_bytes) = hex::decode(pubkey_hex) {
165 if pk_bytes.len() == 32 {
166 let pk: [u8; 32] = pk_bytes.try_into().unwrap();
167 if let Some(dist) = socialgraph::get_follow_distance(&classifier_ndb, &pk) {
168 if dist <= 2 {
169 return PeerPool::Follows;
170 }
171 }
172 }
173 }
174 PeerPool::Other
175 });
176
177 let mut manager = WebRTCManager::new_with_store_and_classifier(
178 keys.clone(),
179 webrtc_config,
180 Arc::clone(&store) as Arc<dyn ContentStore>,
181 peer_classifier,
182 );
183 manager.set_nostr_relay(nostr_relay.clone());
184
185 let webrtc_state = manager.state();
186 let handle = tokio::spawn(async move {
187 if let Err(e) = manager.run().await {
188 tracing::error!("WebRTC manager error: {}", e);
189 }
190 });
191 (Some(webrtc_state), Some(handle))
192 } else {
193 (None, None)
194 };
195 let _ = webrtc_handle;
196 webrtc_state
197 };
198
199 #[cfg(not(feature = "p2p"))]
200 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
201
202 let mut upstream_blossom = config.blossom.servers.clone();
203 upstream_blossom.extend(config.blossom.read_servers.clone());
204
205 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
206 .with_allowed_pubkeys(allowed_pubkeys.clone())
207 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
208 .with_public_writes(config.server.public_writes)
209 .with_upstream_blossom(upstream_blossom)
210 .with_social_graph(social_graph)
211 .with_socialgraph_snapshot(
212 Arc::clone(&ndb),
213 social_graph_root_bytes,
214 config.server.socialgraph_snapshot_public,
215 )
216 .with_nostr_relay(nostr_relay.clone());
217
218 if let Some(ref state) = webrtc_state {
219 server = server.with_webrtc_peers(state.clone());
220 }
221
222 if let Some(extra) = opts.extra_routes {
223 server = server.with_extra_routes(extra);
224 }
225 if let Some(cors) = opts.cors {
226 server = server.with_cors(cors);
227 }
228
229 if config.sync.enabled {
230 let mut blossom_read_servers = config.blossom.servers.clone();
231 blossom_read_servers.extend(config.blossom.read_servers.clone());
232 let sync_config = crate::sync::SyncConfig {
233 sync_own: config.sync.sync_own,
234 sync_followed: config.sync.sync_followed,
235 relays: config.nostr.relays.clone(),
236 max_concurrent: config.sync.max_concurrent,
237 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
238 blossom_timeout_ms: config.sync.blossom_timeout_ms,
239 };
240
241 let sync_keys = nostr_sdk::Keys::parse(&keys.secret_key().to_bech32()?)
242 .context("Failed to parse keys for sync")?;
243
244 let sync_service = crate::sync::BackgroundSync::new(
245 sync_config,
246 Arc::clone(&store),
247 sync_keys,
248 webrtc_state.clone(),
249 )
250 .await
251 .context("Failed to create background sync service")?;
252
253 let contacts_file = opts.data_dir.join("contacts.json");
254 tokio::spawn(async move {
255 if let Err(e) = sync_service.run(contacts_file).await {
256 tracing::error!("Background sync error: {}", e);
257 }
258 });
259 }
260
261 let eviction_store = Arc::clone(&store);
262 tokio::spawn(async move {
263 let mut interval = tokio::time::interval(Duration::from_secs(300));
264 loop {
265 interval.tick().await;
266 match eviction_store.evict_if_needed() {
267 Ok(freed) => {
268 if freed > 0 {
269 tracing::info!("Background eviction freed {} bytes", freed);
270 }
271 }
272 Err(e) => {
273 tracing::warn!("Background eviction error: {}", e);
274 }
275 }
276 }
277 });
278
279 let listener = TcpListener::bind(&opts.bind_address).await?;
280 let local_addr = listener.local_addr()?;
281 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
282
283 tokio::spawn(async move {
284 if let Err(e) = server.run_with_listener(listener).await {
285 tracing::error!("Embedded daemon server error: {}", e);
286 }
287 });
288
289 tracing::info!(
290 "Embedded daemon started on {}, identity {}",
291 actual_addr,
292 npub
293 );
294
295 Ok(EmbeddedDaemonInfo {
296 addr: actual_addr,
297 port: local_addr.port(),
298 store,
299 webrtc_state,
300 })
301}