Skip to main content

teaql_runtime/
lib.rs

1mod checker;
2mod context;
3mod error;
4mod event;
5mod graph;
6mod id;
7mod language;
8mod memory;
9mod registry;
10mod repository;
11
12pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
13pub use error::{ContextError, RepositoryError, RuntimeError};
14pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
15pub use graph::{
16    GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
17    GraphOperation, sorted_update_fields,
18};
19pub(crate) use id::local_id_generator;
20pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
21pub use language::{
22    BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
23};
24pub use memory::{MemoryRepository, MemoryRepositoryError};
25pub use registry::{
26    InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
27    MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
28    RuntimeModule,
29};
30pub use repository::{
31    AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
32    QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
33};
34
35#[cfg(feature = "sqlx")]
36pub mod sqlx_support;
37
38#[cfg(test)]
39mod tests {
40    use std::collections::{BTreeMap, VecDeque};
41    use std::sync::{Arc, Mutex};
42
43    use super::{
44        AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
45        Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
46        GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
47        InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
48        InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
49        QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
50        RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
51    };
52    use teaql_core::{
53        Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
54        EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
55        RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
56    };
57    use teaql_dialect_pg::PostgresDialect;
58    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
59    use teaql_sql::CompiledQuery;
60
61    fn entity() -> EntityDescriptor {
62        EntityDescriptor::new("Order")
63            .table_name("orders")
64            .property(
65                PropertyDescriptor::new("id", DataType::U64)
66                    .column_name("id")
67                    .id()
68                    .not_null(),
69            )
70            .property(
71                PropertyDescriptor::new("version", DataType::I64)
72                    .column_name("version")
73                    .version()
74                    .not_null(),
75            )
76            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
77            .relation(
78                teaql_core::RelationDescriptor::new("lines", "OrderLine")
79                    .local_key("id")
80                    .foreign_key("order_id")
81                    .many(),
82            )
83    }
84
85    fn line_entity() -> EntityDescriptor {
86        EntityDescriptor::new("OrderLine")
87            .table_name("orderline")
88            .property(
89                PropertyDescriptor::new("id", DataType::U64)
90                    .column_name("id")
91                    .id()
92                    .not_null(),
93            )
94            .property(
95                PropertyDescriptor::new("version", DataType::I64)
96                    .column_name("version")
97                    .version(),
98            )
99            .property(
100                PropertyDescriptor::new("order_id", DataType::U64)
101                    .column_name("order_id")
102                    .not_null(),
103            )
104            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
105            .property(
106                PropertyDescriptor::new("product_id", DataType::U64)
107                    .column_name("product_id")
108                    .not_null(),
109            )
110            .relation(
111                teaql_core::RelationDescriptor::new("product", "Product")
112                    .local_key("product_id")
113                    .foreign_key("id"),
114            )
115    }
116
117    fn product_entity() -> EntityDescriptor {
118        EntityDescriptor::new("Product")
119            .table_name("product")
120            .property(
121                PropertyDescriptor::new("id", DataType::U64)
122                    .column_name("id")
123                    .id()
124                    .not_null(),
125            )
126            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
127    }
128
129    #[derive(Debug, Default)]
130    struct StubExecutor {
131        affected: u64,
132        rows: Vec<Record>,
133    }
134
135    #[derive(Debug, Default)]
136    struct QueueExecutor {
137        affected: u64,
138        rows: Mutex<VecDeque<Vec<Record>>>,
139        queries: Mutex<Vec<String>>,
140    }
141
142    struct OrderBehavior;
143
144    #[allow(dead_code)]
145    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
146    #[teaql(entity = "CatalogProduct", table = "catalog_product")]
147    struct CatalogProductRow {
148        #[teaql(id)]
149        id: u64,
150        name: String,
151    }
152
153    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
154    #[teaql(entity = "OrderAggregate", table = "orders")]
155    struct OrderAggregateDynamic {
156        #[teaql(id)]
157        id: u64,
158        #[teaql(dynamic)]
159        dynamic: BTreeMap<String, Value>,
160    }
161
162    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
163    #[teaql(entity = "Product", table = "product")]
164    struct ProductEntityRow {
165        #[teaql(id)]
166        id: u64,
167        name: String,
168    }
169
170    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
171    #[teaql(entity = "OrderLine", table = "orderline")]
172    struct OrderLineEntityRow {
173        #[teaql(id)]
174        id: u64,
175        #[teaql(column = "order_id")]
176        order_id: u64,
177        name: String,
178        #[teaql(column = "product_id")]
179        product_id: u64,
180        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
181        product: Option<ProductEntityRow>,
182    }
183
184    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
185    #[teaql(entity = "Order", table = "orders")]
186    struct OrderAggregateRow {
187        #[teaql(id)]
188        id: u64,
189        #[teaql(version)]
190        version: i64,
191        name: String,
192        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
193        lines: teaql_core::SmartList<OrderLineEntityRow>,
194    }
195
196    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
197    #[teaql(entity = "Order", table = "orders")]
198    struct Order {
199        #[teaql(id)]
200        id: u64,
201        #[teaql(version)]
202        version: i64,
203        name: String,
204    }
205
206    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
207    #[teaql(entity = "Product", table = "product")]
208    struct TypedGraphProduct {
209        #[teaql(id)]
210        id: Option<u64>,
211        name: String,
212    }
213
214    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
215    #[teaql(entity = "OrderLine", table = "orderline")]
216    struct TypedGraphLine {
217        #[teaql(id)]
218        id: Option<u64>,
219        #[teaql(column = "order_id")]
220        order_id: Option<u64>,
221        name: String,
222        #[teaql(column = "product_id")]
223        product_id: Option<u64>,
224        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
225        product: Option<TypedGraphProduct>,
226    }
227
228    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
229    #[teaql(entity = "Order", table = "orders")]
230    struct TypedGraphOrder {
231        #[teaql(id)]
232        id: Option<u64>,
233        #[teaql(version)]
234        version: i64,
235        name: String,
236        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
237        lines: teaql_core::SmartList<TypedGraphLine>,
238    }
239
240    #[derive(Debug, PartialEq, Eq)]
241    struct OrderEntity {
242        id: u64,
243        version: i64,
244        name: String,
245    }
246
247    impl teaql_core::TeaqlEntity for OrderEntity {
248        fn entity_descriptor() -> EntityDescriptor {
249            entity()
250        }
251    }
252
253    impl Entity for OrderEntity {
254        fn from_record(record: Record) -> Result<Self, EntityError> {
255            let id = match record.get("id") {
256                Some(Value::U64(v)) => *v,
257                Some(Value::I64(v)) if *v >= 0 => *v as u64,
258                other => {
259                    return Err(EntityError::new(
260                        "Order",
261                        format!("invalid id field: {other:?}"),
262                    ));
263                }
264            };
265            let version = match record.get("version") {
266                Some(Value::I64(v)) => *v,
267                other => {
268                    return Err(EntityError::new(
269                        "Order",
270                        format!("invalid version field: {other:?}"),
271                    ));
272                }
273            };
274            let name = match record.get("name") {
275                Some(Value::Text(v)) => v.clone(),
276                other => {
277                    return Err(EntityError::new(
278                        "Order",
279                        format!("invalid name field: {other:?}"),
280                    ));
281                }
282            };
283            Ok(Self { id, version, name })
284        }
285
286        fn into_record(self) -> Record {
287            Record::from([
288                (String::from("id"), Value::U64(self.id)),
289                (String::from("version"), Value::I64(self.version)),
290                (String::from("name"), Value::Text(self.name)),
291            ])
292        }
293    }
294
295    #[derive(Debug)]
296    struct StubError;
297
298    impl std::fmt::Display for StubError {
299        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300            write!(f, "stub error")
301        }
302    }
303
304    impl std::error::Error for StubError {}
305
306    impl QueryExecutor for StubExecutor {
307        type Error = StubError;
308
309        fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
310            Ok(self.rows.clone())
311        }
312
313        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
314            Ok(self.affected)
315        }
316
317        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
318            Ok(GraphTransactionBoundary::Started)
319        }
320    }
321
322    impl QueryExecutor for QueueExecutor {
323        type Error = StubError;
324
325        fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
326            self.queries.lock().unwrap().push(query.sql.clone());
327            Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
328        }
329
330        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
331            Ok(self.affected)
332        }
333    }
334
335    #[test]
336    fn user_context_records_configured_sql_logs() {
337        let mut ctx = UserContext::new()
338            .with_module(crate::module!(Order))
339            .with_sql_log_options(SqlLogOptions::select_only());
340        ctx.insert_resource(PostgresDialect);
341        ctx.insert_resource(StubExecutor {
342            affected: 1,
343            rows: Vec::new(),
344        });
345
346        {
347            let repo = ctx
348                .resolve_repository::<PostgresDialect, StubExecutor>("Order")
349                .unwrap();
350            repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
351                .unwrap();
352            repo.insert(&InsertCommand::new("Order").value("name", "created"))
353                .unwrap();
354        }
355
356        let logs = ctx.sql_logs();
357        assert_eq!(logs.len(), 1);
358        assert_eq!(logs[0].operation, SqlLogOperation::Select);
359        assert_eq!(
360            logs[0].debug_sql,
361            "SELECT * FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
362        );
363
364        ctx.set_sql_log_options(SqlLogOptions::mutation_only());
365        ctx.clear_sql_logs();
366        ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
367            .unwrap()
368            .update(
369                &UpdateCommand::new("Order", 1_u64)
370                    .value("name", "updated")
371                    .expected_version(1),
372            )
373            .unwrap();
374
375        let logs = ctx.sql_logs();
376        assert_eq!(logs.len(), 1);
377        assert_eq!(logs[0].operation, SqlLogOperation::Update);
378        assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
379        assert!(logs[0].debug_sql.contains("'updated'"));
380    }
381
382    impl RepositoryBehavior for OrderBehavior {
383        fn before_select(
384            &self,
385            _ctx: &UserContext,
386            query: &mut teaql_core::SelectQuery,
387        ) -> Result<(), RuntimeError> {
388            query.filter = Some(Expr::eq("version", 1_i64));
389            Ok(())
390        }
391
392        fn before_insert(
393            &self,
394            _ctx: &UserContext,
395            command: &mut InsertCommand,
396        ) -> Result<(), RuntimeError> {
397            command
398                .values
399                .entry("version".to_owned())
400                .or_insert(Value::I64(1));
401            Ok(())
402        }
403
404        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
405            vec!["lines".to_owned()]
406        }
407    }
408
409    struct ContextAwareOrderBehavior;
410    struct OrderChecker;
411    #[derive(Clone)]
412    struct RecordingEventSink {
413        events: Arc<Mutex<Vec<EntityEvent>>>,
414    }
415
416    impl RepositoryBehavior for ContextAwareOrderBehavior {
417        fn before_insert(
418            &self,
419            ctx: &UserContext,
420            command: &mut InsertCommand,
421        ) -> Result<(), RuntimeError> {
422            let tenant = ctx
423                .get_named_resource::<String>("tenant")
424                .cloned()
425                .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
426            let version = *ctx
427                .get_named_resource::<i64>("initial_version")
428                .ok_or_else(|| {
429                    RuntimeError::Behavior("missing initial_version resource".to_owned())
430                })?;
431            let trace_id = match ctx.local("trace_id") {
432                Some(Value::Text(value)) => value.clone(),
433                other => {
434                    return Err(RuntimeError::Behavior(format!(
435                        "missing trace_id local, got {other:?}"
436                    )));
437                }
438            };
439
440            command
441                .values
442                .entry("name".to_owned())
443                .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
444            command
445                .values
446                .entry("version".to_owned())
447                .or_insert(Value::I64(version));
448            Ok(())
449        }
450    }
451
452    impl Checker for OrderChecker {
453        fn entity(&self) -> &str {
454            "Order"
455        }
456
457        fn check_and_fix(
458            &self,
459            _ctx: &UserContext,
460            record: &mut Record,
461            location: &ObjectLocation,
462            results: &mut CheckResults,
463        ) {
464            let status = CheckObjectStatus::from_record(record);
465            if status.is_create() {
466                self.required(record, "name", location, results);
467                record.entry("version".to_owned()).or_insert(Value::I64(1));
468            }
469            if status.is_update()
470                && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
471            {
472                record.insert(
473                    "name".to_owned(),
474                    Value::Text("graph-update-checked".to_owned()),
475                );
476            }
477            self.min_string_length(record, "name", 3, location, results);
478        }
479    }
480
481    impl EntityEventSink for RecordingEventSink {
482        fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
483            self.events.lock().unwrap().push(event.clone());
484            Ok(())
485        }
486    }
487
488    struct FixedIdGenerator(u64);
489
490    impl InternalIdGenerator for FixedIdGenerator {
491        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
492            Ok(self.0)
493        }
494    }
495
496    struct SequentialIdGenerator {
497        next: Mutex<u64>,
498    }
499
500    impl SequentialIdGenerator {
501        fn new(next: u64) -> Self {
502            Self {
503                next: Mutex::new(next),
504            }
505        }
506    }
507
508    impl InternalIdGenerator for SequentialIdGenerator {
509        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
510            let mut next = self
511                .next
512                .lock()
513                .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
514            let id = *next;
515            *next += 1;
516            Ok(id)
517        }
518    }
519
520    #[test]
521    fn metadata_store_registers_entities() {
522        let store = InMemoryMetadataStore::new().with_entity(entity());
523        assert!(store.entity("Order").is_some());
524    }
525
526    #[test]
527    fn runtime_module_registers_descriptor_into_context() {
528        let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
529        assert!(ctx.entity("Order").is_some());
530        assert!(ctx.has_repository("Order"));
531    }
532
533    #[test]
534    fn runtime_module_registers_derived_entity_and_behavior() {
535        let ctx = UserContext::new().with_module(
536            RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
537        );
538        assert!(ctx.entity("CatalogProduct").is_some());
539        assert!(ctx.has_repository("CatalogProduct"));
540        assert!(ctx.repository_behavior("CatalogProduct").is_some());
541    }
542
543    #[test]
544    fn module_macro_registers_multiple_entities() {
545        let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
546        assert!(ctx.entity("CatalogProduct").is_some());
547        assert!(ctx.has_repository("CatalogProduct"));
548    }
549
550    #[test]
551    fn module_macro_registers_entity_behavior_pairs() {
552        let ctx =
553            UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
554        assert!(ctx.entity("CatalogProduct").is_some());
555        assert!(ctx.repository_behavior("CatalogProduct").is_some());
556    }
557
558    #[test]
559    fn repository_returns_optimistic_lock_conflict() {
560        let store = InMemoryMetadataStore::new().with_entity(entity());
561        let executor = StubExecutor {
562            affected: 0,
563            rows: Vec::new(),
564        };
565        let repo = Repository::new(&PostgresDialect, &store, &executor);
566
567        let err = repo
568            .update(
569                &UpdateCommand::new("Order", 1_u64)
570                    .expected_version(3)
571                    .value("name", "next"),
572            )
573            .unwrap_err();
574
575        match err {
576            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
577            other => panic!("unexpected error: {other}"),
578        }
579    }
580
581    #[test]
582    fn user_context_indexes_resources_and_locals() {
583        let mut ctx =
584            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
585        ctx.insert_resource::<u64>(42);
586        ctx.insert_named_resource("tenant", String::from("acme"));
587        ctx.put_local("trace_id", "req-1");
588
589        assert!(ctx.entity("Order").is_some());
590        assert_eq!(ctx.get_resource::<u64>(), Some(&42));
591        assert_eq!(
592            ctx.get_named_resource::<String>("tenant"),
593            Some(&String::from("acme"))
594        );
595        assert_eq!(
596            ctx.local("trace_id"),
597            Some(&Value::Text("req-1".to_owned()))
598        );
599    }
600
601    #[test]
602    fn user_context_builds_context_repository() {
603        let mut ctx =
604            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
605        ctx.insert_resource(PostgresDialect);
606        ctx.insert_resource(StubExecutor {
607            affected: 1,
608            rows: Vec::new(),
609        });
610
611        let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
612        let affected = repo
613            .update(
614                &UpdateCommand::new("Order", 1_u64)
615                    .expected_version(3)
616                    .value("name", "next"),
617            )
618            .unwrap();
619
620        assert_eq!(affected, 1);
621    }
622
623    #[test]
624    fn user_context_resolves_repository_by_entity_type() {
625        let mut ctx = UserContext::new()
626            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
627            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
628        ctx.insert_resource(PostgresDialect);
629        ctx.insert_resource(StubExecutor {
630            affected: 1,
631            rows: Vec::new(),
632        });
633
634        let repo = ctx
635            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
636            .unwrap();
637        assert_eq!(repo.entity(), "Order");
638        assert_eq!(repo.select().entity, "Order");
639
640        let affected = repo
641            .insert(
642                &repo
643                    .insert_command()
644                    .value("id", 1_u64)
645                    .value("version", 1_i64)
646                    .value("name", "n"),
647            )
648            .unwrap();
649        assert_eq!(affected, 1);
650    }
651
652    #[test]
653    fn resolved_repository_applies_behavior_hooks() {
654        let mut ctx = UserContext::new()
655            .with_metadata(
656                InMemoryMetadataStore::new()
657                    .with_entity(entity())
658                    .with_entity(line_entity())
659                    .with_entity(product_entity()),
660            )
661            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
662            .with_repository_behavior_registry(
663                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
664            );
665        ctx.insert_resource(PostgresDialect);
666        ctx.insert_resource(StubExecutor {
667            affected: 1,
668            rows: Vec::new(),
669        });
670
671        let repo = ctx
672            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
673            .unwrap();
674
675        let compiled = repo.compile(&repo.select()).unwrap();
676        assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
677
678        let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
679        let affected = repo.insert(&insert).unwrap();
680        assert_eq!(affected, 1);
681        assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
682    }
683
684    #[test]
685    fn resolved_repository_prepares_insert_command_with_generated_id() {
686        let mut ctx = UserContext::new()
687            .with_metadata(
688                InMemoryMetadataStore::new()
689                    .with_entity(entity())
690                    .with_entity(line_entity())
691                    .with_entity(product_entity()),
692            )
693            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
694            .with_repository_behavior_registry(
695                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
696            )
697            .with_internal_id_generator(FixedIdGenerator(42));
698        ctx.insert_resource(PostgresDialect);
699        ctx.insert_resource(StubExecutor {
700            affected: 1,
701            rows: Vec::new(),
702        });
703
704        let repo = ctx
705            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
706            .unwrap();
707
708        let prepared = repo
709            .prepare_insert_command(&repo.insert_command().value("name", "n"))
710            .unwrap();
711
712        assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
713        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
714        assert_eq!(
715            prepared.values.get("name"),
716            Some(&Value::Text("n".to_owned()))
717        );
718    }
719
720    #[test]
721    fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
722        let mut ctx = UserContext::new()
723            .with_metadata(
724                InMemoryMetadataStore::new()
725                    .with_entity(entity())
726                    .with_entity(line_entity())
727                    .with_entity(product_entity()),
728            )
729            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
730            .with_repository_behavior_registry(
731                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
732            )
733            .with_internal_id_generator(SequentialIdGenerator::new(500));
734        ctx.insert_resource(PostgresDialect);
735        ctx.insert_resource(StubExecutor {
736            affected: 1,
737            rows: Vec::new(),
738        });
739
740        let repo = ctx
741            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
742            .unwrap();
743        let graph = GraphNode::new("Order").value("name", "root").relation(
744            "lines",
745            GraphNode::new("OrderLine")
746                .value("name", "line-1")
747                .relation("product", GraphNode::new("Product").value("name", "sku-1")),
748        );
749
750        let saved = repo.save_graph(graph).unwrap();
751
752        assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
753        assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
754        let lines = saved.relations.get("lines").unwrap();
755        assert_eq!(lines.len(), 1);
756        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
757        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
758        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
759        let product = lines[0].relations.get("product").unwrap();
760        assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
761    }
762
763    #[test]
764    fn resolved_repository_extracts_and_saves_typed_entity_graph() {
765        let mut ctx = UserContext::new()
766            .with_metadata(
767                InMemoryMetadataStore::new()
768                    .with_entity(entity())
769                    .with_entity(line_entity())
770                    .with_entity(product_entity()),
771            )
772            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
773            .with_internal_id_generator(SequentialIdGenerator::new(700));
774        ctx.insert_resource(PostgresDialect);
775        ctx.insert_resource(StubExecutor {
776            affected: 1,
777            rows: Vec::new(),
778        });
779
780        let repo = ctx
781            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
782            .unwrap();
783        let order = TypedGraphOrder {
784            id: None,
785            version: 1,
786            name: "typed-root".to_owned(),
787            lines: teaql_core::SmartList::from(vec![TypedGraphLine {
788                id: None,
789                order_id: None,
790                name: "typed-line".to_owned(),
791                product_id: None,
792                product: Some(TypedGraphProduct {
793                    id: None,
794                    name: "typed-product".to_owned(),
795                }),
796            }]),
797        };
798
799        let extracted = repo.graph_node_from_entity(order).unwrap();
800        assert_eq!(extracted.entity, "Order");
801        assert_eq!(
802            extracted.values.get("name"),
803            Some(&Value::Text("typed-root".to_owned()))
804        );
805        assert_eq!(extracted.values.get("id"), Some(&Value::Null));
806        assert_eq!(extracted.relations["lines"].len(), 1);
807        assert_eq!(
808            extracted.relations["lines"][0].values.get("name"),
809            Some(&Value::Text("typed-line".to_owned()))
810        );
811        assert_eq!(
812            extracted.relations["lines"][0].relations["product"].len(),
813            1
814        );
815
816        let saved = repo.save_graph(extracted).unwrap();
817        assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
818        let lines = saved.relations.get("lines").unwrap();
819        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
820        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
821        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
822        assert_eq!(
823            lines[0].relations["product"][0].values.get("id"),
824            Some(&Value::U64(702))
825        );
826    }
827
828    #[test]
829    fn resolved_repository_saves_typed_entity_graph_directly() {
830        let mut ctx = UserContext::new()
831            .with_metadata(
832                InMemoryMetadataStore::new()
833                    .with_entity(entity())
834                    .with_entity(line_entity())
835                    .with_entity(product_entity()),
836            )
837            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
838            .with_internal_id_generator(SequentialIdGenerator::new(800));
839        ctx.insert_resource(PostgresDialect);
840        ctx.insert_resource(StubExecutor {
841            affected: 1,
842            rows: Vec::new(),
843        });
844
845        let repo = ctx
846            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
847            .unwrap();
848        let saved = repo
849            .save_entity_graph(TypedGraphOrder {
850                id: None,
851                version: 1,
852                name: "typed-direct".to_owned(),
853                lines: teaql_core::SmartList::from(vec![TypedGraphLine {
854                    id: None,
855                    order_id: None,
856                    name: "typed-line".to_owned(),
857                    product_id: None,
858                    product: Some(TypedGraphProduct {
859                        id: None,
860                        name: "typed-product".to_owned(),
861                    }),
862                }]),
863            })
864            .unwrap();
865
866        assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
867        assert_eq!(
868            saved.relations["lines"][0].values.get("order_id"),
869            Some(&Value::U64(800))
870        );
871        assert_eq!(
872            saved.relations["lines"][0].values.get("product_id"),
873            Some(&Value::U64(802))
874        );
875    }
876
877    #[test]
878    fn custom_user_context_can_drive_insert_preparation() {
879        let mut ctx = UserContext::new()
880            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
881            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
882            .with_repository_behavior_registry(
883                InMemoryRepositoryBehaviorRegistry::new()
884                    .with_behavior("Order", ContextAwareOrderBehavior),
885            )
886            .with_internal_id_generator(FixedIdGenerator(99));
887        ctx.insert_named_resource("tenant", String::from("acme"));
888        ctx.insert_named_resource("initial_version", 7_i64);
889        ctx.put_local("trace_id", "req-9");
890        ctx.insert_resource(PostgresDialect);
891        ctx.insert_resource(StubExecutor {
892            affected: 1,
893            rows: Vec::new(),
894        });
895
896        let repo = ctx
897            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
898            .unwrap();
899        let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
900
901        assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
902        assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
903        assert_eq!(
904            prepared.values.get("name"),
905            Some(&Value::Text("acme:req-9".to_owned()))
906        );
907    }
908
909    #[test]
910    fn checker_registry_validates_and_fixes_insert_commands() {
911        let mut ctx = UserContext::new()
912            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
913            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
914            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
915            .with_internal_id_generator(FixedIdGenerator(77));
916        ctx.insert_resource(PostgresDialect);
917        ctx.insert_resource(StubExecutor {
918            affected: 1,
919            rows: Vec::new(),
920        });
921
922        let repo = ctx
923            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
924            .unwrap();
925        let prepared = repo
926            .prepare_insert_command(&repo.insert_command().value("name", "valid"))
927            .unwrap();
928
929        assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
930        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
931        assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
932
933        let error = repo
934            .prepare_insert_command(&repo.insert_command().value("name", "no"))
935            .unwrap_err();
936        match error {
937            RuntimeError::Check(results) => {
938                assert_eq!(results.len(), 1);
939                assert_eq!(results[0].location.to_string(), "name");
940            }
941            other => panic!("unexpected checker error: {other:?}"),
942        }
943    }
944
945    #[test]
946    fn checker_registry_validates_update_commands_without_required_insert_checks() {
947        let mut ctx = UserContext::new()
948            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
949            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
950            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
951        ctx.insert_resource(PostgresDialect);
952        ctx.insert_resource(StubExecutor {
953            affected: 1,
954            rows: Vec::new(),
955        });
956
957        let repo = ctx
958            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
959            .unwrap();
960        repo.update(&repo.update_command(1_u64).value("version", 1_i64))
961            .unwrap();
962
963        let error = repo
964            .update(&repo.update_command(1_u64).value("name", "no"))
965            .unwrap_err();
966        match error {
967            RepositoryError::Runtime(RuntimeError::Check(results)) => {
968                assert_eq!(results.len(), 1);
969                assert_eq!(results[0].location.to_string(), "name");
970            }
971            other => panic!("unexpected checker error: {other:?}"),
972        }
973    }
974
975    #[test]
976    fn built_in_language_translators_cover_fifteen_languages() {
977        assert_eq!(Language::ALL.len(), 15);
978        let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
979        let messages = Language::ALL
980            .iter()
981            .map(|language| translate_check_result(*language, &result))
982            .collect::<Vec<_>>();
983
984        assert!(messages.iter().all(|message| !message.is_empty()));
985        assert!(messages.iter().any(|message| message.contains("required")));
986        assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
987        assert!(
988            messages
989                .iter()
990                .any(|message| message.contains("obligatoire"))
991        );
992        assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
993        assert_eq!(
994            Language::from_code("zh-TW"),
995            Some(Language::TraditionalChinese)
996        );
997    }
998
999    #[test]
1000    fn user_context_language_switch_translates_checker_errors() {
1001        let mut ctx = UserContext::new()
1002            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1003            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1004            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1005            .with_internal_id_generator(FixedIdGenerator(77))
1006            .with_language(Language::Chinese);
1007        ctx.insert_resource(PostgresDialect);
1008        ctx.insert_resource(StubExecutor {
1009            affected: 1,
1010            rows: Vec::new(),
1011        });
1012
1013        let repo = ctx
1014            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1015            .unwrap();
1016        let error = repo
1017            .prepare_insert_command(&repo.insert_command())
1018            .unwrap_err();
1019        match error {
1020            RuntimeError::Check(results) => {
1021                assert_eq!(results.len(), 1);
1022                assert!(
1023                    results[0]
1024                        .message
1025                        .as_ref()
1026                        .is_some_and(|message| message.contains("å¿…å¡«"))
1027                );
1028            }
1029            other => panic!("unexpected checker error: {other:?}"),
1030        }
1031
1032        let mut ctx = UserContext::new().with_language(Language::English);
1033        ctx.set_language_code("es").unwrap();
1034        assert_eq!(ctx.language(), Language::Spanish);
1035    }
1036
1037    #[test]
1038    fn checker_registry_merges_graph_update_fixes_by_object_status() {
1039        let mut ctx = UserContext::new()
1040            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1041            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1042            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1043        ctx.insert_resource(PostgresDialect);
1044        ctx.insert_resource(StubExecutor {
1045            affected: 1,
1046            rows: vec![Record::from([
1047                ("id".to_owned(), Value::U64(1)),
1048                ("version".to_owned(), Value::I64(1)),
1049                ("name".to_owned(), Value::Text("old".to_owned())),
1050            ])],
1051        });
1052
1053        let repo = ctx
1054            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1055            .unwrap();
1056        let saved = repo
1057            .save_graph(
1058                GraphNode::new("Order")
1059                    .value("id", 1_u64)
1060                    .value("version", 1_i64)
1061                    .value("name", "graph-update"),
1062            )
1063            .unwrap();
1064
1065        assert_eq!(
1066            saved.values.get("name"),
1067            Some(&Value::Text("graph-update-checked".to_owned()))
1068        );
1069        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1070        assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1071    }
1072
1073    #[test]
1074    fn user_context_event_sink_receives_repository_mutation_events() {
1075        let events = Arc::new(Mutex::new(Vec::new()));
1076        let mut ctx = UserContext::new()
1077            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1078            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1079            .with_internal_id_generator(FixedIdGenerator(88))
1080            .with_event_sink(RecordingEventSink {
1081                events: events.clone(),
1082            });
1083        ctx.insert_resource(PostgresDialect);
1084        ctx.insert_resource(StubExecutor {
1085            affected: 1,
1086            rows: Vec::new(),
1087        });
1088
1089        let repo = ctx
1090            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1091            .unwrap();
1092        repo.insert(&repo.insert_command().value("name", "created"))
1093            .unwrap();
1094        repo.update(
1095            &repo
1096                .update_command(88_u64)
1097                .expected_version(1)
1098                .value("name", "updated"),
1099        )
1100        .unwrap();
1101        repo.delete(&repo.delete_command(88_u64).expected_version(2))
1102            .unwrap();
1103        repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1104
1105        let events = events.lock().unwrap();
1106        assert_eq!(events.len(), 4);
1107        assert_eq!(events[0].kind, EntityEventKind::Created);
1108        assert_eq!(events[0].entity, "Order");
1109        assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1110        assert_eq!(events[1].kind, EntityEventKind::Updated);
1111        assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1112        assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1113        assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1114        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1115        assert_eq!(events[3].kind, EntityEventKind::Recovered);
1116    }
1117
1118    #[test]
1119    fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1120        let events = Arc::new(Mutex::new(Vec::new()));
1121        let mut ctx = UserContext::new()
1122            .with_metadata(
1123                InMemoryMetadataStore::new()
1124                    .with_entity(entity())
1125                    .with_entity(line_entity())
1126                    .with_entity(product_entity()),
1127            )
1128            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1129            .with_event_sink(RecordingEventSink {
1130                events: events.clone(),
1131            });
1132        ctx.insert_resource(PostgresDialect);
1133        ctx.insert_resource(StubExecutor {
1134            affected: 1,
1135            rows: vec![Record::from([
1136                ("id".to_owned(), Value::U64(1)),
1137                ("version".to_owned(), Value::I64(1)),
1138                ("name".to_owned(), Value::Text("old".to_owned())),
1139            ])],
1140        });
1141
1142        let repo = ctx
1143            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1144            .unwrap();
1145        repo.save_graph(
1146            GraphNode::new("Order")
1147                .value("id", 1_u64)
1148                .value("version", 1_i64)
1149                .value("name", "updated")
1150                .relation(
1151                    "lines",
1152                    GraphNode::new("OrderLine")
1153                        .value("name", "line")
1154                        .value("product_id", 3_u64),
1155                ),
1156        )
1157        .unwrap();
1158
1159        let events = events.lock().unwrap();
1160        assert_eq!(events.len(), 3);
1161        assert_eq!(events[0].kind, EntityEventKind::Updated);
1162        assert_eq!(events[0].entity, "Order");
1163        assert_eq!(events[1].kind, EntityEventKind::Updated);
1164        assert_eq!(events[1].entity, "OrderLine");
1165        assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1166        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1167        assert_eq!(events[2].entity, "OrderLine");
1168    }
1169
1170    #[test]
1171    fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1172        let mut ctx = UserContext::new()
1173            .with_metadata(
1174                InMemoryMetadataStore::new()
1175                    .with_entity(entity())
1176                    .with_entity(line_entity())
1177                    .with_entity(product_entity()),
1178            )
1179            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1180            .with_internal_id_generator(SequentialIdGenerator::new(500));
1181        ctx.insert_resource(PostgresDialect);
1182        ctx.insert_resource(StubExecutor {
1183            affected: 1,
1184            rows: vec![Record::from([
1185                ("id".to_owned(), Value::U64(1)),
1186                ("version".to_owned(), Value::I64(1)),
1187                ("name".to_owned(), Value::Text("old".to_owned())),
1188            ])],
1189        });
1190
1191        let plan = ctx
1192            .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1193                GraphNode::new("Order")
1194                    .value("id", 1_u64)
1195                    .value("version", 1_i64)
1196                    .value("name", "updated")
1197                    .relation(
1198                        "lines",
1199                        GraphNode::new("OrderLine")
1200                            .value("name", "new-line-a")
1201                            .value("product_id", 2_u64),
1202                    )
1203                    .relation(
1204                        "lines",
1205                        GraphNode::new("OrderLine")
1206                            .value("name", "new-line-b")
1207                            .value("product_id", 2_u64),
1208                    )
1209                    .relation(
1210                        "lines",
1211                        GraphNode::new("OrderLine")
1212                            .value("id", 5_u64)
1213                            .value("version", 1_i64)
1214                            .value("name", "same-update-a"),
1215                    )
1216                    .relation(
1217                        "lines",
1218                        GraphNode::new("OrderLine")
1219                            .value("id", 6_u64)
1220                            .value("version", 1_i64)
1221                            .value("name", "same-update-b"),
1222                    )
1223                    .relation(
1224                        "lines",
1225                        GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1226                    )
1227                    .relation(
1228                        "lines",
1229                        GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1230                    ),
1231            )
1232            .unwrap();
1233        let counts = plan.grouped_counts();
1234
1235        assert_eq!(
1236            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1237            Some(&1)
1238        );
1239        assert_eq!(
1240            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1241            Some(&2)
1242        );
1243        assert_eq!(
1244            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1245            Some(&2)
1246        );
1247        assert_eq!(
1248            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1249            Some(&1)
1250        );
1251        assert_eq!(
1252            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1253            Some(&1)
1254        );
1255        let create_batch = plan
1256            .batches
1257            .iter()
1258            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1259            .unwrap();
1260        assert_eq!(create_batch.items.len(), 2);
1261        assert_eq!(
1262            create_batch.items[0].values.get("id"),
1263            Some(&Value::U64(500))
1264        );
1265        assert_eq!(
1266            create_batch.items[1].values.get("id"),
1267            Some(&Value::U64(501))
1268        );
1269        let update_batch = plan
1270            .batches
1271            .iter()
1272            .find(|batch| {
1273                batch.entity == "OrderLine"
1274                    && batch.kind == GraphMutationKind::Update
1275                    && batch.update_fields == vec!["name".to_owned()]
1276            })
1277            .unwrap();
1278        assert_eq!(update_batch.items.len(), 2);
1279    }
1280
1281    #[test]
1282    fn resolved_repository_builds_relation_plans() {
1283        let mut ctx = UserContext::new()
1284            .with_metadata(
1285                InMemoryMetadataStore::new()
1286                    .with_entity(entity())
1287                    .with_entity(line_entity())
1288                    .with_entity(product_entity()),
1289            )
1290            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1291            .with_repository_behavior_registry(
1292                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1293            );
1294        ctx.insert_resource(PostgresDialect);
1295        ctx.insert_resource(StubExecutor {
1296            affected: 1,
1297            rows: Vec::new(),
1298        });
1299
1300        let repo = ctx
1301            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1302            .unwrap();
1303        let plans = repo.relation_plans().unwrap();
1304
1305        assert_eq!(plans.len(), 1);
1306        assert_eq!(plans[0].relation_name, "lines");
1307        assert_eq!(plans[0].target_entity, "OrderLine");
1308        assert_eq!(plans[0].local_key, "id");
1309        assert_eq!(plans[0].foreign_key, "order_id");
1310        assert!(plans[0].many);
1311    }
1312
1313    #[test]
1314    fn resolved_repository_builds_relation_query_from_parent_rows() {
1315        let mut ctx = UserContext::new()
1316            .with_metadata(
1317                InMemoryMetadataStore::new()
1318                    .with_entity(entity())
1319                    .with_entity(line_entity())
1320                    .with_entity(product_entity()),
1321            )
1322            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1323            .with_repository_behavior_registry(
1324                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1325            );
1326        ctx.insert_resource(PostgresDialect);
1327        ctx.insert_resource(StubExecutor {
1328            affected: 1,
1329            rows: Vec::new(),
1330        });
1331
1332        let repo = ctx
1333            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1334            .unwrap();
1335        let parent_rows = vec![
1336            Record::from([(String::from("id"), Value::U64(11))]),
1337            Record::from([(String::from("id"), Value::U64(12))]),
1338        ];
1339
1340        let query = repo.relation_query("lines", &parent_rows).unwrap();
1341        let compiled = repo.compile(&query).unwrap();
1342        assert!(compiled.sql.contains("FROM \"orderline\""));
1343        assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1344        assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1345    }
1346
1347    #[test]
1348    fn resolved_repository_enhances_parent_rows_with_relations() {
1349        let mut ctx = UserContext::new()
1350            .with_metadata(
1351                InMemoryMetadataStore::new()
1352                    .with_entity(entity())
1353                    .with_entity(line_entity())
1354                    .with_entity(product_entity()),
1355            )
1356            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1357            .with_repository_behavior_registry(
1358                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1359            );
1360        ctx.insert_resource(PostgresDialect);
1361        ctx.insert_resource(StubExecutor {
1362            affected: 1,
1363            rows: vec![
1364                Record::from([
1365                    (String::from("id"), Value::U64(101)),
1366                    (String::from("order_id"), Value::U64(11)),
1367                    (String::from("name"), Value::Text(String::from("l1"))),
1368                ]),
1369                Record::from([
1370                    (String::from("id"), Value::U64(102)),
1371                    (String::from("order_id"), Value::U64(11)),
1372                    (String::from("name"), Value::Text(String::from("l2"))),
1373                ]),
1374                Record::from([
1375                    (String::from("id"), Value::U64(201)),
1376                    (String::from("order_id"), Value::U64(12)),
1377                    (String::from("name"), Value::Text(String::from("l3"))),
1378                ]),
1379            ],
1380        });
1381
1382        let repo = ctx
1383            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1384            .unwrap();
1385        let mut parents = vec![
1386            Record::from([(String::from("id"), Value::U64(11))]),
1387            Record::from([(String::from("id"), Value::U64(12))]),
1388        ];
1389
1390        repo.enhance_relations(&mut parents).unwrap();
1391
1392        match parents[0].get("lines") {
1393            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1394            other => panic!("unexpected lines payload: {other:?}"),
1395        }
1396        match parents[1].get("lines") {
1397            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1398            other => panic!("unexpected lines payload: {other:?}"),
1399        }
1400    }
1401
1402    #[test]
1403    fn resolved_repository_fetches_smart_list_of_entities() {
1404        let mut ctx = UserContext::new()
1405            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1406            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1407        ctx.insert_resource(PostgresDialect);
1408        ctx.insert_resource(StubExecutor {
1409            affected: 1,
1410            rows: vec![Record::from([
1411                (String::from("id"), Value::U64(7)),
1412                (String::from("version"), Value::I64(2)),
1413                (String::from("name"), Value::Text(String::from("typed"))),
1414            ])],
1415        });
1416
1417        let repo = ctx
1418            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1419            .unwrap();
1420        let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1421
1422        assert_eq!(rows.len(), 1);
1423        assert_eq!(
1424            rows.first(),
1425            Some(&OrderEntity {
1426                id: 7,
1427                version: 2,
1428                name: String::from("typed"),
1429            })
1430        );
1431    }
1432
1433    #[test]
1434    fn resolved_repository_fetches_smart_list_of_derived_entities() {
1435        let mut ctx = UserContext::new()
1436            .with_metadata(
1437                InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1438            )
1439            .with_repository_registry(
1440                InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1441            );
1442        ctx.insert_resource(PostgresDialect);
1443        ctx.insert_resource(StubExecutor {
1444            affected: 1,
1445            rows: vec![Record::from([
1446                (String::from("id"), Value::U64(9)),
1447                (String::from("name"), Value::Text(String::from("derived"))),
1448            ])],
1449        });
1450
1451        let repo = ctx
1452            .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1453            .unwrap();
1454        let rows = repo
1455            .fetch_entities::<CatalogProductRow>(&repo.select())
1456            .unwrap();
1457
1458        assert_eq!(rows.len(), 1);
1459        assert_eq!(
1460            rows.first(),
1461            Some(&CatalogProductRow {
1462                id: 9,
1463                name: String::from("derived"),
1464            })
1465        );
1466    }
1467
1468    #[test]
1469    fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1470        let mut ctx = UserContext::new()
1471            .with_metadata(
1472                InMemoryMetadataStore::new()
1473                    .with_entity(OrderAggregateDynamic::entity_descriptor()),
1474            )
1475            .with_repository_registry(
1476                InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1477            );
1478        ctx.insert_resource(PostgresDialect);
1479        ctx.insert_resource(StubExecutor {
1480            affected: 1,
1481            rows: vec![Record::from([
1482                (String::from("id"), Value::U64(1)),
1483                (String::from("lineCount"), Value::I64(3)),
1484                (String::from("amount"), Value::F64(18.5)),
1485            ])],
1486        });
1487
1488        let repo = ctx
1489            .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1490            .unwrap();
1491        let rows = repo
1492            .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1493            .unwrap();
1494
1495        assert_eq!(rows.len(), 1);
1496        assert_eq!(rows.data[0].id, 1);
1497        assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1498        assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1499        assert_eq!(
1500            rows.into_vec().into_iter().next().unwrap().into_json(),
1501            serde_json::json!({
1502                "id": 1,
1503                "lineCount": 3,
1504                "amount": 18.5
1505            })
1506        );
1507    }
1508
1509    #[test]
1510    fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1511        let executor = QueueExecutor {
1512            affected: 1,
1513            rows: Mutex::new(VecDeque::from([
1514                vec![
1515                    Record::from([
1516                        (String::from("id"), Value::U64(1)),
1517                        (String::from("version"), Value::I64(1)),
1518                        (String::from("name"), Value::Text(String::from("first"))),
1519                    ]),
1520                    Record::from([
1521                        (String::from("id"), Value::U64(2)),
1522                        (String::from("version"), Value::I64(1)),
1523                        (String::from("name"), Value::Text(String::from("second"))),
1524                    ]),
1525                ],
1526                vec![Record::from([
1527                    (String::from("order_id"), Value::U64(1)),
1528                    (String::from("lineCount"), Value::I64(3)),
1529                ])],
1530            ])),
1531            queries: Mutex::new(Vec::new()),
1532        };
1533        let mut ctx = UserContext::new()
1534            .with_metadata(
1535                InMemoryMetadataStore::new()
1536                    .with_entity(entity())
1537                    .with_entity(line_entity()),
1538            )
1539            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1540        ctx.insert_resource(PostgresDialect);
1541        ctx.insert_resource(executor);
1542
1543        let repo = ctx
1544            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1545            .unwrap();
1546        let rows = repo
1547            .fetch_all_with_relation_aggregates(
1548                &repo
1549                    .select()
1550                    .project("id")
1551                    .project("version")
1552                    .project("name"),
1553                &[RelationAggregate::new(
1554                    "lines",
1555                    "lineCount",
1556                    SelectQuery::new("OrderLine"),
1557                    true,
1558                )],
1559            )
1560            .unwrap();
1561
1562        assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1563        assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1564
1565        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1566        let queries = executor.queries.lock().unwrap();
1567        assert_eq!(queries.len(), 2);
1568        assert_eq!(
1569            queries[1],
1570            "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1571        );
1572    }
1573
1574    #[test]
1575    fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1576        let executor = QueueExecutor {
1577            affected: 1,
1578            rows: Mutex::new(VecDeque::from([vec![Record::from([(
1579                String::from("count"),
1580                Value::I64(2),
1581            )])]])),
1582            queries: Mutex::new(Vec::new()),
1583        };
1584        let mut ctx = UserContext::new()
1585            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1586            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1587        ctx.insert_resource(PostgresDialect);
1588        ctx.insert_resource(executor);
1589        ctx.insert_resource(InMemoryAggregationCache::default());
1590
1591        let repo = ctx
1592            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1593            .unwrap();
1594        let query = repo
1595            .select()
1596            .count("count")
1597            .enable_aggregation_cache_for(60_000);
1598
1599        let first = repo.fetch_all(&query).unwrap();
1600        let second = repo.fetch_all(&query).unwrap();
1601
1602        assert_eq!(first, second);
1603        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1604        assert_eq!(executor.queries.lock().unwrap().len(), 1);
1605    }
1606
1607    #[test]
1608    fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1609        let executor = QueueExecutor {
1610            affected: 1,
1611            rows: Mutex::new(VecDeque::from([
1612                vec![Record::from([(String::from("count"), Value::I64(2))])],
1613                vec![Record::from([(String::from("count"), Value::I64(3))])],
1614            ])),
1615            queries: Mutex::new(Vec::new()),
1616        };
1617        let mut ctx = UserContext::new()
1618            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1619            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1620        ctx.insert_resource(PostgresDialect);
1621        ctx.insert_resource(executor);
1622        ctx.insert_resource(
1623            Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1624                as Arc<dyn AggregationCacheBackend>,
1625        );
1626
1627        let repo = ctx
1628            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1629            .unwrap();
1630        let query = repo
1631            .select()
1632            .count("count")
1633            .enable_aggregation_cache_for(60_000);
1634
1635        let first = repo.fetch_all(&query).unwrap();
1636        let cached = repo.fetch_all(&query).unwrap();
1637        repo.insert(
1638            &InsertCommand::new("Order")
1639                .value("id", 9_u64)
1640                .value("version", 1_i64)
1641                .value("name", "new"),
1642        )
1643        .unwrap();
1644        let refreshed = repo.fetch_all(&query).unwrap();
1645
1646        assert_eq!(first, cached);
1647        assert_ne!(cached, refreshed);
1648        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1649        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1650    }
1651
1652    #[test]
1653    fn aggregation_cache_propagates_to_relation_aggregates() {
1654        let parent_rows = vec![
1655            Record::from([
1656                (String::from("id"), Value::U64(1)),
1657                (String::from("version"), Value::I64(1)),
1658                (String::from("name"), Value::Text(String::from("first"))),
1659            ]),
1660            Record::from([
1661                (String::from("id"), Value::U64(2)),
1662                (String::from("version"), Value::I64(1)),
1663                (String::from("name"), Value::Text(String::from("second"))),
1664            ]),
1665        ];
1666        let aggregate_rows = vec![Record::from([
1667            (String::from("order_id"), Value::U64(1)),
1668            (String::from("lineCount"), Value::I64(3)),
1669        ])];
1670        let executor = QueueExecutor {
1671            affected: 1,
1672            rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1673            queries: Mutex::new(Vec::new()),
1674        };
1675        let mut ctx = UserContext::new()
1676            .with_metadata(
1677                InMemoryMetadataStore::new()
1678                    .with_entity(entity())
1679                    .with_entity(line_entity()),
1680            )
1681            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1682        ctx.insert_resource(PostgresDialect);
1683        ctx.insert_resource(executor);
1684        ctx.insert_resource(InMemoryAggregationCache::default());
1685
1686        let repo = ctx
1687            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1688            .unwrap();
1689        let query = repo
1690            .select()
1691            .project("id")
1692            .project("version")
1693            .project("name")
1694            .enable_aggregation_cache_for(60_000)
1695            .propagate_aggregation_cache(60_000);
1696        let aggregate =
1697            RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1698
1699        let first = repo
1700            .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1701            .unwrap();
1702        let second = repo
1703            .fetch_all_with_relation_aggregates(&query, &[aggregate])
1704            .unwrap();
1705
1706        assert_eq!(first, second);
1707        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1708        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1709    }
1710
1711    #[test]
1712    fn memory_repository_fetches_smart_list_entities_with_query_features() {
1713        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1714        let repository = MemoryRepository::new(metadata).with_rows(
1715            "Order",
1716            vec![
1717                Record::from([
1718                    (String::from("id"), Value::U64(1)),
1719                    (String::from("version"), Value::I64(1)),
1720                    (String::from("name"), Value::Text(String::from("alpha"))),
1721                ]),
1722                Record::from([
1723                    (String::from("id"), Value::U64(2)),
1724                    (String::from("version"), Value::I64(1)),
1725                    (String::from("name"), Value::Text(String::from("beta"))),
1726                ]),
1727                Record::from([
1728                    (String::from("id"), Value::U64(3)),
1729                    (String::from("version"), Value::I64(1)),
1730                    (String::from("name"), Value::Text(String::from("gamma"))),
1731                ]),
1732            ],
1733        );
1734
1735        let query = teaql_core::SelectQuery::new("Order")
1736            .filter(Expr::Binary {
1737                left: Box::new(Expr::column("id")),
1738                op: teaql_core::BinaryOp::Gte,
1739                right: Box::new(Expr::value(2_u64)),
1740            })
1741            .order_by(OrderBy::desc("id"))
1742            .limit(1);
1743
1744        let orders = repository.fetch_entities::<Order>(&query).unwrap();
1745
1746        assert_eq!(orders.ids(), vec![Value::U64(3)]);
1747        assert_eq!(orders.versions(), vec![1]);
1748        assert_eq!(orders.first().unwrap().name, "gamma");
1749    }
1750
1751    #[test]
1752    fn memory_repository_runs_aggregates() {
1753        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1754        let repository = MemoryRepository::new(metadata).with_rows(
1755            "Order",
1756            vec![
1757                Record::from([
1758                    (String::from("id"), Value::U64(1)),
1759                    (String::from("version"), Value::I64(1)),
1760                    (String::from("name"), Value::Text(String::from("alpha"))),
1761                ]),
1762                Record::from([
1763                    (String::from("id"), Value::U64(2)),
1764                    (String::from("version"), Value::I64(2)),
1765                    (String::from("name"), Value::Text(String::from("beta"))),
1766                ]),
1767            ],
1768        );
1769
1770        let query = teaql_core::SelectQuery {
1771            entity: String::from("Order"),
1772            projection: Vec::new(),
1773            expr_projection: Vec::new(),
1774            filter: None,
1775            having: None,
1776            order_by: Vec::new(),
1777            slice: None,
1778            aggregates: vec![
1779                Aggregate {
1780                    function: AggregateFunction::Count,
1781                    field: String::from("id"),
1782                    alias: String::from("count"),
1783                },
1784                Aggregate {
1785                    function: AggregateFunction::Sum,
1786                    field: String::from("version"),
1787                    alias: String::from("versionSum"),
1788                },
1789            ],
1790            group_by: Vec::new(),
1791            relations: Vec::new(),
1792            aggregation_cache: None,
1793            comment: None,
1794            raw_sql: None,
1795            raw_sql_search_criteria: Vec::new(),
1796            json_expr: None,
1797            dynamic_properties: Vec::new(),
1798            raw_projections: Vec::new(),
1799            object_group_bys: Vec::new(),
1800            child_enhancements: Vec::new(),
1801        };
1802
1803        let rows = repository.fetch_all(&query).unwrap();
1804
1805        assert_eq!(rows.len(), 1);
1806        assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1807        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1808    }
1809
1810    #[test]
1811    fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1812        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1813        let repository = MemoryRepository::new(metadata).with_rows(
1814            "Order",
1815            vec![
1816                Record::from([
1817                    (String::from("id"), Value::U64(1)),
1818                    (String::from("version"), Value::I64(1)),
1819                    (String::from("name"), Value::Text(String::from("alpha"))),
1820                ]),
1821                Record::from([
1822                    (String::from("id"), Value::U64(2)),
1823                    (String::from("version"), Value::I64(2)),
1824                    (String::from("name"), Value::Text(String::from("alpha"))),
1825                ]),
1826                Record::from([
1827                    (String::from("id"), Value::U64(3)),
1828                    (String::from("version"), Value::I64(3)),
1829                    (String::from("name"), Value::Text(String::from("tmp-beta"))),
1830                ]),
1831            ],
1832        );
1833
1834        let rows = repository
1835            .fetch_all(
1836                &teaql_core::SelectQuery::new("Order")
1837                    .filter(
1838                        Expr::between("version", 1_i64, 3_i64)
1839                            .and_expr(Expr::not_like("name", "tmp%"))
1840                            .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1841                    )
1842                    .group_by("name")
1843                    .count("total")
1844                    .sum("version", "versionSum"),
1845            )
1846            .unwrap();
1847
1848        assert_eq!(rows.len(), 1);
1849        assert_eq!(
1850            rows[0].get("name"),
1851            Some(&Value::Text(String::from("alpha")))
1852        );
1853        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1854        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1855    }
1856
1857    #[test]
1858    fn memory_repository_runs_extended_aggregates_and_having() {
1859        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1860        let repository = MemoryRepository::new(metadata).with_rows(
1861            "Order",
1862            vec![
1863                Record::from([
1864                    (String::from("id"), Value::U64(1)),
1865                    (String::from("version"), Value::I64(1)),
1866                    (String::from("name"), Value::Text(String::from("alpha"))),
1867                ]),
1868                Record::from([
1869                    (String::from("id"), Value::U64(2)),
1870                    (String::from("version"), Value::I64(3)),
1871                    (String::from("name"), Value::Text(String::from("alpha"))),
1872                ]),
1873                Record::from([
1874                    (String::from("id"), Value::U64(3)),
1875                    (String::from("version"), Value::I64(7)),
1876                    (String::from("name"), Value::Text(String::from("beta"))),
1877                ]),
1878            ],
1879        );
1880
1881        let rows = repository
1882            .fetch_all(
1883                &teaql_core::SelectQuery::new("Order")
1884                    .group_by("name")
1885                    .count("total")
1886                    .stddev("version", "stddevVersion")
1887                    .var_pop("version", "varPopVersion")
1888                    .bit_or("version", "bitOrVersion")
1889                    .having(Expr::gt("total", 1_i64)),
1890            )
1891            .unwrap();
1892
1893        assert_eq!(rows.len(), 1);
1894        assert_eq!(
1895            rows[0].get("name"),
1896            Some(&Value::Text(String::from("alpha")))
1897        );
1898        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1899        assert_eq!(
1900            rows[0].get("stddevVersion").map(Value::to_json_value),
1901            Some(serde_json::Value::String(
1902                "1.4142135623730951454746218583".to_owned()
1903            ))
1904        );
1905        assert_eq!(
1906            rows[0].get("varPopVersion"),
1907            Some(&Value::Decimal(Decimal::ONE))
1908        );
1909        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1910    }
1911
1912    #[test]
1913    fn memory_repository_runs_sound_like_filter() {
1914        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1915        let repository = MemoryRepository::new(metadata).with_rows(
1916            "Order",
1917            vec![
1918                Record::from([
1919                    (String::from("id"), Value::U64(1)),
1920                    (String::from("version"), Value::I64(1)),
1921                    (String::from("name"), Value::Text(String::from("Robert"))),
1922                ]),
1923                Record::from([
1924                    (String::from("id"), Value::U64(2)),
1925                    (String::from("version"), Value::I64(1)),
1926                    (String::from("name"), Value::Text(String::from("Rupert"))),
1927                ]),
1928                Record::from([
1929                    (String::from("id"), Value::U64(3)),
1930                    (String::from("version"), Value::I64(1)),
1931                    (String::from("name"), Value::Text(String::from("Ashcraft"))),
1932                ]),
1933            ],
1934        );
1935
1936        let rows = repository
1937            .fetch_all(
1938                &teaql_core::SelectQuery::new("Order")
1939                    .filter(Expr::sound_like("name", "Robert"))
1940                    .order_asc("id"),
1941            )
1942            .unwrap();
1943
1944        assert_eq!(rows.len(), 2);
1945        assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
1946        assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
1947    }
1948
1949    #[test]
1950    fn memory_repository_runs_java_style_string_match_filters() {
1951        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1952        let repository = MemoryRepository::new(metadata).with_rows(
1953            "Order",
1954            vec![
1955                Record::from([
1956                    (String::from("id"), Value::U64(1)),
1957                    (String::from("version"), Value::I64(1)),
1958                    (String::from("name"), Value::Text(String::from("tea-order"))),
1959                ]),
1960                Record::from([
1961                    (String::from("id"), Value::U64(2)),
1962                    (String::from("version"), Value::I64(1)),
1963                    (
1964                        String::from("name"),
1965                        Value::Text(String::from("coffee-order")),
1966                    ),
1967                ]),
1968                Record::from([
1969                    (String::from("id"), Value::U64(3)),
1970                    (String::from("version"), Value::I64(1)),
1971                    (
1972                        String::from("name"),
1973                        Value::Text(String::from("tea-archived")),
1974                    ),
1975                ]),
1976            ],
1977        );
1978
1979        let rows = repository
1980            .fetch_all(
1981                &teaql_core::SelectQuery::new("Order")
1982                    .filter(
1983                        Expr::contain("name", "tea")
1984                            .and_expr(Expr::begin_with("name", "tea"))
1985                            .and_expr(Expr::end_with("name", "order"))
1986                            .and_expr(Expr::not_contain("name", "coffee"))
1987                            .and_expr(Expr::not_begin_with("name", "archived"))
1988                            .and_expr(Expr::not_end_with("name", "draft")),
1989                    )
1990                    .order_asc("id"),
1991            )
1992            .unwrap();
1993
1994        assert_eq!(rows.len(), 1);
1995        assert_eq!(
1996            rows[0].get("name"),
1997            Some(&Value::Text("tea-order".to_owned()))
1998        );
1999    }
2000
2001    #[test]
2002    fn memory_repository_runs_property_to_property_filters() {
2003        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2004        let repository = MemoryRepository::new(metadata).with_rows(
2005            "Order",
2006            vec![
2007                Record::from([
2008                    (String::from("id"), Value::U64(1)),
2009                    (String::from("version"), Value::I64(2)),
2010                    (String::from("name"), Value::Text(String::from("keep"))),
2011                ]),
2012                Record::from([
2013                    (String::from("id"), Value::U64(2)),
2014                    (String::from("version"), Value::I64(1)),
2015                    (String::from("name"), Value::Text(String::from("skip"))),
2016                ]),
2017            ],
2018        );
2019
2020        let rows = repository
2021            .fetch_all(
2022                &teaql_core::SelectQuery::new("Order")
2023                    .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2024                    .order_asc("id"),
2025            )
2026            .unwrap();
2027
2028        assert_eq!(rows.len(), 1);
2029        assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2030    }
2031
2032    #[test]
2033    fn memory_repository_supports_mutations_and_optimistic_locking() {
2034        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2035        let repository = MemoryRepository::new(metadata);
2036
2037        repository
2038            .insert(
2039                &InsertCommand::new("Order")
2040                    .value("id", 10_u64)
2041                    .value("version", 1_i64)
2042                    .value("name", "draft"),
2043            )
2044            .unwrap();
2045        repository
2046            .update(
2047                &UpdateCommand::new("Order", 10_u64)
2048                    .expected_version(1)
2049                    .value("name", "submitted"),
2050            )
2051            .unwrap();
2052
2053        let row = repository
2054            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2055            .unwrap()
2056            .pop()
2057            .unwrap();
2058        assert_eq!(
2059            row.get("name"),
2060            Some(&Value::Text(String::from("submitted")))
2061        );
2062        assert_eq!(row.get("version"), Some(&Value::I64(2)));
2063
2064        let conflict = repository
2065            .update(
2066                &UpdateCommand::new("Order", 10_u64)
2067                    .expected_version(1)
2068                    .value("name", "stale"),
2069            )
2070            .unwrap_err();
2071        assert!(matches!(
2072            conflict,
2073            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2074        ));
2075
2076        repository
2077            .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2078            .unwrap();
2079        let row = repository
2080            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2081            .unwrap()
2082            .pop()
2083            .unwrap();
2084        assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2085
2086        repository
2087            .recover(&RecoverCommand::new("Order", 10_u64, -3))
2088            .unwrap();
2089        let row = repository
2090            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2091            .unwrap()
2092            .pop()
2093            .unwrap();
2094        assert_eq!(row.get("version"), Some(&Value::I64(4)));
2095    }
2096}
2097
2098#[cfg(all(test, feature = "sqlx"))]
2099mod sqlx_integration_tests {
2100    use super::sqlx_support::{
2101        MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2102        SqliteIdSpaceGenerator, SqliteMutationExecutor,
2103    };
2104    use super::{
2105        GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2106        InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2107        RepositoryBehavior, UserContext,
2108    };
2109    use chrono::{NaiveDate, TimeZone, Utc};
2110    use teaql_core::{
2111        DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2112        PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2113    };
2114    use teaql_dialect_pg::PostgresDialect;
2115    use teaql_dialect_sqlite::SqliteDialect;
2116    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2117    use teaql_sql::SqlDialect;
2118    use tokio::runtime::Handle;
2119    use tokio::task::block_in_place;
2120
2121    fn entity() -> EntityDescriptor {
2122        EntityDescriptor::new("Order")
2123            .table_name("orders")
2124            .property(
2125                PropertyDescriptor::new("id", DataType::U64)
2126                    .column_name("id")
2127                    .id()
2128                    .not_null(),
2129            )
2130            .property(
2131                PropertyDescriptor::new("version", DataType::I64)
2132                    .column_name("version")
2133                    .version()
2134                    .not_null(),
2135            )
2136            .property(
2137                PropertyDescriptor::new("name", DataType::Text)
2138                    .column_name("name")
2139                    .not_null(),
2140            )
2141            .relation(
2142                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2143                    .local_key("id")
2144                    .foreign_key("order_id")
2145                    .many(),
2146            )
2147    }
2148
2149    fn sqlite_entity_keep_missing() -> EntityDescriptor {
2150        EntityDescriptor::new("Order")
2151            .table_name("orders")
2152            .property(
2153                PropertyDescriptor::new("id", DataType::U64)
2154                    .column_name("id")
2155                    .id()
2156                    .not_null(),
2157            )
2158            .property(
2159                PropertyDescriptor::new("version", DataType::I64)
2160                    .column_name("version")
2161                    .version()
2162                    .not_null(),
2163            )
2164            .property(
2165                PropertyDescriptor::new("name", DataType::Text)
2166                    .column_name("name")
2167                    .not_null(),
2168            )
2169            .relation(
2170                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2171                    .local_key("id")
2172                    .foreign_key("order_id")
2173                    .many()
2174                    .keep_missing(),
2175            )
2176    }
2177
2178    fn line_entity() -> EntityDescriptor {
2179        EntityDescriptor::new("OrderLine")
2180            .table_name("orderline")
2181            .property(
2182                PropertyDescriptor::new("id", DataType::U64)
2183                    .column_name("id")
2184                    .id()
2185                    .not_null(),
2186            )
2187            .property(
2188                PropertyDescriptor::new("version", DataType::I64)
2189                    .column_name("version")
2190                    .version(),
2191            )
2192            .property(
2193                PropertyDescriptor::new("order_id", DataType::U64)
2194                    .column_name("order_id")
2195                    .not_null(),
2196            )
2197            .property(
2198                PropertyDescriptor::new("name", DataType::Text)
2199                    .column_name("name")
2200                    .not_null(),
2201            )
2202            .property(
2203                PropertyDescriptor::new("product_id", DataType::U64)
2204                    .column_name("product_id")
2205                    .not_null(),
2206            )
2207            .relation(
2208                teaql_core::RelationDescriptor::new("product", "Product")
2209                    .local_key("product_id")
2210                    .foreign_key("id"),
2211            )
2212    }
2213
2214    fn product_entity() -> EntityDescriptor {
2215        EntityDescriptor::new("Product")
2216            .table_name("product")
2217            .property(
2218                PropertyDescriptor::new("id", DataType::U64)
2219                    .column_name("id")
2220                    .id()
2221                    .not_null(),
2222            )
2223            .property(
2224                PropertyDescriptor::new("name", DataType::Text)
2225                    .column_name("name")
2226                    .not_null(),
2227            )
2228    }
2229
2230    fn typed_entity() -> EntityDescriptor {
2231        EntityDescriptor::new("TypedValue")
2232            .table_name("typed_value")
2233            .property(
2234                PropertyDescriptor::new("id", DataType::U64)
2235                    .column_name("id")
2236                    .id()
2237                    .not_null(),
2238            )
2239            .property(
2240                PropertyDescriptor::new("payload", DataType::Json)
2241                    .column_name("payload")
2242                    .not_null(),
2243            )
2244            .property(
2245                PropertyDescriptor::new("amount", DataType::Decimal)
2246                    .column_name("amount")
2247                    .not_null(),
2248            )
2249            .property(
2250                PropertyDescriptor::new("birthday", DataType::Date)
2251                    .column_name("birthday")
2252                    .not_null(),
2253            )
2254            .property(
2255                PropertyDescriptor::new("happened_at", DataType::Timestamp)
2256                    .column_name("happened_at")
2257                    .not_null(),
2258            )
2259    }
2260
2261    struct OrderBehavior;
2262    struct NestedOrderBehavior;
2263
2264    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2265    #[teaql(entity = "Product", table = "product")]
2266    struct ProductEntityRow {
2267        #[teaql(id)]
2268        id: u64,
2269        name: String,
2270    }
2271
2272    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2273    #[teaql(entity = "OrderLine", table = "orderline")]
2274    struct OrderLineEntityRow {
2275        #[teaql(id)]
2276        id: u64,
2277        #[teaql(column = "order_id")]
2278        order_id: u64,
2279        name: String,
2280        #[teaql(column = "product_id")]
2281        product_id: u64,
2282        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2283        product: Option<ProductEntityRow>,
2284    }
2285
2286    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2287    #[teaql(entity = "Order", table = "orders")]
2288    struct OrderAggregateRow {
2289        #[teaql(id)]
2290        id: u64,
2291        #[teaql(version)]
2292        version: i64,
2293        name: String,
2294        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2295        lines: teaql_core::SmartList<OrderLineEntityRow>,
2296    }
2297
2298    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2299    #[teaql(entity = "Order", table = "orders")]
2300    struct Order {
2301        #[teaql(id)]
2302        id: u64,
2303        #[teaql(version)]
2304        version: i64,
2305        name: String,
2306    }
2307
2308    impl RepositoryBehavior for OrderBehavior {
2309        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2310            vec!["lines".to_owned()]
2311        }
2312    }
2313
2314    impl RepositoryBehavior for NestedOrderBehavior {
2315        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2316            vec!["lines.product".to_owned()]
2317        }
2318    }
2319
2320    #[derive(Clone)]
2321    struct SqliteSyncExecutor {
2322        inner: SqliteMutationExecutor,
2323    }
2324
2325    impl SqliteSyncExecutor {
2326        fn new(inner: SqliteMutationExecutor) -> Self {
2327            Self { inner }
2328        }
2329    }
2330
2331    impl QueryExecutor for SqliteSyncExecutor {
2332        type Error = MutationExecutorError;
2333
2334        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2335            let handle = Handle::current();
2336            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2337        }
2338
2339        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2340            let handle = Handle::current();
2341            block_in_place(|| handle.block_on(self.inner.execute(query)))
2342        }
2343
2344        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2345            let handle = Handle::current();
2346            block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2347            Ok(GraphTransactionBoundary::Started)
2348        }
2349
2350        fn commit_transaction(&self) -> Result<(), Self::Error> {
2351            let handle = Handle::current();
2352            block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2353        }
2354
2355        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2356            let handle = Handle::current();
2357            block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2358        }
2359    }
2360
2361    #[derive(Clone)]
2362    struct PgSyncExecutor {
2363        inner: PgMutationExecutor,
2364    }
2365
2366    impl PgSyncExecutor {
2367        fn new(inner: PgMutationExecutor) -> Self {
2368            Self { inner }
2369        }
2370    }
2371
2372    impl QueryExecutor for PgSyncExecutor {
2373        type Error = MutationExecutorError;
2374
2375        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2376            let handle = Handle::current();
2377            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2378        }
2379
2380        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2381            let handle = Handle::current();
2382            block_in_place(|| handle.block_on(self.inner.execute(query)))
2383        }
2384    }
2385
2386    #[derive(Clone)]
2387    struct PgTxSyncExecutor {
2388        inner: PgTransactionExecutor,
2389    }
2390
2391    impl PgTxSyncExecutor {
2392        fn new(inner: PgTransactionExecutor) -> Self {
2393            Self { inner }
2394        }
2395    }
2396
2397    impl QueryExecutor for PgTxSyncExecutor {
2398        type Error = MutationExecutorError;
2399
2400        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2401            let handle = Handle::current();
2402            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2403        }
2404
2405        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2406            let handle = Handle::current();
2407            block_in_place(|| handle.block_on(self.inner.execute(query)))
2408        }
2409
2410        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2411            Ok(GraphTransactionBoundary::AlreadyActive)
2412        }
2413
2414        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2415            let handle = Handle::current();
2416            block_in_place(|| handle.block_on(self.inner.rollback()))
2417        }
2418    }
2419
2420    #[tokio::test]
2421    async fn sqlite_executor_runs_crud_flow() {
2422        use sqlx::sqlite::SqlitePoolOptions;
2423
2424        let pool = SqlitePoolOptions::new()
2425            .max_connections(1)
2426            .connect("sqlite::memory:")
2427            .await
2428            .unwrap();
2429
2430        let executor = SqliteMutationExecutor::new(pool.clone());
2431        let dialect = SqliteDialect;
2432        let entity = entity();
2433        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2434
2435        let insert = dialect
2436            .compile_insert(
2437                &entity,
2438                &InsertCommand::new("Order")
2439                    .value("id", 1_u64)
2440                    .value("version", 1_i64)
2441                    .value("name", "first"),
2442            )
2443            .unwrap();
2444        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2445
2446        let select = dialect
2447            .compile_select(
2448                &entity,
2449                &SelectQuery::new("Order")
2450                    .project("id")
2451                    .project("version")
2452                    .project("name")
2453                    .filter(Expr::eq("id", 1_u64)),
2454            )
2455            .unwrap();
2456        let rows = executor.fetch_all(&select).await.unwrap();
2457        assert_eq!(rows.len(), 1);
2458        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2459        assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2460        assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2461
2462        let update = dialect
2463            .compile_update(
2464                &entity,
2465                &UpdateCommand::new("Order", 1_u64)
2466                    .expected_version(1)
2467                    .value("name", "second"),
2468            )
2469            .unwrap();
2470        assert_eq!(executor.execute(&update).await.unwrap(), 1);
2471
2472        let after_update = executor.fetch_all(&select).await.unwrap();
2473        assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2474        assert_eq!(
2475            after_update[0].get("name"),
2476            Some(&Value::Text("second".to_owned()))
2477        );
2478
2479        let delete = dialect
2480            .compile_delete(
2481                &entity,
2482                &DeleteCommand::new("Order", 1_u64).expected_version(2),
2483            )
2484            .unwrap();
2485        assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2486
2487        let after_delete = executor.fetch_all(&select).await.unwrap();
2488        assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2489
2490        let recover = dialect
2491            .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2492            .unwrap();
2493        assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2494
2495        let after_recover = executor.fetch_all(&select).await.unwrap();
2496        assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2497    }
2498
2499    #[tokio::test(flavor = "multi_thread")]
2500    async fn sqlite_executor_enhances_relations() {
2501        use sqlx::sqlite::SqlitePoolOptions;
2502
2503        let pool = SqlitePoolOptions::new()
2504            .max_connections(1)
2505            .connect("sqlite::memory:")
2506            .await
2507            .unwrap();
2508
2509        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2510        let order = entity();
2511        let line = line_entity();
2512        mutation_executor
2513            .ensure_schema(&SqliteDialect, &[&order, &line])
2514            .await
2515            .unwrap();
2516
2517        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2518            .execute(&pool)
2519            .await
2520            .unwrap();
2521        sqlx::query(
2522            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2523                (101, 1, 1001, 'l1'),
2524                (102, 1, 1002, 'l2'),
2525                (201, 2, 1003, 'l3')",
2526        )
2527        .execute(&pool)
2528        .await
2529        .unwrap();
2530
2531        let executor = SqliteSyncExecutor::new(mutation_executor);
2532        let mut ctx = UserContext::new()
2533            .with_metadata(
2534                InMemoryMetadataStore::new()
2535                    .with_entity(order)
2536                    .with_entity(line)
2537                    .with_entity(product_entity()),
2538            )
2539            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2540            .with_repository_behavior_registry(
2541                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2542            );
2543        ctx.insert_resource(SqliteDialect);
2544        ctx.insert_resource(executor);
2545
2546        let repo = ctx
2547            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2548            .unwrap();
2549        let mut parents = repo
2550            .fetch_all(
2551                &repo
2552                    .select()
2553                    .project("id")
2554                    .project("version")
2555                    .project("name")
2556                    .order_by(teaql_core::OrderBy::asc("id")),
2557            )
2558            .unwrap();
2559
2560        repo.enhance_relations(&mut parents).unwrap();
2561
2562        assert_eq!(parents.len(), 2);
2563        match parents[0].get("lines") {
2564            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2565            other => panic!("unexpected first lines payload: {other:?}"),
2566        }
2567        match parents[1].get("lines") {
2568            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2569            other => panic!("unexpected second lines payload: {other:?}"),
2570        }
2571    }
2572
2573    #[tokio::test(flavor = "multi_thread")]
2574    async fn sqlite_executor_enhances_nested_relations() {
2575        use sqlx::sqlite::SqlitePoolOptions;
2576
2577        let pool = SqlitePoolOptions::new()
2578            .max_connections(1)
2579            .connect("sqlite::memory:")
2580            .await
2581            .unwrap();
2582
2583        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2584        let order = entity();
2585        let line = line_entity();
2586        let product = product_entity();
2587        mutation_executor
2588            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2589            .await
2590            .unwrap();
2591
2592        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2593            .execute(&pool)
2594            .await
2595            .unwrap();
2596        sqlx::query(
2597            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2598                (101, 1, 1001, 'l1'),
2599                (102, 1, 1002, 'l2')",
2600        )
2601        .execute(&pool)
2602        .await
2603        .unwrap();
2604        sqlx::query(
2605            "INSERT INTO product (id, name) VALUES
2606                (1001, 'p1'),
2607                (1002, 'p2')",
2608        )
2609        .execute(&pool)
2610        .await
2611        .unwrap();
2612
2613        let executor = SqliteSyncExecutor::new(mutation_executor);
2614        let mut ctx = UserContext::new()
2615            .with_metadata(
2616                InMemoryMetadataStore::new()
2617                    .with_entity(order)
2618                    .with_entity(line)
2619                    .with_entity(product),
2620            )
2621            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2622            .with_repository_behavior_registry(
2623                InMemoryRepositoryBehaviorRegistry::new()
2624                    .with_behavior("Order", NestedOrderBehavior),
2625            );
2626        ctx.insert_resource(SqliteDialect);
2627        ctx.insert_resource(executor);
2628
2629        let repo = ctx
2630            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2631            .unwrap();
2632        let mut parents = repo
2633            .fetch_all(
2634                &repo
2635                    .select()
2636                    .project("id")
2637                    .project("version")
2638                    .project("name"),
2639            )
2640            .unwrap();
2641
2642        repo.enhance_relations(&mut parents).unwrap();
2643
2644        match parents[0].get("lines") {
2645            Some(Value::List(lines)) => {
2646                assert_eq!(lines.len(), 2);
2647                for line in lines {
2648                    match line {
2649                        Value::Object(line_record) => match line_record.get("product") {
2650                            Some(Value::Object(product)) => {
2651                                assert!(product.get("name").is_some());
2652                            }
2653                            other => panic!("unexpected product payload: {other:?}"),
2654                        },
2655                        other => panic!("unexpected line payload: {other:?}"),
2656                    }
2657                }
2658            }
2659            other => panic!("unexpected nested lines payload: {other:?}"),
2660        }
2661    }
2662
2663    #[tokio::test(flavor = "multi_thread")]
2664    async fn sqlite_executor_enhances_query_relation_with_child_query() {
2665        use sqlx::sqlite::SqlitePoolOptions;
2666
2667        let pool = SqlitePoolOptions::new()
2668            .max_connections(1)
2669            .connect("sqlite::memory:")
2670            .await
2671            .unwrap();
2672
2673        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2674        let order = entity();
2675        let line = line_entity();
2676        let product = product_entity();
2677        mutation_executor
2678            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2679            .await
2680            .unwrap();
2681
2682        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2683            .execute(&pool)
2684            .await
2685            .unwrap();
2686        sqlx::query(
2687            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2688                (101, 1, 1001, 'keep'),
2689                (102, 1, 1002, 'drop')",
2690        )
2691        .execute(&pool)
2692        .await
2693        .unwrap();
2694        sqlx::query(
2695            "INSERT INTO product (id, name) VALUES
2696                (1001, 'p1'),
2697                (1002, 'p2')",
2698        )
2699        .execute(&pool)
2700        .await
2701        .unwrap();
2702
2703        let executor = SqliteSyncExecutor::new(mutation_executor);
2704        let mut ctx = UserContext::new()
2705            .with_metadata(
2706                InMemoryMetadataStore::new()
2707                    .with_entity(order)
2708                    .with_entity(line)
2709                    .with_entity(product),
2710            )
2711            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2712        ctx.insert_resource(SqliteDialect);
2713        ctx.insert_resource(executor);
2714
2715        let repo = ctx
2716            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2717            .unwrap();
2718        let query = repo.select().relation_query(
2719            "lines",
2720            SelectQuery::new("OrderLine")
2721                .project("name")
2722                .filter(Expr::eq("name", "keep"))
2723                .order_desc("id")
2724                .page(0, 10)
2725                .relation_query("product", SelectQuery::new("Product").project("name")),
2726        );
2727        let mut parents = repo
2728            .fetch_all(
2729                &repo
2730                    .select()
2731                    .project("id")
2732                    .project("version")
2733                    .project("name"),
2734            )
2735            .unwrap();
2736
2737        repo.enhance_query_relations(&mut parents, &query).unwrap();
2738
2739        match parents[0].get("lines") {
2740            Some(Value::List(lines)) => {
2741                assert_eq!(lines.len(), 1);
2742                let Value::Object(line) = &lines[0] else {
2743                    panic!("unexpected line payload: {:?}", lines[0]);
2744                };
2745                assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2746                assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2747                assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2748                match line.get("product") {
2749                    Some(Value::Object(product)) => {
2750                        assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2751                        assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2752                    }
2753                    other => panic!("unexpected product payload: {other:?}"),
2754                }
2755            }
2756            other => panic!("unexpected query relation payload: {other:?}"),
2757        }
2758    }
2759
2760    #[tokio::test]
2761    async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2762        use sqlx::Row;
2763        use sqlx::sqlite::SqlitePoolOptions;
2764
2765        let pool = SqlitePoolOptions::new()
2766            .max_connections(1)
2767            .connect("sqlite::memory:")
2768            .await
2769            .unwrap();
2770
2771        sqlx::query(
2772            "CREATE TABLE orders (
2773                id INTEGER PRIMARY KEY
2774            )",
2775        )
2776        .execute(&pool)
2777        .await
2778        .unwrap();
2779
2780        let executor = SqliteMutationExecutor::new(pool.clone());
2781        let dialect = SqliteDialect;
2782        let entity = entity();
2783
2784        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2785
2786        let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2787            .fetch_all(&pool)
2788            .await
2789            .unwrap()
2790            .into_iter()
2791            .map(|row| row.try_get::<String, _>("name").unwrap())
2792            .collect::<Vec<_>>();
2793
2794        assert!(columns.contains(&"id".to_owned()));
2795        assert!(columns.contains(&"version".to_owned()));
2796        assert!(columns.contains(&"name".to_owned()));
2797    }
2798
2799    #[tokio::test]
2800    async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2801        use sqlx::Row;
2802        use sqlx::sqlite::SqlitePoolOptions;
2803
2804        let pool = SqlitePoolOptions::new()
2805            .max_connections(1)
2806            .connect("sqlite::memory:")
2807            .await
2808            .unwrap();
2809
2810        let module = super::RuntimeModule::new()
2811            .descriptor(entity())
2812            .descriptor(line_entity())
2813            .descriptor(product_entity());
2814        let mut ctx = super::UserContext::new().with_module(module);
2815        ctx.insert_resource(SqliteDialect);
2816        ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2817
2818        ctx.ensure_sqlite_schema().await.unwrap();
2819
2820        let tables = sqlx::query(
2821            "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2822        )
2823        .fetch_all(&pool)
2824        .await
2825        .unwrap()
2826        .into_iter()
2827        .map(|row| row.try_get::<String, _>("name").unwrap())
2828        .collect::<Vec<_>>();
2829
2830        assert_eq!(
2831            tables,
2832            vec![
2833                "orderline".to_owned(),
2834                "orders".to_owned(),
2835                "product".to_owned()
2836            ]
2837        );
2838    }
2839
2840    #[tokio::test]
2841    async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2842        use sqlx::sqlite::SqlitePoolOptions;
2843
2844        let pool = SqlitePoolOptions::new()
2845            .max_connections(1)
2846            .connect("sqlite::memory:")
2847            .await
2848            .unwrap();
2849
2850        let executor = SqliteMutationExecutor::new(pool.clone());
2851        let dialect = SqliteDialect;
2852        let entity = typed_entity();
2853        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2854
2855        let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2856        let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2857        let payload = serde_json::json!({"name": "teaql", "count": 2});
2858        let amount = Decimal::new(12345, 2);
2859
2860        let insert = dialect
2861            .compile_insert(
2862                &entity,
2863                &InsertCommand::new("TypedValue")
2864                    .value("id", 1_u64)
2865                    .value("payload", payload.clone())
2866                    .value("amount", amount)
2867                    .value("birthday", birthday)
2868                    .value("happened_at", happened_at),
2869            )
2870            .unwrap();
2871        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2872
2873        let select = dialect
2874            .compile_select(
2875                &entity,
2876                &SelectQuery::new("TypedValue")
2877                    .project("id")
2878                    .project("payload")
2879                    .project("amount")
2880                    .project("birthday")
2881                    .project("happened_at")
2882                    .filter(Expr::eq("id", 1_u64)),
2883            )
2884            .unwrap();
2885        let rows = executor.fetch_all(&select).await.unwrap();
2886        assert_eq!(rows.len(), 1);
2887        assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2888        assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2889        assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2890        assert_eq!(
2891            rows[0].get("happened_at"),
2892            Some(&Value::Timestamp(happened_at))
2893        );
2894    }
2895
2896    #[tokio::test(flavor = "multi_thread")]
2897    async fn sqlite_fetches_enhanced_typed_entities() {
2898        use sqlx::sqlite::SqlitePoolOptions;
2899
2900        let pool = SqlitePoolOptions::new()
2901            .max_connections(1)
2902            .connect("sqlite::memory:")
2903            .await
2904            .unwrap();
2905
2906        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2907        let order = entity();
2908        let line = line_entity();
2909        let product = product_entity();
2910        mutation_executor
2911            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2912            .await
2913            .unwrap();
2914
2915        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2916            .execute(&pool)
2917            .await
2918            .unwrap();
2919        sqlx::query(
2920            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2921                (101, 1, 1001, 'l1'),
2922                (102, 1, 1002, 'l2')",
2923        )
2924        .execute(&pool)
2925        .await
2926        .unwrap();
2927        sqlx::query(
2928            "INSERT INTO product (id, name) VALUES
2929                (1001, 'p1'),
2930                (1002, 'p2')",
2931        )
2932        .execute(&pool)
2933        .await
2934        .unwrap();
2935
2936        let executor = SqliteSyncExecutor::new(mutation_executor);
2937        let mut ctx = UserContext::new()
2938            .with_metadata(
2939                InMemoryMetadataStore::new()
2940                    .with_entity(order)
2941                    .with_entity(line)
2942                    .with_entity(product),
2943            )
2944            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2945            .with_repository_behavior_registry(
2946                InMemoryRepositoryBehaviorRegistry::new()
2947                    .with_behavior("Order", NestedOrderBehavior),
2948            );
2949        ctx.insert_resource(SqliteDialect);
2950        ctx.insert_resource(executor);
2951
2952        let repo = ctx
2953            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2954            .unwrap();
2955        let rows = repo
2956            .fetch_enhanced_entities::<OrderAggregateRow>(
2957                &repo
2958                    .select()
2959                    .project("id")
2960                    .project("version")
2961                    .project("name"),
2962            )
2963            .unwrap();
2964
2965        assert_eq!(rows.len(), 1);
2966        let order = rows.first().unwrap();
2967        assert_eq!(order.id, 1);
2968        assert_eq!(order.lines.len(), 2);
2969        assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
2970        assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
2971    }
2972
2973    #[tokio::test(flavor = "multi_thread")]
2974    async fn sqlite_fetches_typed_smart_list_entities() {
2975        use sqlx::sqlite::SqlitePoolOptions;
2976
2977        let pool = SqlitePoolOptions::new()
2978            .max_connections(1)
2979            .connect("sqlite::memory:")
2980            .await
2981            .unwrap();
2982
2983        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2984        let order = entity();
2985        mutation_executor
2986            .ensure_schema(&SqliteDialect, &[&order])
2987            .await
2988            .unwrap();
2989
2990        sqlx::query(
2991            "INSERT INTO orders (id, version, name) VALUES
2992                (1, 1, 'o1'),
2993                (2, 3, 'o2')",
2994        )
2995        .execute(&pool)
2996        .await
2997        .unwrap();
2998
2999        let executor = SqliteSyncExecutor::new(mutation_executor);
3000        let mut ctx = UserContext::new()
3001            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3002            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3003        ctx.insert_resource(SqliteDialect);
3004        ctx.insert_resource(executor);
3005
3006        let repo = ctx
3007            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3008            .unwrap();
3009        let rows = repo
3010            .fetch_entities::<OrderAggregateRow>(
3011                &repo
3012                    .select()
3013                    .project("id")
3014                    .project("version")
3015                    .project("name"),
3016            )
3017            .unwrap();
3018
3019        assert_eq!(rows.len(), 2);
3020        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3021        assert_eq!(rows.versions(), vec![1, 3]);
3022        assert!(rows.data[0].lines.is_empty());
3023        assert!(rows.data[1].lines.is_empty());
3024    }
3025
3026    #[tokio::test(flavor = "multi_thread")]
3027    async fn sqlite_fetches_smart_list_of_order_entities() {
3028        use sqlx::sqlite::SqlitePoolOptions;
3029
3030        let pool = SqlitePoolOptions::new()
3031            .max_connections(1)
3032            .connect("sqlite::memory:")
3033            .await
3034            .unwrap();
3035
3036        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3037        let order = entity();
3038        mutation_executor
3039            .ensure_schema(&SqliteDialect, &[&order])
3040            .await
3041            .unwrap();
3042
3043        sqlx::query(
3044            "INSERT INTO orders (id, version, name) VALUES
3045                (1, 1, 'o1'),
3046                (2, 3, 'o2')",
3047        )
3048        .execute(&pool)
3049        .await
3050        .unwrap();
3051
3052        let executor = SqliteSyncExecutor::new(mutation_executor);
3053        let mut ctx = UserContext::new()
3054            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3055            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3056        ctx.insert_resource(SqliteDialect);
3057        ctx.insert_resource(executor);
3058
3059        let repo = ctx
3060            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3061            .unwrap();
3062        let rows = repo
3063            .fetch_entities::<Order>(
3064                &repo
3065                    .select()
3066                    .project("id")
3067                    .project("version")
3068                    .project("name"),
3069            )
3070            .unwrap();
3071
3072        assert_eq!(
3073            rows.data,
3074            vec![
3075                Order {
3076                    id: 1,
3077                    version: 1,
3078                    name: "o1".to_owned(),
3079                },
3080                Order {
3081                    id: 2,
3082                    version: 3,
3083                    name: "o2".to_owned(),
3084                }
3085            ]
3086        );
3087        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3088        assert_eq!(rows.versions(), vec![1, 3]);
3089    }
3090
3091    #[tokio::test(flavor = "multi_thread")]
3092    async fn sqlite_insert_generates_missing_id() {
3093        use sqlx::sqlite::SqlitePoolOptions;
3094
3095        let pool = SqlitePoolOptions::new()
3096            .max_connections(1)
3097            .connect("sqlite::memory:")
3098            .await
3099            .unwrap();
3100
3101        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3102        let order = entity();
3103        mutation_executor
3104            .ensure_schema(&SqliteDialect, &[&order])
3105            .await
3106            .unwrap();
3107
3108        let executor = SqliteSyncExecutor::new(mutation_executor);
3109        let mut ctx = UserContext::new()
3110            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3111            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3112        ctx.insert_resource(SqliteDialect);
3113        ctx.insert_resource(executor);
3114
3115        let repo = ctx
3116            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3117            .unwrap();
3118        let affected = repo
3119            .insert(
3120                &repo
3121                    .insert_command()
3122                    .value("version", 1_i64)
3123                    .value("name", "generated"),
3124            )
3125            .unwrap();
3126        assert_eq!(affected, 1);
3127
3128        let rows = repo
3129            .fetch_entities::<Order>(
3130                &repo
3131                    .select()
3132                    .project("id")
3133                    .project("version")
3134                    .project("name")
3135                    .filter(Expr::eq("name", "generated")),
3136            )
3137            .unwrap();
3138        assert_eq!(rows.len(), 1);
3139        let order = rows.first().unwrap();
3140        assert!(order.id > 0);
3141        assert_eq!(order.version, 1);
3142        assert_eq!(order.name, "generated");
3143    }
3144
3145    #[tokio::test(flavor = "multi_thread")]
3146    async fn sqlite_save_graph_inserts_nested_rows() {
3147        use sqlx::{Row, sqlite::SqlitePoolOptions};
3148
3149        let pool = SqlitePoolOptions::new()
3150            .max_connections(1)
3151            .connect("sqlite::memory:?cache=shared")
3152            .await
3153            .unwrap();
3154
3155        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3156        let order = entity();
3157        let line = line_entity();
3158        let product = product_entity();
3159        mutation_executor
3160            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3161            .await
3162            .unwrap();
3163
3164        let executor = SqliteSyncExecutor::new(mutation_executor);
3165        let mut ctx = UserContext::new()
3166            .with_metadata(
3167                InMemoryMetadataStore::new()
3168                    .with_entity(order)
3169                    .with_entity(line)
3170                    .with_entity(product),
3171            )
3172            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3173        ctx.insert_resource(SqliteDialect);
3174        ctx.insert_resource(executor);
3175
3176        let repo = ctx
3177            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3178            .unwrap();
3179        let graph = GraphNode::new("Order")
3180            .value("id", 1_u64)
3181            .value("version", 1_i64)
3182            .value("name", "root")
3183            .relation(
3184                "lines",
3185                GraphNode::new("OrderLine")
3186                    .value("id", 10_u64)
3187                    .value("name", "line-1")
3188                    .relation(
3189                        "product",
3190                        GraphNode::new("Product")
3191                            .value("id", 100_u64)
3192                            .value("name", "sku-1"),
3193                    ),
3194            );
3195
3196        let saved = repo.save_graph(graph).unwrap();
3197        assert_eq!(
3198            saved.relations["lines"][0].values.get("order_id"),
3199            Some(&Value::U64(1))
3200        );
3201        assert_eq!(
3202            saved.relations["lines"][0].values.get("product_id"),
3203            Some(&Value::U64(100))
3204        );
3205
3206        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3207            .fetch_one(&pool)
3208            .await
3209            .unwrap()
3210            .try_get("count")
3211            .unwrap();
3212        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3213            .fetch_one(&pool)
3214            .await
3215            .unwrap();
3216        let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3217            .fetch_one(&pool)
3218            .await
3219            .unwrap()
3220            .try_get("count")
3221            .unwrap();
3222
3223        assert_eq!(order_count, 1);
3224        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3225        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3226        assert_eq!(product_count, 1);
3227    }
3228
3229    #[tokio::test(flavor = "multi_thread")]
3230    async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3231        use sqlx::{Row, sqlite::SqlitePoolOptions};
3232
3233        let pool = SqlitePoolOptions::new()
3234            .max_connections(1)
3235            .connect("sqlite::memory:")
3236            .await
3237            .unwrap();
3238
3239        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3240        let order = entity();
3241        let line = line_entity();
3242        let product = product_entity();
3243        mutation_executor
3244            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3245            .await
3246            .unwrap();
3247
3248        let executor = SqliteSyncExecutor::new(mutation_executor);
3249        let mut ctx = UserContext::new()
3250            .with_metadata(
3251                InMemoryMetadataStore::new()
3252                    .with_entity(order)
3253                    .with_entity(line)
3254                    .with_entity(product),
3255            )
3256            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3257        ctx.insert_resource(SqliteDialect);
3258        ctx.insert_resource(executor);
3259
3260        let repo = ctx
3261            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3262            .unwrap();
3263        let saved = repo
3264            .save_entity_graph(OrderAggregateRow {
3265                id: 2,
3266                version: 1,
3267                name: "typed-root".to_owned(),
3268                lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3269                    id: 20,
3270                    order_id: 0,
3271                    name: "typed-line".to_owned(),
3272                    product_id: 200,
3273                    product: Some(ProductEntityRow {
3274                        id: 200,
3275                        name: "typed-sku".to_owned(),
3276                    }),
3277                }]),
3278            })
3279            .unwrap();
3280
3281        assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3282        assert_eq!(
3283            saved.relations["lines"][0].values.get("order_id"),
3284            Some(&Value::U64(2))
3285        );
3286        assert_eq!(
3287            saved.relations["lines"][0].values.get("product_id"),
3288            Some(&Value::U64(200))
3289        );
3290
3291        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3292            .fetch_one(&pool)
3293            .await
3294            .unwrap()
3295            .try_get("count")
3296            .unwrap();
3297        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3298            .fetch_one(&pool)
3299            .await
3300            .unwrap();
3301        let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3302            .fetch_one(&pool)
3303            .await
3304            .unwrap();
3305
3306        assert_eq!(order_count, 1);
3307        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3308        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3309        assert_eq!(product_name, "typed-sku");
3310    }
3311
3312    #[tokio::test(flavor = "multi_thread")]
3313    async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3314        use sqlx::sqlite::SqlitePoolOptions;
3315        use std::time::{SystemTime, UNIX_EPOCH};
3316
3317        let db_path = std::env::temp_dir().join(format!(
3318            "teaql-plan-{}-{}.db",
3319            std::process::id(),
3320            SystemTime::now()
3321                .duration_since(UNIX_EPOCH)
3322                .unwrap()
3323                .as_nanos()
3324        ));
3325        let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3326        let pool = SqlitePoolOptions::new()
3327            .max_connections(1)
3328            .connect(&db_url)
3329            .await
3330            .unwrap();
3331
3332        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3333        let order = entity();
3334        let line = line_entity();
3335        let product = product_entity();
3336        mutation_executor
3337            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3338            .await
3339            .unwrap();
3340
3341        sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3342            .execute(&pool)
3343            .await
3344            .unwrap();
3345        sqlx::query(
3346            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3347                (200, 1, 100, 301, 'line-a'),
3348                (201, 1, 100, 302, 'line-b')",
3349        )
3350        .execute(&pool)
3351        .await
3352        .unwrap();
3353        let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3354            .fetch_one(&pool)
3355            .await
3356            .unwrap();
3357        assert_eq!(seeded_lines, 2);
3358
3359        let executor = SqliteSyncExecutor::new(mutation_executor);
3360        let mut ctx = UserContext::new()
3361            .with_metadata(
3362                InMemoryMetadataStore::new()
3363                    .with_entity(order)
3364                    .with_entity(line)
3365                    .with_entity(product),
3366            )
3367            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3368            .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3369        ctx.insert_resource(SqliteDialect);
3370        ctx.insert_resource(executor);
3371
3372        let graph = GraphNode::new("Order")
3373            .value("id", 100_u64)
3374            .value("version", 1_i64)
3375            .value("name", "existing-updated")
3376            .relation(
3377                "lines",
3378                GraphNode::new("OrderLine")
3379                    .value("name", "new-line-a")
3380                    .value("product_id", 401_u64),
3381            )
3382            .relation(
3383                "lines",
3384                GraphNode::new("OrderLine")
3385                    .value("name", "new-line-b")
3386                    .value("product_id", 402_u64),
3387            )
3388            .relation(
3389                "lines",
3390                GraphNode::new("OrderLine")
3391                    .value("id", 200_u64)
3392                    .value("version", 1_i64)
3393                    .value("name", "line-a-updated"),
3394            )
3395            .relation(
3396                "lines",
3397                GraphNode::new("OrderLine")
3398                    .value("id", 201_u64)
3399                    .value("version", 1_i64)
3400                    .value("name", "line-b-updated"),
3401            );
3402
3403        let plan = ctx
3404            .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3405            .unwrap();
3406        let counts = plan.grouped_counts();
3407        assert_eq!(
3408            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3409            Some(&1)
3410        );
3411        assert_eq!(
3412            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3413            Some(&2)
3414        );
3415        assert_eq!(
3416            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3417            Some(&2)
3418        );
3419
3420        let create_batch = plan
3421            .batches
3422            .iter()
3423            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3424            .unwrap();
3425        assert_eq!(create_batch.items.len(), 2);
3426        assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3427        assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3428
3429        let update_batch = plan
3430            .batches
3431            .iter()
3432            .find(|batch| {
3433                batch.entity == "OrderLine"
3434                    && batch.kind == GraphMutationKind::Update
3435                    && batch.update_fields == vec!["name".to_owned()]
3436            })
3437            .unwrap();
3438        assert_eq!(update_batch.items.len(), 2);
3439
3440        let repo = ctx
3441            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3442            .unwrap();
3443        let saved = repo.execute_graph_plan(plan).unwrap();
3444        let lines = saved.relations.get("lines").unwrap();
3445        assert_eq!(lines.len(), 4);
3446
3447        let generated_count: i64 = sqlx::query_scalar(
3448            "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3449        )
3450        .fetch_one(&pool)
3451        .await
3452        .unwrap();
3453        assert_eq!(generated_count, 2);
3454        let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3455            .fetch_one(&pool)
3456            .await
3457            .unwrap();
3458        assert_eq!(updated_name, "line-a-updated");
3459        pool.close().await;
3460        let _ = std::fs::remove_file(db_path);
3461    }
3462
3463    #[tokio::test(flavor = "multi_thread")]
3464    async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3465        use sqlx::{Row, sqlite::SqlitePoolOptions};
3466
3467        let pool = SqlitePoolOptions::new()
3468            .max_connections(1)
3469            .connect("sqlite::memory:")
3470            .await
3471            .unwrap();
3472
3473        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3474        let order = entity();
3475        let line = line_entity();
3476        let product = product_entity();
3477        mutation_executor
3478            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3479            .await
3480            .unwrap();
3481
3482        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3483            .execute(&pool)
3484            .await
3485            .unwrap();
3486        sqlx::query(
3487            "INSERT INTO product (id, name) VALUES
3488                (100, 'old-sku'),
3489                (101, 'removed-sku')",
3490        )
3491        .execute(&pool)
3492        .await
3493        .unwrap();
3494        sqlx::query(
3495            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3496                (10, 1, 100, 'old-line'),
3497                (11, 1, 101, 'removed-line')",
3498        )
3499        .execute(&pool)
3500        .await
3501        .unwrap();
3502        sqlx::query("UPDATE orderline SET version = 1")
3503            .execute(&pool)
3504            .await
3505            .unwrap();
3506
3507        let executor = SqliteSyncExecutor::new(mutation_executor);
3508        let mut ctx = UserContext::new()
3509            .with_metadata(
3510                InMemoryMetadataStore::new()
3511                    .with_entity(order)
3512                    .with_entity(line)
3513                    .with_entity(product),
3514            )
3515            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3516        ctx.insert_resource(SqliteDialect);
3517        ctx.insert_resource(executor);
3518
3519        let repo = ctx
3520            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3521            .unwrap();
3522        let graph = GraphNode::new("Order")
3523            .value("id", 1_u64)
3524            .value("version", 1_i64)
3525            .value("name", "new-root")
3526            .relation(
3527                "lines",
3528                GraphNode::new("OrderLine")
3529                    .value("id", 10_u64)
3530                    .value("version", 1_i64)
3531                    .value("name", "new-line")
3532                    .relation(
3533                        "product",
3534                        GraphNode::new("Product")
3535                            .value("id", 100_u64)
3536                            .value("name", "new-sku"),
3537                    ),
3538            )
3539            .relation(
3540                "lines",
3541                GraphNode::new("OrderLine")
3542                    .value("id", 12_u64)
3543                    .value("version", 1_i64)
3544                    .value("name", "added-line")
3545                    .relation(
3546                        "product",
3547                        GraphNode::new("Product")
3548                            .value("id", 102_u64)
3549                            .value("name", "added-sku"),
3550                    ),
3551            );
3552
3553        let saved = repo.save_graph(graph).unwrap();
3554        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3555
3556        let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3557            .fetch_one(&pool)
3558            .await
3559            .unwrap();
3560        assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3561        assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3562
3563        let updated_line =
3564            sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3565                .fetch_one(&pool)
3566                .await
3567                .unwrap();
3568        assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3569        assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3570        assert_eq!(
3571            updated_line.try_get::<String, _>("name").unwrap(),
3572            "new-line"
3573        );
3574
3575        let added_line =
3576            sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3577                .fetch_one(&pool)
3578                .await
3579                .unwrap();
3580        assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3581        assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3582        assert_eq!(
3583            added_line.try_get::<String, _>("name").unwrap(),
3584            "added-line"
3585        );
3586
3587        let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3588            .fetch_one(&pool)
3589            .await
3590            .unwrap();
3591        assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3592
3593        let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3594            .fetch_one(&pool)
3595            .await
3596            .unwrap();
3597        assert_eq!(
3598            updated_product.try_get::<String, _>("name").unwrap(),
3599            "new-sku"
3600        );
3601    }
3602
3603    #[tokio::test(flavor = "multi_thread")]
3604    async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3605        use sqlx::{Row, sqlite::SqlitePoolOptions};
3606
3607        let pool = SqlitePoolOptions::new()
3608            .max_connections(1)
3609            .connect("sqlite::memory:")
3610            .await
3611            .unwrap();
3612
3613        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3614        let order = sqlite_entity_keep_missing();
3615        let line = line_entity();
3616        let product = product_entity();
3617        mutation_executor
3618            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3619            .await
3620            .unwrap();
3621
3622        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3623            .execute(&pool)
3624            .await
3625            .unwrap();
3626        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3627            .execute(&pool)
3628            .await
3629            .unwrap();
3630        sqlx::query(
3631            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3632                (10, 1, 1, 100, 'remove-me'),
3633                (11, 1, 1, 100, 'keep-me')",
3634        )
3635        .execute(&pool)
3636        .await
3637        .unwrap();
3638
3639        let executor = SqliteSyncExecutor::new(mutation_executor);
3640        let mut ctx = UserContext::new()
3641            .with_metadata(
3642                InMemoryMetadataStore::new()
3643                    .with_entity(order)
3644                    .with_entity(line)
3645                    .with_entity(product),
3646            )
3647            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3648        ctx.insert_resource(SqliteDialect);
3649        ctx.insert_resource(executor);
3650
3651        let repo = ctx
3652            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3653            .unwrap();
3654        let graph = GraphNode::new("Order")
3655            .value("id", 1_u64)
3656            .value("version", 1_i64)
3657            .value("name", "root-updated")
3658            .relation(
3659                "lines",
3660                GraphNode::new("OrderLine")
3661                    .value("id", 10_u64)
3662                    .value("version", 1_i64)
3663                    .remove(),
3664            )
3665            .relation(
3666                "lines",
3667                GraphNode::new("OrderLine")
3668                    .value("id", 12_u64)
3669                    .value("version", 1_i64)
3670                    .value("name", "new-reference-line")
3671                    .relation(
3672                        "product",
3673                        GraphNode::new("Product").value("id", 100_u64).reference(),
3674                    ),
3675            );
3676
3677        let saved = repo.save_graph(graph).unwrap();
3678        assert_eq!(
3679            saved.relations["lines"][1].relations["product"][0]
3680                .values
3681                .get("name"),
3682            Some(&Value::Text("reference-only".to_owned()))
3683        );
3684
3685        let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3686            .fetch_one(&pool)
3687            .await
3688            .unwrap();
3689        assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3690
3691        let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3692            .fetch_one(&pool)
3693            .await
3694            .unwrap();
3695        assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3696        assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3697
3698        let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3699            .fetch_one(&pool)
3700            .await
3701            .unwrap();
3702        assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3703
3704        let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3705            .fetch_one(&pool)
3706            .await
3707            .unwrap();
3708        assert_eq!(
3709            product.try_get::<String, _>("name").unwrap(),
3710            "reference-only"
3711        );
3712    }
3713
3714    #[tokio::test(flavor = "multi_thread")]
3715    async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3716        use sqlx::sqlite::SqlitePoolOptions;
3717
3718        let pool = SqlitePoolOptions::new()
3719            .max_connections(1)
3720            .connect("sqlite::memory:")
3721            .await
3722            .unwrap();
3723
3724        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3725        let order = entity();
3726        let line = line_entity();
3727        let product = product_entity();
3728        mutation_executor
3729            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3730            .await
3731            .unwrap();
3732
3733        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3734            .execute(&pool)
3735            .await
3736            .unwrap();
3737        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3738            .execute(&pool)
3739            .await
3740            .unwrap();
3741        sqlx::query(
3742            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3743                (10, 1, 1, 100, 'line-10')",
3744        )
3745        .execute(&pool)
3746        .await
3747        .unwrap();
3748
3749        let executor = SqliteSyncExecutor::new(mutation_executor);
3750        let mut ctx = UserContext::new()
3751            .with_metadata(
3752                InMemoryMetadataStore::new()
3753                    .with_entity(order)
3754                    .with_entity(line)
3755                    .with_entity(product),
3756            )
3757            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3758        ctx.insert_resource(SqliteDialect);
3759        ctx.insert_resource(executor);
3760
3761        let repo = ctx
3762            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3763            .unwrap();
3764
3765        let missing_reference = repo.save_graph(
3766            GraphNode::new("Order").value("id", 1_u64).relation(
3767                "lines",
3768                GraphNode::new("OrderLine")
3769                    .value("id", 12_u64)
3770                    .value("version", 1_i64)
3771                    .value("name", "bad-reference")
3772                    .relation(
3773                        "product",
3774                        GraphNode::new("Product").value("id", 999_u64).reference(),
3775                    ),
3776            ),
3777        );
3778        assert!(format!("{missing_reference:?}").contains("does not exist"));
3779
3780        let mutable_reference = repo.save_graph(
3781            GraphNode::new("Order").value("id", 1_u64).relation(
3782                "lines",
3783                GraphNode::new("OrderLine")
3784                    .value("id", 12_u64)
3785                    .value("version", 1_i64)
3786                    .value("name", "mutable-reference")
3787                    .relation(
3788                        "product",
3789                        GraphNode::new("Product")
3790                            .value("id", 100_u64)
3791                            .value("name", "should-not-mutate")
3792                            .reference(),
3793                    ),
3794            ),
3795        );
3796        assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3797
3798        let duplicate_child = repo.save_graph(
3799            GraphNode::new("Order")
3800                .value("id", 1_u64)
3801                .relation(
3802                    "lines",
3803                    GraphNode::new("OrderLine")
3804                        .value("id", 10_u64)
3805                        .value("version", 1_i64)
3806                        .reference(),
3807                )
3808                .relation(
3809                    "lines",
3810                    GraphNode::new("OrderLine")
3811                        .value("id", 10_u64)
3812                        .value("version", 1_i64)
3813                        .reference(),
3814                ),
3815        );
3816        assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3817
3818        let reference_version_conflict = repo.save_graph(
3819            GraphNode::new("Order").value("id", 1_u64).relation(
3820                "lines",
3821                GraphNode::new("OrderLine")
3822                    .value("id", 10_u64)
3823                    .value("version", 2_i64)
3824                    .reference(),
3825            ),
3826        );
3827        assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3828
3829        let remove_with_child = repo.save_graph(
3830            GraphNode::new("Order").value("id", 1_u64).relation(
3831                "lines",
3832                GraphNode::new("OrderLine")
3833                    .value("id", 10_u64)
3834                    .value("version", 1_i64)
3835                    .relation(
3836                        "product",
3837                        GraphNode::new("Product").value("id", 100_u64).reference(),
3838                    )
3839                    .remove(),
3840            ),
3841        );
3842        assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3843
3844        let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3845        assert!(format!("{remove:?}").contains("does not exist"));
3846    }
3847
3848    #[tokio::test(flavor = "multi_thread")]
3849    async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3850        use sqlx::{Row, sqlite::SqlitePoolOptions};
3851
3852        let pool = SqlitePoolOptions::new()
3853            .max_connections(1)
3854            .connect("sqlite::memory:")
3855            .await
3856            .unwrap();
3857
3858        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3859        let order = entity();
3860        let line = line_entity();
3861        let product = product_entity();
3862        mutation_executor
3863            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3864            .await
3865            .unwrap();
3866
3867        let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3868        let mut ctx = UserContext::new()
3869            .with_metadata(
3870                InMemoryMetadataStore::new()
3871                    .with_entity(order)
3872                    .with_entity(line)
3873                    .with_entity(product),
3874            )
3875            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3876        ctx.insert_resource(SqliteDialect);
3877        ctx.insert_resource(executor);
3878
3879        let repo = ctx
3880            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3881            .unwrap();
3882        let error = repo.save_graph(
3883            GraphNode::new("Order")
3884                .value("id", 1_u64)
3885                .value("version", 1_i64)
3886                .value("name", "rollback-root")
3887                .relation(
3888                    "lines",
3889                    GraphNode::new("OrderLine")
3890                        .value("id", 10_u64)
3891                        .value("version", 1_i64)
3892                        .value("name", "rollback-line")
3893                        .value("product_id", 999_u64)
3894                        .relation(
3895                            "product",
3896                            GraphNode::new("Product").value("id", 999_u64).reference(),
3897                        ),
3898                ),
3899        );
3900        assert!(format!("{error:?}").contains("does not exist"));
3901
3902        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3903            .fetch_one(&pool)
3904            .await
3905            .unwrap()
3906            .try_get("count")
3907            .unwrap();
3908        assert_eq!(count, 0);
3909    }
3910
3911    #[tokio::test]
3912    async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3913        use sqlx::{PgPool, Row};
3914
3915        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
3916            return;
3917        };
3918
3919        let pool = PgPool::connect(&database_url).await.unwrap();
3920        sqlx::query("DROP TABLE IF EXISTS orderline")
3921            .execute(&pool)
3922            .await
3923            .unwrap();
3924        sqlx::query("DROP TABLE IF EXISTS orders")
3925            .execute(&pool)
3926            .await
3927            .unwrap();
3928
3929        let executor = PgMutationExecutor::new(pool.clone());
3930        let dialect = PostgresDialect;
3931        let order = entity();
3932        let line = line_entity();
3933
3934        executor
3935            .ensure_schema(&dialect, &[&order, &line])
3936            .await
3937            .unwrap();
3938
3939        let tables = sqlx::query(
3940            "SELECT table_name
3941             FROM information_schema.tables
3942             WHERE table_schema = current_schema()
3943               AND table_name IN ('orders', 'orderline')
3944             ORDER BY table_name",
3945        )
3946        .fetch_all(&pool)
3947        .await
3948        .unwrap()
3949        .into_iter()
3950        .map(|row| row.try_get::<String, _>("table_name").unwrap())
3951        .collect::<Vec<_>>();
3952        assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
3953
3954        let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
3955            .fetch_one(&pool)
3956            .await
3957            .unwrap();
3958        assert_eq!(soundex, "R163");
3959
3960        sqlx::query(
3961            "INSERT INTO orders (id, version, name) VALUES
3962                (1, 1, 'draft'),
3963                (2, 1, 'submitted'),
3964                (3, 1, 'archived')",
3965        )
3966        .execute(&pool)
3967        .await
3968        .unwrap();
3969
3970        let array_bound_query = dialect
3971            .compile_select(
3972                &order,
3973                &SelectQuery::new("Order")
3974                    .filter(
3975                        Expr::in_large(
3976                            "id",
3977                            vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
3978                        )
3979                        .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
3980                    )
3981                    .order_asc("id"),
3982            )
3983            .unwrap();
3984        assert_eq!(
3985            array_bound_query.sql,
3986            "SELECT * FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
3987        );
3988        let rows = executor.fetch_all(&array_bound_query).await.unwrap();
3989        assert_eq!(rows.len(), 2);
3990        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
3991        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
3992
3993        sqlx::query(
3994            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3995                (11, 1, 1, 101, 'line-1'),
3996                (12, 1, 2, 102, 'line-2'),
3997                (13, 1, 3, 103, 'archived-line')",
3998        )
3999        .execute(&pool)
4000        .await
4001        .unwrap();
4002
4003        let subquery = dialect
4004            .compile_select(
4005                &order,
4006                &SelectQuery::new("Order")
4007                    .filter(Expr::in_subquery(
4008                        "id",
4009                        line.clone(),
4010                        SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4011                        "order_id",
4012                    ))
4013                    .order_asc("id"),
4014            )
4015            .unwrap();
4016        assert_eq!(
4017            subquery.sql,
4018            "SELECT * FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4019        );
4020        let rows = executor.fetch_all(&subquery).await.unwrap();
4021        assert_eq!(rows.len(), 2);
4022        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4023        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4024
4025        let projected = dialect
4026            .compile_select(
4027                &order,
4028                &SelectQuery::new("Order")
4029                    .project("id")
4030                    .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4031                    .order_gbk_asc("name")
4032                    .limit(1),
4033            )
4034            .unwrap();
4035        assert_eq!(
4036            projected.sql,
4037            "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4038        );
4039        let rows = executor.fetch_all(&projected).await.unwrap();
4040        assert_eq!(rows.len(), 1);
4041        assert!(rows[0].contains_key("nameSound"));
4042
4043        let aggregate = dialect
4044            .compile_select(
4045                &order,
4046                &SelectQuery::new("Order")
4047                    .count("total")
4048                    .stddev("version", "stddevVersion")
4049                    .var_pop("version", "varPopVersion")
4050                    .bit_or("version", "bitOrVersion")
4051                    .having(Expr::binary(
4052                        Expr::count_all(),
4053                        teaql_core::BinaryOp::Gt,
4054                        Expr::value(2_i64),
4055                    )),
4056            )
4057            .unwrap();
4058        assert_eq!(
4059            aggregate.sql,
4060            "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4061        );
4062        let rows = executor.fetch_all(&aggregate).await.unwrap();
4063        assert_eq!(rows.len(), 1);
4064        assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4065        assert!(matches!(
4066            rows[0].get("stddevVersion"),
4067            Some(Value::Decimal(_))
4068        ));
4069        assert!(matches!(
4070            rows[0].get("varPopVersion"),
4071            Some(Value::Decimal(_))
4072        ));
4073        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4074
4075        sqlx::query("DROP TABLE orderline")
4076            .execute(&pool)
4077            .await
4078            .unwrap();
4079        sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4080            .execute(&pool)
4081            .await
4082            .unwrap();
4083
4084        executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4085
4086        let columns = sqlx::query(
4087            "SELECT column_name
4088             FROM information_schema.columns
4089             WHERE table_schema = current_schema()
4090               AND table_name = 'orderline'
4091             ORDER BY column_name",
4092        )
4093        .fetch_all(&pool)
4094        .await
4095        .unwrap()
4096        .into_iter()
4097        .map(|row| row.try_get::<String, _>("column_name").unwrap())
4098        .collect::<Vec<_>>();
4099        assert!(columns.contains(&"id".to_owned()));
4100        assert!(columns.contains(&"order_id".to_owned()));
4101        assert!(columns.contains(&"product_id".to_owned()));
4102        assert!(columns.contains(&"name".to_owned()));
4103
4104        sqlx::query("DROP TABLE IF EXISTS orderline")
4105            .execute(&pool)
4106            .await
4107            .unwrap();
4108        sqlx::query("DROP TABLE IF EXISTS orders")
4109            .execute(&pool)
4110            .await
4111            .unwrap();
4112    }
4113
4114    #[tokio::test(flavor = "multi_thread")]
4115    async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4116        use sqlx::{PgPool, Row};
4117
4118        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4119            return;
4120        };
4121
4122        let pool = PgPool::connect(&database_url).await.unwrap();
4123        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4124            .execute(&pool)
4125            .await
4126            .unwrap();
4127        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4128            .execute(&pool)
4129            .await
4130            .unwrap();
4131
4132        let order = entity().table_name("orders_idgen");
4133        let executor = PgMutationExecutor::new(pool.clone());
4134        executor
4135            .ensure_schema(&PostgresDialect, &[&order])
4136            .await
4137            .unwrap();
4138
4139        let mut ctx = UserContext::new()
4140            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4141            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4142            .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4143        ctx.insert_resource(PostgresDialect);
4144        ctx.insert_resource(PgSyncExecutor::new(executor));
4145
4146        let repo = ctx
4147            .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4148            .unwrap();
4149        repo.insert(
4150            &repo
4151                .insert_command()
4152                .value("version", 1_i64)
4153                .value("name", "generated-1"),
4154        )
4155        .unwrap();
4156        repo.insert(
4157            &repo
4158                .insert_command()
4159                .value("version", 1_i64)
4160                .value("name", "generated-2"),
4161        )
4162        .unwrap();
4163
4164        let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4165            .fetch_all(&pool)
4166            .await
4167            .unwrap()
4168            .into_iter()
4169            .map(|row| row.try_get::<i64, _>("id").unwrap())
4170            .collect::<Vec<_>>();
4171        assert_eq!(ids, vec![1, 2]);
4172
4173        let current: i64 = sqlx::query_scalar(
4174            "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4175        )
4176        .fetch_one(&pool)
4177        .await
4178        .unwrap();
4179        assert_eq!(current, 2);
4180
4181        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4182            .execute(&pool)
4183            .await
4184            .unwrap();
4185        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4186            .execute(&pool)
4187            .await
4188            .unwrap();
4189    }
4190
4191    #[tokio::test]
4192    async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4193        use sqlx::PgPool;
4194
4195        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4196            return;
4197        };
4198
4199        let pool = PgPool::connect(&database_url).await.unwrap();
4200        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4201            .execute(&pool)
4202            .await
4203            .unwrap();
4204
4205        let order = entity().table_name("orders_agg");
4206        let executor = PgMutationExecutor::new(pool.clone());
4207        let dialect = PostgresDialect;
4208        executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4209
4210        sqlx::query(
4211            "INSERT INTO orders_agg (id, version, name) VALUES
4212                (1, 1, 'A'),
4213                (2, 2, 'A'),
4214                (3, 3, 'B'),
4215                (4, 4, 'B')",
4216        )
4217        .execute(&pool)
4218        .await
4219        .unwrap();
4220
4221        let aggregate = dialect
4222            .compile_select(
4223                &order,
4224                &SelectQuery::new("Order")
4225                    .count("total")
4226                    .count_field("version", "versionCount")
4227                    .sum("version", "sumVersion")
4228                    .avg("version", "avgVersion")
4229                    .min("version", "minVersion")
4230                    .max("version", "maxVersion")
4231                    .stddev("version", "stddevVersion")
4232                    .stddev_pop("version", "stddevPopVersion")
4233                    .var_samp("version", "varSampVersion")
4234                    .var_pop("version", "varPopVersion")
4235                    .bit_and("version", "bitAndVersion")
4236                    .bit_or("version", "bitOrVersion")
4237                    .bit_xor("version", "bitXorVersion")
4238                    .having(Expr::binary(
4239                        Expr::count_all(),
4240                        teaql_core::BinaryOp::Gt,
4241                        Expr::value(3_i64),
4242                    )),
4243            )
4244            .unwrap();
4245        let rows = executor.fetch_all(&aggregate).await.unwrap();
4246        assert_eq!(rows.len(), 1);
4247        let row = &rows[0];
4248        assert_eq!(row.get("total"), Some(&Value::I64(4)));
4249        assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4250        assert_eq!(
4251            row.get("sumVersion"),
4252            Some(&Value::Decimal(Decimal::new(10, 0)))
4253        );
4254        assert_eq!(
4255            row.get("avgVersion"),
4256            Some(&Value::Decimal(Decimal::new(25, 1)))
4257        );
4258        assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4259        assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4260        assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4261        assert!(matches!(
4262            row.get("stddevPopVersion"),
4263            Some(Value::Decimal(_))
4264        ));
4265        assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4266        assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4267        assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4268        assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4269        assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4270
4271        let grouped = dialect
4272            .compile_select(
4273                &order,
4274                &SelectQuery::new("Order")
4275                    .group_by("name")
4276                    .count("total")
4277                    .sum("version", "sumVersion")
4278                    .having(Expr::binary(
4279                        Expr::count_all(),
4280                        teaql_core::BinaryOp::Gt,
4281                        Expr::value(1_i64),
4282                    ))
4283                    .order_asc("name"),
4284            )
4285            .unwrap();
4286        let rows = executor.fetch_all(&grouped).await.unwrap();
4287        assert_eq!(rows.len(), 2);
4288        assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4289        assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4290        assert_eq!(
4291            rows[0].get("sumVersion"),
4292            Some(&Value::Decimal(Decimal::new(3, 0)))
4293        );
4294        assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4295        assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4296        assert_eq!(
4297            rows[1].get("sumVersion"),
4298            Some(&Value::Decimal(Decimal::new(7, 0)))
4299        );
4300
4301        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4302            .execute(&pool)
4303            .await
4304            .unwrap();
4305    }
4306
4307    #[tokio::test]
4308    async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4309        use sqlx::{PgPool, Row};
4310
4311        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4312            return;
4313        };
4314
4315        let pool = PgPool::connect(&database_url).await.unwrap();
4316        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4317            .execute(&pool)
4318            .await
4319            .unwrap();
4320        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4321            .execute(&pool)
4322            .await
4323            .unwrap();
4324        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4325            .execute(&pool)
4326            .await
4327            .unwrap();
4328
4329        let order = entity().table_name("orders_ctx");
4330        let line = line_entity().table_name("orderline_ctx");
4331        let product = product_entity().table_name("product_ctx");
4332        let module = super::RuntimeModule::new()
4333            .descriptor(order)
4334            .descriptor(line)
4335            .descriptor(product);
4336        let mut ctx = super::UserContext::new().with_module(module);
4337        ctx.insert_resource(PostgresDialect);
4338        ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4339
4340        ctx.ensure_postgres_schema().await.unwrap();
4341
4342        let tables = sqlx::query(
4343            "SELECT table_name
4344             FROM information_schema.tables
4345             WHERE table_schema = current_schema()
4346               AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4347             ORDER BY table_name",
4348        )
4349        .fetch_all(&pool)
4350        .await
4351        .unwrap()
4352        .into_iter()
4353        .map(|row| row.try_get::<String, _>("table_name").unwrap())
4354        .collect::<Vec<_>>();
4355
4356        assert_eq!(
4357            tables,
4358            vec![
4359                "orderline_ctx".to_owned(),
4360                "orders_ctx".to_owned(),
4361                "product_ctx".to_owned()
4362            ]
4363        );
4364
4365        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4366            .execute(&pool)
4367            .await
4368            .unwrap();
4369        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4370            .execute(&pool)
4371            .await
4372            .unwrap();
4373        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4374            .execute(&pool)
4375            .await
4376            .unwrap();
4377    }
4378
4379    #[tokio::test(flavor = "multi_thread")]
4380    async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4381        use sqlx::{PgPool, Row};
4382
4383        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4384            return;
4385        };
4386
4387        let pool = PgPool::connect(&database_url).await.unwrap();
4388        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4389            .execute(&pool)
4390            .await
4391            .unwrap();
4392        sqlx::query("DROP TABLE IF EXISTS product_tx")
4393            .execute(&pool)
4394            .await
4395            .unwrap();
4396        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4397            .execute(&pool)
4398            .await
4399            .unwrap();
4400
4401        let order = entity().table_name("orders_tx");
4402        let line = line_entity().table_name("orderline_tx");
4403        let product = product_entity().table_name("product_tx");
4404        let schema_executor = PgMutationExecutor::new(pool.clone());
4405        schema_executor
4406            .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4407            .await
4408            .unwrap();
4409
4410        let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4411        let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4412        let mut ctx = UserContext::new()
4413            .with_metadata(
4414                InMemoryMetadataStore::new()
4415                    .with_entity(order)
4416                    .with_entity(line)
4417                    .with_entity(product),
4418            )
4419            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4420        ctx.insert_resource(PostgresDialect);
4421        ctx.insert_resource(sync_executor);
4422
4423        let repo = ctx
4424            .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4425            .unwrap();
4426        repo.save_graph(
4427            GraphNode::new("Order")
4428                .value("id", 1_u64)
4429                .value("version", 1_i64)
4430                .value("name", "pg-rollback-root")
4431                .relation(
4432                    "lines",
4433                    GraphNode::new("OrderLine")
4434                        .value("id", 10_u64)
4435                        .value("version", 1_i64)
4436                        .value("name", "pg-rollback-line")
4437                        .relation(
4438                            "product",
4439                            GraphNode::new("Product")
4440                                .value("id", 100_u64)
4441                                .value("name", "pg-rollback-sku"),
4442                        ),
4443                ),
4444        )
4445        .unwrap();
4446
4447        tx_executor.rollback().await.unwrap();
4448
4449        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4450            .fetch_one(&pool)
4451            .await
4452            .unwrap()
4453            .try_get("count")
4454            .unwrap();
4455        assert_eq!(count, 0);
4456
4457        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4458            .execute(&pool)
4459            .await
4460            .unwrap();
4461        sqlx::query("DROP TABLE IF EXISTS product_tx")
4462            .execute(&pool)
4463            .await
4464            .unwrap();
4465        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4466            .execute(&pool)
4467            .await
4468            .unwrap();
4469    }
4470}
4471pub use checker::{
4472    CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4473    CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4474    mark_record_status,
4475};