1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use axum::Router;
8use nostr::nips::nip19::ToBech32;
9use tokio::net::TcpListener;
10use tower_http::cors::CorsLayer;
11
12use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
13use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
14use crate::server::{AppState, HashtreeServer};
15use crate::socialgraph;
16use crate::storage::HashtreeStore;
17
18#[cfg(feature = "p2p")]
19use crate::webrtc::{ContentStore, PeerClassifier, PeerPool, WebRTCConfig, WebRTCManager, WebRTCState};
20
21pub struct EmbeddedDaemonOptions {
22 pub config: Config,
23 pub data_dir: PathBuf,
24 pub bind_address: String,
25 pub relays: Option<Vec<String>>,
26 pub extra_routes: Option<Router<AppState>>,
27 pub cors: Option<CorsLayer>,
28}
29
30pub struct EmbeddedDaemonInfo {
31 pub addr: String,
32 pub port: u16,
33 pub store: Arc<HashtreeStore>,
34 #[allow(dead_code)]
35 pub webrtc_state: Option<Arc<WebRTCState>>,
36}
37
38pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
39 let _ = rustls::crypto::ring::default_provider().install_default();
40
41 let mut config = opts.config;
42 if let Some(relays) = opts.relays {
43 config.nostr.relays = relays;
44 }
45
46 let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
47 let nostr_db_max_bytes = config
48 .nostr
49 .db_max_size_gb
50 .saturating_mul(1024 * 1024 * 1024);
51 let spambox_db_max_bytes = config
52 .nostr
53 .spambox_max_size_gb
54 .saturating_mul(1024 * 1024 * 1024);
55
56 let store = Arc::new(HashtreeStore::with_options(
57 &opts.data_dir,
58 config.storage.s3.as_ref(),
59 max_size_bytes,
60 )?);
61
62 let (keys, _was_generated) = ensure_keys()?;
63 let pk_bytes = pubkey_bytes(&keys);
64 let npub = keys
65 .public_key()
66 .to_bech32()
67 .context("Failed to encode npub")?;
68
69 let mut allowed_pubkeys: HashSet<String> = HashSet::new();
70 allowed_pubkeys.insert(hex::encode(pk_bytes));
71 for npub_str in &config.nostr.allowed_npubs {
72 if let Ok(pk) = parse_npub(npub_str) {
73 allowed_pubkeys.insert(hex::encode(pk));
74 } else {
75 tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
76 }
77 }
78
79 let ndb = socialgraph::init_ndb_with_mapsize(&opts.data_dir, Some(nostr_db_max_bytes))
80 .context("Failed to initialize nostrdb")?;
81
82 let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
83 parse_npub(root_npub).unwrap_or(pk_bytes)
84 } else {
85 pk_bytes
86 };
87 socialgraph::set_social_graph_root(&ndb, &social_graph_root_bytes);
88
89 let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
90 Arc::clone(&ndb),
91 config.nostr.max_write_distance,
92 allowed_pubkeys.clone(),
93 ));
94
95 let nostr_relay_config = NostrRelayConfig {
96 spambox_db_max_bytes,
97 ..Default::default()
98 };
99 let nostr_relay = Arc::new(
100 NostrRelay::new(
101 Arc::clone(&ndb),
102 opts.data_dir.clone(),
103 Some(social_graph.clone()),
104 nostr_relay_config,
105 )
106 .context("Failed to initialize Nostr relay")?,
107 );
108
109 let crawler_spambox = if spambox_db_max_bytes == 0 {
110 None
111 } else {
112 let spam_dir = opts.data_dir.join("nostrdb_spambox");
113 match socialgraph::init_ndb_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
114 Ok(db) => Some(db),
115 Err(err) => {
116 tracing::warn!("Failed to open spambox nostrdb for crawler: {}", err);
117 None
118 }
119 }
120 };
121
122 let crawler_ndb = Arc::clone(&ndb);
123 let crawler_keys = keys.clone();
124 let crawler_relays = config.nostr.relays.clone();
125 let crawler_depth = config.nostr.crawl_depth;
126 let crawler_spambox = crawler_spambox.clone();
127 let (_crawler_shutdown_tx, crawler_shutdown_rx) = tokio::sync::watch::channel(false);
128 tokio::spawn(async move {
129 tokio::time::sleep(Duration::from_secs(5)).await;
130 let mut crawler = socialgraph::SocialGraphCrawler::new(
131 crawler_ndb,
132 crawler_keys,
133 crawler_relays,
134 crawler_depth,
135 );
136 if let Some(spambox) = crawler_spambox {
137 crawler = crawler.with_spambox(spambox);
138 }
139 crawler.crawl(crawler_shutdown_rx).await;
140 });
141
142 #[cfg(feature = "p2p")]
143 let webrtc_state: Option<Arc<WebRTCState>> = {
144 let (webrtc_state, webrtc_handle) = if config.server.enable_webrtc {
145 let webrtc_config = WebRTCConfig {
146 relays: config.nostr.relays.clone(),
147 ..Default::default()
148 };
149
150 let contacts_file = opts.data_dir.join("contacts.json");
151 let classifier_ndb = Arc::clone(&ndb);
152 let peer_classifier: PeerClassifier = Arc::new(move |pubkey_hex: &str| {
153 if contacts_file.exists() {
154 if let Ok(data) = std::fs::read_to_string(&contacts_file) {
155 if let Ok(contacts) = serde_json::from_str::<Vec<String>>(&data) {
156 if contacts.contains(&pubkey_hex.to_string()) {
157 return PeerPool::Follows;
158 }
159 }
160 }
161 }
162 if let Ok(pk_bytes) = hex::decode(pubkey_hex) {
163 if pk_bytes.len() == 32 {
164 let pk: [u8; 32] = pk_bytes.try_into().unwrap();
165 if let Some(dist) = socialgraph::get_follow_distance(&classifier_ndb, &pk) {
166 if dist <= 2 {
167 return PeerPool::Follows;
168 }
169 }
170 }
171 }
172 PeerPool::Other
173 });
174
175 let mut manager = WebRTCManager::new_with_store_and_classifier(
176 keys.clone(),
177 webrtc_config,
178 Arc::clone(&store) as Arc<dyn ContentStore>,
179 peer_classifier,
180 );
181 manager.set_nostr_relay(nostr_relay.clone());
182
183 let webrtc_state = manager.state();
184 let handle = tokio::spawn(async move {
185 if let Err(e) = manager.run().await {
186 tracing::error!("WebRTC manager error: {}", e);
187 }
188 });
189 (Some(webrtc_state), Some(handle))
190 } else {
191 (None, None)
192 };
193 let _ = webrtc_handle;
194 webrtc_state
195 };
196
197 #[cfg(not(feature = "p2p"))]
198 let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
199
200 let mut upstream_blossom = config.blossom.servers.clone();
201 upstream_blossom.extend(config.blossom.read_servers.clone());
202
203 let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
204 .with_allowed_pubkeys(allowed_pubkeys.clone())
205 .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
206 .with_public_writes(config.server.public_writes)
207 .with_upstream_blossom(upstream_blossom)
208 .with_social_graph(social_graph)
209 .with_nostr_relay(nostr_relay.clone());
210
211 if let Some(ref state) = webrtc_state {
212 server = server.with_webrtc_peers(state.clone());
213 }
214
215 if let Some(extra) = opts.extra_routes {
216 server = server.with_extra_routes(extra);
217 }
218 if let Some(cors) = opts.cors {
219 server = server.with_cors(cors);
220 }
221
222 if config.sync.enabled {
223 let mut blossom_read_servers = config.blossom.servers.clone();
224 blossom_read_servers.extend(config.blossom.read_servers.clone());
225 let sync_config = crate::sync::SyncConfig {
226 sync_own: config.sync.sync_own,
227 sync_followed: config.sync.sync_followed,
228 relays: config.nostr.relays.clone(),
229 max_concurrent: config.sync.max_concurrent,
230 webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
231 blossom_timeout_ms: config.sync.blossom_timeout_ms,
232 };
233
234 let sync_keys = nostr_sdk::Keys::parse(&keys.secret_key().to_bech32()?)
235 .context("Failed to parse keys for sync")?;
236
237 let sync_service = crate::sync::BackgroundSync::new(
238 sync_config,
239 Arc::clone(&store),
240 sync_keys,
241 webrtc_state.clone(),
242 )
243 .await
244 .context("Failed to create background sync service")?;
245
246 let contacts_file = opts.data_dir.join("contacts.json");
247 tokio::spawn(async move {
248 if let Err(e) = sync_service.run(contacts_file).await {
249 tracing::error!("Background sync error: {}", e);
250 }
251 });
252 }
253
254 let eviction_store = Arc::clone(&store);
255 tokio::spawn(async move {
256 let mut interval = tokio::time::interval(Duration::from_secs(300));
257 loop {
258 interval.tick().await;
259 match eviction_store.evict_if_needed() {
260 Ok(freed) => {
261 if freed > 0 {
262 tracing::info!("Background eviction freed {} bytes", freed);
263 }
264 }
265 Err(e) => {
266 tracing::warn!("Background eviction error: {}", e);
267 }
268 }
269 }
270 });
271
272 let listener = TcpListener::bind(&opts.bind_address).await?;
273 let local_addr = listener.local_addr()?;
274 let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
275
276 tokio::spawn(async move {
277 if let Err(e) = server.run_with_listener(listener).await {
278 tracing::error!("Embedded daemon server error: {}", e);
279 }
280 });
281
282 tracing::info!(
283 "Embedded daemon started on {}, identity {}",
284 actual_addr,
285 npub
286 );
287
288 Ok(EmbeddedDaemonInfo {
289 addr: actual_addr,
290 port: local_addr.port(),
291 store,
292 webrtc_state,
293 })
294}