Skip to main content

cp_tor/
types.rs

1//! CP-013 protocol data structures for live anonymous search.
2
3use serde::{Deserialize, Serialize};
4use serde_big_array::BigArray;
5
6/// Fixed Canon search protocol port.
7pub const CANON_PORT: u16 = 9735;
8
9/// Maximum wire message size (1MB).
10pub const MAX_MESSAGE_SIZE: u32 = 1_048_576;
11
12/// Peer registration expiry (72 hours in seconds).
13pub const PEER_EXPIRY_SECS: i64 = 72 * 3600;
14
15/// Peer registration renewal interval (24 hours in seconds).
16pub const PEER_RENEWAL_SECS: i64 = 24 * 3600;
17
18/// Circuit pool size.
19pub const CIRCUIT_POOL_SIZE: usize = 8;
20
21/// Keepalive interval in seconds.
22pub const KEEPALIVE_INTERVAL_SECS: u64 = 300;
23
24/// Circuit rotation interval in seconds.
25pub const CIRCUIT_ROTATION_SECS: u64 = 1800;
26
27/// Live search timeout in milliseconds.
28pub const SEARCH_TIMEOUT_MS: u64 = 2000;
29
30/// Maximum results per search.
31pub const MAX_RESULTS: u8 = 20;
32
33/// Rate limit: requests per minute per requester key.
34pub const RATE_LIMIT_PER_MIN: u32 = 10;
35
36/// RRF K parameter.
37pub const RRF_K: u32 = 60;
38
39/// Number of peers to query per search.
40pub const SEARCH_FANOUT: usize = 3;
41
42/// Maximum random jitter before dispatch (milliseconds).
43pub const DISPATCH_JITTER_MS: u64 = 200;
44
45// ============================================================================
46// Peer Registration (Section 13)
47// ============================================================================
48
49/// Capabilities advertised by a peer node.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct PeerCapabilities {
52    /// Maximum concurrent queries this peer will handle.
53    pub max_concurrent_queries: u8,
54    /// Maximum results per query this peer will return.
55    pub max_results: u8,
56    /// Number of indexed chunks on this peer.
57    pub chunk_count: u64,
58    /// Whether this peer supports Merkle inclusion proofs.
59    pub supports_proofs: bool,
60}
61
62impl Default for PeerCapabilities {
63    fn default() -> Self {
64        Self {
65            max_concurrent_queries: 4,
66            max_results: 20,
67            chunk_count: 0,
68            supports_proofs: true,
69        }
70    }
71}
72
73/// Peer registration uploaded to Arweave for discovery.
74///
75/// Each node publishes its onion address, capabilities, and topic coverage.
76/// Registrations expire after 72 hours and should be renewed every 24 hours.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PeerRegistration {
79    /// v3 onion address (56 characters, no .onion suffix).
80    pub onion_address: String,
81    /// Node ID: `BLAKE3(public_key)`[0..16].
82    pub node_id: [u8; 16],
83    /// Ed25519 public key of this node.
84    pub public_key: [u8; 32],
85    /// Advertised capabilities.
86    pub capabilities: PeerCapabilities,
87    /// Topic coverage tags.
88    pub topics: Vec<String>,
89    /// BLAKE3 hash of the embedding model weights used by this node.
90    pub embedding_model: [u8; 32],
91    /// Registration timestamp (Unix milliseconds).
92    pub timestamp: i64,
93    /// Ed25519 signature over BLAKE3(CBOR(preceding fields)).
94    #[serde(with = "BigArray")]
95    pub signature: [u8; 64],
96}
97
98impl PeerRegistration {
99    /// Compute the signing bytes: BLAKE3 of CBOR of all fields except signature.
100    pub fn signing_bytes(&self) -> [u8; 32] {
101        let signable = PeerRegistrationSignable {
102            onion_address: &self.onion_address,
103            node_id: &self.node_id,
104            public_key: &self.public_key,
105            capabilities: &self.capabilities,
106            topics: &self.topics,
107            embedding_model: &self.embedding_model,
108            timestamp: self.timestamp,
109        };
110        let mut buf = Vec::new();
111        ciborium::into_writer(&signable, &mut buf).expect("CBOR serialization cannot fail");
112        *blake3::hash(&buf).as_bytes()
113    }
114
115    /// Check if this registration has expired relative to the given time.
116    /// Also rejects registrations with timestamps more than 60s in the future.
117    pub fn is_expired(&self, now_ms: i64) -> bool {
118        if self.timestamp > now_ms + 60_000 {
119            return true;
120        }
121        let age_secs = (now_ms - self.timestamp) / 1000;
122        age_secs > PEER_EXPIRY_SECS
123    }
124}
125
126/// Signable portion of `PeerRegistration` (excludes signature).
127#[derive(Serialize)]
128struct PeerRegistrationSignable<'a> {
129    onion_address: &'a str,
130    node_id: &'a [u8; 16],
131    public_key: &'a [u8; 32],
132    capabilities: &'a PeerCapabilities,
133    topics: &'a Vec<String>,
134    embedding_model: &'a [u8; 32],
135    timestamp: i64,
136}
137
138// ============================================================================
139// Search Protocol (Section 14)
140// ============================================================================
141
142/// Search request sent from querier to peer over Tor.
143///
144/// Uses ephemeral session keys (not the node's registered identity) to
145/// prevent peers from linking queries to registered node identities.
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SearchRequest {
148    /// Random request identifier.
149    pub request_id: [u8; 16],
150    /// Query embedding vector (i16 quantized, 1536 dimensions).
151    pub query_embedding: Vec<i16>,
152    /// Optional query text for lexical search.
153    pub query_text: Option<String>,
154    /// Maximum results to return (capped at 20).
155    pub max_results: u8,
156    /// Whether to include Merkle inclusion proofs.
157    pub include_proofs: bool,
158    /// BLAKE3 hash of the embedding model used.
159    pub model_hash: [u8; 32],
160    /// Request timestamp (Unix milliseconds).
161    pub timestamp: i64,
162    /// Ed25519 signature over BLAKE3(CBOR(preceding fields)).
163    #[serde(with = "BigArray")]
164    pub signature: [u8; 64],
165    /// Ephemeral session public key (NOT the node identity key).
166    pub public_key: [u8; 32],
167}
168
169impl SearchRequest {
170    /// Compute the signing bytes: BLAKE3 of CBOR of all fields except signature.
171    pub fn signing_bytes(&self) -> [u8; 32] {
172        let signable = SearchRequestSignable {
173            request_id: &self.request_id,
174            query_embedding: &self.query_embedding,
175            query_text: &self.query_text,
176            max_results: self.max_results,
177            include_proofs: self.include_proofs,
178            model_hash: &self.model_hash,
179            timestamp: self.timestamp,
180            public_key: &self.public_key,
181        };
182        let mut buf = Vec::new();
183        ciborium::into_writer(&signable, &mut buf).expect("CBOR serialization cannot fail");
184        *blake3::hash(&buf).as_bytes()
185    }
186}
187
188#[derive(Serialize)]
189struct SearchRequestSignable<'a> {
190    request_id: &'a [u8; 16],
191    query_embedding: &'a Vec<i16>,
192    query_text: &'a Option<String>,
193    max_results: u8,
194    include_proofs: bool,
195    model_hash: &'a [u8; 32],
196    timestamp: i64,
197    public_key: &'a [u8; 32],
198}
199
200/// Status code in a search response.
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
202pub enum SearchStatus {
203    /// Search completed successfully.
204    Ok,
205    /// Peer uses a different embedding model.
206    ModelMismatch,
207    /// Peer is at capacity.
208    Overloaded,
209    /// Request was malformed or signature verification failed.
210    InvalidRequest,
211}
212
213/// A single search result from a remote peer.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct RemoteSearchResult {
216    /// Chunk identifier.
217    pub chunk_id: [u8; 16],
218    /// Chunk text content.
219    pub chunk_text: String,
220    /// Document path (relative).
221    pub document_path: String,
222    /// RRF score (CP-012 integer scale).
223    pub score: u32,
224    /// Optional Merkle inclusion proof against `peer_state_root`.
225    pub merkle_proof: Option<Vec<[u8; 32]>>,
226}
227
228/// Search response sent from peer back to querier.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct SearchResponse {
231    /// Echoed request identifier.
232    pub request_id: [u8; 16],
233    /// Status of the search.
234    pub status: SearchStatus,
235    /// Search results.
236    pub results: Vec<RemoteSearchResult>,
237    /// Peer's current Merkle state root.
238    pub peer_state_root: [u8; 32],
239    /// Time spent searching (milliseconds).
240    pub search_latency_ms: u16,
241    /// Response timestamp (Unix milliseconds).
242    pub timestamp: i64,
243    /// Ed25519 signature over BLAKE3(CBOR(preceding fields)).
244    #[serde(with = "BigArray")]
245    pub signature: [u8; 64],
246}
247
248impl SearchResponse {
249    /// Compute the signing bytes.
250    pub fn signing_bytes(&self) -> [u8; 32] {
251        let signable = SearchResponseSignable {
252            request_id: &self.request_id,
253            status: &self.status,
254            results: &self.results,
255            peer_state_root: &self.peer_state_root,
256            search_latency_ms: self.search_latency_ms,
257            timestamp: self.timestamp,
258        };
259        let mut buf = Vec::new();
260        ciborium::into_writer(&signable, &mut buf).expect("CBOR serialization cannot fail");
261        *blake3::hash(&buf).as_bytes()
262    }
263}
264
265#[derive(Serialize)]
266struct SearchResponseSignable<'a> {
267    request_id: &'a [u8; 16],
268    status: &'a SearchStatus,
269    results: &'a Vec<RemoteSearchResult>,
270    peer_state_root: &'a [u8; 32],
271    search_latency_ms: u16,
272    timestamp: i64,
273}
274
275// ============================================================================
276// Result merging types (Section 16)
277// ============================================================================
278
279/// Source of a merged search result.
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub enum ResultSource {
282    /// Result came from local graph only.
283    Local,
284    /// Result came from a remote peer.
285    Remote { peer_node_id: [u8; 16] },
286    /// Result found in both local and remote.
287    Both { peer_node_id: [u8; 16] },
288}
289
290/// A merged search result combining local and remote sources.
291#[derive(Debug, Clone)]
292pub struct MergedSearchResult {
293    pub chunk_id: [u8; 16],
294    pub chunk_text: String,
295    pub document_path: String,
296    pub score: f64,
297    pub source: ResultSource,
298    pub merkle_proof: Option<Vec<[u8; 32]>>,
299    pub peer_state_root: Option<[u8; 32]>,
300    pub peer_signature: Option<[u8; 64]>,
301}
302
303// ============================================================================
304// Circuit pool types (Section 15)
305// ============================================================================
306
307/// A warm circuit to a specific peer.
308#[derive(Debug, Clone)]
309pub struct WarmCircuit {
310    /// The peer this circuit connects to.
311    pub peer: PeerRegistration,
312    /// When this circuit was established (Unix ms).
313    pub created_at: i64,
314    /// When this circuit was last used for a query (Unix ms).
315    pub last_used: i64,
316    /// When the last keepalive probe was sent (Unix ms).
317    pub last_keepalive: i64,
318}
319
320impl WarmCircuit {
321    /// Check if this circuit needs a keepalive probe.
322    pub fn needs_keepalive(&self, now_ms: i64) -> bool {
323        let elapsed = (now_ms - self.last_keepalive) / 1000;
324        elapsed >= KEEPALIVE_INTERVAL_SECS as i64
325    }
326
327    /// Check if this circuit should be rotated.
328    pub fn needs_rotation(&self, now_ms: i64) -> bool {
329        let elapsed = (now_ms - self.created_at) / 1000;
330        elapsed >= CIRCUIT_ROTATION_SECS as i64
331    }
332}
333
334/// Peer selection score for circuit pool construction.
335#[derive(Debug, Clone)]
336pub struct PeerScore {
337    pub peer: PeerRegistration,
338    pub topic_overlap: f64,
339    pub rating: f64,
340    pub chunk_score: f64,
341    pub composite: f64,
342}
343
344impl PeerScore {
345    /// Compute composite score per CP-013 §15:
346    /// 50% topic overlap + 40% rating + 10% chunk count (log2 scale).
347    pub fn compute(
348        peer: PeerRegistration,
349        node_topics: &[String],
350        peer_rating: Option<f64>,
351    ) -> Self {
352        let topic_overlap = if node_topics.is_empty() || peer.topics.is_empty() {
353            0.5 // Neutral score when no topics specified
354        } else {
355            let matching = peer
356                .topics
357                .iter()
358                .filter(|t| node_topics.contains(t))
359                .count();
360            // Per CP-013 §15: denominator is the node's interest list length
361            matching as f64 / node_topics.len() as f64
362        };
363
364        let rating = peer_rating.unwrap_or(0.5); // Unrated peers get 0.5
365
366        let chunk_score = if peer.capabilities.chunk_count > 0 {
367            (peer.capabilities.chunk_count as f64).log2() / 20.0 // Normalize: log2(1M) ≈ 20
368        } else {
369            0.0
370        };
371
372        let composite = 0.5 * topic_overlap + 0.4 * rating + 0.1 * chunk_score.min(1.0);
373
374        Self {
375            peer,
376            topic_overlap,
377            rating,
378            chunk_score,
379            composite,
380        }
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    fn test_registration() -> PeerRegistration {
389        PeerRegistration {
390            onion_address: "a".repeat(56),
391            node_id: [1u8; 16],
392            public_key: [2u8; 32],
393            capabilities: PeerCapabilities::default(),
394            topics: vec!["science".to_string(), "math".to_string()],
395            embedding_model: [3u8; 32],
396            timestamp: 1000000,
397            signature: [0u8; 64],
398        }
399    }
400
401    #[test]
402    fn test_peer_capabilities_default() {
403        let caps = PeerCapabilities::default();
404        assert_eq!(caps.max_concurrent_queries, 4);
405        assert_eq!(caps.max_results, 20);
406        assert_eq!(caps.chunk_count, 0);
407        assert!(caps.supports_proofs);
408    }
409
410    #[test]
411    fn test_peer_registration_signing_bytes_deterministic() {
412        let reg = test_registration();
413        let bytes1 = reg.signing_bytes();
414        let bytes2 = reg.signing_bytes();
415        assert_eq!(bytes1, bytes2);
416    }
417
418    #[test]
419    fn test_peer_registration_signing_bytes_change_on_mutation() {
420        let mut reg1 = test_registration();
421        let mut reg2 = test_registration();
422        reg2.timestamp = 2000000;
423
424        assert_ne!(reg1.signing_bytes(), reg2.signing_bytes());
425
426        reg1.topics.push("physics".to_string());
427        assert_ne!(reg1.signing_bytes(), test_registration().signing_bytes());
428    }
429
430    #[test]
431    fn test_peer_registration_expiry() {
432        let reg = PeerRegistration {
433            timestamp: 1000,
434            ..test_registration()
435        };
436
437        // 71 hours later: not expired
438        let now_71h = 1000 + 71 * 3600 * 1000;
439        assert!(!reg.is_expired(now_71h));
440
441        // 73 hours later: expired
442        let now_73h = 1000 + 73 * 3600 * 1000;
443        assert!(reg.is_expired(now_73h));
444    }
445
446    #[test]
447    fn test_search_request_signing_bytes_deterministic() {
448        let req = SearchRequest {
449            request_id: [1u8; 16],
450            query_embedding: vec![100, -200, 300],
451            query_text: Some("test query".to_string()),
452            max_results: 10,
453            include_proofs: true,
454            model_hash: [5u8; 32],
455            timestamp: 1000000,
456            signature: [0u8; 64],
457            public_key: [6u8; 32],
458        };
459
460        let bytes1 = req.signing_bytes();
461        let bytes2 = req.signing_bytes();
462        assert_eq!(bytes1, bytes2);
463    }
464
465    #[test]
466    fn test_search_response_signing_bytes() {
467        let resp = SearchResponse {
468            request_id: [1u8; 16],
469            status: SearchStatus::Ok,
470            results: vec![RemoteSearchResult {
471                chunk_id: [2u8; 16],
472                chunk_text: "test chunk".to_string(),
473                document_path: "doc.md".to_string(),
474                score: 100,
475                merkle_proof: None,
476            }],
477            peer_state_root: [3u8; 32],
478            search_latency_ms: 150,
479            timestamp: 1000000,
480            signature: [0u8; 64],
481        };
482
483        let bytes = resp.signing_bytes();
484        assert_eq!(bytes.len(), 32);
485    }
486
487    #[test]
488    fn test_warm_circuit_keepalive() {
489        let circuit = WarmCircuit {
490            peer: test_registration(),
491            created_at: 0,
492            last_used: 0,
493            last_keepalive: 0,
494        };
495
496        // Before interval: no keepalive needed
497        let before = (KEEPALIVE_INTERVAL_SECS as i64 - 10) * 1000;
498        assert!(!circuit.needs_keepalive(before));
499
500        // After interval: keepalive needed
501        let after = (KEEPALIVE_INTERVAL_SECS as i64 + 10) * 1000;
502        assert!(circuit.needs_keepalive(after));
503    }
504
505    #[test]
506    fn test_warm_circuit_rotation() {
507        let circuit = WarmCircuit {
508            peer: test_registration(),
509            created_at: 0,
510            last_used: 0,
511            last_keepalive: 0,
512        };
513
514        // Before rotation window: no rotation
515        let before = (CIRCUIT_ROTATION_SECS as i64 - 10) * 1000;
516        assert!(!circuit.needs_rotation(before));
517
518        // After rotation window: rotate
519        let after = (CIRCUIT_ROTATION_SECS as i64 + 10) * 1000;
520        assert!(circuit.needs_rotation(after));
521    }
522
523    #[test]
524    fn test_peer_score_computation() {
525        let reg = PeerRegistration {
526            capabilities: PeerCapabilities {
527                chunk_count: 100_000,
528                ..PeerCapabilities::default()
529            },
530            ..test_registration()
531        };
532
533        let node_topics = vec![
534            "science".to_string(),
535            "math".to_string(),
536            "physics".to_string(),
537        ];
538        let score = PeerScore::compute(reg, &node_topics, Some(0.8));
539
540        // topic overlap: 2/3 = 0.667
541        assert!(score.topic_overlap > 0.6 && score.topic_overlap < 0.7);
542        assert!((score.rating - 0.8).abs() < 1e-6);
543        assert!(score.chunk_score > 0.0);
544        assert!(score.composite > 0.0 && score.composite < 1.0);
545    }
546
547    #[test]
548    fn test_peer_score_unrated() {
549        let reg = test_registration();
550        let score = PeerScore::compute(reg, &[], None);
551
552        assert!((score.rating - 0.5).abs() < 1e-6);
553        assert!((score.topic_overlap - 0.5).abs() < 1e-6);
554    }
555
556    #[test]
557    fn test_search_status_serialization() {
558        let statuses = vec![
559            SearchStatus::Ok,
560            SearchStatus::ModelMismatch,
561            SearchStatus::Overloaded,
562            SearchStatus::InvalidRequest,
563        ];
564
565        for status in statuses {
566            let mut buf = Vec::new();
567            ciborium::into_writer(&status, &mut buf).unwrap();
568            let decoded: SearchStatus = ciborium::from_reader(buf.as_slice()).unwrap();
569            assert_eq!(decoded, status);
570        }
571    }
572
573    #[test]
574    fn test_cbor_roundtrip_search_request() {
575        let req = SearchRequest {
576            request_id: [7u8; 16],
577            query_embedding: vec![1, 2, 3, -4, -5],
578            query_text: Some("what is quantum computing?".to_string()),
579            max_results: 10,
580            include_proofs: true,
581            model_hash: [9u8; 32],
582            timestamp: 1234567890,
583            signature: [10u8; 64],
584            public_key: [11u8; 32],
585        };
586
587        let mut buf = Vec::new();
588        ciborium::into_writer(&req, &mut buf).unwrap();
589        let decoded: SearchRequest = ciborium::from_reader(buf.as_slice()).unwrap();
590
591        assert_eq!(decoded.request_id, req.request_id);
592        assert_eq!(decoded.query_embedding, req.query_embedding);
593        assert_eq!(decoded.query_text, req.query_text);
594        assert_eq!(decoded.max_results, req.max_results);
595        assert_eq!(decoded.model_hash, req.model_hash);
596        assert_eq!(decoded.public_key, req.public_key);
597    }
598
599    #[test]
600    fn test_cbor_roundtrip_search_response() {
601        let resp = SearchResponse {
602            request_id: [1u8; 16],
603            status: SearchStatus::Ok,
604            results: vec![
605                RemoteSearchResult {
606                    chunk_id: [2u8; 16],
607                    chunk_text: "result one".to_string(),
608                    document_path: "docs/a.md".to_string(),
609                    score: 500,
610                    merkle_proof: Some(vec![[3u8; 32], [4u8; 32]]),
611                },
612                RemoteSearchResult {
613                    chunk_id: [5u8; 16],
614                    chunk_text: "result two".to_string(),
615                    document_path: "docs/b.md".to_string(),
616                    score: 300,
617                    merkle_proof: None,
618                },
619            ],
620            peer_state_root: [6u8; 32],
621            search_latency_ms: 250,
622            timestamp: 9876543210,
623            signature: [7u8; 64],
624        };
625
626        let mut buf = Vec::new();
627        ciborium::into_writer(&resp, &mut buf).unwrap();
628        let decoded: SearchResponse = ciborium::from_reader(buf.as_slice()).unwrap();
629
630        assert_eq!(decoded.request_id, resp.request_id);
631        assert_eq!(decoded.status, SearchStatus::Ok);
632        assert_eq!(decoded.results.len(), 2);
633        assert_eq!(decoded.results[0].chunk_text, "result one");
634        assert_eq!(decoded.results[1].score, 300);
635        assert!(decoded.results[0].merkle_proof.is_some());
636        assert!(decoded.results[1].merkle_proof.is_none());
637    }
638
639    #[test]
640    fn test_cbor_roundtrip_peer_registration() {
641        let reg = test_registration();
642
643        let mut buf = Vec::new();
644        ciborium::into_writer(&reg, &mut buf).unwrap();
645        let decoded: PeerRegistration = ciborium::from_reader(buf.as_slice()).unwrap();
646
647        assert_eq!(decoded.onion_address, reg.onion_address);
648        assert_eq!(decoded.node_id, reg.node_id);
649        assert_eq!(decoded.public_key, reg.public_key);
650        assert_eq!(decoded.topics, reg.topics);
651        assert_eq!(decoded.embedding_model, reg.embedding_model);
652        assert_eq!(decoded.timestamp, reg.timestamp);
653    }
654}