streaming_crypto/core_api/recovery/
persist.rs1use std::fs::{self, File, OpenOptions};
10use std::io::{self, Write, BufRead, BufReader, BufWriter};
11use std::sync::mpsc::{channel, Sender};
12use std::thread;
13use base64::{engine::general_purpose::STANDARD, Engine};
14use tracing::{debug, error};
15
16#[derive(Debug, Clone)]
17pub enum UnifiedEntry {
18 Scheduler(String), Encrypt(Vec<u8>), Decrypt(Vec<u8>), }
22enum LogCommand {
23 Append(UnifiedEntry),
24 Rotate,
25}
26#[derive(Debug)]
27pub struct LogManager {
28 pub entries: Vec<UnifiedEntry>,
29 rotation_limit: usize,
30 writer: BufWriter<File>,
31}
32
33impl LogManager {
34 pub fn new(path: &str, rotation_limit: usize) -> io::Result<Self> {
36 let file = OpenOptions::new().create(true).append(true).open(path)?;
37 Ok(Self {
38 entries: Vec::new(),
39 rotation_limit,
40 writer: BufWriter::new(file)
41 })
42 }
43
44 pub fn append(&mut self, entry: UnifiedEntry) -> io::Result<()> {
46 let line = match &entry {
47 UnifiedEntry::Scheduler(msg) => format!("SCHEDULER: {}\n", msg),
48 UnifiedEntry::Encrypt(data) => format!("ENCRYPT: {}\n", STANDARD.encode(data)),
49 UnifiedEntry::Decrypt(data) => format!("DECRYPT: {}\n", STANDARD.encode(data)),
50 };
51 self.writer.write_all(line.as_bytes())?;
53 self.writer.flush()?;
56
57 self.entries.push(entry);
58 if self.entries.len() >= self.rotation_limit {
59 self.entries.clear(); }
61 Ok(())
62 }
63
64 pub fn stream_log(path: &str) -> io::Result<impl Iterator<Item = io::Result<String>>> {
66 let file = File::open(path)?;
67 Ok(BufReader::new(file).lines())
68 }
69
70 pub fn rotate(&mut self) {
72 if let Err(e) = self.persist_to_file("unified.log") {
74 error!("Log rotation failed: {}", e);
75 }
76 self.entries.clear();
77 }
78
79 pub fn replay(&self) -> &[UnifiedEntry] {
81 &self.entries
82 }
83
84 pub fn persist_to_file(&self, path: &str) -> io::Result<()> {
86 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
87 for entry in &self.entries {
88 let line = match entry {
89 UnifiedEntry::Scheduler(msg) => format!("SCHEDULER: {}\n", msg),
90 UnifiedEntry::Encrypt(data) => format!("ENCRYPT: {} bytes\n", data.len()),
91 UnifiedEntry::Decrypt(data) => format!("DECRYPT: {} bytes\n", data.len()),
92 };
93 file.write_all(line.as_bytes())?;
94 }
95 Ok(())
96 }
97
98}
99
100#[derive(Debug, Clone)]
101pub struct AsyncLogManager {
102 tx: Sender<LogCommand>,
103}
104
105impl AsyncLogManager {
106 pub fn new(path: &str, rotation_limit: usize) -> io::Result<Self> {
108 let (tx, rx) = channel::<LogCommand>();
109 let path_owned = path.to_string();
110
111 let tx_internal = tx.clone();
113
114 thread::spawn(move || {
115 let file = OpenOptions::new().create(true).append(true).open(&path_owned)
116 .expect("Failed to open log file in background thread");
117 let mut writer = BufWriter::new(file);
118 let mut count = 0;
119
120 while let Ok(cmd) = rx.recv() {
121 match cmd {
122 LogCommand::Append(entry) => {
123 let line = format_entry(&entry);
125
126 if let Err(e) = writer.write_all(line.as_bytes()) {
127 error!("Log Write Error: {}", e);
128 continue;
129 }
130 let _ = writer.flush();
131
132 count += 1;
133 if count >= rotation_limit {
134 let _ = tx_internal.send(LogCommand::Rotate);
136 count = 0;
137 }
138 }
139 LogCommand::Rotate => {
140 let _ = writer.flush();
141 drop(writer); let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%S");
144 let archived_path = format!("{}.{}", path_owned, timestamp);
145
146 if fs::rename(&path_owned, &archived_path).is_ok() {
147 let archive_to_compress = archived_path.clone();
148 thread::spawn(move || {
150 compress_log_file(&archive_to_compress);
151 });
152 }
153
154 let new_file = OpenOptions::new().create(true).append(true).open(&path_owned).unwrap();
155 writer = BufWriter::new(new_file);
156 }
157 }
158 }
159 });
160
161 Ok(Self { tx })
162 }
163
164 pub fn console(&self, message: String) {
165 debug!("{}", message);
166 }
167 pub fn append(&self, entry: UnifiedEntry) {
169 if let Err(e) = self.tx.send(LogCommand::Append(entry)) {
170 error!("Failed to send log entry to background thread: {}", e);
171 }
172 }
173
174 pub fn stream_log(path: &str) -> io::Result<impl Iterator<Item = io::Result<String>>> {
176 let file = File::open(path)?;
177 Ok(BufReader::new(file).lines())
178 }
179
180}
181
182fn format_entry(entry: &UnifiedEntry) -> String {
184 match entry {
185 UnifiedEntry::Scheduler(msg) => format!("SCHEDULER: {}\n", msg),
186 UnifiedEntry::Encrypt(data) => format!("ENCRYPT: {}\n", STANDARD.encode(data)),
187 UnifiedEntry::Decrypt(data) => format!("DECRYPT: {}\n", STANDARD.encode(data)),
188 }
189}
190
191fn compress_log_file(src_path: &str) {
193 let dest_path = format!("{}.zst", src_path);
194 if let (Ok(src), Ok(dest)) = (File::open(src_path), File::create(&dest_path)) {
195 if zstd::stream::copy_encode(src, dest, 3).is_ok() {
197 let _ = fs::remove_file(src_path);
198 }
199 }
200}
201
202pub fn compact_unified_log(entries: &mut Vec<UnifiedEntry>) {
204 let mut compacted = Vec::new();
205 let mut last_scheduler: Option<String> = None;
206
207 for entry in entries.drain(..) {
208 match &entry {
209 UnifiedEntry::Scheduler(msg) => {
210 if Some(msg.clone()) != last_scheduler {
211 compacted.push(entry.clone());
212 last_scheduler = Some(msg.clone());
213 }
214 }
215 _ => compacted.push(entry),
216 }
217 }
218
219 *entries = compacted;
220}
221
222