1use crate::backend::{DatabaseBackend, QueryResult};
2use crate::error::{QueryError, Result};
3use crate::schema::SchemaGenerator;
4use serde_json::Value;
5use std::any::TypeId;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum EntityChange {
13 Created(u64),
15 Modified(u64),
17 Deleted(u64),
19}
20
21#[derive(Debug, Clone)]
23pub struct ComponentChange {
24 pub entity_id: u64,
25 pub component_type: TypeId,
26 pub component_name: String,
27 pub data: Option<HashMap<String, Value>>,
28}
29
30#[derive(Debug, Clone)]
32pub struct SyncBatch {
33 pub created: Vec<u64>,
34 pub modified: HashMap<TypeId, Vec<ComponentChange>>,
35 pub deleted: Vec<u64>,
36}
37
38impl SyncBatch {
39 pub fn new() -> Self {
40 Self {
41 created: Vec::new(),
42 modified: HashMap::new(),
43 deleted: Vec::new(),
44 }
45 }
46
47 pub fn is_empty(&self) -> bool {
48 self.created.is_empty() && self.modified.is_empty() && self.deleted.is_empty()
49 }
50
51 pub fn clear(&mut self) {
52 self.created.clear();
53 self.modified.clear();
54 self.deleted.clear();
55 }
56}
57
58impl Default for SyncBatch {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct SyncConfig {
67 pub batch_size: usize,
69 pub background_sync: bool,
71 pub use_transactions: bool,
73 pub max_retries: usize,
75}
76
77impl Default for SyncConfig {
78 fn default() -> Self {
79 Self {
80 batch_size: 1000,
81 background_sync: true,
82 use_transactions: false, max_retries: 3,
84 }
85 }
86}
87
88pub struct QuerySync<B: DatabaseBackend> {
90 backend: Arc<RwLock<B>>,
91 schema_generator: Arc<RwLock<SchemaGenerator>>,
92 config: SyncConfig,
93 pending_batch: Arc<RwLock<SyncBatch>>,
94 synced_entities: Arc<RwLock<HashSet<u64>>>,
95}
96
97impl<B: DatabaseBackend> QuerySync<B> {
98 pub fn new(backend: B, schema_generator: SchemaGenerator) -> Self {
100 Self {
101 backend: Arc::new(RwLock::new(backend)),
102 schema_generator: Arc::new(RwLock::new(schema_generator)),
103 config: SyncConfig::default(),
104 pending_batch: Arc::new(RwLock::new(SyncBatch::new())),
105 synced_entities: Arc::new(RwLock::new(HashSet::new())),
106 }
107 }
108
109 pub fn with_config(backend: B, schema_generator: SchemaGenerator, config: SyncConfig) -> Self {
111 Self {
112 backend: Arc::new(RwLock::new(backend)),
113 schema_generator: Arc::new(RwLock::new(schema_generator)),
114 config,
115 pending_batch: Arc::new(RwLock::new(SyncBatch::new())),
116 synced_entities: Arc::new(RwLock::new(HashSet::new())),
117 }
118 }
119
120 pub async fn initialize_schema(&self) -> Result<()> {
122 let schema_gen = self.schema_generator.read().await;
123 let ddl = schema_gen.generate_ddl();
124
125 let mut backend = self.backend.write().await;
126
127 for statement in ddl.split(";") {
129 let statement = statement.trim();
130 if !statement.is_empty() {
131 backend.execute(statement).await?;
132 }
133 }
134
135 Ok(())
136 }
137
138 pub async fn track_entity_created(&self, entity_id: u64) -> Result<()> {
140 let mut batch = self.pending_batch.write().await;
141 batch.created.push(entity_id);
142
143 if batch.created.len() >= self.config.batch_size {
144 drop(batch);
145 self.flush().await?;
146 }
147
148 Ok(())
149 }
150
151 pub async fn track_component_change(
153 &self,
154 entity_id: u64,
155 component_type: TypeId,
156 component_name: String,
157 data: HashMap<String, Value>,
158 ) -> Result<()> {
159 let mut batch = self.pending_batch.write().await;
160
161 let changes = batch.modified.entry(component_type).or_insert_with(Vec::new);
162 changes.push(ComponentChange {
163 entity_id,
164 component_type,
165 component_name,
166 data: Some(data),
167 });
168
169 let total_changes: usize = batch.modified.values().map(|v| v.len()).sum();
170 if total_changes >= self.config.batch_size {
171 drop(batch);
172 self.flush().await?;
173 }
174
175 Ok(())
176 }
177
178 pub async fn track_entity_deleted(&self, entity_id: u64) -> Result<()> {
180 let mut batch = self.pending_batch.write().await;
181 batch.deleted.push(entity_id);
182
183 if batch.deleted.len() >= self.config.batch_size {
184 drop(batch);
185 self.flush().await?;
186 }
187
188 Ok(())
189 }
190
191 pub async fn flush(&self) -> Result<()> {
193 let batch_to_apply = {
195 let mut batch = self.pending_batch.write().await;
196
197 if batch.is_empty() {
198 return Ok(());
199 }
200
201 let batch_clone = batch.clone();
202 batch.clear();
203 batch_clone
204 };
205
206 let mut backend = self.backend.write().await;
207
208 if self.config.use_transactions {
209 backend.begin_transaction().await?;
210 }
211
212 let result = self.apply_batch(&mut backend, &batch_to_apply).await;
213
214 match result {
215 Ok(_) => {
216 if self.config.use_transactions {
217 backend.commit().await?;
218 }
219 Ok(())
220 }
221 Err(e) => {
222 if self.config.use_transactions {
223 backend.rollback().await?;
224 }
225 Err(e)
226 }
227 }
228 }
229
230 async fn apply_batch(&self, backend: &mut B, batch: &SyncBatch) -> Result<()> {
232 let schema_gen = self.schema_generator.read().await;
233
234 for entity_id in &batch.deleted {
236 for component_name in schema_gen.list_components() {
237 let delete_sql = format!("DELETE FROM {} WHERE entity_id = {}", component_name, entity_id);
238 backend.execute(&delete_sql).await?;
239 }
240
241 let mut synced = self.synced_entities.write().await;
242 synced.remove(entity_id);
243 }
244
245 for (component_type, changes) in &batch.modified {
247 if changes.is_empty() {
248 continue;
249 }
250
251 let _schema = schema_gen
252 .get_schema(component_type)
253 .ok_or_else(|| QueryError::ComponentNotRegistered(format!("{:?}", component_type)))?;
254
255 for change in changes {
256 let data = change
257 .data
258 .as_ref()
259 .ok_or_else(|| QueryError::Sync("Missing component data".to_string()))?;
260
261 let mut columns = vec!["entity_id".to_string()];
263 let mut values = vec![change.entity_id.to_string()];
264
265 for (key, value) in data {
266 columns.push(key.clone());
267 values.push(self.format_value(value));
268 }
269
270 let synced = self.synced_entities.read().await;
272 let exists = synced.contains(&change.entity_id);
273 drop(synced);
274
275 if exists {
276 let mut set_clauses = Vec::new();
278 for (key, value) in data {
279 set_clauses.push(format!("{} = {}", key, self.format_value(value)));
280 }
281
282 let update_sql = format!(
283 "UPDATE {} SET {} WHERE entity_id = {}",
284 change.component_name,
285 set_clauses.join(", "),
286 change.entity_id
287 );
288
289 backend.execute(&update_sql).await?;
290 } else {
291 let insert_sql = format!(
293 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT (entity_id) DO UPDATE SET {}",
294 change.component_name,
295 columns.join(", "),
296 values.join(", "),
297 data.iter()
298 .map(|(k, v)| format!("{} = {}", k, self.format_value(v)))
299 .collect::<Vec<_>>()
300 .join(", ")
301 );
302
303 backend.execute(&insert_sql).await?;
304
305 let mut synced = self.synced_entities.write().await;
306 synced.insert(change.entity_id);
307 }
308 }
309 }
310
311 for entity_id in &batch.created {
313 let mut synced = self.synced_entities.write().await;
314 synced.insert(*entity_id);
315 }
316
317 Ok(())
318 }
319
320 fn format_value(&self, value: &Value) -> String {
322 match value {
323 Value::Null => "NULL".to_string(),
324 Value::Bool(b) => b.to_string().to_uppercase(),
325 Value::Number(n) => n.to_string(),
326 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
327 Value::Array(_) | Value::Object(_) => {
328 format!("'{}'", serde_json::to_string(value).unwrap_or_default().replace('\'', "''"))
329 }
330 }
331 }
332
333 pub async fn query(&self, sql: &str) -> Result<QueryResult> {
335 let mut backend = self.backend.write().await;
336 backend.query(sql).await
337 }
338
339 pub async fn execute(&self, sql: &str) -> Result<u64> {
341 let mut backend = self.backend.write().await;
342 backend.execute(sql).await
343 }
344
345 pub fn config(&self) -> &SyncConfig {
347 &self.config
348 }
349
350 pub async fn is_connected(&self) -> bool {
352 let backend = self.backend.read().await;
353 backend.is_connected()
354 }
355
356 pub async fn synced_entity_count(&self) -> usize {
358 let synced = self.synced_entities.read().await;
359 synced.len()
360 }
361
362 pub async fn pending_change_count(&self) -> usize {
364 let batch = self.pending_batch.read().await;
365 batch.created.len()
366 + batch.modified.values().map(|v| v.len()).sum::<usize>()
367 + batch.deleted.len()
368 }
369
370 pub async fn clear_all(&self) -> Result<()> {
372 let schema_gen = self.schema_generator.read().await;
373 let mut backend = self.backend.write().await;
374
375 for component_name in schema_gen.list_components() {
376 let truncate_sql = format!("DELETE FROM {}", component_name);
377 backend.execute(&truncate_sql).await?;
378 }
379
380 let mut synced = self.synced_entities.write().await;
381 synced.clear();
382
383 Ok(())
384 }
385
386 pub async fn full_sync(
388 &self,
389 entities: Vec<(u64, TypeId, String, HashMap<String, Value>)>,
390 ) -> Result<()> {
391 let mut backend = self.backend.write().await;
392
393 if self.config.use_transactions {
394 backend.begin_transaction().await?;
395 }
396
397 let result = async {
398 for (entity_id, _component_type, component_name, data) in entities {
399 let mut columns = vec!["entity_id".to_string()];
400 let mut values = vec![entity_id.to_string()];
401
402 for (key, value) in &data {
403 columns.push(key.clone());
404 values.push(self.format_value(value));
405 }
406
407 let insert_sql = format!(
408 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT (entity_id) DO UPDATE SET {}",
409 component_name,
410 columns.join(", "),
411 values.join(", "),
412 data.iter()
413 .map(|(k, v)| format!("{} = {}", k, self.format_value(v)))
414 .collect::<Vec<_>>()
415 .join(", ")
416 );
417
418 backend.execute(&insert_sql).await?;
419
420 let mut synced = self.synced_entities.write().await;
421 synced.insert(entity_id);
422 }
423
424 Ok::<(), QueryError>(())
425 }
426 .await;
427
428 match result {
429 Ok(_) => {
430 if self.config.use_transactions {
431 backend.commit().await?;
432 }
433 Ok(())
434 }
435 Err(e) => {
436 if self.config.use_transactions {
437 backend.rollback().await?;
438 }
439 Err(e)
440 }
441 }
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use crate::schema::SqlType;
449 use async_trait::async_trait;
450
451 struct MockBackend {
452 executed: Vec<String>,
453 in_transaction: bool,
454 }
455
456 impl MockBackend {
457 fn new() -> Self {
458 Self {
459 executed: Vec::new(),
460 in_transaction: false,
461 }
462 }
463 }
464
465 #[async_trait]
466 impl DatabaseBackend for MockBackend {
467 async fn connect(_url: &str) -> Result<Self> {
468 Ok(Self::new())
469 }
470
471 async fn execute(&mut self, sql: &str) -> Result<u64> {
472 self.executed.push(sql.to_string());
473 Ok(1)
474 }
475
476 async fn query(&mut self, _sql: &str) -> Result<QueryResult> {
477 Ok(vec![])
478 }
479
480 async fn begin_transaction(&mut self) -> Result<()> {
481 self.in_transaction = true;
482 Ok(())
483 }
484
485 async fn commit(&mut self) -> Result<()> {
486 self.in_transaction = false;
487 Ok(())
488 }
489
490 async fn rollback(&mut self) -> Result<()> {
491 self.in_transaction = false;
492 Ok(())
493 }
494
495 fn is_connected(&self) -> bool {
496 true
497 }
498
499 async fn close(self) -> Result<()> {
500 Ok(())
501 }
502 }
503
504 #[derive(Debug)]
505 struct TestComponent;
506
507 #[tokio::test]
508 async fn test_track_component_change() {
509 let backend = MockBackend::new();
510 let mut schema_gen = SchemaGenerator::new();
511 schema_gen
512 .register::<TestComponent>("Player", vec![("name", SqlType::Text, false)])
513 .unwrap();
514
515 let sync = QuerySync::with_config(
516 backend,
517 schema_gen,
518 SyncConfig {
519 batch_size: 10,
520 use_transactions: false,
521 ..Default::default()
522 },
523 );
524
525 let mut data = HashMap::new();
526 data.insert("name".to_string(), Value::String("Alice".to_string()));
527
528 sync.track_component_change(1, TypeId::of::<TestComponent>(), "Player".to_string(), data)
529 .await
530 .unwrap();
531
532 assert_eq!(sync.pending_change_count().await, 1);
533 }
534
535 #[tokio::test]
536 async fn test_flush() {
537 let backend = MockBackend::new();
538 let mut schema_gen = SchemaGenerator::new();
539 schema_gen
540 .register::<TestComponent>("Player", vec![("name", SqlType::Text, false)])
541 .unwrap();
542
543 let sync = QuerySync::with_config(
544 backend,
545 schema_gen,
546 SyncConfig {
547 use_transactions: false,
548 ..Default::default()
549 },
550 );
551
552 let mut data = HashMap::new();
553 data.insert("name".to_string(), Value::String("Alice".to_string()));
554
555 sync.track_component_change(1, TypeId::of::<TestComponent>(), "Player".to_string(), data)
556 .await
557 .unwrap();
558
559 sync.flush().await.unwrap();
560
561 assert_eq!(sync.pending_change_count().await, 0);
562 assert_eq!(sync.synced_entity_count().await, 1);
563 }
564
565 #[tokio::test]
566 async fn test_entity_deletion() {
567 let backend = MockBackend::new();
568 let mut schema_gen = SchemaGenerator::new();
569 schema_gen
570 .register::<TestComponent>("Player", vec![("name", SqlType::Text, false)])
571 .unwrap();
572
573 let sync = QuerySync::new(backend, schema_gen);
574
575 sync.track_entity_deleted(1).await.unwrap();
576 sync.flush().await.unwrap();
577
578 assert_eq!(sync.synced_entity_count().await, 0);
579 }
580
581 #[tokio::test]
582 async fn test_format_value() {
583 let backend = MockBackend::new();
584 let schema_gen = SchemaGenerator::new();
585 let sync = QuerySync::new(backend, schema_gen);
586
587 assert_eq!(sync.format_value(&Value::Null), "NULL");
588 assert_eq!(sync.format_value(&Value::Bool(true)), "TRUE");
589 assert_eq!(sync.format_value(&Value::Bool(false)), "FALSE");
590 assert_eq!(sync.format_value(&Value::Number(42.into())), "42");
591 assert_eq!(sync.format_value(&Value::String("test".to_string())), "'test'");
592 assert_eq!(
593 sync.format_value(&Value::String("test's".to_string())),
594 "'test''s'"
595 );
596 }
597}