Skip to main content

commy_sdk_rust/
virtual_file.rs

1//! Virtual variable file abstraction
2//!
3//! Provides a unified interface for accessing variables from either:
4//! - Direct memory-mapped files (local clients)
5//! - In-memory buffers synchronized via WSS (remote clients)
6
7use crate::error::{CommyError, Result};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12/// Metadata about a variable in the file
13#[derive(Debug, Clone)]
14pub struct VariableMetadata {
15    /// Variable name
16    pub name: String,
17
18    /// Byte offset in the file
19    pub offset: u64,
20
21    /// Variable size in bytes
22    pub size: u64,
23
24    /// Type ID
25    pub type_id: u32,
26
27    /// Whether this variable persists across disconnections
28    pub persistent: bool,
29}
30
31impl VariableMetadata {
32    /// Create new variable metadata
33    pub fn new(name: String, offset: u64, size: u64, type_id: u32) -> Self {
34        Self {
35            name,
36            offset,
37            size,
38            type_id,
39            persistent: false,
40        }
41    }
42
43    /// Set persistence flag
44    pub fn with_persistent(mut self, persistent: bool) -> Self {
45        self.persistent = persistent;
46        self
47    }
48}
49
50/// Virtual representation of a service's variable file
51///
52/// This abstraction allows both local memory-mapped files and remote WSS-synced files
53/// to be accessed through a unified interface. The file maintains:
54/// - Current variable data in memory
55/// - Metadata about variable locations and sizes
56/// - A shadow copy for change detection
57/// - Per-variable change tracking
58#[derive(Debug)]
59pub struct VirtualVariableFile {
60    /// Service ID
61    service_id: String,
62
63    /// Service name
64    service_name: String,
65
66    /// Tenant ID
67    tenant_id: String,
68
69    /// Variable metadata by name
70    variables: Arc<RwLock<HashMap<String, VariableMetadata>>>,
71
72    /// Current file bytes
73    current_bytes: Arc<RwLock<Vec<u8>>>,
74
75    /// Shadow copy (last known state)
76    shadow_bytes: Arc<RwLock<Vec<u8>>>,
77
78    /// Track which variables have changed
79    changed_variables: Arc<RwLock<Vec<String>>>,
80}
81
82impl VirtualVariableFile {
83    /// Create a new virtual variable file
84    pub fn new(service_id: String, service_name: String, tenant_id: String) -> Self {
85        Self {
86            service_id,
87            service_name,
88            tenant_id,
89            variables: Arc::new(RwLock::new(HashMap::new())),
90            current_bytes: Arc::new(RwLock::new(Vec::new())),
91            shadow_bytes: Arc::new(RwLock::new(Vec::new())),
92            changed_variables: Arc::new(RwLock::new(Vec::new())),
93        }
94    }
95
96    /// Get service ID
97    pub fn service_id(&self) -> &str {
98        &self.service_id
99    }
100
101    /// Get service name
102    pub fn service_name(&self) -> &str {
103        &self.service_name
104    }
105
106    /// Get tenant ID
107    pub fn tenant_id(&self) -> &str {
108        &self.tenant_id
109    }
110
111    /// Register a variable
112    pub async fn register_variable(&self, metadata: VariableMetadata) -> Result<()> {
113        let mut vars = self.variables.write().await;
114
115        // Ensure the byte buffer is large enough
116        let end = metadata.offset as usize + metadata.size as usize;
117        let mut current = self.current_bytes.write().await;
118        if current.len() < end {
119            current.resize(end, 0);
120        }
121
122        // Also resize shadow
123        let mut shadow = self.shadow_bytes.write().await;
124        if shadow.len() < end {
125            shadow.resize(end, 0);
126        }
127
128        vars.insert(metadata.name.clone(), metadata);
129        Ok(())
130    }
131
132    /// Get variable metadata by name
133    pub async fn get_variable_metadata(&self, name: &str) -> Result<VariableMetadata> {
134        let vars = self.variables.read().await;
135        vars.get(name)
136            .cloned()
137            .ok_or_else(|| CommyError::VariableNotFound(name.to_string()))
138    }
139
140    /// List all variables
141    pub async fn list_variables(&self) -> Result<Vec<VariableMetadata>> {
142        let vars = self.variables.read().await;
143        Ok(vars.values().cloned().collect())
144    }
145
146    /// Read a variable as zero-copy slice
147    pub async fn read_variable_slice(&self, name: &str) -> Result<Vec<u8>> {
148        let metadata = self.get_variable_metadata(name).await?;
149        let current = self.current_bytes.read().await;
150
151        let start = metadata.offset as usize;
152        let end = start + metadata.size as usize;
153
154        if end > current.len() {
155            return Err(CommyError::InvalidOffset(format!(
156                "Variable {} extends beyond file bounds",
157                name
158            )));
159        }
160
161        Ok(current[start..end].to_vec())
162    }
163
164    /// Write a variable
165    pub async fn write_variable(&self, name: &str, data: &[u8]) -> Result<()> {
166        let metadata = self.get_variable_metadata(name).await?;
167
168        if data.len() as u64 != metadata.size {
169            return Err(CommyError::InvalidMessage(format!(
170                "Data size {} does not match variable size {}",
171                data.len(),
172                metadata.size
173            )));
174        }
175
176        let mut current = self.current_bytes.write().await;
177        let start = metadata.offset as usize;
178        let end = start + data.len();
179
180        if end > current.len() {
181            return Err(CommyError::InvalidOffset(format!(
182                "Variable {} offset out of bounds",
183                name
184            )));
185        }
186
187        current[start..end].copy_from_slice(data);
188
189        // Mark as changed
190        let mut changed = self.changed_variables.write().await;
191        if !changed.contains(&name.to_string()) {
192            changed.push(name.to_string());
193        }
194
195        Ok(())
196    }
197
198    /// Get raw bytes (zero-copy reference to internal buffer)
199    pub async fn bytes(&self) -> Vec<u8> {
200        self.current_bytes.read().await.clone()
201    }
202
203    /// Update entire file content
204    pub async fn update_bytes(&self, data: Vec<u8>) -> Result<()> {
205        let mut current = self.current_bytes.write().await;
206        *current = data;
207        Ok(())
208    }
209
210    /// Get shadow copy
211    pub async fn shadow_bytes(&self) -> Vec<u8> {
212        self.shadow_bytes.read().await.clone()
213    }
214
215    /// Update shadow copy
216    pub async fn update_shadow_bytes(&self, data: Vec<u8>) -> Result<()> {
217        let mut shadow = self.shadow_bytes.write().await;
218        *shadow = data;
219        Ok(())
220    }
221
222    /// Get list of changed variables since last sync
223    pub async fn get_changed_variables(&self) -> Vec<String> {
224        self.changed_variables.read().await.clone()
225    }
226
227    /// Clear change tracking
228    pub async fn clear_changes(&self) {
229        self.changed_variables.write().await.clear();
230    }
231
232    /// Mark specific variables as changed
233    pub async fn mark_variables_changed(&self, names: Vec<String>) {
234        let mut changed = self.changed_variables.write().await;
235        for name in names {
236            if !changed.contains(&name) {
237                changed.push(name);
238            }
239        }
240    }
241
242    /// Compare two byte ranges using wide SIMD operations
243    ///
244    /// Returns byte offsets where differences were found
245    pub async fn compare_ranges(current: &[u8], shadow: &[u8]) -> Result<Vec<(u64, u64)>> {
246        if current.len() != shadow.len() {
247            return Err(CommyError::SimdError(
248                "Cannot compare buffers of different sizes".to_string(),
249            ));
250        }
251
252        let mut differences = Vec::new();
253        let mut i = 0;
254
255        // Try to use AVX-512 if available (64-byte chunks)
256        #[cfg(target_arch = "x86_64")]
257        {
258            use std::arch::x86_64::*;
259
260            if is_x86_feature_detected!("avx512f") {
261                while i + 64 <= current.len() {
262                    unsafe {
263                        let a = _mm512_loadu_si512(current[i..].as_ptr() as *const _);
264                        let b = _mm512_loadu_si512(shadow[i..].as_ptr() as *const _);
265                        let cmp = _mm512_cmpeq_epi8_mask(a, b);
266
267                        // If not all equal (mask != 0xFFFFFFFFFFFFFFFF)
268                        if cmp != 0xFFFFFFFFFFFFFFFF {
269                            differences.push((i as u64, (i + 64) as u64));
270                        }
271                    }
272                    i += 64;
273                }
274            }
275        }
276
277        // Fall back to AVX2 if available (32-byte chunks)
278        #[cfg(target_arch = "x86_64")]
279        {
280            use std::arch::x86_64::*;
281
282            if i == 0 && is_x86_feature_detected!("avx2") {
283                while i + 32 <= current.len() {
284                    unsafe {
285                        let a = _mm256_loadu_si256(current[i..].as_ptr() as *const _);
286                        let b = _mm256_loadu_si256(shadow[i..].as_ptr() as *const _);
287                        let cmp = _mm256_cmpeq_epi8(a, b);
288
289                        // Check if any byte differs
290                        if _mm256_movemask_epi8(cmp) != -1 {
291                            differences.push((i as u64, (i + 32) as u64));
292                        }
293                    }
294                    i += 32;
295                }
296            }
297        }
298
299        // Fall back to 8-byte (u64) comparisons
300        while i + 8 <= current.len() {
301            let current_u64 = u64::from_ne_bytes([
302                current[i],
303                current[i + 1],
304                current[i + 2],
305                current[i + 3],
306                current[i + 4],
307                current[i + 5],
308                current[i + 6],
309                current[i + 7],
310            ]);
311            let shadow_u64 = u64::from_ne_bytes([
312                shadow[i],
313                shadow[i + 1],
314                shadow[i + 2],
315                shadow[i + 3],
316                shadow[i + 4],
317                shadow[i + 5],
318                shadow[i + 6],
319                shadow[i + 7],
320            ]);
321
322            if current_u64 != shadow_u64 {
323                differences.push((i as u64, (i + 8) as u64));
324            }
325            i += 8;
326        }
327
328        // Handle remaining bytes
329        while i < current.len() {
330            if current[i] != shadow[i] {
331                differences.push((i as u64, (i + 1) as u64));
332            }
333            i += 1;
334        }
335
336        Ok(differences)
337    }
338
339    /// Find which variables changed based on byte differences
340    pub async fn find_changed_variables_from_diff(
341        &self,
342        diff_ranges: &[(u64, u64)],
343    ) -> Result<Vec<String>> {
344        let vars = self.variables.read().await;
345        let mut changed = Vec::new();
346
347        for (diff_start, diff_end) in diff_ranges {
348            for (name, metadata) in vars.iter() {
349                let var_start = metadata.offset;
350                let var_end = metadata.offset + metadata.size;
351
352                // Check if this variable overlaps with the difference
353                if *diff_start < var_end && *diff_end > var_start {
354                    if !changed.contains(&name.clone()) {
355                        changed.push(name.clone());
356                    }
357                }
358            }
359        }
360
361        Ok(changed)
362    }
363
364    /// Sync shadow with current (after sending updates to server)
365    pub async fn sync_shadow(&self) -> Result<()> {
366        let current = self.current_bytes.read().await;
367        let mut shadow = self.shadow_bytes.write().await;
368        *shadow = current.clone();
369        self.changed_variables.write().await.clear();
370        Ok(())
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    #[tokio::test]
379    async fn test_register_and_read_variable() {
380        let vf = VirtualVariableFile::new(
381            "svc_1".to_string(),
382            "config".to_string(),
383            "tenant_1".to_string(),
384        );
385
386        let metadata = VariableMetadata::new("my_var".to_string(), 0, 8, 1);
387        vf.register_variable(metadata).await.unwrap();
388
389        // Write data
390        vf.write_variable("my_var", &[1, 2, 3, 4, 5, 6, 7, 8])
391            .await
392            .unwrap();
393
394        // Read it back
395        let data = vf.read_variable_slice("my_var").await.unwrap();
396        assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8]);
397    }
398
399    #[tokio::test]
400    async fn test_change_tracking() {
401        let vf = VirtualVariableFile::new(
402            "svc_1".to_string(),
403            "config".to_string(),
404            "tenant_1".to_string(),
405        );
406
407        let metadata = VariableMetadata::new("var1".to_string(), 0, 4, 1);
408        vf.register_variable(metadata).await.unwrap();
409
410        vf.write_variable("var1", &[1, 2, 3, 4]).await.unwrap();
411
412        let changed = vf.get_changed_variables().await;
413        assert_eq!(changed.len(), 1);
414        assert_eq!(changed[0], "var1");
415    }
416
417    #[tokio::test]
418    async fn test_simd_compare() {
419        let current = vec![1, 2, 3, 4, 5, 6, 7, 8];
420        let mut shadow = current.clone();
421        shadow[3] = 99; // Change one byte
422
423        let diffs = VirtualVariableFile::compare_ranges(&current, &shadow)
424            .await
425            .unwrap();
426
427        assert!(!diffs.is_empty());
428    }
429}