Skip to main content

exo_federation/
lib.rs

1//! # exo-federation: Distributed Cognitive Mesh
2//!
3//! This crate implements federated substrate networking with:
4//! - Post-quantum cryptographic handshakes
5//! - Privacy-preserving onion routing
6//! - CRDT-based eventual consistency
7//! - Byzantine fault-tolerant consensus
8//!
9//! ## Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────┐
13//! │      FederatedMesh (Coordinator)        │
14//! ├─────────────────────────────────────────┤
15//! │ • Local substrate instance              │
16//! │ • Consensus coordination                │
17//! │ • Federation gateway                    │
18//! │ • Cryptographic identity                │
19//! └─────────────────────────────────────────┘
20//!          │           │           │
21//!    ┌─────┘           │           └─────┐
22//!    ▼                 ▼                 ▼
23//! Handshake         Onion            CRDT
24//! Protocol          Router      Reconciliation
25//! ```
26
27use dashmap::DashMap;
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30use tokio::sync::RwLock;
31
32pub mod consensus;
33pub mod crdt;
34pub mod crypto;
35pub mod handshake;
36pub mod onion;
37pub mod transfer_crdt;
38
39pub use consensus::{byzantine_commit, CommitProof};
40pub use crdt::{reconcile_crdt, GSet, LWWRegister};
41pub use crypto::{EncryptedChannel, PostQuantumKeypair};
42pub use handshake::{join_federation, Capability, FederationToken};
43pub use onion::{onion_query, OnionHeader};
44
45/// Errors that can occur in federation operations
46#[derive(Debug, thiserror::Error)]
47pub enum FederationError {
48    #[error("Cryptographic operation failed: {0}")]
49    CryptoError(String),
50
51    #[error("Network error: {0}")]
52    NetworkError(String),
53
54    #[error("Consensus failed: {0}")]
55    ConsensusError(String),
56
57    #[error("Invalid federation token")]
58    InvalidToken,
59
60    #[error("Insufficient peers for consensus: needed {needed}, got {actual}")]
61    InsufficientPeers { needed: usize, actual: usize },
62
63    #[error("CRDT reconciliation failed: {0}")]
64    ReconciliationError(String),
65
66    #[error("Peer not found: {0}")]
67    PeerNotFound(String),
68}
69
70pub type Result<T> = std::result::Result<T, FederationError>;
71
72/// Unique identifier for a peer in the federation
73#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
74pub struct PeerId(pub String);
75
76impl PeerId {
77    pub fn new(id: String) -> Self {
78        Self(id)
79    }
80
81    pub fn generate() -> Self {
82        use sha2::{Digest, Sha256};
83        let mut hasher = Sha256::new();
84        hasher.update(rand::random::<[u8; 32]>());
85        let hash = hasher.finalize();
86        Self(hex::encode(&hash[..16]))
87    }
88}
89
90/// Network address for a peer
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PeerAddress {
93    pub host: String,
94    pub port: u16,
95    pub public_key: Vec<u8>,
96}
97
98impl PeerAddress {
99    pub fn new(host: String, port: u16, public_key: Vec<u8>) -> Self {
100        Self {
101            host,
102            port,
103            public_key,
104        }
105    }
106}
107
108/// Scope for federated queries
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub enum FederationScope {
111    /// Query only local instance
112    Local,
113    /// Query direct peers only
114    Direct,
115    /// Query entire federation (multi-hop)
116    Global { max_hops: usize },
117}
118
119/// Result from a federated query
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct FederatedResult {
122    pub source: PeerId,
123    pub data: Vec<u8>,
124    pub score: f32,
125    pub timestamp: u64,
126}
127
128/// State update for consensus
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct StateUpdate {
131    pub update_id: String,
132    pub data: Vec<u8>,
133    pub timestamp: u64,
134}
135
136/// Substrate instance placeholder (will reference exo-core types)
137pub struct SubstrateInstance {
138    // Placeholder - will integrate with actual substrate
139}
140
141/// Federated cognitive mesh coordinator
142pub struct FederatedMesh {
143    /// Unique identifier for this node
144    pub local_id: PeerId,
145
146    /// Local substrate instance
147    pub local: Arc<RwLock<SubstrateInstance>>,
148
149    /// Post-quantum cryptographic keypair
150    pub pq_keys: PostQuantumKeypair,
151
152    /// Connected peers
153    pub peers: Arc<DashMap<PeerId, PeerAddress>>,
154
155    /// Active federation tokens
156    pub tokens: Arc<DashMap<PeerId, FederationToken>>,
157
158    /// Encrypted channels to peers
159    pub channels: Arc<DashMap<PeerId, EncryptedChannel>>,
160}
161
162impl FederatedMesh {
163    /// Create a new federated mesh node
164    pub fn new(local: SubstrateInstance) -> Result<Self> {
165        let local_id = PeerId::generate();
166        let pq_keys = PostQuantumKeypair::generate();
167
168        Ok(Self {
169            local_id,
170            local: Arc::new(RwLock::new(local)),
171            pq_keys,
172            peers: Arc::new(DashMap::new()),
173            tokens: Arc::new(DashMap::new()),
174            channels: Arc::new(DashMap::new()),
175        })
176    }
177
178    /// Join a federation by connecting to a peer
179    pub async fn join_federation(&mut self, peer: &PeerAddress) -> Result<FederationToken> {
180        let token = join_federation(&self.pq_keys, peer).await?;
181
182        // Store the peer and token
183        let peer_id = PeerId::new(token.peer_id.clone());
184        self.peers.insert(peer_id.clone(), peer.clone());
185        self.tokens.insert(peer_id, token.clone());
186
187        Ok(token)
188    }
189
190    /// Execute a federated query across the mesh
191    pub async fn federated_query(
192        &self,
193        query: Vec<u8>,
194        scope: FederationScope,
195    ) -> Result<Vec<FederatedResult>> {
196        match scope {
197            FederationScope::Local => {
198                // Query only local instance
199                Ok(vec![FederatedResult {
200                    source: self.local_id.clone(),
201                    data: query, // Placeholder
202                    score: 1.0,
203                    timestamp: current_timestamp(),
204                }])
205            }
206            FederationScope::Direct => {
207                // Query direct peers
208                let mut results = Vec::new();
209
210                for entry in self.peers.iter() {
211                    let peer_id = entry.key().clone();
212                    // Placeholder: would actually send query to peer
213                    results.push(FederatedResult {
214                        source: peer_id,
215                        data: query.clone(),
216                        score: 0.8,
217                        timestamp: current_timestamp(),
218                    });
219                }
220
221                Ok(results)
222            }
223            FederationScope::Global { max_hops } => {
224                // Use onion routing for privacy
225                let _relay_nodes: Vec<_> = self
226                    .peers
227                    .iter()
228                    .take(max_hops)
229                    .map(|e| e.key().clone())
230                    .collect();
231
232                // Placeholder: would use onion_query
233                Ok(vec![])
234            }
235        }
236    }
237
238    /// Commit a state update with Byzantine consensus
239    pub async fn byzantine_commit(&self, update: StateUpdate) -> Result<CommitProof> {
240        let peer_count = self.peers.len() + 1; // +1 for local
241        byzantine_commit(update, peer_count).await
242    }
243
244    /// Get the count of peers in the federation
245    pub fn peer_count(&self) -> usize {
246        self.peers.len()
247    }
248}
249
250/// Get current timestamp in milliseconds
251fn current_timestamp() -> u64 {
252    use std::time::{SystemTime, UNIX_EPOCH};
253    SystemTime::now()
254        .duration_since(UNIX_EPOCH)
255        .unwrap()
256        .as_millis() as u64
257}
258
259// Re-export hex for PeerId
260use hex;
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[tokio::test]
267    async fn test_federated_mesh_creation() {
268        let substrate = SubstrateInstance {};
269        let mesh = FederatedMesh::new(substrate).unwrap();
270        assert_eq!(mesh.peer_count(), 0);
271    }
272
273    #[tokio::test]
274    async fn test_local_query() {
275        let substrate = SubstrateInstance {};
276        let mesh = FederatedMesh::new(substrate).unwrap();
277
278        let results = mesh
279            .federated_query(vec![1, 2, 3], FederationScope::Local)
280            .await
281            .unwrap();
282
283        assert_eq!(results.len(), 1);
284    }
285}