qudag_protocol/
persistence.rs

1//! State persistence layer for QuDAG protocol
2//!
3//! This module provides a comprehensive persistence layer for storing and retrieving
4//! DAG vertices, peer information, and dark domain records using different storage backends.
5
6use async_trait::async_trait;
7use dashmap::DashMap;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use thiserror::Error;
13use tokio::fs::{self, File};
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15use tracing::{debug, error, info, warn};
16
17// Import types from other modules
18use qudag_dag::vertex::{Vertex, VertexId};
19use qudag_network::dark_resolver::DarkDomainRecord;
20use qudag_network::types::PeerId;
21
22/// Result type for persistence operations
23pub type Result<T> = std::result::Result<T, PersistenceError>;
24
25/// Current state version for compatibility
26pub const CURRENT_STATE_VERSION: u32 = 1;
27
28/// Errors that can occur during persistence operations
29#[derive(Debug, Error)]
30pub enum PersistenceError {
31    /// IO error during file operations
32    #[error("IO error: {0}")]
33    Io(#[from] std::io::Error),
34
35    /// Serialization/deserialization error
36    #[error("Serialization error: {0}")]
37    Serialization(String),
38
39    /// Data corruption detected
40    #[error("Data corruption detected: {0}")]
41    Corruption(String),
42
43    /// Directory creation failed
44    #[error("Directory creation failed: {0}")]
45    DirectoryCreation(String),
46
47    /// File not found
48    #[error("File not found: {0}")]
49    FileNotFound(String),
50
51    /// Invalid data format
52    #[error("Invalid data format: {0}")]
53    InvalidFormat(String),
54
55    /// Lock acquisition timeout
56    #[error("Lock acquisition timeout")]
57    LockTimeout,
58}
59
60/// Information about a peer for persistence
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct PeerInfo {
63    /// Peer's network address
64    pub address: String,
65    /// Last seen timestamp (Unix timestamp)
66    pub last_seen: u64,
67    /// Reputation score (0-100)
68    pub reputation: u8,
69    /// Whether the peer is trusted
70    pub trusted: bool,
71    /// Connection statistics
72    pub connection_count: u64,
73    /// Total bytes exchanged
74    pub bytes_exchanged: u64,
75    /// Additional metadata
76    pub metadata: HashMap<String, String>,
77}
78
79/// Persisted DAG state for protocol operations
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct PersistedDagState {
82    /// Version of the persisted state format
83    pub version: u32,
84    /// Node identifier
85    pub node_id: Vec<u8>,
86    /// Current protocol state
87    pub protocol_state: crate::state::ProtocolState,
88    /// Active sessions
89    pub sessions: HashMap<uuid::Uuid, crate::state::SessionInfo>,
90    /// Peer information
91    pub peers: Vec<(PeerId, PeerInfo)>,
92    /// DAG state information
93    pub dag_state: DagState,
94    /// State machine metrics
95    pub metrics: crate::state::StateMachineMetrics,
96    /// Timestamp when state was last saved
97    pub last_saved: u64,
98}
99
100/// DAG-specific state information
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DagState {
103    /// DAG vertices stored as HashMap for efficient lookup
104    pub vertices: HashMap<VertexId, Vertex>,
105    /// Current tip vertices
106    pub tips: std::collections::HashSet<VertexId>,
107    /// Voting records for consensus
108    pub voting_records: HashMap<VertexId, VotingRecord>,
109    /// Last checkpoint information
110    pub last_checkpoint: Option<CheckpointInfo>,
111}
112
113/// Voting record for consensus tracking
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct VotingRecord {
116    /// Vertex being voted on
117    pub vertex_id: VertexId,
118    /// Votes received (node_id -> vote)
119    pub votes: HashMap<Vec<u8>, bool>,
120    /// Timestamp when voting started
121    pub started_at: u64,
122    /// Consensus status
123    pub status: ConsensusStatus,
124}
125
126/// Consensus status for a vertex
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub enum ConsensusStatus {
129    /// Voting in progress
130    Pending,
131    /// Consensus reached (accepted)
132    Accepted,
133    /// Consensus reached (rejected)
134    Rejected,
135    /// Voting timed out
136    TimedOut,
137}
138
139/// Checkpoint information for state snapshots
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct CheckpointInfo {
142    /// Checkpoint identifier
143    pub id: Vec<u8>,
144    /// Checkpoint timestamp
145    pub timestamp: u64,
146    /// Number of vertices at checkpoint
147    pub vertex_count: usize,
148    /// Merkle root of DAG state at checkpoint
149    pub merkle_root: Vec<u8>,
150}
151
152/// General persisted state wrapper
153pub type PersistedState = PersistedDagState;
154
155/// Backend storage interface alias
156pub type MemoryBackend = MemoryStateStore;
157pub type SqliteBackend = FileStateStore; // For now, using file backend as SQLite placeholder
158
159/// Persistence manager for coordinating storage operations
160pub type PersistenceManager = Arc<dyn StateStore + Send + Sync>;
161
162/// State persistence trait alias
163pub trait StatePersistence: StateStore {}
164impl<T: StateStore> StatePersistence for T {}
165
166/// State provider trait for node integration
167pub trait StateProvider: Send + Sync {
168    fn get_state_store(&self) -> Arc<dyn StateStore + Send + Sync>;
169}
170
171impl Default for PeerInfo {
172    fn default() -> Self {
173        Self {
174            address: String::new(),
175            last_seen: 0,
176            reputation: 50,
177            trusted: false,
178            connection_count: 0,
179            bytes_exchanged: 0,
180            metadata: HashMap::new(),
181        }
182    }
183}
184
185/// Abstract storage trait for different persistence backends
186#[async_trait]
187pub trait StateStore: Send + Sync {
188    /// Save a DAG vertex to storage
189    async fn save_vertex(&self, vertex: &Vertex) -> Result<()>;
190
191    /// Load a DAG vertex from storage by ID
192    async fn load_vertex(&self, id: &VertexId) -> Result<Option<Vertex>>;
193
194    /// Save peer information to storage
195    async fn save_peer(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()>;
196
197    /// Load all peers from storage
198    async fn load_peers(&self) -> Result<Vec<(PeerId, PeerInfo)>>;
199
200    /// Save a dark domain record to storage
201    async fn save_dark_record(&self, record: &DarkDomainRecord) -> Result<()>;
202
203    /// Load all dark domain records from storage
204    async fn load_dark_records(&self) -> Result<Vec<DarkDomainRecord>>;
205
206    /// Remove a vertex from storage
207    async fn remove_vertex(&self, id: &VertexId) -> Result<()>;
208
209    /// Remove a peer from storage
210    async fn remove_peer(&self, peer_id: &PeerId) -> Result<()>;
211
212    /// Remove a dark domain record from storage by owner ID
213    async fn remove_dark_record(&self, owner_id: &PeerId) -> Result<()>;
214
215    /// Get total number of stored vertices
216    async fn vertex_count(&self) -> Result<usize>;
217
218    /// Get total number of stored peers
219    async fn peer_count(&self) -> Result<usize>;
220
221    /// Get total number of stored dark records
222    async fn dark_record_count(&self) -> Result<usize>;
223
224    /// Check if storage is healthy
225    async fn health_check(&self) -> Result<bool>;
226
227    /// Save complete persisted state
228    async fn save_state(&self, state: &PersistedDagState) -> Result<()>;
229
230    /// Recover complete persisted state
231    async fn recover_state(&self) -> Result<Option<PersistedDagState>>;
232
233    /// Create backup of the entire state
234    async fn create_backup(&self, backup_path: &PathBuf) -> Result<()>;
235
236    /// Restore from backup
237    async fn restore_backup(&self, backup_path: &PathBuf) -> Result<()>;
238}
239
240/// File-based storage implementation using JSON files
241pub struct FileStateStore {
242    /// Base directory for data storage
243    data_dir: PathBuf,
244    /// Whether to use atomic writes
245    atomic_writes: bool,
246}
247
248impl FileStateStore {
249    /// Create a new file-based state store
250    pub async fn new(data_dir: PathBuf) -> Result<Self> {
251        // Create the base directory structure
252        fs::create_dir_all(&data_dir).await.map_err(|e| {
253            PersistenceError::DirectoryCreation(format!("Failed to create data dir: {}", e))
254        })?;
255
256        // Create subdirectories
257        let vertices_dir = data_dir.join("vertices");
258        let peers_dir = data_dir.join("peers");
259        let domains_dir = data_dir.join("domains");
260
261        fs::create_dir_all(&vertices_dir).await.map_err(|e| {
262            PersistenceError::DirectoryCreation(format!("Failed to create vertices dir: {}", e))
263        })?;
264
265        fs::create_dir_all(&peers_dir).await.map_err(|e| {
266            PersistenceError::DirectoryCreation(format!("Failed to create peers dir: {}", e))
267        })?;
268
269        fs::create_dir_all(&domains_dir).await.map_err(|e| {
270            PersistenceError::DirectoryCreation(format!("Failed to create domains dir: {}", e))
271        })?;
272
273        info!("Initialized file state store at {:?}", data_dir);
274
275        Ok(Self {
276            data_dir,
277            atomic_writes: true,
278        })
279    }
280
281    /// Enable or disable atomic writes
282    pub fn set_atomic_writes(&mut self, enabled: bool) {
283        self.atomic_writes = enabled;
284    }
285
286    /// Get path for a vertex file
287    fn vertex_path(&self, id: &VertexId) -> PathBuf {
288        let id_hex = hex::encode(id.as_bytes());
289        self.data_dir
290            .join("vertices")
291            .join(format!("{}.json", id_hex))
292    }
293
294    /// Get path for a peer file
295    fn peer_path(&self, peer_id: &PeerId) -> PathBuf {
296        let id_hex = hex::encode(peer_id.as_bytes());
297        self.data_dir.join("peers").join(format!("{}.json", id_hex))
298    }
299
300    /// Get path for a dark domain file
301    fn domain_path(&self, record: &DarkDomainRecord) -> PathBuf {
302        // Use owner_id as the filename since domain is in a related field
303        let id_hex = hex::encode(record.owner_id.as_bytes());
304        self.data_dir
305            .join("domains")
306            .join(format!("{}.json", id_hex))
307    }
308
309    /// Write data to file atomically
310    async fn write_file_atomic<T: Serialize>(&self, path: &Path, data: &T) -> Result<()> {
311        let json = serde_json::to_string_pretty(data)
312            .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
313
314        if self.atomic_writes {
315            // Write to temporary file first
316            let temp_path = path.with_extension("tmp");
317            let mut file = File::create(&temp_path).await?;
318            file.write_all(json.as_bytes()).await?;
319            file.sync_all().await?;
320
321            // Atomically rename to final location
322            fs::rename(&temp_path, path).await?;
323        } else {
324            // Direct write
325            let mut file = File::create(path).await?;
326            file.write_all(json.as_bytes()).await?;
327            file.sync_all().await?;
328        }
329
330        Ok(())
331    }
332
333    /// Read data from file
334    async fn read_file<T: for<'de> Deserialize<'de>>(&self, path: &Path) -> Result<Option<T>> {
335        if !path.exists() {
336            return Ok(None);
337        }
338
339        let mut file = File::open(path).await?;
340        let mut contents = String::new();
341        file.read_to_string(&mut contents).await?;
342
343        let data = serde_json::from_str(&contents)
344            .map_err(|e| PersistenceError::Serialization(e.to_string()))?;
345
346        Ok(Some(data))
347    }
348}
349
350#[async_trait]
351impl StateStore for FileStateStore {
352    async fn save_vertex(&self, vertex: &Vertex) -> Result<()> {
353        let path = self.vertex_path(&vertex.id);
354        self.write_file_atomic(&path, vertex).await?;
355        debug!("Saved vertex {:?} to file", vertex.id);
356        Ok(())
357    }
358
359    async fn load_vertex(&self, id: &VertexId) -> Result<Option<Vertex>> {
360        let path = self.vertex_path(id);
361        let vertex = self.read_file(&path).await?;
362        if vertex.is_some() {
363            debug!("Loaded vertex {:?} from file", id);
364        }
365        Ok(vertex)
366    }
367
368    async fn save_peer(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()> {
369        let path = self.peer_path(peer_id);
370        self.write_file_atomic(&path, info).await?;
371        debug!("Saved peer {:?} to file", peer_id);
372        Ok(())
373    }
374
375    async fn load_peers(&self) -> Result<Vec<(PeerId, PeerInfo)>> {
376        let peers_dir = self.data_dir.join("peers");
377        let mut peers = Vec::new();
378
379        let mut entries = fs::read_dir(&peers_dir).await?;
380        while let Some(entry) = entries.next_entry().await? {
381            let path = entry.path();
382            if path.extension().and_then(|s| s.to_str()) == Some("json") {
383                let filename = path.file_stem().and_then(|s| s.to_str()).ok_or_else(|| {
384                    PersistenceError::InvalidFormat("Invalid filename".to_string())
385                })?;
386
387                let peer_id_bytes = hex::decode(filename).map_err(|e| {
388                    PersistenceError::InvalidFormat(format!("Invalid peer ID: {}", e))
389                })?;
390
391                if peer_id_bytes.len() != 32 {
392                    return Err(PersistenceError::InvalidFormat(
393                        "Invalid peer ID length".to_string(),
394                    ));
395                }
396
397                let mut id_array = [0u8; 32];
398                id_array.copy_from_slice(&peer_id_bytes);
399                let peer_id = PeerId::from_bytes(id_array);
400
401                if let Some(info) = self.read_file::<PeerInfo>(&path).await? {
402                    peers.push((peer_id, info));
403                }
404            }
405        }
406
407        debug!("Loaded {} peers from files", peers.len());
408        Ok(peers)
409    }
410
411    async fn save_dark_record(&self, record: &DarkDomainRecord) -> Result<()> {
412        let path = self.domain_path(record);
413        self.write_file_atomic(&path, record).await?;
414        debug!(
415            "Saved dark domain record for owner {:?} to file",
416            record.owner_id
417        );
418        Ok(())
419    }
420
421    async fn load_dark_records(&self) -> Result<Vec<DarkDomainRecord>> {
422        let domains_dir = self.data_dir.join("domains");
423        let mut records = Vec::new();
424
425        let mut entries = fs::read_dir(&domains_dir).await?;
426        while let Some(entry) = entries.next_entry().await? {
427            let path = entry.path();
428            if path.extension().and_then(|s| s.to_str()) == Some("json") {
429                if let Some(record) = self.read_file::<DarkDomainRecord>(&path).await? {
430                    records.push(record);
431                }
432            }
433        }
434
435        debug!("Loaded {} dark domain records from files", records.len());
436        Ok(records)
437    }
438
439    async fn remove_vertex(&self, id: &VertexId) -> Result<()> {
440        let path = self.vertex_path(id);
441        if path.exists() {
442            fs::remove_file(&path).await?;
443            debug!("Removed vertex {:?} from file", id);
444        }
445        Ok(())
446    }
447
448    async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
449        let path = self.peer_path(peer_id);
450        if path.exists() {
451            fs::remove_file(&path).await?;
452            debug!("Removed peer {:?} from file", peer_id);
453        }
454        Ok(())
455    }
456
457    async fn remove_dark_record(&self, owner_id: &PeerId) -> Result<()> {
458        let id_hex = hex::encode(owner_id.as_bytes());
459        let path = self
460            .data_dir
461            .join("domains")
462            .join(format!("{}.json", id_hex));
463        if path.exists() {
464            fs::remove_file(&path).await?;
465            debug!(
466                "Removed dark domain record for owner {:?} from file",
467                owner_id
468            );
469        }
470        Ok(())
471    }
472
473    async fn vertex_count(&self) -> Result<usize> {
474        let vertices_dir = self.data_dir.join("vertices");
475        let mut count = 0;
476        let mut entries = fs::read_dir(&vertices_dir).await?;
477        while let Some(entry) = entries.next_entry().await? {
478            if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
479                count += 1;
480            }
481        }
482        Ok(count)
483    }
484
485    async fn peer_count(&self) -> Result<usize> {
486        let peers_dir = self.data_dir.join("peers");
487        let mut count = 0;
488        let mut entries = fs::read_dir(&peers_dir).await?;
489        while let Some(entry) = entries.next_entry().await? {
490            if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
491                count += 1;
492            }
493        }
494        Ok(count)
495    }
496
497    async fn dark_record_count(&self) -> Result<usize> {
498        let domains_dir = self.data_dir.join("domains");
499        let mut count = 0;
500        let mut entries = fs::read_dir(&domains_dir).await?;
501        while let Some(entry) = entries.next_entry().await? {
502            if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
503                count += 1;
504            }
505        }
506        Ok(count)
507    }
508
509    async fn health_check(&self) -> Result<bool> {
510        // Check if directories exist and are writable
511        let test_file = self.data_dir.join(".health_check");
512        match File::create(&test_file).await {
513            Ok(_) => {
514                let _ = fs::remove_file(&test_file).await;
515                Ok(true)
516            }
517            Err(_) => Ok(false),
518        }
519    }
520
521    async fn save_state(&self, state: &PersistedDagState) -> Result<()> {
522        let state_file = self.data_dir.join("state.json");
523        self.write_file_atomic(&state_file, state).await?;
524        debug!("Saved complete state to file");
525        Ok(())
526    }
527
528    async fn recover_state(&self) -> Result<Option<PersistedDagState>> {
529        let state_file = self.data_dir.join("state.json");
530        let state = self.read_file(&state_file).await?;
531        if state.is_some() {
532            debug!("Recovered complete state from file");
533        }
534        Ok(state)
535    }
536
537    async fn create_backup(&self, backup_path: &PathBuf) -> Result<()> {
538        // Create backup directory if it doesn't exist
539        fs::create_dir_all(backup_path).await.map_err(|e| {
540            PersistenceError::DirectoryCreation(format!("Failed to create backup dir: {}", e))
541        })?;
542
543        // Copy all data files to backup directory
544        let backup_data_dir = backup_path.join("data");
545        fs::create_dir_all(&backup_data_dir).await.map_err(|e| {
546            PersistenceError::DirectoryCreation(format!("Failed to create backup data dir: {}", e))
547        })?;
548
549        // Copy all files recursively
550        copy_dir_all(self.data_dir.clone(), backup_data_dir.clone()).await?;
551
552        debug!("Created backup at {:?}", backup_path);
553        Ok(())
554    }
555
556    async fn restore_backup(&self, backup_path: &PathBuf) -> Result<()> {
557        let backup_data_dir = backup_path.join("data");
558
559        if !backup_data_dir.exists() {
560            return Err(PersistenceError::FileNotFound(format!(
561                "Backup data directory not found: {:?}",
562                backup_data_dir
563            )));
564        }
565
566        // Clear current data directory
567        if self.data_dir.exists() {
568            fs::remove_dir_all(&self.data_dir).await?;
569        }
570
571        // Restore from backup
572        copy_dir_all(backup_data_dir.clone(), self.data_dir.clone()).await?;
573
574        debug!("Restored backup from {:?}", backup_path);
575        Ok(())
576    }
577}
578
579/// In-memory storage implementation for testing
580pub struct MemoryStateStore {
581    /// Stored vertices
582    vertices: DashMap<VertexId, Vertex>,
583    /// Stored peers
584    peers: DashMap<PeerId, PeerInfo>,
585    /// Stored dark domain records
586    dark_records: DashMap<String, DarkDomainRecord>,
587}
588
589impl Default for MemoryStateStore {
590    fn default() -> Self {
591        Self::new()
592    }
593}
594
595impl MemoryStateStore {
596    /// Create a new memory-based state store
597    pub fn new() -> Self {
598        Self {
599            vertices: DashMap::new(),
600            peers: DashMap::new(),
601            dark_records: DashMap::new(),
602        }
603    }
604
605    /// Clear all stored data
606    pub fn clear(&self) {
607        self.vertices.clear();
608        self.peers.clear();
609        self.dark_records.clear();
610    }
611}
612
613#[async_trait]
614impl StateStore for MemoryStateStore {
615    async fn save_vertex(&self, vertex: &Vertex) -> Result<()> {
616        self.vertices.insert(vertex.id.clone(), vertex.clone());
617        debug!("Saved vertex {:?} to memory", vertex.id);
618        Ok(())
619    }
620
621    async fn load_vertex(&self, id: &VertexId) -> Result<Option<Vertex>> {
622        let vertex = self.vertices.get(id).map(|entry| entry.clone());
623        if vertex.is_some() {
624            debug!("Loaded vertex {:?} from memory", id);
625        }
626        Ok(vertex)
627    }
628
629    async fn save_peer(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()> {
630        self.peers.insert(*peer_id, info.clone());
631        debug!("Saved peer {:?} to memory", peer_id);
632        Ok(())
633    }
634
635    async fn load_peers(&self) -> Result<Vec<(PeerId, PeerInfo)>> {
636        let peers: Vec<(PeerId, PeerInfo)> = self
637            .peers
638            .iter()
639            .map(|entry| (*entry.key(), entry.value().clone()))
640            .collect();
641        debug!("Loaded {} peers from memory", peers.len());
642        Ok(peers)
643    }
644
645    async fn save_dark_record(&self, record: &DarkDomainRecord) -> Result<()> {
646        let key = hex::encode(record.owner_id.as_bytes());
647        self.dark_records.insert(key, record.clone());
648        debug!(
649            "Saved dark domain record for owner {:?} to memory",
650            record.owner_id
651        );
652        Ok(())
653    }
654
655    async fn load_dark_records(&self) -> Result<Vec<DarkDomainRecord>> {
656        let records: Vec<DarkDomainRecord> = self
657            .dark_records
658            .iter()
659            .map(|entry| entry.value().clone())
660            .collect();
661        debug!("Loaded {} dark domain records from memory", records.len());
662        Ok(records)
663    }
664
665    async fn remove_vertex(&self, id: &VertexId) -> Result<()> {
666        self.vertices.remove(id);
667        debug!("Removed vertex {:?} from memory", id);
668        Ok(())
669    }
670
671    async fn remove_peer(&self, peer_id: &PeerId) -> Result<()> {
672        self.peers.remove(peer_id);
673        debug!("Removed peer {:?} from memory", peer_id);
674        Ok(())
675    }
676
677    async fn remove_dark_record(&self, owner_id: &PeerId) -> Result<()> {
678        let key = hex::encode(owner_id.as_bytes());
679        self.dark_records.remove(&key);
680        debug!(
681            "Removed dark domain record for owner {:?} from memory",
682            owner_id
683        );
684        Ok(())
685    }
686
687    async fn vertex_count(&self) -> Result<usize> {
688        Ok(self.vertices.len())
689    }
690
691    async fn peer_count(&self) -> Result<usize> {
692        Ok(self.peers.len())
693    }
694
695    async fn dark_record_count(&self) -> Result<usize> {
696        Ok(self.dark_records.len())
697    }
698
699    async fn health_check(&self) -> Result<bool> {
700        // Memory store is always healthy if it exists
701        Ok(true)
702    }
703
704    async fn save_state(&self, _state: &PersistedDagState) -> Result<()> {
705        // Memory store doesn't persist state to disk
706        debug!("State save called on memory store (no-op)");
707        Ok(())
708    }
709
710    async fn recover_state(&self) -> Result<Option<PersistedDagState>> {
711        // Memory store doesn't persist state to disk
712        debug!("State recovery called on memory store (returning None)");
713        Ok(None)
714    }
715
716    async fn create_backup(&self, _backup_path: &PathBuf) -> Result<()> {
717        // Memory store doesn't support backups
718        warn!("Backup creation called on memory store (no-op)");
719        Ok(())
720    }
721
722    async fn restore_backup(&self, _backup_path: &PathBuf) -> Result<()> {
723        // Memory store doesn't support backups
724        warn!("Backup restoration called on memory store (no-op)");
725        Ok(())
726    }
727}
728
729/// Enhanced NodeRunner with persistence integration
730pub struct PersistentNodeRunner<S: StateStore> {
731    /// Storage backend
732    store: Arc<S>,
733    /// Auto-save interval in seconds
734    auto_save_interval: u64,
735    /// Whether persistence is enabled
736    persistence_enabled: bool,
737}
738
739impl<S: StateStore + 'static> PersistentNodeRunner<S> {
740    /// Create a new persistent node runner
741    pub fn new(store: Arc<S>) -> Self {
742        Self {
743            store,
744            auto_save_interval: 300, // 5 minutes default
745            persistence_enabled: true,
746        }
747    }
748
749    /// Set auto-save interval in seconds
750    pub fn set_auto_save_interval(&mut self, seconds: u64) {
751        self.auto_save_interval = seconds;
752    }
753
754    /// Enable or disable persistence
755    pub fn set_persistence_enabled(&mut self, enabled: bool) {
756        self.persistence_enabled = enabled;
757    }
758
759    /// Save a DAG vertex after consensus
760    pub async fn save_vertex_after_consensus(&self, vertex: &Vertex) -> Result<()> {
761        if !self.persistence_enabled {
762            return Ok(());
763        }
764
765        self.store.save_vertex(vertex).await?;
766        info!("Persisted vertex {:?} after consensus", vertex.id);
767        Ok(())
768    }
769
770    /// Persist peer information
771    pub async fn persist_peer_info(&self, peer_id: &PeerId, info: &PeerInfo) -> Result<()> {
772        if !self.persistence_enabled {
773            return Ok(());
774        }
775
776        self.store.save_peer(peer_id, info).await?;
777        debug!("Persisted peer info for {:?}", peer_id);
778        Ok(())
779    }
780
781    /// Store dark domain registration
782    pub async fn store_dark_domain_registration(&self, record: &DarkDomainRecord) -> Result<()> {
783        if !self.persistence_enabled {
784            return Ok(());
785        }
786
787        self.store.save_dark_record(record).await?;
788        info!(
789            "Stored dark domain registration for owner {:?}",
790            record.owner_id
791        );
792        Ok(())
793    }
794
795    /// Load state on startup
796    pub async fn load_state_on_startup(&self) -> Result<StartupState> {
797        if !self.persistence_enabled {
798            return Ok(StartupState::default());
799        }
800
801        info!("Loading persisted state on startup...");
802
803        let vertices = vec![]; // Would load all vertices in a real implementation
804        let peers = self.store.load_peers().await?;
805        let dark_records = self.store.load_dark_records().await?;
806
807        let state = StartupState {
808            vertices,
809            peers,
810            dark_records,
811        };
812
813        info!(
814            "Loaded startup state: {} vertices, {} peers, {} dark records",
815            state.vertices.len(),
816            state.peers.len(),
817            state.dark_records.len()
818        );
819
820        Ok(state)
821    }
822
823    /// Start auto-save background task
824    pub async fn start_auto_save_task(&self) -> Result<()> {
825        if !self.persistence_enabled || self.auto_save_interval == 0 {
826            return Ok(());
827        }
828
829        let store = self.store.clone();
830        let interval = self.auto_save_interval;
831
832        tokio::spawn(async move {
833            let mut interval_timer =
834                tokio::time::interval(tokio::time::Duration::from_secs(interval));
835
836            loop {
837                interval_timer.tick().await;
838
839                match store.health_check().await {
840                    Ok(true) => {
841                        debug!("Auto-save health check passed");
842                    }
843                    Ok(false) => {
844                        warn!("Auto-save health check failed");
845                    }
846                    Err(e) => {
847                        error!("Auto-save health check error: {}", e);
848                    }
849                }
850            }
851        });
852
853        info!("Started auto-save task with {} second interval", interval);
854        Ok(())
855    }
856
857    /// Get storage statistics
858    pub async fn get_storage_stats(&self) -> Result<StorageStats> {
859        let vertex_count = self.store.vertex_count().await?;
860        let peer_count = self.store.peer_count().await?;
861        let dark_record_count = self.store.dark_record_count().await?;
862        let healthy = self.store.health_check().await?;
863
864        Ok(StorageStats {
865            vertex_count,
866            peer_count,
867            dark_record_count,
868            healthy,
869        })
870    }
871}
872
873/// State loaded on node startup
874#[derive(Debug, Default)]
875pub struct StartupState {
876    /// Loaded vertices
877    pub vertices: Vec<Vertex>,
878    /// Loaded peers
879    pub peers: Vec<(PeerId, PeerInfo)>,
880    /// Loaded dark domain records
881    pub dark_records: Vec<DarkDomainRecord>,
882}
883
884/// Storage statistics
885#[derive(Debug)]
886pub struct StorageStats {
887    /// Number of stored vertices
888    pub vertex_count: usize,
889    /// Number of stored peers
890    pub peer_count: usize,
891    /// Number of stored dark records
892    pub dark_record_count: usize,
893    /// Whether storage is healthy
894    pub healthy: bool,
895}
896
897/// Helper function to copy directory contents recursively
898fn copy_dir_all(
899    src: PathBuf,
900    dst: PathBuf,
901) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'static>> {
902    Box::pin(async move {
903        fs::create_dir_all(&dst).await.map_err(|e| {
904            PersistenceError::DirectoryCreation(format!(
905                "Failed to create destination directory: {}",
906                e
907            ))
908        })?;
909
910        let mut entries = fs::read_dir(&src).await?;
911        while let Some(entry) = entries.next_entry().await? {
912            let path = entry.path();
913            let dest_path = dst.join(entry.file_name());
914
915            if path.is_dir() {
916                copy_dir_all(path, dest_path).await?;
917            } else {
918                fs::copy(&path, &dest_path).await?;
919            }
920        }
921        Ok(())
922    })
923}
924
925#[cfg(test)]
926mod tests {
927    use super::*;
928    use qudag_dag::vertex::VertexId;
929    use qudag_network::peer::PeerId;
930    use std::collections::HashSet;
931    use tempfile::tempdir;
932
933    fn create_test_vertex() -> Vertex {
934        Vertex::new(VertexId::new(), vec![1, 2, 3, 4], HashSet::new())
935    }
936
937    fn create_test_peer_info() -> PeerInfo {
938        PeerInfo {
939            address: "127.0.0.1:8080".to_string(),
940            last_seen: 1234567890,
941            reputation: 75,
942            trusted: true,
943            connection_count: 5,
944            bytes_exchanged: 1024,
945            metadata: HashMap::new(),
946        }
947    }
948
949    fn create_test_dark_record() -> DarkDomainRecord {
950        use qudag_network::types::NetworkAddress;
951        use std::collections::HashMap;
952
953        DarkDomainRecord {
954            signing_public_key: vec![1, 2, 3, 4],
955            encryption_public_key: vec![5, 6, 7, 8],
956            addresses: vec![NetworkAddress::new([127, 0, 0, 1], 8080)],
957            alias: Some("test.dark".to_string()),
958            ttl: 3600,
959            registered_at: 1234567890,
960            expires_at: 1234567890 + 3600,
961            owner_id: PeerId::new(),
962            signature: vec![9, 10, 11, 12],
963            metadata: HashMap::new(),
964        }
965    }
966
967    #[tokio::test]
968    async fn test_memory_store_vertices() {
969        let store = MemoryStateStore::new();
970        let vertex = create_test_vertex();
971
972        // Save vertex
973        store.save_vertex(&vertex).await.unwrap();
974
975        // Load vertex
976        let loaded = store.load_vertex(&vertex.id).await.unwrap();
977        assert!(loaded.is_some());
978        assert_eq!(loaded.unwrap().id, vertex.id);
979
980        // Check count
981        assert_eq!(store.vertex_count().await.unwrap(), 1);
982
983        // Remove vertex
984        store.remove_vertex(&vertex.id).await.unwrap();
985        assert_eq!(store.vertex_count().await.unwrap(), 0);
986    }
987
988    #[tokio::test]
989    async fn test_memory_store_peers() {
990        let store = MemoryStateStore::new();
991        let peer_id = PeerId::random();
992        let peer_info = create_test_peer_info();
993
994        // Save peer
995        store.save_peer(&peer_id, &peer_info).await.unwrap();
996
997        // Load peers
998        let peers = store.load_peers().await.unwrap();
999        assert_eq!(peers.len(), 1);
1000        assert_eq!(peers[0].0, peer_id);
1001
1002        // Check count
1003        assert_eq!(store.peer_count().await.unwrap(), 1);
1004
1005        // Remove peer
1006        store.remove_peer(&peer_id).await.unwrap();
1007        assert_eq!(store.peer_count().await.unwrap(), 0);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_memory_store_dark_records() {
1012        let store = MemoryStateStore::new();
1013        let record = create_test_dark_record();
1014
1015        // Save record
1016        store.save_dark_record(&record).await.unwrap();
1017
1018        // Load records
1019        let records = store.load_dark_records().await.unwrap();
1020        assert_eq!(records.len(), 1);
1021        assert_eq!(records[0].owner_id, record.owner_id);
1022
1023        // Check count
1024        assert_eq!(store.dark_record_count().await.unwrap(), 1);
1025
1026        // Remove record
1027        store.remove_dark_record(&record.owner_id).await.unwrap();
1028        assert_eq!(store.dark_record_count().await.unwrap(), 0);
1029    }
1030
1031    #[tokio::test]
1032    async fn test_file_store_vertices() {
1033        let temp_dir = tempdir().unwrap();
1034        let store = FileStateStore::new(temp_dir.path().to_path_buf())
1035            .await
1036            .unwrap();
1037        let vertex = create_test_vertex();
1038
1039        // Save vertex
1040        store.save_vertex(&vertex).await.unwrap();
1041
1042        // Load vertex
1043        let loaded = store.load_vertex(&vertex.id).await.unwrap();
1044        assert!(loaded.is_some());
1045        assert_eq!(loaded.unwrap().id, vertex.id);
1046
1047        // Check count
1048        assert_eq!(store.vertex_count().await.unwrap(), 1);
1049    }
1050
1051    #[tokio::test]
1052    async fn test_file_store_peers() {
1053        let temp_dir = tempdir().unwrap();
1054        let store = FileStateStore::new(temp_dir.path().to_path_buf())
1055            .await
1056            .unwrap();
1057        let peer_id = PeerId::random();
1058        let peer_info = create_test_peer_info();
1059
1060        // Save peer
1061        store.save_peer(&peer_id, &peer_info).await.unwrap();
1062
1063        // Load peers
1064        let peers = store.load_peers().await.unwrap();
1065        assert_eq!(peers.len(), 1);
1066        assert_eq!(peers[0].0, peer_id);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_persistent_node_runner() {
1071        let store = Arc::new(MemoryStateStore::new());
1072        let mut runner = PersistentNodeRunner::new(store.clone());
1073        runner.set_auto_save_interval(1);
1074
1075        let vertex = create_test_vertex();
1076        runner.save_vertex_after_consensus(&vertex).await.unwrap();
1077
1078        let peer_id = PeerId::random();
1079        let peer_info = create_test_peer_info();
1080        runner
1081            .persist_peer_info(&peer_id, &peer_info)
1082            .await
1083            .unwrap();
1084
1085        let dark_record = create_test_dark_record();
1086        runner
1087            .store_dark_domain_registration(&dark_record)
1088            .await
1089            .unwrap();
1090
1091        let state = runner.load_state_on_startup().await.unwrap();
1092        assert_eq!(state.peers.len(), 1);
1093        assert_eq!(state.dark_records.len(), 1);
1094
1095        let stats = runner.get_storage_stats().await.unwrap();
1096        assert_eq!(stats.vertex_count, 1);
1097        assert_eq!(stats.peer_count, 1);
1098        assert_eq!(stats.dark_record_count, 1);
1099        assert!(stats.healthy);
1100    }
1101
1102    #[tokio::test]
1103    async fn test_health_check() {
1104        let store = MemoryStateStore::new();
1105        assert!(store.health_check().await.unwrap());
1106
1107        let temp_dir = tempdir().unwrap();
1108        let file_store = FileStateStore::new(temp_dir.path().to_path_buf())
1109            .await
1110            .unwrap();
1111        assert!(file_store.health_check().await.unwrap());
1112    }
1113}