1use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11
12use arti_client::{DataStream, StreamPrefs, TorClient, TorClientConfig};
13use safelog::DisplayRedacted;
14use tokio::sync::Mutex;
15use tor_cell::relaycell::msg::Connected;
16use tor_hsservice::config::OnionServiceConfigBuilder;
17use tor_hsservice::HsNickname;
18use tor_rtcompat::PreferredRuntime;
19use tracing::{debug, info, warn};
20
21use cp_graph::GraphStore;
22
23use crate::error::{Result, TorError};
24use crate::rate_limit::RateLimiter;
25use crate::server::ServerConfig;
26use crate::types::{
27 PeerCapabilities, PeerRegistration, PeerScore, CANON_PORT, CIRCUIT_POOL_SIZE,
28 CIRCUIT_ROTATION_SECS, KEEPALIVE_INTERVAL_SECS,
29};
30use crate::wire;
31
32fn now_ms() -> i64 {
33 std::time::SystemTime::now()
34 .duration_since(std::time::UNIX_EPOCH)
35 .unwrap()
36 .as_millis() as i64
37}
38
39pub struct TorConfig {
41 pub enable_onion_service: bool,
43 pub circuit_pool_size: usize,
45 pub keepalive_interval_secs: u64,
47 pub circuit_rotation_secs: u64,
49 pub identity_secret: [u8; 32],
51 pub identity_public: [u8; 32],
53 pub model_hash: [u8; 32],
55 pub topics: Vec<String>,
57}
58
59impl Default for TorConfig {
60 fn default() -> Self {
61 Self {
62 enable_onion_service: true,
63 circuit_pool_size: CIRCUIT_POOL_SIZE,
64 keepalive_interval_secs: KEEPALIVE_INTERVAL_SECS,
65 circuit_rotation_secs: CIRCUIT_ROTATION_SECS,
66 identity_secret: [0u8; 32],
67 identity_public: [0u8; 32],
68 model_hash: [0u8; 32],
69 topics: Vec::new(),
70 }
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RuntimeState {
77 Idle,
79 Bootstrapping,
81 Ready,
83 ShuttingDown,
85}
86
87#[allow(dead_code)]
89struct PooledCircuit {
90 peer: PeerRegistration,
91 stream: DataStream,
92 created_at: i64,
93 last_used: i64,
94 last_keepalive: i64,
95}
96
97impl PooledCircuit {
98 fn needs_keepalive(&self, now_ms: i64) -> bool {
99 let elapsed = (now_ms - self.last_keepalive) / 1000;
100 elapsed >= KEEPALIVE_INTERVAL_SECS as i64
101 }
102
103 fn needs_rotation(&self, now_ms: i64) -> bool {
104 let elapsed = (now_ms - self.created_at) / 1000;
105 elapsed >= CIRCUIT_ROTATION_SECS as i64
106 }
107}
108
109pub struct TorRuntime {
116 client: TorClient<PreferredRuntime>,
117 config: TorConfig,
118 state: Arc<Mutex<RuntimeState>>,
119 onion_address: Arc<Mutex<Option<String>>>,
121 #[allow(dead_code)]
123 onion_service: Mutex<Option<Arc<tor_hsservice::RunningOnionService>>>,
124}
125
126impl TorRuntime {
127 pub async fn bootstrap(config: TorConfig) -> Result<Self> {
133 info!("Starting Tor bootstrap...");
134 let start = Instant::now();
135
136 let tor_config = TorClientConfig::default();
137 let client = TorClient::create_bootstrapped(tor_config)
138 .await
139 .map_err(|e| TorError::Connection(format!("Tor bootstrap failed: {e}")))?;
140
141 let elapsed = start.elapsed();
142 info!("Tor bootstrapped in {:.1}s", elapsed.as_secs_f64());
143
144 Ok(Self {
145 client,
146 config,
147 state: Arc::new(Mutex::new(RuntimeState::Ready)),
148 onion_address: Arc::new(Mutex::new(None)),
149 onion_service: Mutex::new(None),
150 })
151 }
152
153 pub async fn state(&self) -> RuntimeState {
155 *self.state.lock().await
156 }
157
158 pub async fn onion_address(&self) -> Option<String> {
160 self.onion_address.lock().await.clone()
161 }
162
163 pub async fn connect_to_peer(&self, onion_address: &str) -> Result<DataStream> {
167 let addr = format!("{onion_address}.onion:{CANON_PORT}");
168 let prefs = StreamPrefs::new();
169 let stream = self
170 .client
171 .connect_with_prefs(&addr, &prefs)
172 .await
173 .map_err(|e| TorError::Connection(format!("Failed to connect to {addr}: {e}")))?;
174
175 debug!(
176 "Connected to peer {}",
177 &onion_address[..8.min(onion_address.len())]
178 );
179 Ok(stream)
180 }
181
182 pub async fn build_registration(&self) -> PeerRegistration {
187 let node_id = crate::keys::node_id_from_public_key(&self.config.identity_public);
188
189 let onion_address = self.onion_address.lock().await.clone().unwrap_or_default();
190
191 let mut reg = PeerRegistration {
192 onion_address,
193 node_id,
194 public_key: self.config.identity_public,
195 capabilities: PeerCapabilities::default(),
196 topics: self.config.topics.clone(),
197 embedding_model: self.config.model_hash,
198 timestamp: now_ms(),
199 signature: [0u8; 64],
200 };
201
202 let signing_bytes = reg.signing_bytes();
203 let signing_key = ed25519_dalek::SigningKey::from_bytes(&self.config.identity_secret);
204 reg.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
205
206 reg
207 }
208
209 pub async fn start_onion_service(
216 &self,
217 ) -> Result<futures::stream::BoxStream<'static, tor_hsservice::RendRequest>> {
218 use futures::StreamExt;
219
220 if !self.config.enable_onion_service {
221 info!("Onion service disabled by config");
222 return Err(TorError::Connection("Onion service disabled".into()));
223 }
224
225 info!("Launching onion service on port {}", CANON_PORT);
226
227 let nickname: HsNickname = "canon-search"
228 .parse()
229 .map_err(|e| TorError::Connection(format!("Invalid service nickname: {e}")))?;
230
231 let svc_config = OnionServiceConfigBuilder::default()
232 .nickname(nickname)
233 .build()
234 .map_err(|e| TorError::Connection(format!("Onion service config error: {e}")))?;
235
236 let (onion_service, rend_requests) = self
237 .client
238 .launch_onion_service(svc_config)
239 .map_err(|e| TorError::Connection(format!("Failed to launch onion service: {e}")))?
240 .ok_or_else(|| TorError::Connection("Onion service disabled by Tor config".into()))?;
241
242 *self.onion_service.lock().await = Some(onion_service.clone());
244
245 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
247
248 if let Some(hs_id) = onion_service.onion_address() {
249 let address = hs_id.display_unredacted().to_string();
250 let address = address.trim_end_matches(".onion").to_string();
252 *self.onion_address.lock().await = Some(address.clone());
253 info!("Onion service ready at {}.onion:{}", address, CANON_PORT);
254 } else {
255 warn!("Onion service launched but address not yet available");
256 }
257
258 Ok(rend_requests.boxed())
259 }
260
261 pub async fn run_accept_loop(
267 self: &Arc<Self>,
268 rend_requests: futures::stream::BoxStream<'static, tor_hsservice::RendRequest>,
269 graph: Arc<Mutex<GraphStore>>,
270 server_config: Arc<ServerConfig>,
271 ) {
272 use futures::StreamExt;
273
274 info!(
275 "Starting onion service accept loop (max_concurrent={})",
276 server_config.max_concurrent
277 );
278
279 let max_concurrent = server_config.max_concurrent.max(1) as usize;
281 let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent));
282
283 let stream_requests = tor_hsservice::handle_rend_requests(rend_requests);
284 futures::pin_mut!(stream_requests);
285
286 let rate_limiter = Arc::new(Mutex::new(RateLimiter::default()));
288
289 while let Some(stream_request) = stream_requests.next().await {
290 let state = *self.state.lock().await;
291 if state == RuntimeState::ShuttingDown {
292 break;
293 }
294
295 let graph = graph.clone();
296 let config = server_config.clone();
297 let sem = semaphore.clone();
298 let rate_limiter = rate_limiter.clone();
299
300 tokio::spawn(async move {
301 let Ok(_permit) = sem.acquire().await else {
303 return; };
305
306 match stream_request.accept(Connected::new_empty()).await {
307 Ok(mut data_stream) => {
308 if let Err(e) = crate::server::handle_connection_loop(
309 &mut data_stream,
310 &graph,
311 &rate_limiter,
312 &config,
313 )
314 .await
315 {
316 debug!("Connection handler error: {}", e);
317 }
318 }
319 Err(e) => {
320 debug!("Failed to accept stream: {}", e);
321 }
322 }
323 });
324 }
325
326 info!("Accept loop stopped");
327 }
328
329 pub async fn run_circuit_maintenance(
336 self: &Arc<Self>,
337 peers: Arc<Mutex<Vec<PeerRegistration>>>,
338 ) {
339 info!(
340 "Starting circuit pool maintenance (size={}, keepalive={}s, rotation={}s)",
341 self.config.circuit_pool_size,
342 self.config.keepalive_interval_secs,
343 self.config.circuit_rotation_secs
344 );
345
346 let mut pool: HashMap<[u8; 16], PooledCircuit> = HashMap::new();
347
348 loop {
349 let state = *self.state.lock().await;
350 if state == RuntimeState::ShuttingDown {
351 break;
352 }
353
354 let ts = now_ms();
355
356 pool.retain(|node_id, circuit| {
358 if circuit.needs_rotation(ts) {
359 debug!("Rotating circuit for peer {}", hex::encode(&node_id[..4]));
360 false
361 } else {
362 true
363 }
364 });
365
366 let needing_keepalive: Vec<[u8; 16]> = pool
368 .iter()
369 .filter(|(_, c)| c.needs_keepalive(ts))
370 .map(|(id, _)| *id)
371 .collect();
372
373 for node_id in needing_keepalive {
374 let Some(circuit) = pool.get_mut(&node_id) else {
375 continue;
376 };
377
378 if let Ok(()) = wire::write_keepalive(&mut circuit.stream).await {
379 circuit.last_keepalive = ts;
380 debug!("Keepalive OK for peer {}", hex::encode(&node_id[..4]));
381 } else {
382 warn!(
383 "Keepalive failed for peer {}, removing",
384 hex::encode(&node_id[..4])
385 );
386 pool.remove(&node_id);
387 }
388 }
389
390 let slots = self.config.circuit_pool_size.saturating_sub(pool.len());
392 if slots > 0 {
393 let peer_list = peers.lock().await;
394 let mut scored: Vec<PeerScore> = peer_list
395 .iter()
396 .filter(|p| !p.is_expired(ts) && !pool.contains_key(&p.node_id))
397 .filter(|p| !p.onion_address.is_empty())
398 .map(|p| PeerScore::compute(p.clone(), &self.config.topics, None))
399 .collect();
400 drop(peer_list);
401
402 scored.sort_by(|a, b| {
403 b.composite
404 .partial_cmp(&a.composite)
405 .unwrap_or(std::cmp::Ordering::Equal)
406 });
407
408 for peer_score in scored.into_iter().take(slots) {
409 let peer = peer_score.peer;
410 match self.connect_to_peer(&peer.onion_address).await {
411 Ok(mut stream) => {
412 if wire::write_keepalive(&mut stream).await.is_ok() {
413 debug!(
414 "Warm circuit established to peer {}",
415 hex::encode(&peer.node_id[..4])
416 );
417 pool.insert(
418 peer.node_id,
419 PooledCircuit {
420 peer,
421 stream,
422 created_at: ts,
423 last_used: ts,
424 last_keepalive: ts,
425 },
426 );
427 }
428 }
429 Err(e) => {
430 debug!(
431 "Failed to connect to peer {}: {}",
432 hex::encode(&peer.node_id[..4]),
433 e
434 );
435 }
436 }
437 }
438 }
439
440 debug!(
441 "Circuit pool: {}/{} warm",
442 pool.len(),
443 self.config.circuit_pool_size
444 );
445
446 tokio::time::sleep(tokio::time::Duration::from_secs(
447 self.config.keepalive_interval_secs,
448 ))
449 .await;
450 }
451
452 info!("Circuit maintenance loop stopped");
453 }
454
455 pub async fn shutdown(&self) {
457 info!("Shutting down Tor runtime");
458 *self.state.lock().await = RuntimeState::ShuttingDown;
459 }
460}