tx2_query/
sync.rs

1use crate::backend::{DatabaseBackend, QueryResult};
2use crate::error::{QueryError, Result};
3use crate::schema::SchemaGenerator;
4use serde_json::Value;
5use std::any::TypeId;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10/// Entity change tracking
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum EntityChange {
13    /// Entity was created
14    Created(u64),
15    /// Entity was modified
16    Modified(u64),
17    /// Entity was deleted
18    Deleted(u64),
19}
20
21/// Component change tracking
22#[derive(Debug, Clone)]
23pub struct ComponentChange {
24    pub entity_id: u64,
25    pub component_type: TypeId,
26    pub component_name: String,
27    pub data: Option<HashMap<String, Value>>,
28}
29
30/// Batch of changes to sync
31#[derive(Debug, Clone)]
32pub struct SyncBatch {
33    pub created: Vec<u64>,
34    pub modified: HashMap<TypeId, Vec<ComponentChange>>,
35    pub deleted: Vec<u64>,
36}
37
38impl SyncBatch {
39    pub fn new() -> Self {
40        Self {
41            created: Vec::new(),
42            modified: HashMap::new(),
43            deleted: Vec::new(),
44        }
45    }
46
47    pub fn is_empty(&self) -> bool {
48        self.created.is_empty() && self.modified.is_empty() && self.deleted.is_empty()
49    }
50
51    pub fn clear(&mut self) {
52        self.created.clear();
53        self.modified.clear();
54        self.deleted.clear();
55    }
56}
57
58impl Default for SyncBatch {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64/// Configuration for query sync
65#[derive(Debug, Clone)]
66pub struct SyncConfig {
67    /// Batch size for bulk operations
68    pub batch_size: usize,
69    /// Whether to sync in background
70    pub background_sync: bool,
71    /// Whether to use transactions for batches
72    pub use_transactions: bool,
73    /// Maximum number of retries on failure
74    pub max_retries: usize,
75}
76
77impl Default for SyncConfig {
78    fn default() -> Self {
79        Self {
80            batch_size: 1000,
81            background_sync: true,
82            use_transactions: false,  // Disabled by default due to connection pooling limitations
83            max_retries: 3,
84        }
85    }
86}
87
88/// QuerySync manages synchronization between ECS world and SQL database
89pub struct QuerySync<B: DatabaseBackend> {
90    backend: Arc<RwLock<B>>,
91    schema_generator: Arc<RwLock<SchemaGenerator>>,
92    config: SyncConfig,
93    pending_batch: Arc<RwLock<SyncBatch>>,
94    synced_entities: Arc<RwLock<HashSet<u64>>>,
95}
96
97impl<B: DatabaseBackend> QuerySync<B> {
98    /// Create a new QuerySync instance
99    pub fn new(backend: B, schema_generator: SchemaGenerator) -> Self {
100        Self {
101            backend: Arc::new(RwLock::new(backend)),
102            schema_generator: Arc::new(RwLock::new(schema_generator)),
103            config: SyncConfig::default(),
104            pending_batch: Arc::new(RwLock::new(SyncBatch::new())),
105            synced_entities: Arc::new(RwLock::new(HashSet::new())),
106        }
107    }
108
109    /// Create a new QuerySync instance with custom config
110    pub fn with_config(backend: B, schema_generator: SchemaGenerator, config: SyncConfig) -> Self {
111        Self {
112            backend: Arc::new(RwLock::new(backend)),
113            schema_generator: Arc::new(RwLock::new(schema_generator)),
114            config,
115            pending_batch: Arc::new(RwLock::new(SyncBatch::new())),
116            synced_entities: Arc::new(RwLock::new(HashSet::new())),
117        }
118    }
119
120    /// Initialize database schema
121    pub async fn initialize_schema(&self) -> Result<()> {
122        let schema_gen = self.schema_generator.read().await;
123        let ddl = schema_gen.generate_ddl();
124
125        let mut backend = self.backend.write().await;
126
127        // Execute each DDL statement
128        for statement in ddl.split(";") {
129            let statement = statement.trim();
130            if !statement.is_empty() {
131                backend.execute(statement).await?;
132            }
133        }
134
135        Ok(())
136    }
137
138    /// Track entity creation
139    pub async fn track_entity_created(&self, entity_id: u64) -> Result<()> {
140        let mut batch = self.pending_batch.write().await;
141        batch.created.push(entity_id);
142
143        if batch.created.len() >= self.config.batch_size {
144            drop(batch);
145            self.flush().await?;
146        }
147
148        Ok(())
149    }
150
151    /// Track component change
152    pub async fn track_component_change(
153        &self,
154        entity_id: u64,
155        component_type: TypeId,
156        component_name: String,
157        data: HashMap<String, Value>,
158    ) -> Result<()> {
159        let mut batch = self.pending_batch.write().await;
160
161        let changes = batch.modified.entry(component_type).or_insert_with(Vec::new);
162        changes.push(ComponentChange {
163            entity_id,
164            component_type,
165            component_name,
166            data: Some(data),
167        });
168
169        let total_changes: usize = batch.modified.values().map(|v| v.len()).sum();
170        if total_changes >= self.config.batch_size {
171            drop(batch);
172            self.flush().await?;
173        }
174
175        Ok(())
176    }
177
178    /// Track entity deletion
179    pub async fn track_entity_deleted(&self, entity_id: u64) -> Result<()> {
180        let mut batch = self.pending_batch.write().await;
181        batch.deleted.push(entity_id);
182
183        if batch.deleted.len() >= self.config.batch_size {
184            drop(batch);
185            self.flush().await?;
186        }
187
188        Ok(())
189    }
190
191    /// Flush pending changes to database
192    pub async fn flush(&self) -> Result<()> {
193        // Clone the batch and clear it immediately to avoid holding the lock
194        let batch_to_apply = {
195            let mut batch = self.pending_batch.write().await;
196
197            if batch.is_empty() {
198                return Ok(());
199            }
200
201            let batch_clone = batch.clone();
202            batch.clear();
203            batch_clone
204        };
205
206        let mut backend = self.backend.write().await;
207
208        if self.config.use_transactions {
209            backend.begin_transaction().await?;
210        }
211
212        let result = self.apply_batch(&mut backend, &batch_to_apply).await;
213
214        match result {
215            Ok(_) => {
216                if self.config.use_transactions {
217                    backend.commit().await?;
218                }
219                Ok(())
220            }
221            Err(e) => {
222                if self.config.use_transactions {
223                    backend.rollback().await?;
224                }
225                Err(e)
226            }
227        }
228    }
229
230    /// Apply a batch of changes to the database
231    async fn apply_batch(&self, backend: &mut B, batch: &SyncBatch) -> Result<()> {
232        let schema_gen = self.schema_generator.read().await;
233
234        // Handle deletions first
235        for entity_id in &batch.deleted {
236            for component_name in schema_gen.list_components() {
237                let delete_sql = format!("DELETE FROM {} WHERE entity_id = {}", component_name, entity_id);
238                backend.execute(&delete_sql).await?;
239            }
240
241            let mut synced = self.synced_entities.write().await;
242            synced.remove(entity_id);
243        }
244
245        // Handle component modifications (INSERT or UPDATE)
246        for (component_type, changes) in &batch.modified {
247            if changes.is_empty() {
248                continue;
249            }
250
251            let _schema = schema_gen
252                .get_schema(component_type)
253                .ok_or_else(|| QueryError::ComponentNotRegistered(format!("{:?}", component_type)))?;
254
255            for change in changes {
256                let data = change
257                    .data
258                    .as_ref()
259                    .ok_or_else(|| QueryError::Sync("Missing component data".to_string()))?;
260
261                // Build column names and values
262                let mut columns = vec!["entity_id".to_string()];
263                let mut values = vec![change.entity_id.to_string()];
264
265                for (key, value) in data {
266                    columns.push(key.clone());
267                    values.push(self.format_value(value));
268                }
269
270                // Check if entity already exists in this table
271                let synced = self.synced_entities.read().await;
272                let exists = synced.contains(&change.entity_id);
273                drop(synced);
274
275                if exists {
276                    // UPDATE
277                    let mut set_clauses = Vec::new();
278                    for (key, value) in data {
279                        set_clauses.push(format!("{} = {}", key, self.format_value(value)));
280                    }
281
282                    let update_sql = format!(
283                        "UPDATE {} SET {} WHERE entity_id = {}",
284                        change.component_name,
285                        set_clauses.join(", "),
286                        change.entity_id
287                    );
288
289                    backend.execute(&update_sql).await?;
290                } else {
291                    // INSERT
292                    let insert_sql = format!(
293                        "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT (entity_id) DO UPDATE SET {}",
294                        change.component_name,
295                        columns.join(", "),
296                        values.join(", "),
297                        data.iter()
298                            .map(|(k, v)| format!("{} = {}", k, self.format_value(v)))
299                            .collect::<Vec<_>>()
300                            .join(", ")
301                    );
302
303                    backend.execute(&insert_sql).await?;
304
305                    let mut synced = self.synced_entities.write().await;
306                    synced.insert(change.entity_id);
307                }
308            }
309        }
310
311        // Handle creations (mark entities as synced)
312        for entity_id in &batch.created {
313            let mut synced = self.synced_entities.write().await;
314            synced.insert(*entity_id);
315        }
316
317        Ok(())
318    }
319
320    /// Format a JSON value for SQL insertion
321    fn format_value(&self, value: &Value) -> String {
322        match value {
323            Value::Null => "NULL".to_string(),
324            Value::Bool(b) => b.to_string().to_uppercase(),
325            Value::Number(n) => n.to_string(),
326            Value::String(s) => format!("'{}'", s.replace('\'', "''")),
327            Value::Array(_) | Value::Object(_) => {
328                format!("'{}'", serde_json::to_string(value).unwrap_or_default().replace('\'', "''"))
329            }
330        }
331    }
332
333    /// Query the database
334    pub async fn query(&self, sql: &str) -> Result<QueryResult> {
335        let mut backend = self.backend.write().await;
336        backend.query(sql).await
337    }
338
339    /// Execute a SQL statement
340    pub async fn execute(&self, sql: &str) -> Result<u64> {
341        let mut backend = self.backend.write().await;
342        backend.execute(sql).await
343    }
344
345    /// Get the current sync configuration
346    pub fn config(&self) -> &SyncConfig {
347        &self.config
348    }
349
350    /// Check if backend is connected
351    pub async fn is_connected(&self) -> bool {
352        let backend = self.backend.read().await;
353        backend.is_connected()
354    }
355
356    /// Get the number of synced entities
357    pub async fn synced_entity_count(&self) -> usize {
358        let synced = self.synced_entities.read().await;
359        synced.len()
360    }
361
362    /// Get the number of pending changes
363    pub async fn pending_change_count(&self) -> usize {
364        let batch = self.pending_batch.read().await;
365        batch.created.len()
366            + batch.modified.values().map(|v| v.len()).sum::<usize>()
367            + batch.deleted.len()
368    }
369
370    /// Clear all synced data from database
371    pub async fn clear_all(&self) -> Result<()> {
372        let schema_gen = self.schema_generator.read().await;
373        let mut backend = self.backend.write().await;
374
375        for component_name in schema_gen.list_components() {
376            let truncate_sql = format!("DELETE FROM {}", component_name);
377            backend.execute(&truncate_sql).await?;
378        }
379
380        let mut synced = self.synced_entities.write().await;
381        synced.clear();
382
383        Ok(())
384    }
385
386    /// Perform a full sync of all components for given entities
387    pub async fn full_sync(
388        &self,
389        entities: Vec<(u64, TypeId, String, HashMap<String, Value>)>,
390    ) -> Result<()> {
391        let mut backend = self.backend.write().await;
392
393        if self.config.use_transactions {
394            backend.begin_transaction().await?;
395        }
396
397        let result = async {
398            for (entity_id, _component_type, component_name, data) in entities {
399                let mut columns = vec!["entity_id".to_string()];
400                let mut values = vec![entity_id.to_string()];
401
402                for (key, value) in &data {
403                    columns.push(key.clone());
404                    values.push(self.format_value(value));
405                }
406
407                let insert_sql = format!(
408                    "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT (entity_id) DO UPDATE SET {}",
409                    component_name,
410                    columns.join(", "),
411                    values.join(", "),
412                    data.iter()
413                        .map(|(k, v)| format!("{} = {}", k, self.format_value(v)))
414                        .collect::<Vec<_>>()
415                        .join(", ")
416                );
417
418                backend.execute(&insert_sql).await?;
419
420                let mut synced = self.synced_entities.write().await;
421                synced.insert(entity_id);
422            }
423
424            Ok::<(), QueryError>(())
425        }
426        .await;
427
428        match result {
429            Ok(_) => {
430                if self.config.use_transactions {
431                    backend.commit().await?;
432                }
433                Ok(())
434            }
435            Err(e) => {
436                if self.config.use_transactions {
437                    backend.rollback().await?;
438                }
439                Err(e)
440            }
441        }
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use crate::schema::SqlType;
449    use async_trait::async_trait;
450
451    struct MockBackend {
452        executed: Vec<String>,
453        in_transaction: bool,
454    }
455
456    impl MockBackend {
457        fn new() -> Self {
458            Self {
459                executed: Vec::new(),
460                in_transaction: false,
461            }
462        }
463    }
464
465    #[async_trait]
466    impl DatabaseBackend for MockBackend {
467        async fn connect(_url: &str) -> Result<Self> {
468            Ok(Self::new())
469        }
470
471        async fn execute(&mut self, sql: &str) -> Result<u64> {
472            self.executed.push(sql.to_string());
473            Ok(1)
474        }
475
476        async fn query(&mut self, _sql: &str) -> Result<QueryResult> {
477            Ok(vec![])
478        }
479
480        async fn begin_transaction(&mut self) -> Result<()> {
481            self.in_transaction = true;
482            Ok(())
483        }
484
485        async fn commit(&mut self) -> Result<()> {
486            self.in_transaction = false;
487            Ok(())
488        }
489
490        async fn rollback(&mut self) -> Result<()> {
491            self.in_transaction = false;
492            Ok(())
493        }
494
495        fn is_connected(&self) -> bool {
496            true
497        }
498
499        async fn close(self) -> Result<()> {
500            Ok(())
501        }
502    }
503
504    #[derive(Debug)]
505    struct TestComponent;
506
507    #[tokio::test]
508    async fn test_track_component_change() {
509        let backend = MockBackend::new();
510        let mut schema_gen = SchemaGenerator::new();
511        schema_gen
512            .register::<TestComponent>("Player", vec![("name", SqlType::Text, false)])
513            .unwrap();
514
515        let sync = QuerySync::with_config(
516            backend,
517            schema_gen,
518            SyncConfig {
519                batch_size: 10,
520                use_transactions: false,
521                ..Default::default()
522            },
523        );
524
525        let mut data = HashMap::new();
526        data.insert("name".to_string(), Value::String("Alice".to_string()));
527
528        sync.track_component_change(1, TypeId::of::<TestComponent>(), "Player".to_string(), data)
529            .await
530            .unwrap();
531
532        assert_eq!(sync.pending_change_count().await, 1);
533    }
534
535    #[tokio::test]
536    async fn test_flush() {
537        let backend = MockBackend::new();
538        let mut schema_gen = SchemaGenerator::new();
539        schema_gen
540            .register::<TestComponent>("Player", vec![("name", SqlType::Text, false)])
541            .unwrap();
542
543        let sync = QuerySync::with_config(
544            backend,
545            schema_gen,
546            SyncConfig {
547                use_transactions: false,
548                ..Default::default()
549            },
550        );
551
552        let mut data = HashMap::new();
553        data.insert("name".to_string(), Value::String("Alice".to_string()));
554
555        sync.track_component_change(1, TypeId::of::<TestComponent>(), "Player".to_string(), data)
556            .await
557            .unwrap();
558
559        sync.flush().await.unwrap();
560
561        assert_eq!(sync.pending_change_count().await, 0);
562        assert_eq!(sync.synced_entity_count().await, 1);
563    }
564
565    #[tokio::test]
566    async fn test_entity_deletion() {
567        let backend = MockBackend::new();
568        let mut schema_gen = SchemaGenerator::new();
569        schema_gen
570            .register::<TestComponent>("Player", vec![("name", SqlType::Text, false)])
571            .unwrap();
572
573        let sync = QuerySync::new(backend, schema_gen);
574
575        sync.track_entity_deleted(1).await.unwrap();
576        sync.flush().await.unwrap();
577
578        assert_eq!(sync.synced_entity_count().await, 0);
579    }
580
581    #[tokio::test]
582    async fn test_format_value() {
583        let backend = MockBackend::new();
584        let schema_gen = SchemaGenerator::new();
585        let sync = QuerySync::new(backend, schema_gen);
586
587        assert_eq!(sync.format_value(&Value::Null), "NULL");
588        assert_eq!(sync.format_value(&Value::Bool(true)), "TRUE");
589        assert_eq!(sync.format_value(&Value::Bool(false)), "FALSE");
590        assert_eq!(sync.format_value(&Value::Number(42.into())), "42");
591        assert_eq!(sync.format_value(&Value::String("test".to_string())), "'test'");
592        assert_eq!(
593            sync.format_value(&Value::String("test's".to_string())),
594            "'test''s'"
595        );
596    }
597}