1use async_trait::async_trait;
7use dashmap::DashMap;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use thiserror::Error;
13use tokio::fs::{self, File};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tracing::{debug, error, info, warn};
16
17use qudag_dag::vertex::{Vertex, VertexId};
19use qudag_network::dark_resolver::DarkDomainRecord;
20use qudag_network::types::PeerId;
21
22pub type Result<T> = std::result::Result<T, PersistenceError>;
24
25pub const CURRENT_STATE_VERSION: u32 = 1;
27
28#[derive(Debug, Error)]
30pub enum PersistenceError {
31 #[error("IO error: {0}")]
33 Io(#[from] std::io::Error),
34
35 #[error("Serialization error: {0}")]
37 Serialization(String),
38
39 #[error("Data corruption detected: {0}")]
41 Corruption(String),
42
43 #[error("Directory creation failed: {0}")]
45 DirectoryCreation(String),
46
47 #[error("File not found: {0}")]
49 FileNotFound(String),
50
51 #[error("Invalid data format: {0}")]
53 InvalidFormat(String),
54
55 #[error("Lock acquisition timeout")]
57 LockTimeout,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct PeerInfo {
63 pub address: String,
65 pub last_seen: u64,
67 pub reputation: u8,
69 pub trusted: bool,
71 pub connection_count: u64,
73 pub bytes_exchanged: u64,
75 pub metadata: HashMap<String, String>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct PersistedDagState {
82 pub version: u32,
84 pub node_id: Vec<u8>,
86 pub protocol_state: crate::state::ProtocolState,
88 pub sessions: HashMap<uuid::Uuid, crate::state::SessionInfo>,
90 pub peers: Vec<(PeerId, PeerInfo)>,
92 pub dag_state: DagState,
94 pub metrics: crate::state::StateMachineMetrics,
96 pub last_saved: u64,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DagState {
103 pub vertices: HashMap<VertexId, Vertex>,
105 pub tips: std::collections::HashSet<VertexId>,
107 pub voting_records: HashMap<VertexId, VotingRecord>,
109 pub last_checkpoint: Option<CheckpointInfo>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct VotingRecord {
116 pub vertex_id: VertexId,
118 pub votes: HashMap<Vec<u8>, bool>,
120 pub started_at: u64,
122 pub status: ConsensusStatus,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub enum ConsensusStatus {
129 Pending,
131 Accepted,
133 Rejected,
135 TimedOut,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct CheckpointInfo {
142 pub id: Vec<u8>,
144 pub timestamp: u64,
146 pub vertex_count: usize,
148 pub merkle_root: Vec<u8>,
150}
151
152pub type PersistedState = PersistedDagState;
154
155pub type MemoryBackend = MemoryStateStore;
157pub type SqliteBackend = FileStateStore; pub type PersistenceManager = Arc<dyn StateStore + Send + Sync>;
161
162pub trait StatePersistence: StateStore {}
164impl<T: StateStore> StatePersistence for T {}
165
166pub trait StateProvider: Send + Sync {
168 fn get_state_store(&self) -> Arc<dyn StateStore + Send + Sync>;
169}
170
171impl Default for PeerInfo {
172 fn default() -> Self {
173 Self {
174 address: String::new(),
175 last_seen: 0,
176 reputation: 50,
177 trusted: false,
178 connection_count: 0,
179 bytes_exchanged: 0,
180 metadata: HashMap::new(),
181 }
182 }
183}
184
185#[async_trait]
187pub trait StateStore: Send + Sync {
188 async fn save_vertex(&self, vertex: &Vertex) -> Result<()>;
190
191 async fn load_vertex(&self, id: &VertexId) -> Result<Option<Vertex>>;
193
194 async fn save_peer(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()>;
196
197 async fn load_peers(&self) -> Result<Vec<(PeerId, PeerInfo)>>;
199
200 async fn save_dark_record(&self, record: &DarkDomainRecord) -> Result<()>;
202
203 async fn load_dark_records(&self) -> Result<Vec<DarkDomainRecord>>;
205
206 async fn remove_vertex(&self, id: &VertexId) -> Result<()>;
208
209 async fn remove_peer(&self, peer_id: &PeerId) -> Result<()>;
211
212 async fn remove_dark_record(&self, owner_id: &PeerId) -> Result<()>;
214
215 async fn vertex_count(&self) -> Result<usize>;
217
218 async fn peer_count(&self) -> Result<usize>;
220
221 async fn dark_record_count(&self) -> Result<usize>;
223
224 async fn health_check(&self) -> Result<bool>;
226
227 async fn save_state(&self, state: &PersistedDagState) -> Result<()>;
229
230 async fn recover_state(&self) -> Result<Option<PersistedDagState>>;
232
233 async fn create_backup(&self, backup_path: &PathBuf) -> Result<()>;
235
236 async fn restore_backup(&self, backup_path: &PathBuf) -> Result<()>;
238}
239
240pub struct FileStateStore {
242 data_dir: PathBuf,
244 atomic_writes: bool,
246}
247
248impl FileStateStore {
249 pub async fn new(data_dir: PathBuf) -> Result<Self> {
251 fs::create_dir_all(&data_dir).await.map_err(|e| {
253 PersistenceError::DirectoryCreation(format!("Failed to create data dir: {}", e))
254 })?;
255
256 let vertices_dir = data_dir.join("vertices");
258 let peers_dir = data_dir.join("peers");
259 let domains_dir = data_dir.join("domains");
260
261 fs::create_dir_all(&vertices_dir).await.map_err(|e| {
262 PersistenceError::DirectoryCreation(format!("Failed to create vertices dir: {}", e))
263 })?;
264
265 fs::create_dir_all(&peers_dir).await.map_err(|e| {
266 PersistenceError::DirectoryCreation(format!("Failed to create peers dir: {}", e))
267 })?;
268
269 fs::create_dir_all(&domains_dir).await.map_err(|e| {
270 PersistenceError::DirectoryCreation(format!("Failed to create domains dir: {}", e))
271 })?;
272
273 info!("Initialized file state store at {:?}", data_dir);
274
275 Ok(Self {
276 data_dir,
277 atomic_writes: true,
278 })
279 }
280
281 pub fn set_atomic_writes(&mut self, enabled: bool) {
283 self.atomic_writes = enabled;
284 }
285
286 fn vertex_path(&self, id: &VertexId) -> PathBuf {
288 let id_hex = hex::encode(id.as_bytes());
289 self.data_dir
290 .join("vertices")
291 .join(format!("{}.json", id_hex))
292 }
293
294 fn peer_path(&self, peer_id: &PeerId) -> PathBuf {
296 let id_hex = hex::encode(peer_id.as_bytes());
297 self.data_dir.join("peers").join(format!("{}.json", id_hex))
298 }
299
300 fn domain_path(&self, record: &DarkDomainRecord) -> PathBuf {
302 let id_hex = hex::encode(record.owner_id.as_bytes());
304 self.data_dir
305 .join("domains")
306 .join(format!("{}.json", id_hex))
307 }
308
309 async fn write_file_atomic<T: Serialize>(&self, path: &Path, data: &T) -> Result<()> {
311 let json = serde_json::to_string_pretty(data)
312 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
313
314 if self.atomic_writes {
315 let temp_path = path.with_extension("tmp");
317 let mut file = File::create(&temp_path).await?;
318 file.write_all(json.as_bytes()).await?;
319 file.sync_all().await?;
320
321 fs::rename(&temp_path, path).await?;
323 } else {
324 let mut file = File::create(path).await?;
326 file.write_all(json.as_bytes()).await?;
327 file.sync_all().await?;
328 }
329
330 Ok(())
331 }
332
333 async fn read_file<T: for<'de> Deserialize<'de>>(&self, path: &Path) -> Result<Option<T>> {
335 if !path.exists() {
336 return Ok(None);
337 }
338
339 let mut file = File::open(path).await?;
340 let mut contents = String::new();
341 file.read_to_string(&mut contents).await?;
342
343 let data = serde_json::from_str(&contents)
344 .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
345
346 Ok(Some(data))
347 }
348}
349
350#[async_trait]
351impl StateStore for FileStateStore {
352 async fn save_vertex(&self, vertex: &Vertex) -> Result<()> {
353 let path = self.vertex_path(&vertex.id);
354 self.write_file_atomic(&path, vertex).await?;
355 debug!("Saved vertex {:?} to file", vertex.id);
356 Ok(())
357 }
358
359 async fn load_vertex(&self, id: &VertexId) -> Result<Option<Vertex>> {
360 let path = self.vertex_path(id);
361 let vertex = self.read_file(&path).await?;
362 if vertex.is_some() {
363 debug!("Loaded vertex {:?} from file", id);
364 }
365 Ok(vertex)
366 }
367
368 async fn save_peer(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()> {
369 let path = self.peer_path(peer_id);
370 self.write_file_atomic(&path, info).await?;
371 debug!("Saved peer {:?} to file", peer_id);
372 Ok(())
373 }
374
375 async fn load_peers(&self) -> Result<Vec<(PeerId, PeerInfo)>> {
376 let peers_dir = self.data_dir.join("peers");
377 let mut peers = Vec::new();
378
379 let mut entries = fs::read_dir(&peers_dir).await?;
380 while let Some(entry) = entries.next_entry().await? {
381 let path = entry.path();
382 if path.extension().and_then(|s| s.to_str()) == Some("json") {
383 let filename = path.file_stem().and_then(|s| s.to_str()).ok_or_else(|| {
384 PersistenceError::InvalidFormat("Invalid filename".to_string())
385 })?;
386
387 let peer_id_bytes = hex::decode(filename).map_err(|e| {
388 PersistenceError::InvalidFormat(format!("Invalid peer ID: {}", e))
389 })?;
390
391 if peer_id_bytes.len() != 32 {
392 return Err(PersistenceError::InvalidFormat(
393 "Invalid peer ID length".to_string(),
394 ));
395 }
396
397 let mut id_array = [0u8; 32];
398 id_array.copy_from_slice(&peer_id_bytes);
399 let peer_id = PeerId::from_bytes(id_array);
400
401 if let Some(info) = self.read_file::<PeerInfo>(&path).await? {
402 peers.push((peer_id, info));
403 }
404 }
405 }
406
407 debug!("Loaded {} peers from files", peers.len());
408 Ok(peers)
409 }
410
411 async fn save_dark_record(&self, record: &DarkDomainRecord) -> Result<()> {
412 let path = self.domain_path(record);
413 self.write_file_atomic(&path, record).await?;
414 debug!(
415 "Saved dark domain record for owner {:?} to file",
416 record.owner_id
417 );
418 Ok(())
419 }
420
421 async fn load_dark_records(&self) -> Result<Vec<DarkDomainRecord>> {
422 let domains_dir = self.data_dir.join("domains");
423 let mut records = Vec::new();
424
425 let mut entries = fs::read_dir(&domains_dir).await?;
426 while let Some(entry) = entries.next_entry().await? {
427 let path = entry.path();
428 if path.extension().and_then(|s| s.to_str()) == Some("json") {
429 if let Some(record) = self.read_file::<DarkDomainRecord>(&path).await? {
430 records.push(record);
431 }
432 }
433 }
434
435 debug!("Loaded {} dark domain records from files", records.len());
436 Ok(records)
437 }
438
439 async fn remove_vertex(&self, id: &VertexId) -> Result<()> {
440 let path = self.vertex_path(id);
441 if path.exists() {
442 fs::remove_file(&path).await?;
443 debug!("Removed vertex {:?} from file", id);
444 }
445 Ok(())
446 }
447
448 async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
449 let path = self.peer_path(peer_id);
450 if path.exists() {
451 fs::remove_file(&path).await?;
452 debug!("Removed peer {:?} from file", peer_id);
453 }
454 Ok(())
455 }
456
457 async fn remove_dark_record(&self, owner_id: &PeerId) -> Result<()> {
458 let id_hex = hex::encode(owner_id.as_bytes());
459 let path = self
460 .data_dir
461 .join("domains")
462 .join(format!("{}.json", id_hex));
463 if path.exists() {
464 fs::remove_file(&path).await?;
465 debug!(
466 "Removed dark domain record for owner {:?} from file",
467 owner_id
468 );
469 }
470 Ok(())
471 }
472
473 async fn vertex_count(&self) -> Result<usize> {
474 let vertices_dir = self.data_dir.join("vertices");
475 let mut count = 0;
476 let mut entries = fs::read_dir(&vertices_dir).await?;
477 while let Some(entry) = entries.next_entry().await? {
478 if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
479 count += 1;
480 }
481 }
482 Ok(count)
483 }
484
485 async fn peer_count(&self) -> Result<usize> {
486 let peers_dir = self.data_dir.join("peers");
487 let mut count = 0;
488 let mut entries = fs::read_dir(&peers_dir).await?;
489 while let Some(entry) = entries.next_entry().await? {
490 if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
491 count += 1;
492 }
493 }
494 Ok(count)
495 }
496
497 async fn dark_record_count(&self) -> Result<usize> {
498 let domains_dir = self.data_dir.join("domains");
499 let mut count = 0;
500 let mut entries = fs::read_dir(&domains_dir).await?;
501 while let Some(entry) = entries.next_entry().await? {
502 if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
503 count += 1;
504 }
505 }
506 Ok(count)
507 }
508
509 async fn health_check(&self) -> Result<bool> {
510 let test_file = self.data_dir.join(".health_check");
512 match File::create(&test_file).await {
513 Ok(_) => {
514 let _ = fs::remove_file(&test_file).await;
515 Ok(true)
516 }
517 Err(_) => Ok(false),
518 }
519 }
520
521 async fn save_state(&self, state: &PersistedDagState) -> Result<()> {
522 let state_file = self.data_dir.join("state.json");
523 self.write_file_atomic(&state_file, state).await?;
524 debug!("Saved complete state to file");
525 Ok(())
526 }
527
528 async fn recover_state(&self) -> Result<Option<PersistedDagState>> {
529 let state_file = self.data_dir.join("state.json");
530 let state = self.read_file(&state_file).await?;
531 if state.is_some() {
532 debug!("Recovered complete state from file");
533 }
534 Ok(state)
535 }
536
537 async fn create_backup(&self, backup_path: &PathBuf) -> Result<()> {
538 fs::create_dir_all(backup_path).await.map_err(|e| {
540 PersistenceError::DirectoryCreation(format!("Failed to create backup dir: {}", e))
541 })?;
542
543 let backup_data_dir = backup_path.join("data");
545 fs::create_dir_all(&backup_data_dir).await.map_err(|e| {
546 PersistenceError::DirectoryCreation(format!("Failed to create backup data dir: {}", e))
547 })?;
548
549 copy_dir_all(self.data_dir.clone(), backup_data_dir.clone()).await?;
551
552 debug!("Created backup at {:?}", backup_path);
553 Ok(())
554 }
555
556 async fn restore_backup(&self, backup_path: &PathBuf) -> Result<()> {
557 let backup_data_dir = backup_path.join("data");
558
559 if !backup_data_dir.exists() {
560 return Err(PersistenceError::FileNotFound(format!(
561 "Backup data directory not found: {:?}",
562 backup_data_dir
563 )));
564 }
565
566 if self.data_dir.exists() {
568 fs::remove_dir_all(&self.data_dir).await?;
569 }
570
571 copy_dir_all(backup_data_dir.clone(), self.data_dir.clone()).await?;
573
574 debug!("Restored backup from {:?}", backup_path);
575 Ok(())
576 }
577}
578
579pub struct MemoryStateStore {
581 vertices: DashMap<VertexId, Vertex>,
583 peers: DashMap<PeerId, PeerInfo>,
585 dark_records: DashMap<String, DarkDomainRecord>,
587}
588
589impl Default for MemoryStateStore {
590 fn default() -> Self {
591 Self::new()
592 }
593}
594
595impl MemoryStateStore {
596 pub fn new() -> Self {
598 Self {
599 vertices: DashMap::new(),
600 peers: DashMap::new(),
601 dark_records: DashMap::new(),
602 }
603 }
604
605 pub fn clear(&self) {
607 self.vertices.clear();
608 self.peers.clear();
609 self.dark_records.clear();
610 }
611}
612
613#[async_trait]
614impl StateStore for MemoryStateStore {
615 async fn save_vertex(&self, vertex: &Vertex) -> Result<()> {
616 self.vertices.insert(vertex.id.clone(), vertex.clone());
617 debug!("Saved vertex {:?} to memory", vertex.id);
618 Ok(())
619 }
620
621 async fn load_vertex(&self, id: &VertexId) -> Result<Option<Vertex>> {
622 let vertex = self.vertices.get(id).map(|entry| entry.clone());
623 if vertex.is_some() {
624 debug!("Loaded vertex {:?} from memory", id);
625 }
626 Ok(vertex)
627 }
628
629 async fn save_peer(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()> {
630 self.peers.insert(*peer_id, info.clone());
631 debug!("Saved peer {:?} to memory", peer_id);
632 Ok(())
633 }
634
635 async fn load_peers(&self) -> Result<Vec<(PeerId, PeerInfo)>> {
636 let peers: Vec<(PeerId, PeerInfo)> = self
637 .peers
638 .iter()
639 .map(|entry| (*entry.key(), entry.value().clone()))
640 .collect();
641 debug!("Loaded {} peers from memory", peers.len());
642 Ok(peers)
643 }
644
645 async fn save_dark_record(&self, record: &DarkDomainRecord) -> Result<()> {
646 let key = hex::encode(record.owner_id.as_bytes());
647 self.dark_records.insert(key, record.clone());
648 debug!(
649 "Saved dark domain record for owner {:?} to memory",
650 record.owner_id
651 );
652 Ok(())
653 }
654
655 async fn load_dark_records(&self) -> Result<Vec<DarkDomainRecord>> {
656 let records: Vec<DarkDomainRecord> = self
657 .dark_records
658 .iter()
659 .map(|entry| entry.value().clone())
660 .collect();
661 debug!("Loaded {} dark domain records from memory", records.len());
662 Ok(records)
663 }
664
665 async fn remove_vertex(&self, id: &VertexId) -> Result<()> {
666 self.vertices.remove(id);
667 debug!("Removed vertex {:?} from memory", id);
668 Ok(())
669 }
670
671 async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
672 self.peers.remove(peer_id);
673 debug!("Removed peer {:?} from memory", peer_id);
674 Ok(())
675 }
676
677 async fn remove_dark_record(&self, owner_id: &PeerId) -> Result<()> {
678 let key = hex::encode(owner_id.as_bytes());
679 self.dark_records.remove(&key);
680 debug!(
681 "Removed dark domain record for owner {:?} from memory",
682 owner_id
683 );
684 Ok(())
685 }
686
687 async fn vertex_count(&self) -> Result<usize> {
688 Ok(self.vertices.len())
689 }
690
691 async fn peer_count(&self) -> Result<usize> {
692 Ok(self.peers.len())
693 }
694
695 async fn dark_record_count(&self) -> Result<usize> {
696 Ok(self.dark_records.len())
697 }
698
699 async fn health_check(&self) -> Result<bool> {
700 Ok(true)
702 }
703
704 async fn save_state(&self, _state: &PersistedDagState) -> Result<()> {
705 debug!("State save called on memory store (no-op)");
707 Ok(())
708 }
709
710 async fn recover_state(&self) -> Result<Option<PersistedDagState>> {
711 debug!("State recovery called on memory store (returning None)");
713 Ok(None)
714 }
715
716 async fn create_backup(&self, _backup_path: &PathBuf) -> Result<()> {
717 warn!("Backup creation called on memory store (no-op)");
719 Ok(())
720 }
721
722 async fn restore_backup(&self, _backup_path: &PathBuf) -> Result<()> {
723 warn!("Backup restoration called on memory store (no-op)");
725 Ok(())
726 }
727}
728
729pub struct PersistentNodeRunner<S: StateStore> {
731 store: Arc<S>,
733 auto_save_interval: u64,
735 persistence_enabled: bool,
737}
738
739impl<S: StateStore + 'static> PersistentNodeRunner<S> {
740 pub fn new(store: Arc<S>) -> Self {
742 Self {
743 store,
744 auto_save_interval: 300, persistence_enabled: true,
746 }
747 }
748
749 pub fn set_auto_save_interval(&mut self, seconds: u64) {
751 self.auto_save_interval = seconds;
752 }
753
754 pub fn set_persistence_enabled(&mut self, enabled: bool) {
756 self.persistence_enabled = enabled;
757 }
758
759 pub async fn save_vertex_after_consensus(&self, vertex: &Vertex) -> Result<()> {
761 if !self.persistence_enabled {
762 return Ok(());
763 }
764
765 self.store.save_vertex(vertex).await?;
766 info!("Persisted vertex {:?} after consensus", vertex.id);
767 Ok(())
768 }
769
770 pub async fn persist_peer_info(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()> {
772 if !self.persistence_enabled {
773 return Ok(());
774 }
775
776 self.store.save_peer(peer_id, info).await?;
777 debug!("Persisted peer info for {:?}", peer_id);
778 Ok(())
779 }
780
781 pub async fn store_dark_domain_registration(&self, record: &DarkDomainRecord) -> Result<()> {
783 if !self.persistence_enabled {
784 return Ok(());
785 }
786
787 self.store.save_dark_record(record).await?;
788 info!(
789 "Stored dark domain registration for owner {:?}",
790 record.owner_id
791 );
792 Ok(())
793 }
794
795 pub async fn load_state_on_startup(&self) -> Result<StartupState> {
797 if !self.persistence_enabled {
798 return Ok(StartupState::default());
799 }
800
801 info!("Loading persisted state on startup...");
802
803 let vertices = vec![]; let peers = self.store.load_peers().await?;
805 let dark_records = self.store.load_dark_records().await?;
806
807 let state = StartupState {
808 vertices,
809 peers,
810 dark_records,
811 };
812
813 info!(
814 "Loaded startup state: {} vertices, {} peers, {} dark records",
815 state.vertices.len(),
816 state.peers.len(),
817 state.dark_records.len()
818 );
819
820 Ok(state)
821 }
822
823 pub async fn start_auto_save_task(&self) -> Result<()> {
825 if !self.persistence_enabled || self.auto_save_interval == 0 {
826 return Ok(());
827 }
828
829 let store = self.store.clone();
830 let interval = self.auto_save_interval;
831
832 tokio::spawn(async move {
833 let mut interval_timer =
834 tokio::time::interval(tokio::time::Duration::from_secs(interval));
835
836 loop {
837 interval_timer.tick().await;
838
839 match store.health_check().await {
840 Ok(true) => {
841 debug!("Auto-save health check passed");
842 }
843 Ok(false) => {
844 warn!("Auto-save health check failed");
845 }
846 Err(e) => {
847 error!("Auto-save health check error: {}", e);
848 }
849 }
850 }
851 });
852
853 info!("Started auto-save task with {} second interval", interval);
854 Ok(())
855 }
856
857 pub async fn get_storage_stats(&self) -> Result<StorageStats> {
859 let vertex_count = self.store.vertex_count().await?;
860 let peer_count = self.store.peer_count().await?;
861 let dark_record_count = self.store.dark_record_count().await?;
862 let healthy = self.store.health_check().await?;
863
864 Ok(StorageStats {
865 vertex_count,
866 peer_count,
867 dark_record_count,
868 healthy,
869 })
870 }
871}
872
873#[derive(Debug, Default)]
875pub struct StartupState {
876 pub vertices: Vec<Vertex>,
878 pub peers: Vec<(PeerId, PeerInfo)>,
880 pub dark_records: Vec<DarkDomainRecord>,
882}
883
884#[derive(Debug)]
886pub struct StorageStats {
887 pub vertex_count: usize,
889 pub peer_count: usize,
891 pub dark_record_count: usize,
893 pub healthy: bool,
895}
896
897fn copy_dir_all(
899 src: PathBuf,
900 dst: PathBuf,
901) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'static>> {
902 Box::pin(async move {
903 fs::create_dir_all(&dst).await.map_err(|e| {
904 PersistenceError::DirectoryCreation(format!(
905 "Failed to create destination directory: {}",
906 e
907 ))
908 })?;
909
910 let mut entries = fs::read_dir(&src).await?;
911 while let Some(entry) = entries.next_entry().await? {
912 let path = entry.path();
913 let dest_path = dst.join(entry.file_name());
914
915 if path.is_dir() {
916 copy_dir_all(path, dest_path).await?;
917 } else {
918 fs::copy(&path, &dest_path).await?;
919 }
920 }
921 Ok(())
922 })
923}
924
925#[cfg(test)]
926mod tests {
927 use super::*;
928 use qudag_dag::vertex::VertexId;
929 use qudag_network::peer::PeerId;
930 use std::collections::HashSet;
931 use tempfile::tempdir;
932
933 fn create_test_vertex() -> Vertex {
934 Vertex::new(VertexId::new(), vec![1, 2, 3, 4], HashSet::new())
935 }
936
937 fn create_test_peer_info() -> PeerInfo {
938 PeerInfo {
939 address: "127.0.0.1:8080".to_string(),
940 last_seen: 1234567890,
941 reputation: 75,
942 trusted: true,
943 connection_count: 5,
944 bytes_exchanged: 1024,
945 metadata: HashMap::new(),
946 }
947 }
948
949 fn create_test_dark_record() -> DarkDomainRecord {
950 use qudag_network::types::NetworkAddress;
951 use std::collections::HashMap;
952
953 DarkDomainRecord {
954 signing_public_key: vec![1, 2, 3, 4],
955 encryption_public_key: vec![5, 6, 7, 8],
956 addresses: vec![NetworkAddress::new([127, 0, 0, 1], 8080)],
957 alias: Some("test.dark".to_string()),
958 ttl: 3600,
959 registered_at: 1234567890,
960 expires_at: 1234567890 + 3600,
961 owner_id: PeerId::new(),
962 signature: vec![9, 10, 11, 12],
963 metadata: HashMap::new(),
964 }
965 }
966
967 #[tokio::test]
968 async fn test_memory_store_vertices() {
969 let store = MemoryStateStore::new();
970 let vertex = create_test_vertex();
971
972 store.save_vertex(&vertex).await.unwrap();
974
975 let loaded = store.load_vertex(&vertex.id).await.unwrap();
977 assert!(loaded.is_some());
978 assert_eq!(loaded.unwrap().id, vertex.id);
979
980 assert_eq!(store.vertex_count().await.unwrap(), 1);
982
983 store.remove_vertex(&vertex.id).await.unwrap();
985 assert_eq!(store.vertex_count().await.unwrap(), 0);
986 }
987
988 #[tokio::test]
989 async fn test_memory_store_peers() {
990 let store = MemoryStateStore::new();
991 let peer_id = PeerId::random();
992 let peer_info = create_test_peer_info();
993
994 store.save_peer(&peer_id, &peer_info).await.unwrap();
996
997 let peers = store.load_peers().await.unwrap();
999 assert_eq!(peers.len(), 1);
1000 assert_eq!(peers[0].0, peer_id);
1001
1002 assert_eq!(store.peer_count().await.unwrap(), 1);
1004
1005 store.remove_peer(&peer_id).await.unwrap();
1007 assert_eq!(store.peer_count().await.unwrap(), 0);
1008 }
1009
1010 #[tokio::test]
1011 async fn test_memory_store_dark_records() {
1012 let store = MemoryStateStore::new();
1013 let record = create_test_dark_record();
1014
1015 store.save_dark_record(&record).await.unwrap();
1017
1018 let records = store.load_dark_records().await.unwrap();
1020 assert_eq!(records.len(), 1);
1021 assert_eq!(records[0].owner_id, record.owner_id);
1022
1023 assert_eq!(store.dark_record_count().await.unwrap(), 1);
1025
1026 store.remove_dark_record(&record.owner_id).await.unwrap();
1028 assert_eq!(store.dark_record_count().await.unwrap(), 0);
1029 }
1030
1031 #[tokio::test]
1032 async fn test_file_store_vertices() {
1033 let temp_dir = tempdir().unwrap();
1034 let store = FileStateStore::new(temp_dir.path().to_path_buf())
1035 .await
1036 .unwrap();
1037 let vertex = create_test_vertex();
1038
1039 store.save_vertex(&vertex).await.unwrap();
1041
1042 let loaded = store.load_vertex(&vertex.id).await.unwrap();
1044 assert!(loaded.is_some());
1045 assert_eq!(loaded.unwrap().id, vertex.id);
1046
1047 assert_eq!(store.vertex_count().await.unwrap(), 1);
1049 }
1050
1051 #[tokio::test]
1052 async fn test_file_store_peers() {
1053 let temp_dir = tempdir().unwrap();
1054 let store = FileStateStore::new(temp_dir.path().to_path_buf())
1055 .await
1056 .unwrap();
1057 let peer_id = PeerId::random();
1058 let peer_info = create_test_peer_info();
1059
1060 store.save_peer(&peer_id, &peer_info).await.unwrap();
1062
1063 let peers = store.load_peers().await.unwrap();
1065 assert_eq!(peers.len(), 1);
1066 assert_eq!(peers[0].0, peer_id);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_persistent_node_runner() {
1071 let store = Arc::new(MemoryStateStore::new());
1072 let mut runner = PersistentNodeRunner::new(store.clone());
1073 runner.set_auto_save_interval(1);
1074
1075 let vertex = create_test_vertex();
1076 runner.save_vertex_after_consensus(&vertex).await.unwrap();
1077
1078 let peer_id = PeerId::random();
1079 let peer_info = create_test_peer_info();
1080 runner
1081 .persist_peer_info(&peer_id, &peer_info)
1082 .await
1083 .unwrap();
1084
1085 let dark_record = create_test_dark_record();
1086 runner
1087 .store_dark_domain_registration(&dark_record)
1088 .await
1089 .unwrap();
1090
1091 let state = runner.load_state_on_startup().await.unwrap();
1092 assert_eq!(state.peers.len(), 1);
1093 assert_eq!(state.dark_records.len(), 1);
1094
1095 let stats = runner.get_storage_stats().await.unwrap();
1096 assert_eq!(stats.vertex_count, 1);
1097 assert_eq!(stats.peer_count, 1);
1098 assert_eq!(stats.dark_record_count, 1);
1099 assert!(stats.healthy);
1100 }
1101
1102 #[tokio::test]
1103 async fn test_health_check() {
1104 let store = MemoryStateStore::new();
1105 assert!(store.health_check().await.unwrap());
1106
1107 let temp_dir = tempdir().unwrap();
1108 let file_store = FileStateStore::new(temp_dir.path().to_path_buf())
1109 .await
1110 .unwrap();
1111 assert!(file_store.health_check().await.unwrap());
1112 }
1113}