Skip to main content

hashtree_cli/
daemon.rs

1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use axum::Router;
8use nostr::nips::nip19::ToBech32;
9use tokio::net::TcpListener;
10use tower_http::cors::CorsLayer;
11
12use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
13use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
14use crate::server::{AppState, HashtreeServer};
15use crate::socialgraph;
16use crate::storage::HashtreeStore;
17
18#[cfg(feature = "p2p")]
19use crate::webrtc::{
20    ContentStore, PeerClassifier, PeerPool, WebRTCConfig, WebRTCManager, WebRTCState,
21};
22
23pub struct EmbeddedDaemonOptions {
24    pub config: Config,
25    pub data_dir: PathBuf,
26    pub bind_address: String,
27    pub relays: Option<Vec<String>>,
28    pub extra_routes: Option<Router<AppState>>,
29    pub cors: Option<CorsLayer>,
30}
31
32pub struct EmbeddedDaemonInfo {
33    pub addr: String,
34    pub port: u16,
35    pub store: Arc<HashtreeStore>,
36    #[allow(dead_code)]
37    pub webrtc_state: Option<Arc<WebRTCState>>,
38}
39
40pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
41    let _ = rustls::crypto::ring::default_provider().install_default();
42
43    let mut config = opts.config;
44    if let Some(relays) = opts.relays {
45        config.nostr.relays = relays;
46    }
47
48    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
49    let nostr_db_max_bytes = config
50        .nostr
51        .db_max_size_gb
52        .saturating_mul(1024 * 1024 * 1024);
53    let spambox_db_max_bytes = config
54        .nostr
55        .spambox_max_size_gb
56        .saturating_mul(1024 * 1024 * 1024);
57
58    let store = Arc::new(HashtreeStore::with_options(
59        &opts.data_dir,
60        config.storage.s3.as_ref(),
61        max_size_bytes,
62    )?);
63
64    let (keys, _was_generated) = ensure_keys()?;
65    let pk_bytes = pubkey_bytes(&keys);
66    let npub = keys
67        .public_key()
68        .to_bech32()
69        .context("Failed to encode npub")?;
70
71    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
72    allowed_pubkeys.insert(hex::encode(pk_bytes));
73    for npub_str in &config.nostr.allowed_npubs {
74        if let Ok(pk) = parse_npub(npub_str) {
75            allowed_pubkeys.insert(hex::encode(pk));
76        } else {
77            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
78        }
79    }
80
81    let ndb = socialgraph::init_ndb_with_mapsize(&opts.data_dir, Some(nostr_db_max_bytes))
82        .context("Failed to initialize nostrdb")?;
83
84    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
85        parse_npub(root_npub).unwrap_or(pk_bytes)
86    } else {
87        pk_bytes
88    };
89    socialgraph::set_social_graph_root(&ndb, &social_graph_root_bytes);
90
91    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
92        Arc::clone(&ndb),
93        config.nostr.max_write_distance,
94        allowed_pubkeys.clone(),
95    ));
96
97    let nostr_relay_config = NostrRelayConfig {
98        spambox_db_max_bytes,
99        ..Default::default()
100    };
101    let nostr_relay = Arc::new(
102        NostrRelay::new(
103            Arc::clone(&ndb),
104            opts.data_dir.clone(),
105            Some(social_graph.clone()),
106            nostr_relay_config,
107        )
108        .context("Failed to initialize Nostr relay")?,
109    );
110
111    let crawler_spambox = if spambox_db_max_bytes == 0 {
112        None
113    } else {
114        let spam_dir = opts.data_dir.join("nostrdb_spambox");
115        match socialgraph::init_ndb_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
116            Ok(db) => Some(db),
117            Err(err) => {
118                tracing::warn!("Failed to open spambox nostrdb for crawler: {}", err);
119                None
120            }
121        }
122    };
123
124    let crawler_ndb = Arc::clone(&ndb);
125    let crawler_keys = keys.clone();
126    let crawler_relays = config.nostr.relays.clone();
127    let crawler_depth = config.nostr.crawl_depth;
128    let crawler_spambox = crawler_spambox.clone();
129    let (_crawler_shutdown_tx, crawler_shutdown_rx) = tokio::sync::watch::channel(false);
130    tokio::spawn(async move {
131        tokio::time::sleep(Duration::from_secs(5)).await;
132        let mut crawler = socialgraph::SocialGraphCrawler::new(
133            crawler_ndb,
134            crawler_keys,
135            crawler_relays,
136            crawler_depth,
137        );
138        if let Some(spambox) = crawler_spambox {
139            crawler = crawler.with_spambox(spambox);
140        }
141        crawler.crawl(crawler_shutdown_rx).await;
142    });
143
144    #[cfg(feature = "p2p")]
145    let webrtc_state: Option<Arc<WebRTCState>> = {
146        let (webrtc_state, webrtc_handle) = if config.server.enable_webrtc {
147            let webrtc_config = WebRTCConfig {
148                relays: config.nostr.relays.clone(),
149                ..Default::default()
150            };
151
152            let contacts_file = opts.data_dir.join("contacts.json");
153            let classifier_ndb = Arc::clone(&ndb);
154            let peer_classifier: PeerClassifier = Arc::new(move |pubkey_hex: &str| {
155                if contacts_file.exists() {
156                    if let Ok(data) = std::fs::read_to_string(&contacts_file) {
157                        if let Ok(contacts) = serde_json::from_str::<Vec<String>>(&data) {
158                            if contacts.contains(&pubkey_hex.to_string()) {
159                                return PeerPool::Follows;
160                            }
161                        }
162                    }
163                }
164                if let Ok(pk_bytes) = hex::decode(pubkey_hex) {
165                    if pk_bytes.len() == 32 {
166                        let pk: [u8; 32] = pk_bytes.try_into().unwrap();
167                        if let Some(dist) = socialgraph::get_follow_distance(&classifier_ndb, &pk) {
168                            if dist <= 2 {
169                                return PeerPool::Follows;
170                            }
171                        }
172                    }
173                }
174                PeerPool::Other
175            });
176
177            let mut manager = WebRTCManager::new_with_store_and_classifier(
178                keys.clone(),
179                webrtc_config,
180                Arc::clone(&store) as Arc<dyn ContentStore>,
181                peer_classifier,
182            );
183            manager.set_nostr_relay(nostr_relay.clone());
184
185            let webrtc_state = manager.state();
186            let handle = tokio::spawn(async move {
187                if let Err(e) = manager.run().await {
188                    tracing::error!("WebRTC manager error: {}", e);
189                }
190            });
191            (Some(webrtc_state), Some(handle))
192        } else {
193            (None, None)
194        };
195        let _ = webrtc_handle;
196        webrtc_state
197    };
198
199    #[cfg(not(feature = "p2p"))]
200    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
201
202    let mut upstream_blossom = config.blossom.servers.clone();
203    upstream_blossom.extend(config.blossom.read_servers.clone());
204
205    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
206        .with_allowed_pubkeys(allowed_pubkeys.clone())
207        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
208        .with_public_writes(config.server.public_writes)
209        .with_upstream_blossom(upstream_blossom)
210        .with_social_graph(social_graph)
211        .with_socialgraph_snapshot(
212            Arc::clone(&ndb),
213            social_graph_root_bytes,
214            config.server.socialgraph_snapshot_public,
215        )
216        .with_nostr_relay(nostr_relay.clone());
217
218    if let Some(ref state) = webrtc_state {
219        server = server.with_webrtc_peers(state.clone());
220    }
221
222    if let Some(extra) = opts.extra_routes {
223        server = server.with_extra_routes(extra);
224    }
225    if let Some(cors) = opts.cors {
226        server = server.with_cors(cors);
227    }
228
229    if config.sync.enabled {
230        let mut blossom_read_servers = config.blossom.servers.clone();
231        blossom_read_servers.extend(config.blossom.read_servers.clone());
232        let sync_config = crate::sync::SyncConfig {
233            sync_own: config.sync.sync_own,
234            sync_followed: config.sync.sync_followed,
235            relays: config.nostr.relays.clone(),
236            max_concurrent: config.sync.max_concurrent,
237            webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
238            blossom_timeout_ms: config.sync.blossom_timeout_ms,
239        };
240
241        let sync_keys = nostr_sdk::Keys::parse(&keys.secret_key().to_bech32()?)
242            .context("Failed to parse keys for sync")?;
243
244        let sync_service = crate::sync::BackgroundSync::new(
245            sync_config,
246            Arc::clone(&store),
247            sync_keys,
248            webrtc_state.clone(),
249        )
250        .await
251        .context("Failed to create background sync service")?;
252
253        let contacts_file = opts.data_dir.join("contacts.json");
254        tokio::spawn(async move {
255            if let Err(e) = sync_service.run(contacts_file).await {
256                tracing::error!("Background sync error: {}", e);
257            }
258        });
259    }
260
261    let eviction_store = Arc::clone(&store);
262    tokio::spawn(async move {
263        let mut interval = tokio::time::interval(Duration::from_secs(300));
264        loop {
265            interval.tick().await;
266            match eviction_store.evict_if_needed() {
267                Ok(freed) => {
268                    if freed > 0 {
269                        tracing::info!("Background eviction freed {} bytes", freed);
270                    }
271                }
272                Err(e) => {
273                    tracing::warn!("Background eviction error: {}", e);
274                }
275            }
276        }
277    });
278
279    let listener = TcpListener::bind(&opts.bind_address).await?;
280    let local_addr = listener.local_addr()?;
281    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
282
283    tokio::spawn(async move {
284        if let Err(e) = server.run_with_listener(listener).await {
285            tracing::error!("Embedded daemon server error: {}", e);
286        }
287    });
288
289    tracing::info!(
290        "Embedded daemon started on {}, identity {}",
291        actual_addr,
292        npub
293    );
294
295    Ok(EmbeddedDaemonInfo {
296        addr: actual_addr,
297        port: local_addr.port(),
298        store,
299        webrtc_state,
300    })
301}