Skip to main content

hashtree_cli/
daemon.rs

1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use axum::Router;
8use nostr::nips::nip19::ToBech32;
9use tokio::net::TcpListener;
10use tower_http::cors::CorsLayer;
11
12use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
13use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
14use crate::server::{AppState, HashtreeServer};
15use crate::socialgraph;
16use crate::storage::HashtreeStore;
17
18#[cfg(feature = "p2p")]
19use crate::webrtc::{ContentStore, PeerClassifier, PeerPool, WebRTCConfig, WebRTCManager, WebRTCState};
20
21pub struct EmbeddedDaemonOptions {
22    pub config: Config,
23    pub data_dir: PathBuf,
24    pub bind_address: String,
25    pub relays: Option<Vec<String>>,
26    pub extra_routes: Option<Router<AppState>>,
27    pub cors: Option<CorsLayer>,
28}
29
30pub struct EmbeddedDaemonInfo {
31    pub addr: String,
32    pub port: u16,
33    pub store: Arc<HashtreeStore>,
34    #[allow(dead_code)]
35    pub webrtc_state: Option<Arc<WebRTCState>>,
36}
37
38pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
39    let _ = rustls::crypto::ring::default_provider().install_default();
40
41    let mut config = opts.config;
42    if let Some(relays) = opts.relays {
43        config.nostr.relays = relays;
44    }
45
46    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
47    let nostr_db_max_bytes = config
48        .nostr
49        .db_max_size_gb
50        .saturating_mul(1024 * 1024 * 1024);
51    let spambox_db_max_bytes = config
52        .nostr
53        .spambox_max_size_gb
54        .saturating_mul(1024 * 1024 * 1024);
55
56    let store = Arc::new(HashtreeStore::with_options(
57        &opts.data_dir,
58        config.storage.s3.as_ref(),
59        max_size_bytes,
60    )?);
61
62    let (keys, _was_generated) = ensure_keys()?;
63    let pk_bytes = pubkey_bytes(&keys);
64    let npub = keys
65        .public_key()
66        .to_bech32()
67        .context("Failed to encode npub")?;
68
69    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
70    allowed_pubkeys.insert(hex::encode(pk_bytes));
71    for npub_str in &config.nostr.allowed_npubs {
72        if let Ok(pk) = parse_npub(npub_str) {
73            allowed_pubkeys.insert(hex::encode(pk));
74        } else {
75            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
76        }
77    }
78
79    let ndb = socialgraph::init_ndb_with_mapsize(&opts.data_dir, Some(nostr_db_max_bytes))
80        .context("Failed to initialize nostrdb")?;
81
82    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
83        parse_npub(root_npub).unwrap_or(pk_bytes)
84    } else {
85        pk_bytes
86    };
87    socialgraph::set_social_graph_root(&ndb, &social_graph_root_bytes);
88
89    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
90        Arc::clone(&ndb),
91        config.nostr.max_write_distance,
92        allowed_pubkeys.clone(),
93    ));
94
95    let nostr_relay_config = NostrRelayConfig {
96        spambox_db_max_bytes,
97        ..Default::default()
98    };
99    let nostr_relay = Arc::new(
100        NostrRelay::new(
101            Arc::clone(&ndb),
102            opts.data_dir.clone(),
103            Some(social_graph.clone()),
104            nostr_relay_config,
105        )
106        .context("Failed to initialize Nostr relay")?,
107    );
108
109    let crawler_spambox = if spambox_db_max_bytes == 0 {
110        None
111    } else {
112        let spam_dir = opts.data_dir.join("nostrdb_spambox");
113        match socialgraph::init_ndb_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
114            Ok(db) => Some(db),
115            Err(err) => {
116                tracing::warn!("Failed to open spambox nostrdb for crawler: {}", err);
117                None
118            }
119        }
120    };
121
122    let crawler_ndb = Arc::clone(&ndb);
123    let crawler_keys = keys.clone();
124    let crawler_relays = config.nostr.relays.clone();
125    let crawler_depth = config.nostr.crawl_depth;
126    let crawler_spambox = crawler_spambox.clone();
127    let (_crawler_shutdown_tx, crawler_shutdown_rx) = tokio::sync::watch::channel(false);
128    tokio::spawn(async move {
129        tokio::time::sleep(Duration::from_secs(5)).await;
130        let mut crawler = socialgraph::SocialGraphCrawler::new(
131            crawler_ndb,
132            crawler_keys,
133            crawler_relays,
134            crawler_depth,
135        );
136        if let Some(spambox) = crawler_spambox {
137            crawler = crawler.with_spambox(spambox);
138        }
139        crawler.crawl(crawler_shutdown_rx).await;
140    });
141
142    #[cfg(feature = "p2p")]
143    let webrtc_state: Option<Arc<WebRTCState>> = {
144        let (webrtc_state, webrtc_handle) = if config.server.enable_webrtc {
145            let webrtc_config = WebRTCConfig {
146                relays: config.nostr.relays.clone(),
147                ..Default::default()
148            };
149
150            let contacts_file = opts.data_dir.join("contacts.json");
151            let classifier_ndb = Arc::clone(&ndb);
152            let peer_classifier: PeerClassifier = Arc::new(move |pubkey_hex: &str| {
153                if contacts_file.exists() {
154                    if let Ok(data) = std::fs::read_to_string(&contacts_file) {
155                        if let Ok(contacts) = serde_json::from_str::<Vec<String>>(&data) {
156                            if contacts.contains(&pubkey_hex.to_string()) {
157                                return PeerPool::Follows;
158                            }
159                        }
160                    }
161                }
162                if let Ok(pk_bytes) = hex::decode(pubkey_hex) {
163                    if pk_bytes.len() == 32 {
164                        let pk: [u8; 32] = pk_bytes.try_into().unwrap();
165                        if let Some(dist) = socialgraph::get_follow_distance(&classifier_ndb, &pk) {
166                            if dist <= 2 {
167                                return PeerPool::Follows;
168                            }
169                        }
170                    }
171                }
172                PeerPool::Other
173            });
174
175            let mut manager = WebRTCManager::new_with_store_and_classifier(
176                keys.clone(),
177                webrtc_config,
178                Arc::clone(&store) as Arc<dyn ContentStore>,
179                peer_classifier,
180            );
181            manager.set_nostr_relay(nostr_relay.clone());
182
183            let webrtc_state = manager.state();
184            let handle = tokio::spawn(async move {
185                if let Err(e) = manager.run().await {
186                    tracing::error!("WebRTC manager error: {}", e);
187                }
188            });
189            (Some(webrtc_state), Some(handle))
190        } else {
191            (None, None)
192        };
193        let _ = webrtc_handle;
194        webrtc_state
195    };
196
197    #[cfg(not(feature = "p2p"))]
198    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
199
200    let mut upstream_blossom = config.blossom.servers.clone();
201    upstream_blossom.extend(config.blossom.read_servers.clone());
202
203    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
204        .with_allowed_pubkeys(allowed_pubkeys.clone())
205        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
206        .with_public_writes(config.server.public_writes)
207        .with_upstream_blossom(upstream_blossom)
208        .with_social_graph(social_graph)
209        .with_nostr_relay(nostr_relay.clone());
210
211    if let Some(ref state) = webrtc_state {
212        server = server.with_webrtc_peers(state.clone());
213    }
214
215    if let Some(extra) = opts.extra_routes {
216        server = server.with_extra_routes(extra);
217    }
218    if let Some(cors) = opts.cors {
219        server = server.with_cors(cors);
220    }
221
222    if config.sync.enabled {
223        let mut blossom_read_servers = config.blossom.servers.clone();
224        blossom_read_servers.extend(config.blossom.read_servers.clone());
225        let sync_config = crate::sync::SyncConfig {
226            sync_own: config.sync.sync_own,
227            sync_followed: config.sync.sync_followed,
228            relays: config.nostr.relays.clone(),
229            max_concurrent: config.sync.max_concurrent,
230            webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
231            blossom_timeout_ms: config.sync.blossom_timeout_ms,
232        };
233
234        let sync_keys = nostr_sdk::Keys::parse(&keys.secret_key().to_bech32()?)
235            .context("Failed to parse keys for sync")?;
236
237        let sync_service = crate::sync::BackgroundSync::new(
238            sync_config,
239            Arc::clone(&store),
240            sync_keys,
241            webrtc_state.clone(),
242        )
243        .await
244        .context("Failed to create background sync service")?;
245
246        let contacts_file = opts.data_dir.join("contacts.json");
247        tokio::spawn(async move {
248            if let Err(e) = sync_service.run(contacts_file).await {
249                tracing::error!("Background sync error: {}", e);
250            }
251        });
252    }
253
254    let eviction_store = Arc::clone(&store);
255    tokio::spawn(async move {
256        let mut interval = tokio::time::interval(Duration::from_secs(300));
257        loop {
258            interval.tick().await;
259            match eviction_store.evict_if_needed() {
260                Ok(freed) => {
261                    if freed > 0 {
262                        tracing::info!("Background eviction freed {} bytes", freed);
263                    }
264                }
265                Err(e) => {
266                    tracing::warn!("Background eviction error: {}", e);
267                }
268            }
269        }
270    });
271
272    let listener = TcpListener::bind(&opts.bind_address).await?;
273    let local_addr = listener.local_addr()?;
274    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
275
276    tokio::spawn(async move {
277        if let Err(e) = server.run_with_listener(listener).await {
278            tracing::error!("Embedded daemon server error: {}", e);
279        }
280    });
281
282    tracing::info!(
283        "Embedded daemon started on {}, identity {}",
284        actual_addr,
285        npub
286    );
287
288    Ok(EmbeddedDaemonInfo {
289        addr: actual_addr,
290        port: local_addr.port(),
291        store,
292        webrtc_state,
293    })
294}