Skip to main content

cp_tor/
runtime.rs

1//! Tor runtime: manages Arti bootstrap, circuit pool, and onion service.
2//!
3//! Provides the `TorRuntime` struct that wraps arti-client for the Canon
4//! live search protocol (CP-013 §13-17). Manages a pool of warm Tor
5//! circuits to peers, an onion service for incoming queries, and the
6//! full lifecycle from bootstrap to teardown.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11
12use arti_client::{DataStream, StreamPrefs, TorClient, TorClientConfig};
13use safelog::DisplayRedacted;
14use tokio::sync::Mutex;
15use tor_cell::relaycell::msg::Connected;
16use tor_hsservice::config::OnionServiceConfigBuilder;
17use tor_hsservice::HsNickname;
18use tor_rtcompat::PreferredRuntime;
19use tracing::{debug, info, warn};
20
21use cp_graph::GraphStore;
22
23use crate::error::{Result, TorError};
24use crate::rate_limit::RateLimiter;
25use crate::server::ServerConfig;
26use crate::types::{
27    PeerCapabilities, PeerRegistration, PeerScore, CANON_PORT, CIRCUIT_POOL_SIZE,
28    CIRCUIT_ROTATION_SECS, KEEPALIVE_INTERVAL_SECS,
29};
30use crate::wire;
31
32fn now_ms() -> i64 {
33    std::time::SystemTime::now()
34        .duration_since(std::time::UNIX_EPOCH)
35        .unwrap()
36        .as_millis() as i64
37}
38
39/// Configuration for the Tor runtime.
40pub struct TorConfig {
41    /// Whether to launch a Tor onion service for incoming queries.
42    pub enable_onion_service: bool,
43    /// Number of warm circuits to maintain in the pool.
44    pub circuit_pool_size: usize,
45    /// Keepalive interval in seconds.
46    pub keepalive_interval_secs: u64,
47    /// Circuit rotation interval in seconds.
48    pub circuit_rotation_secs: u64,
49    /// Node identity secret key (Ed25519 seed).
50    pub identity_secret: [u8; 32],
51    /// Node's Ed25519 public key.
52    pub identity_public: [u8; 32],
53    /// BLAKE3 hash of the embedding model.
54    pub model_hash: [u8; 32],
55    /// Topics this node covers.
56    pub topics: Vec<String>,
57}
58
59impl Default for TorConfig {
60    fn default() -> Self {
61        Self {
62            enable_onion_service: true,
63            circuit_pool_size: CIRCUIT_POOL_SIZE,
64            keepalive_interval_secs: KEEPALIVE_INTERVAL_SECS,
65            circuit_rotation_secs: CIRCUIT_ROTATION_SECS,
66            identity_secret: [0u8; 32],
67            identity_public: [0u8; 32],
68            model_hash: [0u8; 32],
69            topics: Vec::new(),
70        }
71    }
72}
73
74/// State of the Tor runtime.
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RuntimeState {
77    /// Not yet bootstrapped.
78    Idle,
79    /// Bootstrap in progress.
80    Bootstrapping,
81    /// Connected and ready for search.
82    Ready,
83    /// Shutting down.
84    ShuttingDown,
85}
86
87/// A warm circuit entry holding an actual `DataStream` to a peer.
88#[allow(dead_code)]
89struct PooledCircuit {
90    peer: PeerRegistration,
91    stream: DataStream,
92    created_at: i64,
93    last_used: i64,
94    last_keepalive: i64,
95}
96
97impl PooledCircuit {
98    fn needs_keepalive(&self, now_ms: i64) -> bool {
99        let elapsed = (now_ms - self.last_keepalive) / 1000;
100        elapsed >= KEEPALIVE_INTERVAL_SECS as i64
101    }
102
103    fn needs_rotation(&self, now_ms: i64) -> bool {
104        let elapsed = (now_ms - self.created_at) / 1000;
105        elapsed >= CIRCUIT_ROTATION_SECS as i64
106    }
107}
108
109/// Manages Tor connectivity for the Canon node.
110///
111/// Handles bootstrap, circuit pool management with warm circuits to
112/// peers, and an onion service for incoming search queries. The circuit
113/// pool targets `CIRCUIT_POOL_SIZE` warm connections with keepalive probes
114/// every `KEEPALIVE_INTERVAL_SECS` and rotation every `CIRCUIT_ROTATION_SECS`.
115pub struct TorRuntime {
116    client: TorClient<PreferredRuntime>,
117    config: TorConfig,
118    state: Arc<Mutex<RuntimeState>>,
119    /// The node's onion address (set after onion service launches).
120    onion_address: Arc<Mutex<Option<String>>>,
121    /// Handle to the running onion service — must be kept alive or the service shuts down.
122    #[allow(dead_code)]
123    onion_service: Mutex<Option<Arc<tor_hsservice::RunningOnionService>>>,
124}
125
126impl TorRuntime {
127    /// Bootstrap the Tor client and connect to the network.
128    ///
129    /// This blocks until the Tor client is bootstrapped and connected,
130    /// which typically takes under 1 minute on first launch. Returns
131    /// when the client is ready for circuit establishment.
132    pub async fn bootstrap(config: TorConfig) -> Result<Self> {
133        info!("Starting Tor bootstrap...");
134        let start = Instant::now();
135
136        let tor_config = TorClientConfig::default();
137        let client = TorClient::create_bootstrapped(tor_config)
138            .await
139            .map_err(|e| TorError::Connection(format!("Tor bootstrap failed: {e}")))?;
140
141        let elapsed = start.elapsed();
142        info!("Tor bootstrapped in {:.1}s", elapsed.as_secs_f64());
143
144        Ok(Self {
145            client,
146            config,
147            state: Arc::new(Mutex::new(RuntimeState::Ready)),
148            onion_address: Arc::new(Mutex::new(None)),
149            onion_service: Mutex::new(None),
150        })
151    }
152
153    /// Get the current runtime state.
154    pub async fn state(&self) -> RuntimeState {
155        *self.state.lock().await
156    }
157
158    /// Get the node's onion address (if the onion service is running).
159    pub async fn onion_address(&self) -> Option<String> {
160        self.onion_address.lock().await.clone()
161    }
162
163    /// Open an anonymous Tor stream to a peer's onion address.
164    ///
165    /// Returns a `DataStream` that implements `AsyncRead` + `AsyncWrite`.
166    pub async fn connect_to_peer(&self, onion_address: &str) -> Result<DataStream> {
167        let addr = format!("{onion_address}.onion:{CANON_PORT}");
168        let prefs = StreamPrefs::new();
169        let stream = self
170            .client
171            .connect_with_prefs(&addr, &prefs)
172            .await
173            .map_err(|e| TorError::Connection(format!("Failed to connect to {addr}: {e}")))?;
174
175        debug!(
176            "Connected to peer {}",
177            &onion_address[..8.min(onion_address.len())]
178        );
179        Ok(stream)
180    }
181
182    /// Build a `PeerRegistration` for this node to publish on Arweave.
183    ///
184    /// The `onion_address` is populated from the running onion service.
185    /// Call `start_onion_service` before this to ensure the address is set.
186    pub async fn build_registration(&self) -> PeerRegistration {
187        let node_id = crate::keys::node_id_from_public_key(&self.config.identity_public);
188
189        let onion_address = self.onion_address.lock().await.clone().unwrap_or_default();
190
191        let mut reg = PeerRegistration {
192            onion_address,
193            node_id,
194            public_key: self.config.identity_public,
195            capabilities: PeerCapabilities::default(),
196            topics: self.config.topics.clone(),
197            embedding_model: self.config.model_hash,
198            timestamp: now_ms(),
199            signature: [0u8; 64],
200        };
201
202        let signing_bytes = reg.signing_bytes();
203        let signing_key = ed25519_dalek::SigningKey::from_bytes(&self.config.identity_secret);
204        reg.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
205
206        reg
207    }
208
209    /// Launch the Tor onion service for incoming search queries.
210    ///
211    /// Uses the arti `TorClient::launch_onion_service` API to create a real
212    /// Tor hidden service. The service address is derived by arti from an
213    /// auto-generated or stored keypair. Returns the stream of rendezvous
214    /// requests that should be fed to `run_accept_loop`.
215    pub async fn start_onion_service(
216        &self,
217    ) -> Result<futures::stream::BoxStream<'static, tor_hsservice::RendRequest>> {
218        use futures::StreamExt;
219
220        if !self.config.enable_onion_service {
221            info!("Onion service disabled by config");
222            return Err(TorError::Connection("Onion service disabled".into()));
223        }
224
225        info!("Launching onion service on port {}", CANON_PORT);
226
227        let nickname: HsNickname = "canon-search"
228            .parse()
229            .map_err(|e| TorError::Connection(format!("Invalid service nickname: {e}")))?;
230
231        let svc_config = OnionServiceConfigBuilder::default()
232            .nickname(nickname)
233            .build()
234            .map_err(|e| TorError::Connection(format!("Onion service config error: {e}")))?;
235
236        let (onion_service, rend_requests) = self
237            .client
238            .launch_onion_service(svc_config)
239            .map_err(|e| TorError::Connection(format!("Failed to launch onion service: {e}")))?
240            .ok_or_else(|| TorError::Connection("Onion service disabled by Tor config".into()))?;
241
242        // Store the handle so the onion service stays alive
243        *self.onion_service.lock().await = Some(onion_service.clone());
244
245        // Wait briefly for the service to publish its descriptor
246        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
247
248        if let Some(hs_id) = onion_service.onion_address() {
249            let address = hs_id.display_unredacted().to_string();
250            // Strip the ".onion" suffix for internal storage
251            let address = address.trim_end_matches(".onion").to_string();
252            *self.onion_address.lock().await = Some(address.clone());
253            info!("Onion service ready at {}.onion:{}", address, CANON_PORT);
254        } else {
255            warn!("Onion service launched but address not yet available");
256        }
257
258        Ok(rend_requests.boxed())
259    }
260
261    /// Run the incoming connection accept loop for the onion service.
262    ///
263    /// Processes rendezvous requests from `start_onion_service`, accepts
264    /// incoming streams, and spawns a handler for each connection. This
265    /// runs until the runtime is shut down.
266    pub async fn run_accept_loop(
267        self: &Arc<Self>,
268        rend_requests: futures::stream::BoxStream<'static, tor_hsservice::RendRequest>,
269        graph: Arc<Mutex<GraphStore>>,
270        server_config: Arc<ServerConfig>,
271    ) {
272        use futures::StreamExt;
273
274        info!(
275            "Starting onion service accept loop (max_concurrent={})",
276            server_config.max_concurrent
277        );
278
279        // Enforce max_concurrent query limit via a semaphore
280        let max_concurrent = server_config.max_concurrent.max(1) as usize;
281        let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent));
282
283        let stream_requests = tor_hsservice::handle_rend_requests(rend_requests);
284        futures::pin_mut!(stream_requests);
285
286        // Shared rate limiter across all connections
287        let rate_limiter = Arc::new(Mutex::new(RateLimiter::default()));
288
289        while let Some(stream_request) = stream_requests.next().await {
290            let state = *self.state.lock().await;
291            if state == RuntimeState::ShuttingDown {
292                break;
293            }
294
295            let graph = graph.clone();
296            let config = server_config.clone();
297            let sem = semaphore.clone();
298            let rate_limiter = rate_limiter.clone();
299
300            tokio::spawn(async move {
301                // Acquire a permit before processing; drop it when done.
302                let Ok(_permit) = sem.acquire().await else {
303                    return; // Semaphore closed
304                };
305
306                match stream_request.accept(Connected::new_empty()).await {
307                    Ok(mut data_stream) => {
308                        if let Err(e) = crate::server::handle_connection_loop(
309                            &mut data_stream,
310                            &graph,
311                            &rate_limiter,
312                            &config,
313                        )
314                        .await
315                        {
316                            debug!("Connection handler error: {}", e);
317                        }
318                    }
319                    Err(e) => {
320                        debug!("Failed to accept stream: {}", e);
321                    }
322                }
323            });
324        }
325
326        info!("Accept loop stopped");
327    }
328
329    /// Run the circuit pool maintenance loop.
330    ///
331    /// Manages warm circuits to peers: establishes `DataStream` connections
332    /// to fill the pool, sends keepalive probes on held streams, and
333    /// rotates circuits that have exceeded the rotation window. Runs
334    /// until the runtime is shut down.
335    pub async fn run_circuit_maintenance(
336        self: &Arc<Self>,
337        peers: Arc<Mutex<Vec<PeerRegistration>>>,
338    ) {
339        info!(
340            "Starting circuit pool maintenance (size={}, keepalive={}s, rotation={}s)",
341            self.config.circuit_pool_size,
342            self.config.keepalive_interval_secs,
343            self.config.circuit_rotation_secs
344        );
345
346        let mut pool: HashMap<[u8; 16], PooledCircuit> = HashMap::new();
347
348        loop {
349            let state = *self.state.lock().await;
350            if state == RuntimeState::ShuttingDown {
351                break;
352            }
353
354            let ts = now_ms();
355
356            // 1. Rotate circuits that have exceeded the rotation window
357            pool.retain(|node_id, circuit| {
358                if circuit.needs_rotation(ts) {
359                    debug!("Rotating circuit for peer {}", hex::encode(&node_id[..4]));
360                    false
361                } else {
362                    true
363                }
364            });
365
366            // 2. Send keepalive probes on held streams
367            let needing_keepalive: Vec<[u8; 16]> = pool
368                .iter()
369                .filter(|(_, c)| c.needs_keepalive(ts))
370                .map(|(id, _)| *id)
371                .collect();
372
373            for node_id in needing_keepalive {
374                let Some(circuit) = pool.get_mut(&node_id) else {
375                    continue;
376                };
377
378                if let Ok(()) = wire::write_keepalive(&mut circuit.stream).await {
379                    circuit.last_keepalive = ts;
380                    debug!("Keepalive OK for peer {}", hex::encode(&node_id[..4]));
381                } else {
382                    warn!(
383                        "Keepalive failed for peer {}, removing",
384                        hex::encode(&node_id[..4])
385                    );
386                    pool.remove(&node_id);
387                }
388            }
389
390            // 3. Score and select new peers to fill empty pool slots
391            let slots = self.config.circuit_pool_size.saturating_sub(pool.len());
392            if slots > 0 {
393                let peer_list = peers.lock().await;
394                let mut scored: Vec<PeerScore> = peer_list
395                    .iter()
396                    .filter(|p| !p.is_expired(ts) && !pool.contains_key(&p.node_id))
397                    .filter(|p| !p.onion_address.is_empty())
398                    .map(|p| PeerScore::compute(p.clone(), &self.config.topics, None))
399                    .collect();
400                drop(peer_list);
401
402                scored.sort_by(|a, b| {
403                    b.composite
404                        .partial_cmp(&a.composite)
405                        .unwrap_or(std::cmp::Ordering::Equal)
406                });
407
408                for peer_score in scored.into_iter().take(slots) {
409                    let peer = peer_score.peer;
410                    match self.connect_to_peer(&peer.onion_address).await {
411                        Ok(mut stream) => {
412                            if wire::write_keepalive(&mut stream).await.is_ok() {
413                                debug!(
414                                    "Warm circuit established to peer {}",
415                                    hex::encode(&peer.node_id[..4])
416                                );
417                                pool.insert(
418                                    peer.node_id,
419                                    PooledCircuit {
420                                        peer,
421                                        stream,
422                                        created_at: ts,
423                                        last_used: ts,
424                                        last_keepalive: ts,
425                                    },
426                                );
427                            }
428                        }
429                        Err(e) => {
430                            debug!(
431                                "Failed to connect to peer {}: {}",
432                                hex::encode(&peer.node_id[..4]),
433                                e
434                            );
435                        }
436                    }
437                }
438            }
439
440            debug!(
441                "Circuit pool: {}/{} warm",
442                pool.len(),
443                self.config.circuit_pool_size
444            );
445
446            tokio::time::sleep(tokio::time::Duration::from_secs(
447                self.config.keepalive_interval_secs,
448            ))
449            .await;
450        }
451
452        info!("Circuit maintenance loop stopped");
453    }
454
455    /// Initiate a graceful shutdown.
456    pub async fn shutdown(&self) {
457        info!("Shutting down Tor runtime");
458        *self.state.lock().await = RuntimeState::ShuttingDown;
459    }
460}