1use crate::error::{GuardianError, Result};
6use crate::ipfs_core_api::config::ClientConfig;
7use crate::ipfs_log::identity_provider::Keystore;
8use crate::keystore::SledKeystore;
9use chrono::{DateTime, Utc};
10use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
11use libp2p::{PeerId, identity::Keypair as LibP2PKeypair};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::sync::Arc;
15use std::time::{Duration, SystemTime};
16use tokio::sync::{Mutex, RwLock};
17use tracing::{debug, info, warn};
18use uuid::Uuid;
19
20const SYNC_PROTOCOL_VERSION: u32 = 1;
22
23const MAX_MESSAGE_AGE: Duration = Duration::from_secs(300); #[allow(dead_code)]
28const MAX_SYNC_RETRIES: u8 = 3;
29
30const MAX_SYNC_QUEUE_SIZE: usize = 1000;
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub enum KeySyncStatus {
36 Synchronized,
38 Synchronizing,
40 Pending,
42 Failed(String),
44 Conflict(String),
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
50pub enum SyncOperation {
51 Create,
53 Update,
55 Delete,
57 MetadataSync,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct KeyMetadata {
64 pub key_id: String,
66 pub version: u64,
68 pub last_modified: DateTime<Utc>,
70 pub creator: PeerId,
72 pub signature: Vec<u8>,
74 pub crypto_algorithm: String,
76 pub public_key_hash: Vec<u8>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct SyncMessage {
83 pub message_id: Uuid,
85 pub protocol_version: u32,
87 pub timestamp: SystemTime,
89 pub sender: PeerId,
91 pub operation: SyncOperation,
93 pub metadata: KeyMetadata,
95 pub key_data: Option<Vec<u8>>,
97 pub message_signature: Vec<u8>,
99}
100
101#[derive(Debug, Clone)]
103#[allow(dead_code)]
104struct SyncQueueEntry {
105 #[allow(dead_code)]
107 message: SyncMessage,
108 #[allow(dead_code)]
110 retry_count: u8,
111 #[allow(dead_code)]
113 next_retry: SystemTime,
114 #[allow(dead_code)]
116 target_peers: Vec<PeerId>,
117}
118
119#[derive(Debug, Default, Clone, Serialize, Deserialize)]
121pub struct SyncStatistics {
122 pub messages_synced: u64,
124 pub pending_messages: u64,
126 pub conflicts_detected: u64,
128 pub conflicts_resolved: u64,
130 pub success_rate: f64,
132 pub avg_sync_latency_ms: f64,
134 pub active_peers: u32,
136}
137
138pub struct KeySynchronizer {
140 #[allow(dead_code)]
142 config: ClientConfig,
143 local_keystore: Arc<SledKeystore>,
145 node_keypair: LibP2PKeypair,
147 peer_id: PeerId,
149 synchronized_keys: Arc<RwLock<HashMap<String, KeyMetadata>>>,
151 sync_status: Arc<RwLock<HashMap<String, KeySyncStatus>>>,
153 sync_queue: Arc<Mutex<VecDeque<SyncQueueEntry>>>,
155 message_cache: Arc<RwLock<HashMap<Uuid, SystemTime>>>,
157 statistics: Arc<RwLock<SyncStatistics>>,
159 trusted_peers: Arc<RwLock<HashMap<PeerId, VerifyingKey>>>,
161}
162
163impl KeySynchronizer {
164 pub async fn new(config: &ClientConfig) -> Result<Self> {
166 let keystore_path = config
167 .data_store_path
168 .as_ref()
169 .map(|p| p.join("keystore"))
170 .unwrap_or_else(|| std::env::temp_dir().join("guardian_keystore"));
171
172 let local_keystore = Arc::new(SledKeystore::new(Some(keystore_path))?);
173
174 let node_keypair = Self::load_or_generate_keypair(&local_keystore).await?;
176 let peer_id = PeerId::from_public_key(&node_keypair.public());
177
178 info!(
179 "Inicializando sincronizador de chaves para PeerID: {}",
180 peer_id
181 );
182
183 Ok(Self {
184 config: config.clone(),
185 local_keystore,
186 node_keypair,
187 peer_id,
188 synchronized_keys: Arc::new(RwLock::new(HashMap::new())),
189 sync_status: Arc::new(RwLock::new(HashMap::new())),
190 sync_queue: Arc::new(Mutex::new(VecDeque::new())),
191 message_cache: Arc::new(RwLock::new(HashMap::new())),
192 statistics: Arc::new(RwLock::new(SyncStatistics::default())),
193 trusted_peers: Arc::new(RwLock::new(HashMap::new())),
194 })
195 }
196
197 pub fn peer_id(&self) -> PeerId {
199 self.peer_id
200 }
201
202 pub fn keypair(&self) -> &LibP2PKeypair {
204 &self.node_keypair
205 }
206
207 async fn load_or_generate_keypair(keystore: &SledKeystore) -> Result<LibP2PKeypair> {
209 const MAIN_KEYPAIR_KEY: &str = "main_node_keypair";
210
211 if let Some(keypair) = keystore.get_keypair(MAIN_KEYPAIR_KEY).await? {
213 debug!("Carregando keypair principal existente");
214 return Ok(keypair);
215 }
216
217 let keypair = LibP2PKeypair::generate_ed25519();
219 keystore.put_keypair(MAIN_KEYPAIR_KEY, &keypair).await?;
220
221 info!("Novo keypair principal gerado e salvo");
222 Ok(keypair)
223 }
224
225 pub async fn add_trusted_peer(&self, peer_id: PeerId, public_key: VerifyingKey) -> Result<()> {
227 let mut trusted = self.trusted_peers.write().await;
228 trusted.insert(peer_id, public_key);
229 info!("Peer confiável adicionado: {}", peer_id);
230 Ok(())
231 }
232
233 pub async fn remove_trusted_peer(&self, peer_id: &PeerId) -> Result<bool> {
235 let mut trusted = self.trusted_peers.write().await;
236 let removed = trusted.remove(peer_id).is_some();
237 if removed {
238 info!("Peer removido da lista de confiança: {}", peer_id);
239 }
240 Ok(removed)
241 }
242
243 pub async fn sync_key(&self, key_id: &str, operation: SyncOperation) -> Result<()> {
245 debug!(
246 "Iniciando sincronização da chave: {} (operação: {:?})",
247 key_id, operation
248 );
249
250 let metadata = self.get_key_metadata(key_id).await?;
252
253 let message = self.create_sync_message(operation, metadata, None).await?;
255
256 self.enqueue_sync_message(message).await?;
258
259 self.update_sync_status(key_id, KeySyncStatus::Synchronizing)
261 .await;
262
263 Ok(())
264 }
265
266 pub async fn handle_sync_message(&self, message: SyncMessage) -> Result<()> {
268 if self.is_message_too_old(&message)? {
270 warn!(
271 "Mensagem de sincronização rejeitada (muito antiga): {:?}",
272 message.message_id
273 );
274 return Err(GuardianError::Other("Mensagem muito antiga".to_string()));
275 }
276
277 if self.is_message_duplicate(&message).await? {
279 debug!("Mensagem duplicada ignorada: {:?}", message.message_id);
280 return Ok(());
281 }
282
283 self.verify_message_signature(&message).await?;
285
286 match message.operation {
288 SyncOperation::Create => self.handle_key_create(&message).await?,
289 SyncOperation::Update => self.handle_key_update(&message).await?,
290 SyncOperation::Delete => self.handle_key_delete(&message).await?,
291 SyncOperation::MetadataSync => self.handle_metadata_sync(&message).await?,
292 }
293
294 self.cache_processed_message(&message).await;
296
297 self.update_statistics().await;
299
300 Ok(())
301 }
302
303 async fn get_key_metadata(&self, key_id: &str) -> Result<KeyMetadata> {
305 let synchronized_keys = self.synchronized_keys.read().await;
306
307 if let Some(metadata) = synchronized_keys.get(key_id) {
308 return Ok(metadata.clone());
309 }
310
311 let keypair = self
313 .local_keystore
314 .get_keypair(key_id)
315 .await?
316 .ok_or_else(|| GuardianError::Other(format!("Chave não encontrada: {}", key_id)))?;
317
318 let public_key_hash = blake3::hash(&keypair.public().encode_protobuf())
319 .as_bytes()
320 .to_vec();
321
322 let metadata = KeyMetadata {
323 key_id: key_id.to_string(),
324 version: 1,
325 last_modified: Utc::now(),
326 creator: self.peer_id,
327 signature: Vec::new(), crypto_algorithm: "Ed25519".to_string(),
329 public_key_hash,
330 };
331
332 Ok(metadata)
333 }
334
335 async fn create_sync_message(
337 &self,
338 operation: SyncOperation,
339 metadata: KeyMetadata,
340 key_data: Option<Vec<u8>>,
341 ) -> Result<SyncMessage> {
342 let message = SyncMessage {
343 message_id: Uuid::new_v4(),
344 protocol_version: SYNC_PROTOCOL_VERSION,
345 timestamp: SystemTime::now(),
346 sender: self.peer_id,
347 operation,
348 metadata,
349 key_data,
350 message_signature: Vec::new(), };
352
353 let signed_message = self.sign_sync_message(message).await?;
355
356 Ok(signed_message)
357 }
358
359 async fn sign_sync_message(&self, mut message: SyncMessage) -> Result<SyncMessage> {
361 let mut message_copy = message.clone();
363 message_copy.message_signature.clear();
364
365 let message_bytes =
366 bincode::serde::encode_to_vec(&message_copy, bincode::config::standard())
367 .map_err(|e| GuardianError::Other(format!("Erro ao serializar mensagem: {}", e)))?;
368
369 let signature = if let Ok(ed25519_keypair) = self.node_keypair.clone().try_into_ed25519() {
371 let secret_bytes = ed25519_keypair.secret().as_ref().to_vec();
372 let signing_key = SigningKey::try_from(&secret_bytes[..32]).map_err(|e| {
373 GuardianError::Other(format!("Erro ao criar chave de assinatura: {}", e))
374 })?;
375 signing_key.sign(&message_bytes).to_bytes().to_vec()
376 } else {
377 return Err(GuardianError::Other(
378 "Tipo de chave não suportado para assinatura".to_string(),
379 ));
380 };
381
382 message.message_signature = signature;
383 Ok(message)
384 }
385
386 async fn verify_message_signature(&self, message: &SyncMessage) -> Result<()> {
388 let trusted_peers = self.trusted_peers.read().await;
390 let verifying_key = trusted_peers.get(&message.sender).ok_or_else(|| {
391 GuardianError::Other(format!("Peer não confiável: {}", message.sender))
392 })?;
393
394 let mut message_copy = message.clone();
396 message_copy.message_signature.clear();
397
398 let message_bytes =
399 bincode::serde::encode_to_vec(&message_copy, bincode::config::standard())
400 .map_err(|e| GuardianError::Other(format!("Erro ao serializar mensagem: {}", e)))?;
401
402 let signature = Signature::from_slice(&message.message_signature)
404 .map_err(|e| GuardianError::Other(format!("Assinatura inválida: {}", e)))?;
405
406 verifying_key
407 .verify(&message_bytes, &signature)
408 .map_err(|e| {
409 GuardianError::Other(format!("Verificação de assinatura falhou: {}", e))
410 })?;
411
412 Ok(())
413 }
414
415 fn is_message_too_old(&self, message: &SyncMessage) -> Result<bool> {
417 let now = SystemTime::now();
418 let age = now
419 .duration_since(message.timestamp)
420 .map_err(|_| GuardianError::Other("Timestamp inválido".to_string()))?;
421
422 Ok(age > MAX_MESSAGE_AGE)
423 }
424
425 async fn is_message_duplicate(&self, message: &SyncMessage) -> Result<bool> {
427 let cache = self.message_cache.read().await;
428 Ok(cache.contains_key(&message.message_id))
429 }
430
431 async fn enqueue_sync_message(&self, message: SyncMessage) -> Result<()> {
433 let mut queue = self.sync_queue.lock().await;
434
435 if queue.len() >= MAX_SYNC_QUEUE_SIZE {
437 queue.pop_front();
439 warn!("Fila de sincronização cheia, removendo mensagem mais antiga");
440 }
441
442 let entry = SyncQueueEntry {
443 message,
444 retry_count: 0,
445 next_retry: SystemTime::now(),
446 target_peers: Vec::new(), };
448
449 queue.push_back(entry);
450 debug!("Mensagem adicionada à fila de sincronização");
451
452 Ok(())
453 }
454
455 async fn handle_key_create(&self, message: &SyncMessage) -> Result<()> {
457 let key_id = &message.metadata.key_id;
458
459 if self.local_keystore.has(key_id).await? {
461 let local_metadata = self.get_key_metadata(key_id).await?;
463 if local_metadata.version >= message.metadata.version {
464 debug!("Chave já existe com versão igual ou superior: {}", key_id);
465 return Ok(());
466 }
467 }
468
469 if let Some(key_data) = &message.key_data {
471 self.local_keystore.put(key_id, key_data).await?;
472 }
473
474 let mut synchronized_keys = self.synchronized_keys.write().await;
476 synchronized_keys.insert(key_id.clone(), message.metadata.clone());
477
478 self.update_sync_status(key_id, KeySyncStatus::Synchronized)
479 .await;
480
481 info!("Chave criada via sincronização: {}", key_id);
482 Ok(())
483 }
484
485 async fn handle_key_update(&self, message: &SyncMessage) -> Result<()> {
487 let key_id = &message.metadata.key_id;
488
489 if !self.local_keystore.has(key_id).await? {
491 warn!("Tentativa de atualizar chave inexistente: {}", key_id);
492 return Err(GuardianError::Other(format!(
493 "Chave não encontrada: {}",
494 key_id
495 )));
496 }
497
498 let local_metadata = self.get_key_metadata(key_id).await?;
500 if local_metadata.version > message.metadata.version {
501 warn!("Conflito de versão detectado para chave: {}", key_id);
502 self.update_sync_status(
503 key_id,
504 KeySyncStatus::Conflict(format!(
505 "Local: v{}, Remoto: v{}",
506 local_metadata.version, message.metadata.version
507 )),
508 )
509 .await;
510 return Ok(());
511 }
512
513 if let Some(key_data) = &message.key_data {
515 self.local_keystore.put(key_id, key_data).await?;
516 }
517
518 let mut synchronized_keys = self.synchronized_keys.write().await;
520 synchronized_keys.insert(key_id.clone(), message.metadata.clone());
521
522 self.update_sync_status(key_id, KeySyncStatus::Synchronized)
523 .await;
524
525 info!("Chave atualizada via sincronização: {}", key_id);
526 Ok(())
527 }
528
529 async fn handle_key_delete(&self, message: &SyncMessage) -> Result<()> {
531 let key_id = &message.metadata.key_id;
532
533 self.local_keystore.delete(key_id).await?;
535
536 let mut synchronized_keys = self.synchronized_keys.write().await;
538 synchronized_keys.remove(key_id);
539
540 let mut sync_status = self.sync_status.write().await;
541 sync_status.remove(key_id);
542
543 info!("Chave deletada via sincronização: {}", key_id);
544 Ok(())
545 }
546
547 async fn handle_metadata_sync(&self, message: &SyncMessage) -> Result<()> {
549 let key_id = &message.metadata.key_id;
550
551 let mut synchronized_keys = self.synchronized_keys.write().await;
553 synchronized_keys.insert(key_id.clone(), message.metadata.clone());
554
555 debug!("Metadados sincronizados para chave: {}", key_id);
556 Ok(())
557 }
558
559 async fn cache_processed_message(&self, message: &SyncMessage) {
561 let mut cache = self.message_cache.write().await;
562 cache.insert(message.message_id, SystemTime::now());
563
564 let cutoff = SystemTime::now() - MAX_MESSAGE_AGE;
566 cache.retain(|_, timestamp| *timestamp > cutoff);
567 }
568
569 async fn update_sync_status(&self, key_id: &str, status: KeySyncStatus) {
571 let mut sync_status = self.sync_status.write().await;
572 sync_status.insert(key_id.to_string(), status);
573 }
574
575 async fn update_statistics(&self) {
577 let mut stats = self.statistics.write().await;
578 stats.messages_synced += 1;
579
580 let queue = self.sync_queue.lock().await;
581 stats.pending_messages = queue.len() as u64;
582
583 let trusted_peers = self.trusted_peers.read().await;
584 stats.active_peers = trusted_peers.len() as u32;
585
586 let sync_status = self.sync_status.read().await;
588 let total_keys = sync_status.len() as u64;
589 let synchronized_keys = sync_status
590 .values()
591 .filter(|status| matches!(status, KeySyncStatus::Synchronized))
592 .count() as u64;
593
594 stats.success_rate = if total_keys > 0 {
595 (synchronized_keys as f64 / total_keys as f64) * 100.0
596 } else {
597 100.0
598 };
599 }
600
601 pub async fn get_statistics(&self) -> SyncStatistics {
603 self.statistics.read().await.clone()
604 }
605
606 pub async fn get_key_sync_status(&self, key_id: &str) -> Option<KeySyncStatus> {
608 let sync_status = self.sync_status.read().await;
609 sync_status.get(key_id).cloned()
610 }
611
612 pub async fn list_synchronized_keys(&self) -> Vec<String> {
614 let synchronized_keys = self.synchronized_keys.read().await;
615 synchronized_keys.keys().cloned().collect()
616 }
617
618 pub async fn force_full_sync(&self) -> Result<()> {
620 info!("Iniciando sincronização completa forçada");
621
622 let keys = self.local_keystore.list_keys().await?;
623 for key_id in keys {
624 self.sync_key(&key_id, SyncOperation::MetadataSync).await?;
625 }
626
627 info!(
628 "Sincronização completa forçada iniciada para {} chaves",
629 self.synchronized_keys.read().await.len()
630 );
631 Ok(())
632 }
633
634 pub async fn export_sync_config(&self) -> Result<Vec<u8>> {
636 let config = SyncExportConfig {
637 peer_id: self.peer_id,
638 trusted_peers: self.trusted_peers.read().await.clone(),
639 synchronized_keys: self.synchronized_keys.read().await.clone(),
640 statistics: self.statistics.read().await.clone(),
641 };
642
643 bincode::serde::encode_to_vec(&config, bincode::config::standard())
644 .map_err(|e| GuardianError::Other(format!("Erro ao exportar configuração: {}", e)))
645 }
646}
647
648#[derive(Debug, Serialize, Deserialize)]
650struct SyncExportConfig {
651 peer_id: PeerId,
652 trusted_peers: HashMap<PeerId, VerifyingKey>,
653 synchronized_keys: HashMap<String, KeyMetadata>,
654 statistics: SyncStatistics,
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660 use tempdir::TempDir;
661
662 #[tokio::test]
663 async fn test_key_synchronizer_creation() {
664 let temp_dir = TempDir::new("test_sync").unwrap();
665 let config = ClientConfig {
666 data_store_path: Some(temp_dir.path().to_path_buf()),
667 ..Default::default()
668 };
669
670 let synchronizer = KeySynchronizer::new(&config).await.unwrap();
671 assert!(!synchronizer.peer_id().to_string().is_empty());
672 }
673
674 #[tokio::test]
675 async fn test_sync_message_creation_and_verification() {
676 let temp_dir = TempDir::new("test_sync").unwrap();
677 let config = ClientConfig {
678 data_store_path: Some(temp_dir.path().to_path_buf()),
679 ..Default::default()
680 };
681
682 let synchronizer = KeySynchronizer::new(&config).await.unwrap();
683
684 let metadata = KeyMetadata {
686 key_id: "test_key".to_string(),
687 version: 1,
688 last_modified: Utc::now(),
689 creator: synchronizer.peer_id(),
690 signature: Vec::new(),
691 crypto_algorithm: "Ed25519".to_string(),
692 public_key_hash: vec![1, 2, 3, 4],
693 };
694
695 let message = synchronizer
697 .create_sync_message(SyncOperation::Create, metadata, Some(b"test_data".to_vec()))
698 .await
699 .unwrap();
700
701 assert_eq!(message.protocol_version, SYNC_PROTOCOL_VERSION);
703 assert_eq!(message.sender, synchronizer.peer_id());
704 assert_eq!(message.operation, SyncOperation::Create);
705 assert!(!message.message_signature.is_empty());
706 }
707}