Skip to main content

streaming_crypto/core_api/recovery/
persist.rs

1// ## 📦 `src/recovery/persist.rs`
2
3// - **Rotation policy**: archive current entries and start fresh.  
4// - **Compaction strategy**: remove redundant entries (e.g. consecutive scheduler markers).  
5// - **Persistence hooks**: optional save/load to disk.  
6// - **Unit tests**: validate append, rotate, replay, compaction.  
7
8//! Unified log manager for append, rotation, replay, compaction.
9use std::fs::{self, File, OpenOptions};
10use std::io::{self, Write, BufRead, BufReader, BufWriter};
11use std::sync::mpsc::{channel, Sender};
12use std::thread;
13use base64::{engine::general_purpose::STANDARD, Engine};
14use tracing::{debug, error};
15
16#[derive(Debug, Clone)]
17pub enum UnifiedEntry {
18    Scheduler(String), // e.g. compaction marker
19    Encrypt(Vec<u8>),  // encrypted frame
20    Decrypt(Vec<u8>),  // decrypted frame
21}
22enum LogCommand {
23    Append(UnifiedEntry),
24    Rotate,
25}
26#[derive(Debug)]
27pub struct LogManager {
28    pub entries: Vec<UnifiedEntry>,
29    rotation_limit: usize,
30    writer: BufWriter<File>,
31}
32
33impl LogManager {
34    /// Create a new log manager with a rotation limit.
35    pub fn new(path: &str, rotation_limit: usize) -> io::Result<Self> {
36        let file = OpenOptions::new().create(true).append(true).open(path)?;
37        Ok(Self { 
38            entries: Vec::new(), 
39            rotation_limit,
40            writer: BufWriter::new(file)
41        })
42    }
43
44    /// Append a new entry.
45    pub fn append(&mut self, entry: UnifiedEntry) -> io::Result<()> {
46        let line = match &entry {
47            UnifiedEntry::Scheduler(msg) => format!("SCHEDULER: {}\n", msg),
48            UnifiedEntry::Encrypt(data) => format!("ENCRYPT: {}\n", STANDARD.encode(data)),
49            UnifiedEntry::Decrypt(data) => format!("DECRYPT: {}\n", STANDARD.encode(data)),
50        };
51        // Write to the already-open BufWriter
52        self.writer.write_all(line.as_bytes())?;
53        // 🔥 CRASH-CONSISTENCY:
54        // It forces the OS to write the bytes to disk immediately.
55        self.writer.flush()?; 
56        
57        self.entries.push(entry);
58        if self.entries.len() >= self.rotation_limit {
59            self.entries.clear(); // Rotation logic: clear mem, file handle remains
60        }
61        Ok(())
62    }
63
64    /// Load entries from file (basic replay).
65    pub fn stream_log(path: &str) -> io::Result<impl Iterator<Item = io::Result<String>>> {
66        let file = File::open(path)?;
67        Ok(BufReader::new(file).lines())
68    }
69    
70    /// Rotate: archive current entries and clear.
71    pub fn rotate(&mut self) {
72        // For simplicity, write to a file named "unified.log"
73        if let Err(e) = self.persist_to_file("unified.log") {
74            error!("Log rotation failed: {}", e);
75        }
76        self.entries.clear();
77    }
78
79    /// Replay all entries.
80    pub fn replay(&self) -> &[UnifiedEntry] {
81        &self.entries
82    }
83
84    /// Persist entries to file.
85    pub fn persist_to_file(&self, path: &str) -> io::Result<()> {
86        let mut file = OpenOptions::new().create(true).append(true).open(path)?;
87        for entry in &self.entries {
88            let line = match entry {
89                UnifiedEntry::Scheduler(msg) => format!("SCHEDULER: {}\n", msg),
90                UnifiedEntry::Encrypt(data) => format!("ENCRYPT: {} bytes\n", data.len()),
91                UnifiedEntry::Decrypt(data) => format!("DECRYPT: {} bytes\n", data.len()),
92            };
93            file.write_all(line.as_bytes())?;
94        }
95        Ok(())
96    }
97
98}
99
100#[derive(Debug, Clone)]
101pub struct AsyncLogManager {
102    tx: Sender<LogCommand>,
103}
104
105impl AsyncLogManager {
106    /// Initialize the background logger thread.
107    pub fn new(path: &str, rotation_limit: usize) -> io::Result<Self> {
108        let (tx, rx) = channel::<LogCommand>();
109        let path_owned = path.to_string();
110        
111        // Clone sender so the background thread can trigger its own rotation
112        let tx_internal = tx.clone(); 
113
114        thread::spawn(move || {
115            let file = OpenOptions::new().create(true).append(true).open(&path_owned)
116                .expect("Failed to open log file in background thread");
117            let mut writer = BufWriter::new(file);
118            let mut count = 0;
119
120            while let Ok(cmd) = rx.recv() {
121                match cmd {
122                    LogCommand::Append(entry) => {
123                        // FIX: Use the defined helper function below
124                        let line = format_entry(&entry);
125                        
126                        if let Err(e) = writer.write_all(line.as_bytes()) {
127                            error!("Log Write Error: {}", e);
128                            continue;
129                        }
130                        let _ = writer.flush();
131
132                        count += 1;
133                        if count >= rotation_limit {
134                            // FIX: Successfully use tx_internal to trigger rotation
135                            let _ = tx_internal.send(LogCommand::Rotate);
136                            count = 0;
137                        }
138                    }
139                    LogCommand::Rotate => {
140                        let _ = writer.flush();
141                        drop(writer); // Close file handle
142
143                        let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%S");
144                        let archived_path = format!("{}.{}", path_owned, timestamp);
145                        
146                        if fs::rename(&path_owned, &archived_path).is_ok() {
147                            let archive_to_compress = archived_path.clone();
148                            // Background Zstd compression (New for 2.3.0)
149                            thread::spawn(move || {
150                                compress_log_file(&archive_to_compress);
151                            });
152                        }
153
154                        let new_file = OpenOptions::new().create(true).append(true).open(&path_owned).unwrap();
155                        writer = BufWriter::new(new_file);
156                    }
157                }
158            }
159        });
160
161        Ok(Self { tx })
162    }
163
164    pub fn console(&self, message: String) {
165        debug!("{}", message);
166    }
167    /// Non-blocking append. Sends entry to background thread.
168    pub fn append(&self, entry: UnifiedEntry) {
169        if let Err(e) = self.tx.send(LogCommand::Append(entry)) {
170            error!("Failed to send log entry to background thread: {}", e);
171        }
172    }
173
174    /// Stream log entries for bootstrap (remains synchronous)
175    pub fn stream_log(path: &str) -> io::Result<impl Iterator<Item = io::Result<String>>> {
176        let file = File::open(path)?;
177        Ok(BufReader::new(file).lines())
178    }
179
180}
181
182/// FIX: Helper function to handle string formatting for log entries
183fn format_entry(entry: &UnifiedEntry) -> String {
184    match entry {
185        UnifiedEntry::Scheduler(msg) => format!("SCHEDULER: {}\n", msg),
186        UnifiedEntry::Encrypt(data) => format!("ENCRYPT: {}\n", STANDARD.encode(data)),
187        UnifiedEntry::Decrypt(data) => format!("DECRYPT: {}\n", STANDARD.encode(data)),
188    }
189}
190
191/// Helper for async Zstd compression
192fn compress_log_file(src_path: &str) {
193    let dest_path = format!("{}.zst", src_path);
194    if let (Ok(src), Ok(dest)) = (File::open(src_path), File::create(&dest_path)) {
195        // Zstd Level 3 is the 2026 standard for log archival
196        if zstd::stream::copy_encode(src, dest, 3).is_ok() {
197            let _ = fs::remove_file(src_path);
198        }
199    }
200}
201
202/// Compaction logic: remove redundant scheduler markers.
203pub fn compact_unified_log(entries: &mut Vec<UnifiedEntry>) {
204    let mut compacted = Vec::new();
205    let mut last_scheduler: Option<String> = None;
206
207    for entry in entries.drain(..) {
208        match &entry {
209            UnifiedEntry::Scheduler(msg) => {
210                if Some(msg.clone()) != last_scheduler {
211                    compacted.push(entry.clone());
212                    last_scheduler = Some(msg.clone());
213                }
214            }
215            _ => compacted.push(entry),
216        }
217    }
218
219    *entries = compacted;
220}
221
222// ## ✅ What’s Added
223// - **Rotation**: auto‑rotates when `rotation_limit` reached, persists to file.  
224// - **Persistence**: `persist_to_file` and `load_from_file`.  
225// - **Compaction**: removes redundant scheduler entries.  
226// - **Unit tests**: validate append/replay and compaction.  
227