1use crate::error::{CommyError, Result};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone)]
14pub struct VariableMetadata {
15 pub name: String,
17
18 pub offset: u64,
20
21 pub size: u64,
23
24 pub type_id: u32,
26
27 pub persistent: bool,
29}
30
31impl VariableMetadata {
32 pub fn new(name: String, offset: u64, size: u64, type_id: u32) -> Self {
34 Self {
35 name,
36 offset,
37 size,
38 type_id,
39 persistent: false,
40 }
41 }
42
43 pub fn with_persistent(mut self, persistent: bool) -> Self {
45 self.persistent = persistent;
46 self
47 }
48}
49
50#[derive(Debug)]
59pub struct VirtualVariableFile {
60 service_id: String,
62
63 service_name: String,
65
66 tenant_id: String,
68
69 variables: Arc<RwLock<HashMap<String, VariableMetadata>>>,
71
72 current_bytes: Arc<RwLock<Vec<u8>>>,
74
75 shadow_bytes: Arc<RwLock<Vec<u8>>>,
77
78 changed_variables: Arc<RwLock<Vec<String>>>,
80}
81
82impl VirtualVariableFile {
83 pub fn new(service_id: String, service_name: String, tenant_id: String) -> Self {
85 Self {
86 service_id,
87 service_name,
88 tenant_id,
89 variables: Arc::new(RwLock::new(HashMap::new())),
90 current_bytes: Arc::new(RwLock::new(Vec::new())),
91 shadow_bytes: Arc::new(RwLock::new(Vec::new())),
92 changed_variables: Arc::new(RwLock::new(Vec::new())),
93 }
94 }
95
96 pub fn service_id(&self) -> &str {
98 &self.service_id
99 }
100
101 pub fn service_name(&self) -> &str {
103 &self.service_name
104 }
105
106 pub fn tenant_id(&self) -> &str {
108 &self.tenant_id
109 }
110
111 pub async fn register_variable(&self, metadata: VariableMetadata) -> Result<()> {
113 let mut vars = self.variables.write().await;
114
115 let end = metadata.offset as usize + metadata.size as usize;
117 let mut current = self.current_bytes.write().await;
118 if current.len() < end {
119 current.resize(end, 0);
120 }
121
122 let mut shadow = self.shadow_bytes.write().await;
124 if shadow.len() < end {
125 shadow.resize(end, 0);
126 }
127
128 vars.insert(metadata.name.clone(), metadata);
129 Ok(())
130 }
131
132 pub async fn get_variable_metadata(&self, name: &str) -> Result<VariableMetadata> {
134 let vars = self.variables.read().await;
135 vars.get(name)
136 .cloned()
137 .ok_or_else(|| CommyError::VariableNotFound(name.to_string()))
138 }
139
140 pub async fn list_variables(&self) -> Result<Vec<VariableMetadata>> {
142 let vars = self.variables.read().await;
143 Ok(vars.values().cloned().collect())
144 }
145
146 pub async fn read_variable_slice(&self, name: &str) -> Result<Vec<u8>> {
148 let metadata = self.get_variable_metadata(name).await?;
149 let current = self.current_bytes.read().await;
150
151 let start = metadata.offset as usize;
152 let end = start + metadata.size as usize;
153
154 if end > current.len() {
155 return Err(CommyError::InvalidOffset(format!(
156 "Variable {} extends beyond file bounds",
157 name
158 )));
159 }
160
161 Ok(current[start..end].to_vec())
162 }
163
164 pub async fn write_variable(&self, name: &str, data: &[u8]) -> Result<()> {
166 let metadata = self.get_variable_metadata(name).await?;
167
168 if data.len() as u64 != metadata.size {
169 return Err(CommyError::InvalidMessage(format!(
170 "Data size {} does not match variable size {}",
171 data.len(),
172 metadata.size
173 )));
174 }
175
176 let mut current = self.current_bytes.write().await;
177 let start = metadata.offset as usize;
178 let end = start + data.len();
179
180 if end > current.len() {
181 return Err(CommyError::InvalidOffset(format!(
182 "Variable {} offset out of bounds",
183 name
184 )));
185 }
186
187 current[start..end].copy_from_slice(data);
188
189 let mut changed = self.changed_variables.write().await;
191 if !changed.contains(&name.to_string()) {
192 changed.push(name.to_string());
193 }
194
195 Ok(())
196 }
197
198 pub async fn bytes(&self) -> Vec<u8> {
200 self.current_bytes.read().await.clone()
201 }
202
203 pub async fn update_bytes(&self, data: Vec<u8>) -> Result<()> {
205 let mut current = self.current_bytes.write().await;
206 *current = data;
207 Ok(())
208 }
209
210 pub async fn shadow_bytes(&self) -> Vec<u8> {
212 self.shadow_bytes.read().await.clone()
213 }
214
215 pub async fn update_shadow_bytes(&self, data: Vec<u8>) -> Result<()> {
217 let mut shadow = self.shadow_bytes.write().await;
218 *shadow = data;
219 Ok(())
220 }
221
222 pub async fn get_changed_variables(&self) -> Vec<String> {
224 self.changed_variables.read().await.clone()
225 }
226
227 pub async fn clear_changes(&self) {
229 self.changed_variables.write().await.clear();
230 }
231
232 pub async fn mark_variables_changed(&self, names: Vec<String>) {
234 let mut changed = self.changed_variables.write().await;
235 for name in names {
236 if !changed.contains(&name) {
237 changed.push(name);
238 }
239 }
240 }
241
242 pub async fn compare_ranges(current: &[u8], shadow: &[u8]) -> Result<Vec<(u64, u64)>> {
246 if current.len() != shadow.len() {
247 return Err(CommyError::SimdError(
248 "Cannot compare buffers of different sizes".to_string(),
249 ));
250 }
251
252 let mut differences = Vec::new();
253 let mut i = 0;
254
255 #[cfg(target_arch = "x86_64")]
257 {
258 use std::arch::x86_64::*;
259
260 if is_x86_feature_detected!("avx512f") {
261 while i + 64 <= current.len() {
262 unsafe {
263 let a = _mm512_loadu_si512(current[i..].as_ptr() as *const _);
264 let b = _mm512_loadu_si512(shadow[i..].as_ptr() as *const _);
265 let cmp = _mm512_cmpeq_epi8_mask(a, b);
266
267 if cmp != 0xFFFFFFFFFFFFFFFF {
269 differences.push((i as u64, (i + 64) as u64));
270 }
271 }
272 i += 64;
273 }
274 }
275 }
276
277 #[cfg(target_arch = "x86_64")]
279 {
280 use std::arch::x86_64::*;
281
282 if i == 0 && is_x86_feature_detected!("avx2") {
283 while i + 32 <= current.len() {
284 unsafe {
285 let a = _mm256_loadu_si256(current[i..].as_ptr() as *const _);
286 let b = _mm256_loadu_si256(shadow[i..].as_ptr() as *const _);
287 let cmp = _mm256_cmpeq_epi8(a, b);
288
289 if _mm256_movemask_epi8(cmp) != -1 {
291 differences.push((i as u64, (i + 32) as u64));
292 }
293 }
294 i += 32;
295 }
296 }
297 }
298
299 while i + 8 <= current.len() {
301 let current_u64 = u64::from_ne_bytes([
302 current[i],
303 current[i + 1],
304 current[i + 2],
305 current[i + 3],
306 current[i + 4],
307 current[i + 5],
308 current[i + 6],
309 current[i + 7],
310 ]);
311 let shadow_u64 = u64::from_ne_bytes([
312 shadow[i],
313 shadow[i + 1],
314 shadow[i + 2],
315 shadow[i + 3],
316 shadow[i + 4],
317 shadow[i + 5],
318 shadow[i + 6],
319 shadow[i + 7],
320 ]);
321
322 if current_u64 != shadow_u64 {
323 differences.push((i as u64, (i + 8) as u64));
324 }
325 i += 8;
326 }
327
328 while i < current.len() {
330 if current[i] != shadow[i] {
331 differences.push((i as u64, (i + 1) as u64));
332 }
333 i += 1;
334 }
335
336 Ok(differences)
337 }
338
339 pub async fn find_changed_variables_from_diff(
341 &self,
342 diff_ranges: &[(u64, u64)],
343 ) -> Result<Vec<String>> {
344 let vars = self.variables.read().await;
345 let mut changed = Vec::new();
346
347 for (diff_start, diff_end) in diff_ranges {
348 for (name, metadata) in vars.iter() {
349 let var_start = metadata.offset;
350 let var_end = metadata.offset + metadata.size;
351
352 if *diff_start < var_end && *diff_end > var_start {
354 if !changed.contains(&name.clone()) {
355 changed.push(name.clone());
356 }
357 }
358 }
359 }
360
361 Ok(changed)
362 }
363
364 pub async fn sync_shadow(&self) -> Result<()> {
366 let current = self.current_bytes.read().await;
367 let mut shadow = self.shadow_bytes.write().await;
368 *shadow = current.clone();
369 self.changed_variables.write().await.clear();
370 Ok(())
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 #[tokio::test]
379 async fn test_register_and_read_variable() {
380 let vf = VirtualVariableFile::new(
381 "svc_1".to_string(),
382 "config".to_string(),
383 "tenant_1".to_string(),
384 );
385
386 let metadata = VariableMetadata::new("my_var".to_string(), 0, 8, 1);
387 vf.register_variable(metadata).await.unwrap();
388
389 vf.write_variable("my_var", &[1, 2, 3, 4, 5, 6, 7, 8])
391 .await
392 .unwrap();
393
394 let data = vf.read_variable_slice("my_var").await.unwrap();
396 assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8]);
397 }
398
399 #[tokio::test]
400 async fn test_change_tracking() {
401 let vf = VirtualVariableFile::new(
402 "svc_1".to_string(),
403 "config".to_string(),
404 "tenant_1".to_string(),
405 );
406
407 let metadata = VariableMetadata::new("var1".to_string(), 0, 4, 1);
408 vf.register_variable(metadata).await.unwrap();
409
410 vf.write_variable("var1", &[1, 2, 3, 4]).await.unwrap();
411
412 let changed = vf.get_changed_variables().await;
413 assert_eq!(changed.len(), 1);
414 assert_eq!(changed[0], "var1");
415 }
416
417 #[tokio::test]
418 async fn test_simd_compare() {
419 let current = vec![1, 2, 3, 4, 5, 6, 7, 8];
420 let mut shadow = current.clone();
421 shadow[3] = 99; let diffs = VirtualVariableFile::compare_ranges(¤t, &shadow)
424 .await
425 .unwrap();
426
427 assert!(!diffs.is_empty());
428 }
429}