Skip to main content

teaql_runtime/
lib.rs

1mod checker;
2mod context;
3mod entity_runtime;
4mod error;
5mod event;
6mod graph;
7mod id;
8mod language;
9mod memory;
10mod registry;
11mod repository;
12
13pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
14pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
15pub use error::{ContextError, RepositoryError, RuntimeError};
16pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
17pub use graph::{
18    GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
19    GraphOperation, sorted_update_fields,
20};
21pub(crate) use id::local_id_generator;
22pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
23pub use language::{
24    BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
25};
26pub use memory::{MemoryRepository, MemoryRepositoryError};
27pub use registry::{
28    InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
29    MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
30    RuntimeModule,
31};
32pub use repository::{
33    AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
34    QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
35};
36
37#[cfg(feature = "sqlx")]
38pub mod sqlx_support;
39
40#[cfg(test)]
41mod tests {
42    use std::collections::{BTreeMap, VecDeque};
43    use std::sync::{Arc, Mutex};
44
45    use super::{
46        AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
47        Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
48        GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
49        InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
50        InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
51        QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
52        RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
53    };
54    use teaql_core::{
55        Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
56        EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
57        RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
58    };
59    use teaql_dialect_pg::PostgresDialect;
60    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
61    use teaql_sql::CompiledQuery;
62
63    const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
64
65    fn entity() -> EntityDescriptor {
66        EntityDescriptor::new("Order")
67            .table_name("orders")
68            .property(
69                PropertyDescriptor::new("id", DataType::U64)
70                    .column_name("id")
71                    .id()
72                    .not_null(),
73            )
74            .property(
75                PropertyDescriptor::new("version", DataType::I64)
76                    .column_name("version")
77                    .version()
78                    .not_null(),
79            )
80            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
81            .relation(
82                teaql_core::RelationDescriptor::new("lines", "OrderLine")
83                    .local_key("id")
84                    .foreign_key("order_id")
85                    .many(),
86            )
87    }
88
89    fn line_entity() -> EntityDescriptor {
90        EntityDescriptor::new("OrderLine")
91            .table_name("orderline")
92            .property(
93                PropertyDescriptor::new("id", DataType::U64)
94                    .column_name("id")
95                    .id()
96                    .not_null(),
97            )
98            .property(
99                PropertyDescriptor::new("version", DataType::I64)
100                    .column_name("version")
101                    .version(),
102            )
103            .property(
104                PropertyDescriptor::new("order_id", DataType::U64)
105                    .column_name("order_id")
106                    .not_null(),
107            )
108            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
109            .property(
110                PropertyDescriptor::new("product_id", DataType::U64)
111                    .column_name("product_id")
112                    .not_null(),
113            )
114            .relation(
115                teaql_core::RelationDescriptor::new("product", "Product")
116                    .local_key("product_id")
117                    .foreign_key("id"),
118            )
119    }
120
121    fn product_entity() -> EntityDescriptor {
122        EntityDescriptor::new("Product")
123            .table_name("product")
124            .property(
125                PropertyDescriptor::new("id", DataType::U64)
126                    .column_name("id")
127                    .id()
128                    .not_null(),
129            )
130            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
131    }
132
133    #[derive(Debug, Default)]
134    struct StubExecutor {
135        affected: u64,
136        rows: Vec<Record>,
137    }
138
139    #[derive(Debug, Default)]
140    struct QueueExecutor {
141        affected: u64,
142        rows: Mutex<VecDeque<Vec<Record>>>,
143        queries: Mutex<Vec<String>>,
144    }
145
146    struct OrderBehavior;
147
148    #[allow(dead_code)]
149    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
150    #[teaql(entity = "CatalogProduct", table = "catalog_product")]
151    struct CatalogProductRow {
152        #[teaql(id)]
153        id: u64,
154        name: String,
155    }
156
157    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
158    #[teaql(entity = "OrderAggregate", table = "orders")]
159    struct OrderAggregateDynamic {
160        #[teaql(id)]
161        id: u64,
162        #[teaql(dynamic)]
163        dynamic: BTreeMap<String, Value>,
164    }
165
166    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
167    #[teaql(entity = "Product", table = "product")]
168    struct ProductEntityRow {
169        #[teaql(id)]
170        id: u64,
171        name: String,
172    }
173
174    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
175    #[teaql(entity = "OrderLine", table = "orderline")]
176    struct OrderLineEntityRow {
177        #[teaql(id)]
178        id: u64,
179        #[teaql(column = "order_id")]
180        order_id: u64,
181        name: String,
182        #[teaql(column = "product_id")]
183        product_id: u64,
184        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
185        product: Option<ProductEntityRow>,
186    }
187
188    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
189    #[teaql(entity = "Order", table = "orders")]
190    struct OrderAggregateRow {
191        #[teaql(id)]
192        id: u64,
193        #[teaql(version)]
194        version: i64,
195        name: String,
196        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
197        lines: teaql_core::SmartList<OrderLineEntityRow>,
198    }
199
200    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
201    #[teaql(entity = "Order", table = "orders")]
202    struct Order {
203        #[teaql(id)]
204        id: u64,
205        #[teaql(version)]
206        version: i64,
207        name: String,
208    }
209
210    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
211    #[teaql(entity = "Product", table = "product")]
212    struct TypedGraphProduct {
213        #[teaql(id)]
214        id: Option<u64>,
215        name: String,
216    }
217
218    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
219    #[teaql(entity = "OrderLine", table = "orderline")]
220    struct TypedGraphLine {
221        #[teaql(id)]
222        id: Option<u64>,
223        #[teaql(column = "order_id")]
224        order_id: Option<u64>,
225        name: String,
226        #[teaql(column = "product_id")]
227        product_id: Option<u64>,
228        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
229        product: Option<TypedGraphProduct>,
230    }
231
232    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
233    #[teaql(entity = "Order", table = "orders")]
234    struct TypedGraphOrder {
235        #[teaql(id)]
236        id: Option<u64>,
237        #[teaql(version)]
238        version: i64,
239        name: String,
240        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
241        lines: teaql_core::SmartList<TypedGraphLine>,
242    }
243
244    #[derive(Debug, PartialEq, Eq)]
245    struct OrderEntity {
246        id: u64,
247        version: i64,
248        name: String,
249    }
250
251    impl teaql_core::TeaqlEntity for OrderEntity {
252        fn entity_descriptor() -> EntityDescriptor {
253            entity()
254        }
255    }
256
257    impl Entity for OrderEntity {
258        fn from_record(record: Record) -> Result<Self, EntityError> {
259            let id = match record.get("id") {
260                Some(Value::U64(v)) => *v,
261                Some(Value::I64(v)) if *v >= 0 => *v as u64,
262                other => {
263                    return Err(EntityError::new(
264                        "Order",
265                        format!("invalid id field: {other:?}"),
266                    ));
267                }
268            };
269            let version = match record.get("version") {
270                Some(Value::I64(v)) => *v,
271                other => {
272                    return Err(EntityError::new(
273                        "Order",
274                        format!("invalid version field: {other:?}"),
275                    ));
276                }
277            };
278            let name = match record.get("name") {
279                Some(Value::Text(v)) => v.clone(),
280                other => {
281                    return Err(EntityError::new(
282                        "Order",
283                        format!("invalid name field: {other:?}"),
284                    ));
285                }
286            };
287            Ok(Self { id, version, name })
288        }
289
290        fn into_record(self) -> Record {
291            Record::from([
292                (String::from("id"), Value::U64(self.id)),
293                (String::from("version"), Value::I64(self.version)),
294                (String::from("name"), Value::Text(self.name)),
295            ])
296        }
297    }
298
299    #[derive(Debug)]
300    struct StubError;
301
302    impl std::fmt::Display for StubError {
303        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304            write!(f, "stub error")
305        }
306    }
307
308    impl std::error::Error for StubError {}
309
310    impl QueryExecutor for StubExecutor {
311        type Error = StubError;
312
313        fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
314            Ok(self.rows.clone())
315        }
316
317        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
318            Ok(self.affected)
319        }
320
321        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
322            Ok(GraphTransactionBoundary::Started)
323        }
324    }
325
326    impl QueryExecutor for QueueExecutor {
327        type Error = StubError;
328
329        fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
330            self.queries.lock().unwrap().push(query.sql.clone());
331            Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
332        }
333
334        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
335            Ok(self.affected)
336        }
337    }
338
339    #[test]
340    fn user_context_records_configured_sql_logs() {
341        let mut ctx = UserContext::new()
342            .with_module(crate::module!(Order))
343            .with_sql_log_options(SqlLogOptions::select_only());
344        ctx.insert_resource(PostgresDialect);
345        ctx.insert_resource(StubExecutor {
346            affected: 1,
347            rows: Vec::new(),
348        });
349
350        {
351            let repo = ctx
352                .resolve_repository::<PostgresDialect, StubExecutor>("Order")
353                .unwrap();
354            repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
355                .unwrap();
356            repo.insert(&InsertCommand::new("Order").value("name", "created"))
357                .unwrap();
358        }
359
360        let logs = ctx.sql_logs();
361        assert_eq!(logs.len(), 1);
362        assert_eq!(logs[0].operation, SqlLogOperation::Select);
363        assert_eq!(
364            logs[0].debug_sql,
365            format!(
366                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
367            )
368        );
369
370        ctx.set_sql_log_options(SqlLogOptions::mutation_only());
371        ctx.clear_sql_logs();
372        ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
373            .unwrap()
374            .update(
375                &UpdateCommand::new("Order", 1_u64)
376                    .value("name", "updated")
377                    .expected_version(1),
378            )
379            .unwrap();
380
381        let logs = ctx.sql_logs();
382        assert_eq!(logs.len(), 1);
383        assert_eq!(logs[0].operation, SqlLogOperation::Update);
384        assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
385        assert!(logs[0].debug_sql.contains("'updated'"));
386    }
387
388    impl RepositoryBehavior for OrderBehavior {
389        fn before_select(
390            &self,
391            _ctx: &UserContext,
392            query: &mut teaql_core::SelectQuery,
393        ) -> Result<(), RuntimeError> {
394            query.filter = Some(Expr::eq("version", 1_i64));
395            Ok(())
396        }
397
398        fn before_insert(
399            &self,
400            _ctx: &UserContext,
401            command: &mut InsertCommand,
402        ) -> Result<(), RuntimeError> {
403            command
404                .values
405                .entry("version".to_owned())
406                .or_insert(Value::I64(1));
407            Ok(())
408        }
409
410        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
411            vec!["lines".to_owned()]
412        }
413    }
414
415    struct ContextAwareOrderBehavior;
416    struct OrderChecker;
417    #[derive(Clone)]
418    struct RecordingEventSink {
419        events: Arc<Mutex<Vec<EntityEvent>>>,
420    }
421
422    impl RepositoryBehavior for ContextAwareOrderBehavior {
423        fn before_insert(
424            &self,
425            ctx: &UserContext,
426            command: &mut InsertCommand,
427        ) -> Result<(), RuntimeError> {
428            let tenant = ctx
429                .get_named_resource::<String>("tenant")
430                .cloned()
431                .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
432            let version = *ctx
433                .get_named_resource::<i64>("initial_version")
434                .ok_or_else(|| {
435                    RuntimeError::Behavior("missing initial_version resource".to_owned())
436                })?;
437            let trace_id = match ctx.local("trace_id") {
438                Some(Value::Text(value)) => value.clone(),
439                other => {
440                    return Err(RuntimeError::Behavior(format!(
441                        "missing trace_id local, got {other:?}"
442                    )));
443                }
444            };
445
446            command
447                .values
448                .entry("name".to_owned())
449                .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
450            command
451                .values
452                .entry("version".to_owned())
453                .or_insert(Value::I64(version));
454            Ok(())
455        }
456    }
457
458    impl Checker for OrderChecker {
459        fn entity(&self) -> &str {
460            "Order"
461        }
462
463        fn check_and_fix(
464            &self,
465            _ctx: &UserContext,
466            record: &mut Record,
467            location: &ObjectLocation,
468            results: &mut CheckResults,
469        ) {
470            let status = CheckObjectStatus::from_record(record);
471            if status.is_create() {
472                self.required(record, "name", location, results);
473                record.entry("version".to_owned()).or_insert(Value::I64(1));
474            }
475            if status.is_update()
476                && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
477            {
478                record.insert(
479                    "name".to_owned(),
480                    Value::Text("graph-update-checked".to_owned()),
481                );
482            }
483            self.min_string_length(record, "name", 3, location, results);
484        }
485    }
486
487    impl EntityEventSink for RecordingEventSink {
488        fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
489            self.events.lock().unwrap().push(event.clone());
490            Ok(())
491        }
492    }
493
494    struct FixedIdGenerator(u64);
495
496    impl InternalIdGenerator for FixedIdGenerator {
497        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
498            Ok(self.0)
499        }
500    }
501
502    struct SequentialIdGenerator {
503        next: Mutex<u64>,
504    }
505
506    impl SequentialIdGenerator {
507        fn new(next: u64) -> Self {
508            Self {
509                next: Mutex::new(next),
510            }
511        }
512    }
513
514    impl InternalIdGenerator for SequentialIdGenerator {
515        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
516            let mut next = self
517                .next
518                .lock()
519                .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
520            let id = *next;
521            *next += 1;
522            Ok(id)
523        }
524    }
525
526    #[test]
527    fn metadata_store_registers_entities() {
528        let store = InMemoryMetadataStore::new().with_entity(entity());
529        assert!(store.entity("Order").is_some());
530    }
531
532    #[test]
533    fn runtime_module_registers_descriptor_into_context() {
534        let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
535        assert!(ctx.entity("Order").is_some());
536        assert!(ctx.has_repository("Order"));
537    }
538
539    #[test]
540    fn runtime_module_registers_derived_entity_and_behavior() {
541        let ctx = UserContext::new().with_module(
542            RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
543        );
544        assert!(ctx.entity("CatalogProduct").is_some());
545        assert!(ctx.has_repository("CatalogProduct"));
546        assert!(ctx.repository_behavior("CatalogProduct").is_some());
547    }
548
549    #[test]
550    fn module_macro_registers_multiple_entities() {
551        let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
552        assert!(ctx.entity("CatalogProduct").is_some());
553        assert!(ctx.has_repository("CatalogProduct"));
554    }
555
556    #[test]
557    fn module_macro_registers_entity_behavior_pairs() {
558        let ctx =
559            UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
560        assert!(ctx.entity("CatalogProduct").is_some());
561        assert!(ctx.repository_behavior("CatalogProduct").is_some());
562    }
563
564    #[test]
565    fn repository_returns_optimistic_lock_conflict() {
566        let store = InMemoryMetadataStore::new().with_entity(entity());
567        let executor = StubExecutor {
568            affected: 0,
569            rows: Vec::new(),
570        };
571        let repo = Repository::new(&PostgresDialect, &store, &executor);
572
573        let err = repo
574            .update(
575                &UpdateCommand::new("Order", 1_u64)
576                    .expected_version(3)
577                    .value("name", "next"),
578            )
579            .unwrap_err();
580
581        match err {
582            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
583            other => panic!("unexpected error: {other}"),
584        }
585    }
586
587    #[test]
588    fn user_context_indexes_resources_and_locals() {
589        let mut ctx =
590            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
591        ctx.insert_resource::<u64>(42);
592        ctx.insert_named_resource("tenant", String::from("acme"));
593        ctx.put_local("trace_id", "req-1");
594
595        assert!(ctx.entity("Order").is_some());
596        assert_eq!(ctx.get_resource::<u64>(), Some(&42));
597        assert_eq!(
598            ctx.get_named_resource::<String>("tenant"),
599            Some(&String::from("acme"))
600        );
601        assert_eq!(
602            ctx.local("trace_id"),
603            Some(&Value::Text("req-1".to_owned()))
604        );
605    }
606
607    #[test]
608    fn user_context_builds_context_repository() {
609        let mut ctx =
610            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
611        ctx.insert_resource(PostgresDialect);
612        ctx.insert_resource(StubExecutor {
613            affected: 1,
614            rows: Vec::new(),
615        });
616
617        let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
618        let affected = repo
619            .update(
620                &UpdateCommand::new("Order", 1_u64)
621                    .expected_version(3)
622                    .value("name", "next"),
623            )
624            .unwrap();
625
626        assert_eq!(affected, 1);
627    }
628
629    #[test]
630    fn user_context_resolves_repository_by_entity_type() {
631        let mut ctx = UserContext::new()
632            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
633            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
634        ctx.insert_resource(PostgresDialect);
635        ctx.insert_resource(StubExecutor {
636            affected: 1,
637            rows: Vec::new(),
638        });
639
640        let repo = ctx
641            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
642            .unwrap();
643        assert_eq!(repo.entity(), "Order");
644        assert_eq!(repo.select().entity, "Order");
645
646        let affected = repo
647            .insert(
648                &repo
649                    .insert_command()
650                    .value("id", 1_u64)
651                    .value("version", 1_i64)
652                    .value("name", "n"),
653            )
654            .unwrap();
655        assert_eq!(affected, 1);
656    }
657
658    #[test]
659    fn resolved_repository_applies_behavior_hooks() {
660        let mut ctx = UserContext::new()
661            .with_metadata(
662                InMemoryMetadataStore::new()
663                    .with_entity(entity())
664                    .with_entity(line_entity())
665                    .with_entity(product_entity()),
666            )
667            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
668            .with_repository_behavior_registry(
669                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
670            );
671        ctx.insert_resource(PostgresDialect);
672        ctx.insert_resource(StubExecutor {
673            affected: 1,
674            rows: Vec::new(),
675        });
676
677        let repo = ctx
678            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
679            .unwrap();
680
681        let compiled = repo.compile(&repo.select()).unwrap();
682        assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
683
684        let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
685        let affected = repo.insert(&insert).unwrap();
686        assert_eq!(affected, 1);
687        assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
688    }
689
690    #[test]
691    fn resolved_repository_prepares_insert_command_with_generated_id() {
692        let mut ctx = UserContext::new()
693            .with_metadata(
694                InMemoryMetadataStore::new()
695                    .with_entity(entity())
696                    .with_entity(line_entity())
697                    .with_entity(product_entity()),
698            )
699            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
700            .with_repository_behavior_registry(
701                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
702            )
703            .with_internal_id_generator(FixedIdGenerator(42));
704        ctx.insert_resource(PostgresDialect);
705        ctx.insert_resource(StubExecutor {
706            affected: 1,
707            rows: Vec::new(),
708        });
709
710        let repo = ctx
711            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
712            .unwrap();
713
714        let prepared = repo
715            .prepare_insert_command(&repo.insert_command().value("name", "n"))
716            .unwrap();
717
718        assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
719        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
720        assert_eq!(
721            prepared.values.get("name"),
722            Some(&Value::Text("n".to_owned()))
723        );
724    }
725
726    #[test]
727    fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
728        let mut ctx = UserContext::new()
729            .with_metadata(
730                InMemoryMetadataStore::new()
731                    .with_entity(entity())
732                    .with_entity(line_entity())
733                    .with_entity(product_entity()),
734            )
735            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
736            .with_repository_behavior_registry(
737                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
738            )
739            .with_internal_id_generator(SequentialIdGenerator::new(500));
740        ctx.insert_resource(PostgresDialect);
741        ctx.insert_resource(StubExecutor {
742            affected: 1,
743            rows: Vec::new(),
744        });
745
746        let repo = ctx
747            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
748            .unwrap();
749        let graph = GraphNode::new("Order").value("name", "root").relation(
750            "lines",
751            GraphNode::new("OrderLine")
752                .value("name", "line-1")
753                .relation("product", GraphNode::new("Product").value("name", "sku-1")),
754        );
755
756        let saved = repo.save_graph(graph).unwrap();
757
758        assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
759        assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
760        let lines = saved.relations.get("lines").unwrap();
761        assert_eq!(lines.len(), 1);
762        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
763        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
764        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
765        let product = lines[0].relations.get("product").unwrap();
766        assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
767    }
768
769    #[test]
770    fn resolved_repository_extracts_and_saves_typed_entity_graph() {
771        let mut ctx = UserContext::new()
772            .with_metadata(
773                InMemoryMetadataStore::new()
774                    .with_entity(entity())
775                    .with_entity(line_entity())
776                    .with_entity(product_entity()),
777            )
778            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
779            .with_internal_id_generator(SequentialIdGenerator::new(700));
780        ctx.insert_resource(PostgresDialect);
781        ctx.insert_resource(StubExecutor {
782            affected: 1,
783            rows: Vec::new(),
784        });
785
786        let repo = ctx
787            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
788            .unwrap();
789        let order = TypedGraphOrder {
790            id: None,
791            version: 1,
792            name: "typed-root".to_owned(),
793            lines: teaql_core::SmartList::from(vec![TypedGraphLine {
794                id: None,
795                order_id: None,
796                name: "typed-line".to_owned(),
797                product_id: None,
798                product: Some(TypedGraphProduct {
799                    id: None,
800                    name: "typed-product".to_owned(),
801                }),
802            }]),
803        };
804
805        let extracted = repo.graph_node_from_entity(order).unwrap();
806        assert_eq!(extracted.entity, "Order");
807        assert_eq!(
808            extracted.values.get("name"),
809            Some(&Value::Text("typed-root".to_owned()))
810        );
811        assert_eq!(extracted.values.get("id"), Some(&Value::Null));
812        assert_eq!(extracted.relations["lines"].len(), 1);
813        assert_eq!(
814            extracted.relations["lines"][0].values.get("name"),
815            Some(&Value::Text("typed-line".to_owned()))
816        );
817        assert_eq!(
818            extracted.relations["lines"][0].relations["product"].len(),
819            1
820        );
821
822        let saved = repo.save_graph(extracted).unwrap();
823        assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
824        let lines = saved.relations.get("lines").unwrap();
825        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
826        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
827        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
828        assert_eq!(
829            lines[0].relations["product"][0].values.get("id"),
830            Some(&Value::U64(702))
831        );
832    }
833
834    #[test]
835    fn resolved_repository_saves_typed_entity_graph_directly() {
836        let mut ctx = UserContext::new()
837            .with_metadata(
838                InMemoryMetadataStore::new()
839                    .with_entity(entity())
840                    .with_entity(line_entity())
841                    .with_entity(product_entity()),
842            )
843            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
844            .with_internal_id_generator(SequentialIdGenerator::new(800));
845        ctx.insert_resource(PostgresDialect);
846        ctx.insert_resource(StubExecutor {
847            affected: 1,
848            rows: Vec::new(),
849        });
850
851        let repo = ctx
852            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
853            .unwrap();
854        let saved = repo
855            .save_entity_graph(TypedGraphOrder {
856                id: None,
857                version: 1,
858                name: "typed-direct".to_owned(),
859                lines: teaql_core::SmartList::from(vec![TypedGraphLine {
860                    id: None,
861                    order_id: None,
862                    name: "typed-line".to_owned(),
863                    product_id: None,
864                    product: Some(TypedGraphProduct {
865                        id: None,
866                        name: "typed-product".to_owned(),
867                    }),
868                }]),
869            })
870            .unwrap();
871
872        assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
873        assert_eq!(
874            saved.relations["lines"][0].values.get("order_id"),
875            Some(&Value::U64(800))
876        );
877        assert_eq!(
878            saved.relations["lines"][0].values.get("product_id"),
879            Some(&Value::U64(802))
880        );
881    }
882
883    #[test]
884    fn custom_user_context_can_drive_insert_preparation() {
885        let mut ctx = UserContext::new()
886            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
887            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
888            .with_repository_behavior_registry(
889                InMemoryRepositoryBehaviorRegistry::new()
890                    .with_behavior("Order", ContextAwareOrderBehavior),
891            )
892            .with_internal_id_generator(FixedIdGenerator(99));
893        ctx.insert_named_resource("tenant", String::from("acme"));
894        ctx.insert_named_resource("initial_version", 7_i64);
895        ctx.put_local("trace_id", "req-9");
896        ctx.insert_resource(PostgresDialect);
897        ctx.insert_resource(StubExecutor {
898            affected: 1,
899            rows: Vec::new(),
900        });
901
902        let repo = ctx
903            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
904            .unwrap();
905        let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
906
907        assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
908        assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
909        assert_eq!(
910            prepared.values.get("name"),
911            Some(&Value::Text("acme:req-9".to_owned()))
912        );
913    }
914
915    #[test]
916    fn checker_registry_validates_and_fixes_insert_commands() {
917        let mut ctx = UserContext::new()
918            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
919            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
920            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
921            .with_internal_id_generator(FixedIdGenerator(77));
922        ctx.insert_resource(PostgresDialect);
923        ctx.insert_resource(StubExecutor {
924            affected: 1,
925            rows: Vec::new(),
926        });
927
928        let repo = ctx
929            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
930            .unwrap();
931        let prepared = repo
932            .prepare_insert_command(&repo.insert_command().value("name", "valid"))
933            .unwrap();
934
935        assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
936        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
937        assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
938
939        let error = repo
940            .prepare_insert_command(&repo.insert_command().value("name", "no"))
941            .unwrap_err();
942        match error {
943            RuntimeError::Check(results) => {
944                assert_eq!(results.len(), 1);
945                assert_eq!(results[0].location.to_string(), "name");
946            }
947            other => panic!("unexpected checker error: {other:?}"),
948        }
949    }
950
951    #[test]
952    fn checker_registry_validates_update_commands_without_required_insert_checks() {
953        let mut ctx = UserContext::new()
954            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
955            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
956            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
957        ctx.insert_resource(PostgresDialect);
958        ctx.insert_resource(StubExecutor {
959            affected: 1,
960            rows: Vec::new(),
961        });
962
963        let repo = ctx
964            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
965            .unwrap();
966        repo.update(&repo.update_command(1_u64).value("version", 1_i64))
967            .unwrap();
968
969        let error = repo
970            .update(&repo.update_command(1_u64).value("name", "no"))
971            .unwrap_err();
972        match error {
973            RepositoryError::Runtime(RuntimeError::Check(results)) => {
974                assert_eq!(results.len(), 1);
975                assert_eq!(results[0].location.to_string(), "name");
976            }
977            other => panic!("unexpected checker error: {other:?}"),
978        }
979    }
980
981    #[test]
982    fn built_in_language_translators_cover_fifteen_languages() {
983        assert_eq!(Language::ALL.len(), 15);
984        let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
985        let messages = Language::ALL
986            .iter()
987            .map(|language| translate_check_result(*language, &result))
988            .collect::<Vec<_>>();
989
990        assert!(messages.iter().all(|message| !message.is_empty()));
991        assert!(messages.iter().any(|message| message.contains("required")));
992        assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
993        assert!(
994            messages
995                .iter()
996                .any(|message| message.contains("obligatoire"))
997        );
998        assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
999        assert_eq!(
1000            Language::from_code("zh-TW"),
1001            Some(Language::TraditionalChinese)
1002        );
1003    }
1004
1005    #[test]
1006    fn user_context_language_switch_translates_checker_errors() {
1007        let mut ctx = UserContext::new()
1008            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1009            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1010            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1011            .with_internal_id_generator(FixedIdGenerator(77))
1012            .with_language(Language::Chinese);
1013        ctx.insert_resource(PostgresDialect);
1014        ctx.insert_resource(StubExecutor {
1015            affected: 1,
1016            rows: Vec::new(),
1017        });
1018
1019        let repo = ctx
1020            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1021            .unwrap();
1022        let error = repo
1023            .prepare_insert_command(&repo.insert_command())
1024            .unwrap_err();
1025        match error {
1026            RuntimeError::Check(results) => {
1027                assert_eq!(results.len(), 1);
1028                assert!(
1029                    results[0]
1030                        .message
1031                        .as_ref()
1032                        .is_some_and(|message| message.contains("å¿…å¡«"))
1033                );
1034            }
1035            other => panic!("unexpected checker error: {other:?}"),
1036        }
1037
1038        let mut ctx = UserContext::new().with_language(Language::English);
1039        ctx.set_language_code("es").unwrap();
1040        assert_eq!(ctx.language(), Language::Spanish);
1041    }
1042
1043    #[test]
1044    fn checker_registry_merges_graph_update_fixes_by_object_status() {
1045        let mut ctx = UserContext::new()
1046            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1047            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1048            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1049        ctx.insert_resource(PostgresDialect);
1050        ctx.insert_resource(StubExecutor {
1051            affected: 1,
1052            rows: vec![Record::from([
1053                ("id".to_owned(), Value::U64(1)),
1054                ("version".to_owned(), Value::I64(1)),
1055                ("name".to_owned(), Value::Text("old".to_owned())),
1056            ])],
1057        });
1058
1059        let repo = ctx
1060            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1061            .unwrap();
1062        let saved = repo
1063            .save_graph(
1064                GraphNode::new("Order")
1065                    .value("id", 1_u64)
1066                    .value("version", 1_i64)
1067                    .value("name", "graph-update"),
1068            )
1069            .unwrap();
1070
1071        assert_eq!(
1072            saved.values.get("name"),
1073            Some(&Value::Text("graph-update-checked".to_owned()))
1074        );
1075        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1076        assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1077    }
1078
1079    #[test]
1080    fn user_context_event_sink_receives_repository_mutation_events() {
1081        let events = Arc::new(Mutex::new(Vec::new()));
1082        let mut ctx = UserContext::new()
1083            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1084            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1085            .with_internal_id_generator(FixedIdGenerator(88))
1086            .with_event_sink(RecordingEventSink {
1087                events: events.clone(),
1088            });
1089        ctx.insert_resource(PostgresDialect);
1090        ctx.insert_resource(StubExecutor {
1091            affected: 1,
1092            rows: Vec::new(),
1093        });
1094
1095        let repo = ctx
1096            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1097            .unwrap();
1098        repo.insert(&repo.insert_command().value("name", "created"))
1099            .unwrap();
1100        repo.update(
1101            &repo
1102                .update_command(88_u64)
1103                .expected_version(1)
1104                .value("name", "updated"),
1105        )
1106        .unwrap();
1107        repo.delete(&repo.delete_command(88_u64).expected_version(2))
1108            .unwrap();
1109        repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1110
1111        let events = events.lock().unwrap();
1112        assert_eq!(events.len(), 4);
1113        assert_eq!(events[0].kind, EntityEventKind::Created);
1114        assert_eq!(events[0].entity, "Order");
1115        assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1116        assert_eq!(events[1].kind, EntityEventKind::Updated);
1117        assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1118        assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1119        assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1120        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1121        assert_eq!(events[3].kind, EntityEventKind::Recovered);
1122    }
1123
1124    #[test]
1125    fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1126        let events = Arc::new(Mutex::new(Vec::new()));
1127        let mut ctx = UserContext::new()
1128            .with_metadata(
1129                InMemoryMetadataStore::new()
1130                    .with_entity(entity())
1131                    .with_entity(line_entity())
1132                    .with_entity(product_entity()),
1133            )
1134            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1135            .with_event_sink(RecordingEventSink {
1136                events: events.clone(),
1137            });
1138        ctx.insert_resource(PostgresDialect);
1139        ctx.insert_resource(StubExecutor {
1140            affected: 1,
1141            rows: vec![Record::from([
1142                ("id".to_owned(), Value::U64(1)),
1143                ("version".to_owned(), Value::I64(1)),
1144                ("name".to_owned(), Value::Text("old".to_owned())),
1145            ])],
1146        });
1147
1148        let repo = ctx
1149            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1150            .unwrap();
1151        repo.save_graph(
1152            GraphNode::new("Order")
1153                .value("id", 1_u64)
1154                .value("version", 1_i64)
1155                .value("name", "updated")
1156                .relation(
1157                    "lines",
1158                    GraphNode::new("OrderLine")
1159                        .value("name", "line")
1160                        .value("product_id", 3_u64),
1161                ),
1162        )
1163        .unwrap();
1164
1165        let events = events.lock().unwrap();
1166        assert_eq!(events.len(), 3);
1167        assert_eq!(events[0].kind, EntityEventKind::Updated);
1168        assert_eq!(events[0].entity, "Order");
1169        assert_eq!(events[1].kind, EntityEventKind::Updated);
1170        assert_eq!(events[1].entity, "OrderLine");
1171        assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1172        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1173        assert_eq!(events[2].entity, "OrderLine");
1174    }
1175
1176    #[test]
1177    fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1178        let mut ctx = UserContext::new()
1179            .with_metadata(
1180                InMemoryMetadataStore::new()
1181                    .with_entity(entity())
1182                    .with_entity(line_entity())
1183                    .with_entity(product_entity()),
1184            )
1185            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1186            .with_internal_id_generator(SequentialIdGenerator::new(500));
1187        ctx.insert_resource(PostgresDialect);
1188        ctx.insert_resource(StubExecutor {
1189            affected: 1,
1190            rows: vec![Record::from([
1191                ("id".to_owned(), Value::U64(1)),
1192                ("version".to_owned(), Value::I64(1)),
1193                ("name".to_owned(), Value::Text("old".to_owned())),
1194            ])],
1195        });
1196
1197        let plan = ctx
1198            .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1199                GraphNode::new("Order")
1200                    .value("id", 1_u64)
1201                    .value("version", 1_i64)
1202                    .value("name", "updated")
1203                    .relation(
1204                        "lines",
1205                        GraphNode::new("OrderLine")
1206                            .value("name", "new-line-a")
1207                            .value("product_id", 2_u64),
1208                    )
1209                    .relation(
1210                        "lines",
1211                        GraphNode::new("OrderLine")
1212                            .value("name", "new-line-b")
1213                            .value("product_id", 2_u64),
1214                    )
1215                    .relation(
1216                        "lines",
1217                        GraphNode::new("OrderLine")
1218                            .value("id", 5_u64)
1219                            .value("version", 1_i64)
1220                            .value("name", "same-update-a"),
1221                    )
1222                    .relation(
1223                        "lines",
1224                        GraphNode::new("OrderLine")
1225                            .value("id", 6_u64)
1226                            .value("version", 1_i64)
1227                            .value("name", "same-update-b"),
1228                    )
1229                    .relation(
1230                        "lines",
1231                        GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1232                    )
1233                    .relation(
1234                        "lines",
1235                        GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1236                    ),
1237            )
1238            .unwrap();
1239        let counts = plan.grouped_counts();
1240
1241        assert_eq!(
1242            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1243            Some(&1)
1244        );
1245        assert_eq!(
1246            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1247            Some(&2)
1248        );
1249        assert_eq!(
1250            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1251            Some(&2)
1252        );
1253        assert_eq!(
1254            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1255            Some(&1)
1256        );
1257        assert_eq!(
1258            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1259            Some(&1)
1260        );
1261        let create_batch = plan
1262            .batches
1263            .iter()
1264            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1265            .unwrap();
1266        assert_eq!(create_batch.items.len(), 2);
1267        assert_eq!(
1268            create_batch.items[0].values.get("id"),
1269            Some(&Value::U64(500))
1270        );
1271        assert_eq!(
1272            create_batch.items[1].values.get("id"),
1273            Some(&Value::U64(501))
1274        );
1275        let update_batch = plan
1276            .batches
1277            .iter()
1278            .find(|batch| {
1279                batch.entity == "OrderLine"
1280                    && batch.kind == GraphMutationKind::Update
1281                    && batch.update_fields == vec!["name".to_owned()]
1282            })
1283            .unwrap();
1284        assert_eq!(update_batch.items.len(), 2);
1285    }
1286
1287    #[test]
1288    fn resolved_repository_builds_relation_plans() {
1289        let mut ctx = UserContext::new()
1290            .with_metadata(
1291                InMemoryMetadataStore::new()
1292                    .with_entity(entity())
1293                    .with_entity(line_entity())
1294                    .with_entity(product_entity()),
1295            )
1296            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1297            .with_repository_behavior_registry(
1298                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1299            );
1300        ctx.insert_resource(PostgresDialect);
1301        ctx.insert_resource(StubExecutor {
1302            affected: 1,
1303            rows: Vec::new(),
1304        });
1305
1306        let repo = ctx
1307            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1308            .unwrap();
1309        let plans = repo.relation_plans().unwrap();
1310
1311        assert_eq!(plans.len(), 1);
1312        assert_eq!(plans[0].relation_name, "lines");
1313        assert_eq!(plans[0].target_entity, "OrderLine");
1314        assert_eq!(plans[0].local_key, "id");
1315        assert_eq!(plans[0].foreign_key, "order_id");
1316        assert!(plans[0].many);
1317    }
1318
1319    #[test]
1320    fn resolved_repository_builds_relation_query_from_parent_rows() {
1321        let mut ctx = UserContext::new()
1322            .with_metadata(
1323                InMemoryMetadataStore::new()
1324                    .with_entity(entity())
1325                    .with_entity(line_entity())
1326                    .with_entity(product_entity()),
1327            )
1328            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1329            .with_repository_behavior_registry(
1330                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1331            );
1332        ctx.insert_resource(PostgresDialect);
1333        ctx.insert_resource(StubExecutor {
1334            affected: 1,
1335            rows: Vec::new(),
1336        });
1337
1338        let repo = ctx
1339            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1340            .unwrap();
1341        let parent_rows = vec![
1342            Record::from([(String::from("id"), Value::U64(11))]),
1343            Record::from([(String::from("id"), Value::U64(12))]),
1344        ];
1345
1346        let query = repo.relation_query("lines", &parent_rows).unwrap();
1347        let compiled = repo.compile(&query).unwrap();
1348        assert!(compiled.sql.contains("FROM \"orderline\""));
1349        assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1350        assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1351    }
1352
1353    #[test]
1354    fn resolved_repository_enhances_parent_rows_with_relations() {
1355        let mut ctx = UserContext::new()
1356            .with_metadata(
1357                InMemoryMetadataStore::new()
1358                    .with_entity(entity())
1359                    .with_entity(line_entity())
1360                    .with_entity(product_entity()),
1361            )
1362            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1363            .with_repository_behavior_registry(
1364                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1365            );
1366        ctx.insert_resource(PostgresDialect);
1367        ctx.insert_resource(StubExecutor {
1368            affected: 1,
1369            rows: vec![
1370                Record::from([
1371                    (String::from("id"), Value::U64(101)),
1372                    (String::from("order_id"), Value::U64(11)),
1373                    (String::from("name"), Value::Text(String::from("l1"))),
1374                ]),
1375                Record::from([
1376                    (String::from("id"), Value::U64(102)),
1377                    (String::from("order_id"), Value::U64(11)),
1378                    (String::from("name"), Value::Text(String::from("l2"))),
1379                ]),
1380                Record::from([
1381                    (String::from("id"), Value::U64(201)),
1382                    (String::from("order_id"), Value::U64(12)),
1383                    (String::from("name"), Value::Text(String::from("l3"))),
1384                ]),
1385            ],
1386        });
1387
1388        let repo = ctx
1389            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1390            .unwrap();
1391        let mut parents = vec![
1392            Record::from([(String::from("id"), Value::U64(11))]),
1393            Record::from([(String::from("id"), Value::U64(12))]),
1394        ];
1395
1396        repo.enhance_relations(&mut parents).unwrap();
1397
1398        match parents[0].get("lines") {
1399            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1400            other => panic!("unexpected lines payload: {other:?}"),
1401        }
1402        match parents[1].get("lines") {
1403            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1404            other => panic!("unexpected lines payload: {other:?}"),
1405        }
1406    }
1407
1408    #[test]
1409    fn resolved_repository_fetches_smart_list_of_entities() {
1410        let mut ctx = UserContext::new()
1411            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1412            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1413        ctx.insert_resource(PostgresDialect);
1414        ctx.insert_resource(StubExecutor {
1415            affected: 1,
1416            rows: vec![Record::from([
1417                (String::from("id"), Value::U64(7)),
1418                (String::from("version"), Value::I64(2)),
1419                (String::from("name"), Value::Text(String::from("typed"))),
1420            ])],
1421        });
1422
1423        let repo = ctx
1424            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1425            .unwrap();
1426        let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1427
1428        assert_eq!(rows.len(), 1);
1429        assert_eq!(
1430            rows.first(),
1431            Some(&OrderEntity {
1432                id: 7,
1433                version: 2,
1434                name: String::from("typed"),
1435            })
1436        );
1437    }
1438
1439    #[test]
1440    fn resolved_repository_fetches_smart_list_of_derived_entities() {
1441        let mut ctx = UserContext::new()
1442            .with_metadata(
1443                InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1444            )
1445            .with_repository_registry(
1446                InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1447            );
1448        ctx.insert_resource(PostgresDialect);
1449        ctx.insert_resource(StubExecutor {
1450            affected: 1,
1451            rows: vec![Record::from([
1452                (String::from("id"), Value::U64(9)),
1453                (String::from("name"), Value::Text(String::from("derived"))),
1454            ])],
1455        });
1456
1457        let repo = ctx
1458            .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1459            .unwrap();
1460        let rows = repo
1461            .fetch_entities::<CatalogProductRow>(&repo.select())
1462            .unwrap();
1463
1464        assert_eq!(rows.len(), 1);
1465        assert_eq!(
1466            rows.first(),
1467            Some(&CatalogProductRow {
1468                id: 9,
1469                name: String::from("derived"),
1470            })
1471        );
1472    }
1473
1474    #[test]
1475    fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1476        let mut ctx = UserContext::new()
1477            .with_metadata(
1478                InMemoryMetadataStore::new()
1479                    .with_entity(OrderAggregateDynamic::entity_descriptor()),
1480            )
1481            .with_repository_registry(
1482                InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1483            );
1484        ctx.insert_resource(PostgresDialect);
1485        ctx.insert_resource(StubExecutor {
1486            affected: 1,
1487            rows: vec![Record::from([
1488                (String::from("id"), Value::U64(1)),
1489                (String::from("lineCount"), Value::I64(3)),
1490                (String::from("amount"), Value::F64(18.5)),
1491            ])],
1492        });
1493
1494        let repo = ctx
1495            .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1496            .unwrap();
1497        let rows = repo
1498            .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1499            .unwrap();
1500
1501        assert_eq!(rows.len(), 1);
1502        assert_eq!(rows.data[0].id, 1);
1503        assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1504        assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1505        assert_eq!(
1506            rows.into_vec().into_iter().next().unwrap().into_json(),
1507            serde_json::json!({
1508                "id": 1,
1509                "lineCount": 3,
1510                "amount": 18.5
1511            })
1512        );
1513    }
1514
1515    #[test]
1516    fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1517        let executor = QueueExecutor {
1518            affected: 1,
1519            rows: Mutex::new(VecDeque::from([
1520                vec![
1521                    Record::from([
1522                        (String::from("id"), Value::U64(1)),
1523                        (String::from("version"), Value::I64(1)),
1524                        (String::from("name"), Value::Text(String::from("first"))),
1525                    ]),
1526                    Record::from([
1527                        (String::from("id"), Value::U64(2)),
1528                        (String::from("version"), Value::I64(1)),
1529                        (String::from("name"), Value::Text(String::from("second"))),
1530                    ]),
1531                ],
1532                vec![Record::from([
1533                    (String::from("order_id"), Value::U64(1)),
1534                    (String::from("lineCount"), Value::I64(3)),
1535                ])],
1536            ])),
1537            queries: Mutex::new(Vec::new()),
1538        };
1539        let mut ctx = UserContext::new()
1540            .with_metadata(
1541                InMemoryMetadataStore::new()
1542                    .with_entity(entity())
1543                    .with_entity(line_entity()),
1544            )
1545            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1546        ctx.insert_resource(PostgresDialect);
1547        ctx.insert_resource(executor);
1548
1549        let repo = ctx
1550            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1551            .unwrap();
1552        let rows = repo
1553            .fetch_all_with_relation_aggregates(
1554                &repo
1555                    .select()
1556                    .project("id")
1557                    .project("version")
1558                    .project("name"),
1559                &[RelationAggregate::new(
1560                    "lines",
1561                    "lineCount",
1562                    SelectQuery::new("OrderLine"),
1563                    true,
1564                )],
1565            )
1566            .unwrap();
1567
1568        assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1569        assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1570
1571        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1572        let queries = executor.queries.lock().unwrap();
1573        assert_eq!(queries.len(), 2);
1574        assert_eq!(
1575            queries[1],
1576            "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1577        );
1578    }
1579
1580    #[test]
1581    fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1582        let executor = QueueExecutor {
1583            affected: 1,
1584            rows: Mutex::new(VecDeque::from([vec![Record::from([(
1585                String::from("count"),
1586                Value::I64(2),
1587            )])]])),
1588            queries: Mutex::new(Vec::new()),
1589        };
1590        let mut ctx = UserContext::new()
1591            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1592            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1593        ctx.insert_resource(PostgresDialect);
1594        ctx.insert_resource(executor);
1595        ctx.insert_resource(InMemoryAggregationCache::default());
1596
1597        let repo = ctx
1598            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1599            .unwrap();
1600        let query = repo
1601            .select()
1602            .count("count")
1603            .enable_aggregation_cache_for(60_000);
1604
1605        let first = repo.fetch_all(&query).unwrap();
1606        let second = repo.fetch_all(&query).unwrap();
1607
1608        assert_eq!(first, second);
1609        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1610        assert_eq!(executor.queries.lock().unwrap().len(), 1);
1611    }
1612
1613    #[test]
1614    fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1615        let executor = QueueExecutor {
1616            affected: 1,
1617            rows: Mutex::new(VecDeque::from([
1618                vec![Record::from([(String::from("count"), Value::I64(2))])],
1619                vec![Record::from([(String::from("count"), Value::I64(3))])],
1620            ])),
1621            queries: Mutex::new(Vec::new()),
1622        };
1623        let mut ctx = UserContext::new()
1624            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1625            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1626        ctx.insert_resource(PostgresDialect);
1627        ctx.insert_resource(executor);
1628        ctx.insert_resource(
1629            Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1630                as Arc<dyn AggregationCacheBackend>,
1631        );
1632
1633        let repo = ctx
1634            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1635            .unwrap();
1636        let query = repo
1637            .select()
1638            .count("count")
1639            .enable_aggregation_cache_for(60_000);
1640
1641        let first = repo.fetch_all(&query).unwrap();
1642        let cached = repo.fetch_all(&query).unwrap();
1643        repo.insert(
1644            &InsertCommand::new("Order")
1645                .value("id", 9_u64)
1646                .value("version", 1_i64)
1647                .value("name", "new"),
1648        )
1649        .unwrap();
1650        let refreshed = repo.fetch_all(&query).unwrap();
1651
1652        assert_eq!(first, cached);
1653        assert_ne!(cached, refreshed);
1654        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1655        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1656    }
1657
1658    #[test]
1659    fn aggregation_cache_propagates_to_relation_aggregates() {
1660        let parent_rows = vec![
1661            Record::from([
1662                (String::from("id"), Value::U64(1)),
1663                (String::from("version"), Value::I64(1)),
1664                (String::from("name"), Value::Text(String::from("first"))),
1665            ]),
1666            Record::from([
1667                (String::from("id"), Value::U64(2)),
1668                (String::from("version"), Value::I64(1)),
1669                (String::from("name"), Value::Text(String::from("second"))),
1670            ]),
1671        ];
1672        let aggregate_rows = vec![Record::from([
1673            (String::from("order_id"), Value::U64(1)),
1674            (String::from("lineCount"), Value::I64(3)),
1675        ])];
1676        let executor = QueueExecutor {
1677            affected: 1,
1678            rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1679            queries: Mutex::new(Vec::new()),
1680        };
1681        let mut ctx = UserContext::new()
1682            .with_metadata(
1683                InMemoryMetadataStore::new()
1684                    .with_entity(entity())
1685                    .with_entity(line_entity()),
1686            )
1687            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1688        ctx.insert_resource(PostgresDialect);
1689        ctx.insert_resource(executor);
1690        ctx.insert_resource(InMemoryAggregationCache::default());
1691
1692        let repo = ctx
1693            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1694            .unwrap();
1695        let query = repo
1696            .select()
1697            .project("id")
1698            .project("version")
1699            .project("name")
1700            .enable_aggregation_cache_for(60_000)
1701            .propagate_aggregation_cache(60_000);
1702        let aggregate =
1703            RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1704
1705        let first = repo
1706            .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1707            .unwrap();
1708        let second = repo
1709            .fetch_all_with_relation_aggregates(&query, &[aggregate])
1710            .unwrap();
1711
1712        assert_eq!(first, second);
1713        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1714        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1715    }
1716
1717    #[test]
1718    fn memory_repository_fetches_smart_list_entities_with_query_features() {
1719        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1720        let repository = MemoryRepository::new(metadata).with_rows(
1721            "Order",
1722            vec![
1723                Record::from([
1724                    (String::from("id"), Value::U64(1)),
1725                    (String::from("version"), Value::I64(1)),
1726                    (String::from("name"), Value::Text(String::from("alpha"))),
1727                ]),
1728                Record::from([
1729                    (String::from("id"), Value::U64(2)),
1730                    (String::from("version"), Value::I64(1)),
1731                    (String::from("name"), Value::Text(String::from("beta"))),
1732                ]),
1733                Record::from([
1734                    (String::from("id"), Value::U64(3)),
1735                    (String::from("version"), Value::I64(1)),
1736                    (String::from("name"), Value::Text(String::from("gamma"))),
1737                ]),
1738            ],
1739        );
1740
1741        let query = teaql_core::SelectQuery::new("Order")
1742            .filter(Expr::Binary {
1743                left: Box::new(Expr::column("id")),
1744                op: teaql_core::BinaryOp::Gte,
1745                right: Box::new(Expr::value(2_u64)),
1746            })
1747            .order_by(OrderBy::desc("id"))
1748            .limit(1);
1749
1750        let orders = repository.fetch_entities::<Order>(&query).unwrap();
1751
1752        assert_eq!(orders.ids(), vec![Value::U64(3)]);
1753        assert_eq!(orders.versions(), vec![1]);
1754        assert_eq!(orders.first().unwrap().name, "gamma");
1755    }
1756
1757    #[test]
1758    fn memory_repository_runs_aggregates() {
1759        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1760        let repository = MemoryRepository::new(metadata).with_rows(
1761            "Order",
1762            vec![
1763                Record::from([
1764                    (String::from("id"), Value::U64(1)),
1765                    (String::from("version"), Value::I64(1)),
1766                    (String::from("name"), Value::Text(String::from("alpha"))),
1767                ]),
1768                Record::from([
1769                    (String::from("id"), Value::U64(2)),
1770                    (String::from("version"), Value::I64(2)),
1771                    (String::from("name"), Value::Text(String::from("beta"))),
1772                ]),
1773            ],
1774        );
1775
1776        let query = teaql_core::SelectQuery {
1777            entity: String::from("Order"),
1778            projection: Vec::new(),
1779            expr_projection: Vec::new(),
1780            filter: None,
1781            having: None,
1782            order_by: Vec::new(),
1783            slice: None,
1784            aggregates: vec![
1785                Aggregate {
1786                    function: AggregateFunction::Count,
1787                    field: String::from("id"),
1788                    alias: String::from("count"),
1789                },
1790                Aggregate {
1791                    function: AggregateFunction::Sum,
1792                    field: String::from("version"),
1793                    alias: String::from("versionSum"),
1794                },
1795            ],
1796            group_by: Vec::new(),
1797            relations: Vec::new(),
1798            aggregation_cache: None,
1799            comment: None,
1800            raw_sql: None,
1801            raw_sql_search_criteria: Vec::new(),
1802            json_expr: None,
1803            dynamic_properties: Vec::new(),
1804            raw_projections: Vec::new(),
1805            object_group_bys: Vec::new(),
1806            child_enhancements: Vec::new(),
1807        };
1808
1809        let rows = repository.fetch_all(&query).unwrap();
1810
1811        assert_eq!(rows.len(), 1);
1812        assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1813        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1814    }
1815
1816    #[test]
1817    fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1818        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1819        let repository = MemoryRepository::new(metadata).with_rows(
1820            "Order",
1821            vec![
1822                Record::from([
1823                    (String::from("id"), Value::U64(1)),
1824                    (String::from("version"), Value::I64(1)),
1825                    (String::from("name"), Value::Text(String::from("alpha"))),
1826                ]),
1827                Record::from([
1828                    (String::from("id"), Value::U64(2)),
1829                    (String::from("version"), Value::I64(2)),
1830                    (String::from("name"), Value::Text(String::from("alpha"))),
1831                ]),
1832                Record::from([
1833                    (String::from("id"), Value::U64(3)),
1834                    (String::from("version"), Value::I64(3)),
1835                    (String::from("name"), Value::Text(String::from("tmp-beta"))),
1836                ]),
1837            ],
1838        );
1839
1840        let rows = repository
1841            .fetch_all(
1842                &teaql_core::SelectQuery::new("Order")
1843                    .filter(
1844                        Expr::between("version", 1_i64, 3_i64)
1845                            .and_expr(Expr::not_like("name", "tmp%"))
1846                            .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1847                    )
1848                    .group_by("name")
1849                    .count("total")
1850                    .sum("version", "versionSum"),
1851            )
1852            .unwrap();
1853
1854        assert_eq!(rows.len(), 1);
1855        assert_eq!(
1856            rows[0].get("name"),
1857            Some(&Value::Text(String::from("alpha")))
1858        );
1859        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1860        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1861    }
1862
1863    #[test]
1864    fn memory_repository_runs_extended_aggregates_and_having() {
1865        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1866        let repository = MemoryRepository::new(metadata).with_rows(
1867            "Order",
1868            vec![
1869                Record::from([
1870                    (String::from("id"), Value::U64(1)),
1871                    (String::from("version"), Value::I64(1)),
1872                    (String::from("name"), Value::Text(String::from("alpha"))),
1873                ]),
1874                Record::from([
1875                    (String::from("id"), Value::U64(2)),
1876                    (String::from("version"), Value::I64(3)),
1877                    (String::from("name"), Value::Text(String::from("alpha"))),
1878                ]),
1879                Record::from([
1880                    (String::from("id"), Value::U64(3)),
1881                    (String::from("version"), Value::I64(7)),
1882                    (String::from("name"), Value::Text(String::from("beta"))),
1883                ]),
1884            ],
1885        );
1886
1887        let rows = repository
1888            .fetch_all(
1889                &teaql_core::SelectQuery::new("Order")
1890                    .group_by("name")
1891                    .count("total")
1892                    .stddev("version", "stddevVersion")
1893                    .var_pop("version", "varPopVersion")
1894                    .bit_or("version", "bitOrVersion")
1895                    .having(Expr::gt("total", 1_i64)),
1896            )
1897            .unwrap();
1898
1899        assert_eq!(rows.len(), 1);
1900        assert_eq!(
1901            rows[0].get("name"),
1902            Some(&Value::Text(String::from("alpha")))
1903        );
1904        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1905        assert_eq!(
1906            rows[0].get("stddevVersion").map(Value::to_json_value),
1907            Some(serde_json::Value::String(
1908                "1.4142135623730951454746218583".to_owned()
1909            ))
1910        );
1911        assert_eq!(
1912            rows[0].get("varPopVersion"),
1913            Some(&Value::Decimal(Decimal::ONE))
1914        );
1915        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1916    }
1917
1918    #[test]
1919    fn memory_repository_runs_sound_like_filter() {
1920        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1921        let repository = MemoryRepository::new(metadata).with_rows(
1922            "Order",
1923            vec![
1924                Record::from([
1925                    (String::from("id"), Value::U64(1)),
1926                    (String::from("version"), Value::I64(1)),
1927                    (String::from("name"), Value::Text(String::from("Robert"))),
1928                ]),
1929                Record::from([
1930                    (String::from("id"), Value::U64(2)),
1931                    (String::from("version"), Value::I64(1)),
1932                    (String::from("name"), Value::Text(String::from("Rupert"))),
1933                ]),
1934                Record::from([
1935                    (String::from("id"), Value::U64(3)),
1936                    (String::from("version"), Value::I64(1)),
1937                    (String::from("name"), Value::Text(String::from("Ashcraft"))),
1938                ]),
1939            ],
1940        );
1941
1942        let rows = repository
1943            .fetch_all(
1944                &teaql_core::SelectQuery::new("Order")
1945                    .filter(Expr::sound_like("name", "Robert"))
1946                    .order_asc("id"),
1947            )
1948            .unwrap();
1949
1950        assert_eq!(rows.len(), 2);
1951        assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
1952        assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
1953    }
1954
1955    #[test]
1956    fn memory_repository_runs_java_style_string_match_filters() {
1957        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1958        let repository = MemoryRepository::new(metadata).with_rows(
1959            "Order",
1960            vec![
1961                Record::from([
1962                    (String::from("id"), Value::U64(1)),
1963                    (String::from("version"), Value::I64(1)),
1964                    (String::from("name"), Value::Text(String::from("tea-order"))),
1965                ]),
1966                Record::from([
1967                    (String::from("id"), Value::U64(2)),
1968                    (String::from("version"), Value::I64(1)),
1969                    (
1970                        String::from("name"),
1971                        Value::Text(String::from("coffee-order")),
1972                    ),
1973                ]),
1974                Record::from([
1975                    (String::from("id"), Value::U64(3)),
1976                    (String::from("version"), Value::I64(1)),
1977                    (
1978                        String::from("name"),
1979                        Value::Text(String::from("tea-archived")),
1980                    ),
1981                ]),
1982            ],
1983        );
1984
1985        let rows = repository
1986            .fetch_all(
1987                &teaql_core::SelectQuery::new("Order")
1988                    .filter(
1989                        Expr::contain("name", "tea")
1990                            .and_expr(Expr::begin_with("name", "tea"))
1991                            .and_expr(Expr::end_with("name", "order"))
1992                            .and_expr(Expr::not_contain("name", "coffee"))
1993                            .and_expr(Expr::not_begin_with("name", "archived"))
1994                            .and_expr(Expr::not_end_with("name", "draft")),
1995                    )
1996                    .order_asc("id"),
1997            )
1998            .unwrap();
1999
2000        assert_eq!(rows.len(), 1);
2001        assert_eq!(
2002            rows[0].get("name"),
2003            Some(&Value::Text("tea-order".to_owned()))
2004        );
2005    }
2006
2007    #[test]
2008    fn memory_repository_runs_property_to_property_filters() {
2009        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2010        let repository = MemoryRepository::new(metadata).with_rows(
2011            "Order",
2012            vec![
2013                Record::from([
2014                    (String::from("id"), Value::U64(1)),
2015                    (String::from("version"), Value::I64(2)),
2016                    (String::from("name"), Value::Text(String::from("keep"))),
2017                ]),
2018                Record::from([
2019                    (String::from("id"), Value::U64(2)),
2020                    (String::from("version"), Value::I64(1)),
2021                    (String::from("name"), Value::Text(String::from("skip"))),
2022                ]),
2023            ],
2024        );
2025
2026        let rows = repository
2027            .fetch_all(
2028                &teaql_core::SelectQuery::new("Order")
2029                    .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2030                    .order_asc("id"),
2031            )
2032            .unwrap();
2033
2034        assert_eq!(rows.len(), 1);
2035        assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2036    }
2037
2038    #[test]
2039    fn memory_repository_supports_mutations_and_optimistic_locking() {
2040        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2041        let repository = MemoryRepository::new(metadata);
2042
2043        repository
2044            .insert(
2045                &InsertCommand::new("Order")
2046                    .value("id", 10_u64)
2047                    .value("version", 1_i64)
2048                    .value("name", "draft"),
2049            )
2050            .unwrap();
2051        repository
2052            .update(
2053                &UpdateCommand::new("Order", 10_u64)
2054                    .expected_version(1)
2055                    .value("name", "submitted"),
2056            )
2057            .unwrap();
2058
2059        let row = repository
2060            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2061            .unwrap()
2062            .pop()
2063            .unwrap();
2064        assert_eq!(
2065            row.get("name"),
2066            Some(&Value::Text(String::from("submitted")))
2067        );
2068        assert_eq!(row.get("version"), Some(&Value::I64(2)));
2069
2070        let conflict = repository
2071            .update(
2072                &UpdateCommand::new("Order", 10_u64)
2073                    .expected_version(1)
2074                    .value("name", "stale"),
2075            )
2076            .unwrap_err();
2077        assert!(matches!(
2078            conflict,
2079            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2080        ));
2081
2082        repository
2083            .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2084            .unwrap();
2085        let row = repository
2086            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2087            .unwrap()
2088            .pop()
2089            .unwrap();
2090        assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2091
2092        repository
2093            .recover(&RecoverCommand::new("Order", 10_u64, -3))
2094            .unwrap();
2095        let row = repository
2096            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2097            .unwrap()
2098            .pop()
2099            .unwrap();
2100        assert_eq!(row.get("version"), Some(&Value::I64(4)));
2101    }
2102}
2103
2104#[cfg(all(test, feature = "sqlx"))]
2105mod sqlx_integration_tests {
2106    use super::sqlx_support::{
2107        MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2108        SqliteIdSpaceGenerator, SqliteMutationExecutor,
2109    };
2110    use super::{
2111        GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2112        InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2113        RepositoryBehavior, UserContext,
2114    };
2115    use chrono::{NaiveDate, TimeZone, Utc};
2116    use teaql_core::{
2117        DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2118        PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2119    };
2120    use teaql_dialect_pg::PostgresDialect;
2121    use teaql_dialect_sqlite::SqliteDialect;
2122    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2123    use teaql_sql::SqlDialect;
2124    use tokio::runtime::Handle;
2125    use tokio::task::block_in_place;
2126
2127    const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
2128
2129    fn entity() -> EntityDescriptor {
2130        EntityDescriptor::new("Order")
2131            .table_name("orders")
2132            .property(
2133                PropertyDescriptor::new("id", DataType::U64)
2134                    .column_name("id")
2135                    .id()
2136                    .not_null(),
2137            )
2138            .property(
2139                PropertyDescriptor::new("version", DataType::I64)
2140                    .column_name("version")
2141                    .version()
2142                    .not_null(),
2143            )
2144            .property(
2145                PropertyDescriptor::new("name", DataType::Text)
2146                    .column_name("name")
2147                    .not_null(),
2148            )
2149            .relation(
2150                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2151                    .local_key("id")
2152                    .foreign_key("order_id")
2153                    .many(),
2154            )
2155    }
2156
2157    fn sqlite_entity_keep_missing() -> EntityDescriptor {
2158        EntityDescriptor::new("Order")
2159            .table_name("orders")
2160            .property(
2161                PropertyDescriptor::new("id", DataType::U64)
2162                    .column_name("id")
2163                    .id()
2164                    .not_null(),
2165            )
2166            .property(
2167                PropertyDescriptor::new("version", DataType::I64)
2168                    .column_name("version")
2169                    .version()
2170                    .not_null(),
2171            )
2172            .property(
2173                PropertyDescriptor::new("name", DataType::Text)
2174                    .column_name("name")
2175                    .not_null(),
2176            )
2177            .relation(
2178                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2179                    .local_key("id")
2180                    .foreign_key("order_id")
2181                    .many()
2182                    .keep_missing(),
2183            )
2184    }
2185
2186    fn line_entity() -> EntityDescriptor {
2187        EntityDescriptor::new("OrderLine")
2188            .table_name("orderline")
2189            .property(
2190                PropertyDescriptor::new("id", DataType::U64)
2191                    .column_name("id")
2192                    .id()
2193                    .not_null(),
2194            )
2195            .property(
2196                PropertyDescriptor::new("version", DataType::I64)
2197                    .column_name("version")
2198                    .version(),
2199            )
2200            .property(
2201                PropertyDescriptor::new("order_id", DataType::U64)
2202                    .column_name("order_id")
2203                    .not_null(),
2204            )
2205            .property(
2206                PropertyDescriptor::new("name", DataType::Text)
2207                    .column_name("name")
2208                    .not_null(),
2209            )
2210            .property(
2211                PropertyDescriptor::new("product_id", DataType::U64)
2212                    .column_name("product_id")
2213                    .not_null(),
2214            )
2215            .relation(
2216                teaql_core::RelationDescriptor::new("product", "Product")
2217                    .local_key("product_id")
2218                    .foreign_key("id"),
2219            )
2220    }
2221
2222    fn product_entity() -> EntityDescriptor {
2223        EntityDescriptor::new("Product")
2224            .table_name("product")
2225            .property(
2226                PropertyDescriptor::new("id", DataType::U64)
2227                    .column_name("id")
2228                    .id()
2229                    .not_null(),
2230            )
2231            .property(
2232                PropertyDescriptor::new("name", DataType::Text)
2233                    .column_name("name")
2234                    .not_null(),
2235            )
2236    }
2237
2238    fn typed_entity() -> EntityDescriptor {
2239        EntityDescriptor::new("TypedValue")
2240            .table_name("typed_value")
2241            .property(
2242                PropertyDescriptor::new("id", DataType::U64)
2243                    .column_name("id")
2244                    .id()
2245                    .not_null(),
2246            )
2247            .property(
2248                PropertyDescriptor::new("payload", DataType::Json)
2249                    .column_name("payload")
2250                    .not_null(),
2251            )
2252            .property(
2253                PropertyDescriptor::new("amount", DataType::Decimal)
2254                    .column_name("amount")
2255                    .not_null(),
2256            )
2257            .property(
2258                PropertyDescriptor::new("birthday", DataType::Date)
2259                    .column_name("birthday")
2260                    .not_null(),
2261            )
2262            .property(
2263                PropertyDescriptor::new("happened_at", DataType::Timestamp)
2264                    .column_name("happened_at")
2265                    .not_null(),
2266            )
2267    }
2268
2269    struct OrderBehavior;
2270    struct NestedOrderBehavior;
2271
2272    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2273    #[teaql(entity = "Product", table = "product")]
2274    struct ProductEntityRow {
2275        #[teaql(id)]
2276        id: u64,
2277        name: String,
2278    }
2279
2280    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2281    #[teaql(entity = "OrderLine", table = "orderline")]
2282    struct OrderLineEntityRow {
2283        #[teaql(id)]
2284        id: u64,
2285        #[teaql(column = "order_id")]
2286        order_id: u64,
2287        name: String,
2288        #[teaql(column = "product_id")]
2289        product_id: u64,
2290        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2291        product: Option<ProductEntityRow>,
2292    }
2293
2294    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2295    #[teaql(entity = "Order", table = "orders")]
2296    struct OrderAggregateRow {
2297        #[teaql(id)]
2298        id: u64,
2299        #[teaql(version)]
2300        version: i64,
2301        name: String,
2302        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2303        lines: teaql_core::SmartList<OrderLineEntityRow>,
2304    }
2305
2306    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2307    #[teaql(entity = "Order", table = "orders")]
2308    struct Order {
2309        #[teaql(id)]
2310        id: u64,
2311        #[teaql(version)]
2312        version: i64,
2313        name: String,
2314    }
2315
2316    impl RepositoryBehavior for OrderBehavior {
2317        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2318            vec!["lines".to_owned()]
2319        }
2320    }
2321
2322    impl RepositoryBehavior for NestedOrderBehavior {
2323        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2324            vec!["lines.product".to_owned()]
2325        }
2326    }
2327
2328    #[derive(Clone)]
2329    struct SqliteSyncExecutor {
2330        inner: SqliteMutationExecutor,
2331    }
2332
2333    impl SqliteSyncExecutor {
2334        fn new(inner: SqliteMutationExecutor) -> Self {
2335            Self { inner }
2336        }
2337    }
2338
2339    impl QueryExecutor for SqliteSyncExecutor {
2340        type Error = MutationExecutorError;
2341
2342        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2343            let handle = Handle::current();
2344            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2345        }
2346
2347        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2348            let handle = Handle::current();
2349            block_in_place(|| handle.block_on(self.inner.execute(query)))
2350        }
2351
2352        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2353            let handle = Handle::current();
2354            block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2355            Ok(GraphTransactionBoundary::Started)
2356        }
2357
2358        fn commit_transaction(&self) -> Result<(), Self::Error> {
2359            let handle = Handle::current();
2360            block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2361        }
2362
2363        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2364            let handle = Handle::current();
2365            block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2366        }
2367    }
2368
2369    #[derive(Clone)]
2370    struct PgSyncExecutor {
2371        inner: PgMutationExecutor,
2372    }
2373
2374    impl PgSyncExecutor {
2375        fn new(inner: PgMutationExecutor) -> Self {
2376            Self { inner }
2377        }
2378    }
2379
2380    impl QueryExecutor for PgSyncExecutor {
2381        type Error = MutationExecutorError;
2382
2383        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2384            let handle = Handle::current();
2385            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2386        }
2387
2388        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2389            let handle = Handle::current();
2390            block_in_place(|| handle.block_on(self.inner.execute(query)))
2391        }
2392    }
2393
2394    #[derive(Clone)]
2395    struct PgTxSyncExecutor {
2396        inner: PgTransactionExecutor,
2397    }
2398
2399    impl PgTxSyncExecutor {
2400        fn new(inner: PgTransactionExecutor) -> Self {
2401            Self { inner }
2402        }
2403    }
2404
2405    impl QueryExecutor for PgTxSyncExecutor {
2406        type Error = MutationExecutorError;
2407
2408        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2409            let handle = Handle::current();
2410            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2411        }
2412
2413        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2414            let handle = Handle::current();
2415            block_in_place(|| handle.block_on(self.inner.execute(query)))
2416        }
2417
2418        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2419            Ok(GraphTransactionBoundary::AlreadyActive)
2420        }
2421
2422        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2423            let handle = Handle::current();
2424            block_in_place(|| handle.block_on(self.inner.rollback()))
2425        }
2426    }
2427
2428    #[tokio::test]
2429    async fn sqlite_executor_runs_crud_flow() {
2430        use sqlx::sqlite::SqlitePoolOptions;
2431
2432        let pool = SqlitePoolOptions::new()
2433            .max_connections(1)
2434            .connect("sqlite::memory:")
2435            .await
2436            .unwrap();
2437
2438        let executor = SqliteMutationExecutor::new(pool.clone());
2439        let dialect = SqliteDialect;
2440        let entity = entity();
2441        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2442
2443        let insert = dialect
2444            .compile_insert(
2445                &entity,
2446                &InsertCommand::new("Order")
2447                    .value("id", 1_u64)
2448                    .value("version", 1_i64)
2449                    .value("name", "first"),
2450            )
2451            .unwrap();
2452        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2453
2454        let select = dialect
2455            .compile_select(
2456                &entity,
2457                &SelectQuery::new("Order")
2458                    .project("id")
2459                    .project("version")
2460                    .project("name")
2461                    .filter(Expr::eq("id", 1_u64)),
2462            )
2463            .unwrap();
2464        let rows = executor.fetch_all(&select).await.unwrap();
2465        assert_eq!(rows.len(), 1);
2466        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2467        assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2468        assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2469
2470        let update = dialect
2471            .compile_update(
2472                &entity,
2473                &UpdateCommand::new("Order", 1_u64)
2474                    .expected_version(1)
2475                    .value("name", "second"),
2476            )
2477            .unwrap();
2478        assert_eq!(executor.execute(&update).await.unwrap(), 1);
2479
2480        let after_update = executor.fetch_all(&select).await.unwrap();
2481        assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2482        assert_eq!(
2483            after_update[0].get("name"),
2484            Some(&Value::Text("second".to_owned()))
2485        );
2486
2487        let delete = dialect
2488            .compile_delete(
2489                &entity,
2490                &DeleteCommand::new("Order", 1_u64).expected_version(2),
2491            )
2492            .unwrap();
2493        assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2494
2495        let after_delete = executor.fetch_all(&select).await.unwrap();
2496        assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2497
2498        let recover = dialect
2499            .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2500            .unwrap();
2501        assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2502
2503        let after_recover = executor.fetch_all(&select).await.unwrap();
2504        assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2505    }
2506
2507    #[tokio::test(flavor = "multi_thread")]
2508    async fn sqlite_executor_enhances_relations() {
2509        use sqlx::sqlite::SqlitePoolOptions;
2510
2511        let pool = SqlitePoolOptions::new()
2512            .max_connections(1)
2513            .connect("sqlite::memory:")
2514            .await
2515            .unwrap();
2516
2517        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2518        let order = entity();
2519        let line = line_entity();
2520        mutation_executor
2521            .ensure_schema(&SqliteDialect, &[&order, &line])
2522            .await
2523            .unwrap();
2524
2525        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2526            .execute(&pool)
2527            .await
2528            .unwrap();
2529        sqlx::query(
2530            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2531                (101, 1, 1001, 'l1'),
2532                (102, 1, 1002, 'l2'),
2533                (201, 2, 1003, 'l3')",
2534        )
2535        .execute(&pool)
2536        .await
2537        .unwrap();
2538
2539        let executor = SqliteSyncExecutor::new(mutation_executor);
2540        let mut ctx = UserContext::new()
2541            .with_metadata(
2542                InMemoryMetadataStore::new()
2543                    .with_entity(order)
2544                    .with_entity(line)
2545                    .with_entity(product_entity()),
2546            )
2547            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2548            .with_repository_behavior_registry(
2549                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2550            );
2551        ctx.insert_resource(SqliteDialect);
2552        ctx.insert_resource(executor);
2553
2554        let repo = ctx
2555            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2556            .unwrap();
2557        let mut parents = repo
2558            .fetch_all(
2559                &repo
2560                    .select()
2561                    .project("id")
2562                    .project("version")
2563                    .project("name")
2564                    .order_by(teaql_core::OrderBy::asc("id")),
2565            )
2566            .unwrap();
2567
2568        repo.enhance_relations(&mut parents).unwrap();
2569
2570        assert_eq!(parents.len(), 2);
2571        match parents[0].get("lines") {
2572            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2573            other => panic!("unexpected first lines payload: {other:?}"),
2574        }
2575        match parents[1].get("lines") {
2576            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2577            other => panic!("unexpected second lines payload: {other:?}"),
2578        }
2579    }
2580
2581    #[tokio::test(flavor = "multi_thread")]
2582    async fn sqlite_executor_enhances_nested_relations() {
2583        use sqlx::sqlite::SqlitePoolOptions;
2584
2585        let pool = SqlitePoolOptions::new()
2586            .max_connections(1)
2587            .connect("sqlite::memory:")
2588            .await
2589            .unwrap();
2590
2591        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2592        let order = entity();
2593        let line = line_entity();
2594        let product = product_entity();
2595        mutation_executor
2596            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2597            .await
2598            .unwrap();
2599
2600        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2601            .execute(&pool)
2602            .await
2603            .unwrap();
2604        sqlx::query(
2605            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2606                (101, 1, 1001, 'l1'),
2607                (102, 1, 1002, 'l2')",
2608        )
2609        .execute(&pool)
2610        .await
2611        .unwrap();
2612        sqlx::query(
2613            "INSERT INTO product (id, name) VALUES
2614                (1001, 'p1'),
2615                (1002, 'p2')",
2616        )
2617        .execute(&pool)
2618        .await
2619        .unwrap();
2620
2621        let executor = SqliteSyncExecutor::new(mutation_executor);
2622        let mut ctx = UserContext::new()
2623            .with_metadata(
2624                InMemoryMetadataStore::new()
2625                    .with_entity(order)
2626                    .with_entity(line)
2627                    .with_entity(product),
2628            )
2629            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2630            .with_repository_behavior_registry(
2631                InMemoryRepositoryBehaviorRegistry::new()
2632                    .with_behavior("Order", NestedOrderBehavior),
2633            );
2634        ctx.insert_resource(SqliteDialect);
2635        ctx.insert_resource(executor);
2636
2637        let repo = ctx
2638            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2639            .unwrap();
2640        let mut parents = repo
2641            .fetch_all(
2642                &repo
2643                    .select()
2644                    .project("id")
2645                    .project("version")
2646                    .project("name"),
2647            )
2648            .unwrap();
2649
2650        repo.enhance_relations(&mut parents).unwrap();
2651
2652        match parents[0].get("lines") {
2653            Some(Value::List(lines)) => {
2654                assert_eq!(lines.len(), 2);
2655                for line in lines {
2656                    match line {
2657                        Value::Object(line_record) => match line_record.get("product") {
2658                            Some(Value::Object(product)) => {
2659                                assert!(product.get("name").is_some());
2660                            }
2661                            other => panic!("unexpected product payload: {other:?}"),
2662                        },
2663                        other => panic!("unexpected line payload: {other:?}"),
2664                    }
2665                }
2666            }
2667            other => panic!("unexpected nested lines payload: {other:?}"),
2668        }
2669    }
2670
2671    #[tokio::test(flavor = "multi_thread")]
2672    async fn sqlite_executor_enhances_query_relation_with_child_query() {
2673        use sqlx::sqlite::SqlitePoolOptions;
2674
2675        let pool = SqlitePoolOptions::new()
2676            .max_connections(1)
2677            .connect("sqlite::memory:")
2678            .await
2679            .unwrap();
2680
2681        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2682        let order = entity();
2683        let line = line_entity();
2684        let product = product_entity();
2685        mutation_executor
2686            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2687            .await
2688            .unwrap();
2689
2690        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2691            .execute(&pool)
2692            .await
2693            .unwrap();
2694        sqlx::query(
2695            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2696                (101, 1, 1001, 'keep'),
2697                (102, 1, 1002, 'drop')",
2698        )
2699        .execute(&pool)
2700        .await
2701        .unwrap();
2702        sqlx::query(
2703            "INSERT INTO product (id, name) VALUES
2704                (1001, 'p1'),
2705                (1002, 'p2')",
2706        )
2707        .execute(&pool)
2708        .await
2709        .unwrap();
2710
2711        let executor = SqliteSyncExecutor::new(mutation_executor);
2712        let mut ctx = UserContext::new()
2713            .with_metadata(
2714                InMemoryMetadataStore::new()
2715                    .with_entity(order)
2716                    .with_entity(line)
2717                    .with_entity(product),
2718            )
2719            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2720        ctx.insert_resource(SqliteDialect);
2721        ctx.insert_resource(executor);
2722
2723        let repo = ctx
2724            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2725            .unwrap();
2726        let query = repo.select().relation_query(
2727            "lines",
2728            SelectQuery::new("OrderLine")
2729                .project("name")
2730                .filter(Expr::eq("name", "keep"))
2731                .order_desc("id")
2732                .page(0, 10)
2733                .relation_query("product", SelectQuery::new("Product").project("name")),
2734        );
2735        let mut parents = repo
2736            .fetch_all(
2737                &repo
2738                    .select()
2739                    .project("id")
2740                    .project("version")
2741                    .project("name"),
2742            )
2743            .unwrap();
2744
2745        repo.enhance_query_relations(&mut parents, &query).unwrap();
2746
2747        match parents[0].get("lines") {
2748            Some(Value::List(lines)) => {
2749                assert_eq!(lines.len(), 1);
2750                let Value::Object(line) = &lines[0] else {
2751                    panic!("unexpected line payload: {:?}", lines[0]);
2752                };
2753                assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2754                assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2755                assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2756                match line.get("product") {
2757                    Some(Value::Object(product)) => {
2758                        assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2759                        assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2760                    }
2761                    other => panic!("unexpected product payload: {other:?}"),
2762                }
2763            }
2764            other => panic!("unexpected query relation payload: {other:?}"),
2765        }
2766    }
2767
2768    #[tokio::test]
2769    async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2770        use sqlx::Row;
2771        use sqlx::sqlite::SqlitePoolOptions;
2772
2773        let pool = SqlitePoolOptions::new()
2774            .max_connections(1)
2775            .connect("sqlite::memory:")
2776            .await
2777            .unwrap();
2778
2779        sqlx::query(
2780            "CREATE TABLE orders (
2781                id INTEGER PRIMARY KEY
2782            )",
2783        )
2784        .execute(&pool)
2785        .await
2786        .unwrap();
2787
2788        let executor = SqliteMutationExecutor::new(pool.clone());
2789        let dialect = SqliteDialect;
2790        let entity = entity();
2791
2792        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2793
2794        let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2795            .fetch_all(&pool)
2796            .await
2797            .unwrap()
2798            .into_iter()
2799            .map(|row| row.try_get::<String, _>("name").unwrap())
2800            .collect::<Vec<_>>();
2801
2802        assert!(columns.contains(&"id".to_owned()));
2803        assert!(columns.contains(&"version".to_owned()));
2804        assert!(columns.contains(&"name".to_owned()));
2805    }
2806
2807    #[tokio::test]
2808    async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2809        use sqlx::Row;
2810        use sqlx::sqlite::SqlitePoolOptions;
2811
2812        let pool = SqlitePoolOptions::new()
2813            .max_connections(1)
2814            .connect("sqlite::memory:")
2815            .await
2816            .unwrap();
2817
2818        let module = super::RuntimeModule::new()
2819            .descriptor(entity())
2820            .descriptor(line_entity())
2821            .descriptor(product_entity())
2822            .initial_graph(
2823                GraphNode::new("Order")
2824                    .value("id", 1_u64)
2825                    .value("version", 1_i64)
2826                    .value("name", "seed-order"),
2827            );
2828        let mut ctx = super::UserContext::new().with_module(module);
2829        ctx.insert_resource(SqliteDialect);
2830        ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2831
2832        ctx.ensure_sqlite_schema().await.unwrap();
2833        sqlx::query("UPDATE orders SET name = 'stale-seed-order' WHERE id = 1")
2834            .execute(&pool)
2835            .await
2836            .unwrap();
2837        ctx.ensure_sqlite_schema().await.unwrap();
2838
2839        let tables = sqlx::query(
2840            "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2841        )
2842        .fetch_all(&pool)
2843        .await
2844        .unwrap()
2845        .into_iter()
2846        .map(|row| row.try_get::<String, _>("name").unwrap())
2847        .collect::<Vec<_>>();
2848
2849        assert_eq!(
2850            tables,
2851            vec![
2852                "orderline".to_owned(),
2853                "orders".to_owned(),
2854                "product".to_owned()
2855            ]
2856        );
2857
2858        let seed_count: i64 =
2859            sqlx::query_scalar("SELECT COUNT(1) FROM orders WHERE id = 1 AND name = 'seed-order'")
2860                .fetch_one(&pool)
2861                .await
2862                .unwrap();
2863        assert_eq!(seed_count, 1);
2864    }
2865
2866    #[tokio::test]
2867    async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2868        use sqlx::sqlite::SqlitePoolOptions;
2869
2870        let pool = SqlitePoolOptions::new()
2871            .max_connections(1)
2872            .connect("sqlite::memory:")
2873            .await
2874            .unwrap();
2875
2876        let executor = SqliteMutationExecutor::new(pool.clone());
2877        let dialect = SqliteDialect;
2878        let entity = typed_entity();
2879        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2880
2881        let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2882        let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2883        let payload = serde_json::json!({"name": "teaql", "count": 2});
2884        let amount = Decimal::new(12345, 2);
2885
2886        let insert = dialect
2887            .compile_insert(
2888                &entity,
2889                &InsertCommand::new("TypedValue")
2890                    .value("id", 1_u64)
2891                    .value("payload", payload.clone())
2892                    .value("amount", amount)
2893                    .value("birthday", birthday)
2894                    .value("happened_at", happened_at),
2895            )
2896            .unwrap();
2897        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2898
2899        let select = dialect
2900            .compile_select(
2901                &entity,
2902                &SelectQuery::new("TypedValue")
2903                    .project("id")
2904                    .project("payload")
2905                    .project("amount")
2906                    .project("birthday")
2907                    .project("happened_at")
2908                    .filter(Expr::eq("id", 1_u64)),
2909            )
2910            .unwrap();
2911        let rows = executor.fetch_all(&select).await.unwrap();
2912        assert_eq!(rows.len(), 1);
2913        assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2914        assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2915        assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2916        assert_eq!(
2917            rows[0].get("happened_at"),
2918            Some(&Value::Timestamp(happened_at))
2919        );
2920    }
2921
2922    #[tokio::test(flavor = "multi_thread")]
2923    async fn sqlite_fetches_enhanced_typed_entities() {
2924        use sqlx::sqlite::SqlitePoolOptions;
2925
2926        let pool = SqlitePoolOptions::new()
2927            .max_connections(1)
2928            .connect("sqlite::memory:")
2929            .await
2930            .unwrap();
2931
2932        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2933        let order = entity();
2934        let line = line_entity();
2935        let product = product_entity();
2936        mutation_executor
2937            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2938            .await
2939            .unwrap();
2940
2941        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2942            .execute(&pool)
2943            .await
2944            .unwrap();
2945        sqlx::query(
2946            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2947                (101, 1, 1001, 'l1'),
2948                (102, 1, 1002, 'l2')",
2949        )
2950        .execute(&pool)
2951        .await
2952        .unwrap();
2953        sqlx::query(
2954            "INSERT INTO product (id, name) VALUES
2955                (1001, 'p1'),
2956                (1002, 'p2')",
2957        )
2958        .execute(&pool)
2959        .await
2960        .unwrap();
2961
2962        let executor = SqliteSyncExecutor::new(mutation_executor);
2963        let mut ctx = UserContext::new()
2964            .with_metadata(
2965                InMemoryMetadataStore::new()
2966                    .with_entity(order)
2967                    .with_entity(line)
2968                    .with_entity(product),
2969            )
2970            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2971            .with_repository_behavior_registry(
2972                InMemoryRepositoryBehaviorRegistry::new()
2973                    .with_behavior("Order", NestedOrderBehavior),
2974            );
2975        ctx.insert_resource(SqliteDialect);
2976        ctx.insert_resource(executor);
2977
2978        let repo = ctx
2979            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2980            .unwrap();
2981        let rows = repo
2982            .fetch_enhanced_entities::<OrderAggregateRow>(
2983                &repo
2984                    .select()
2985                    .project("id")
2986                    .project("version")
2987                    .project("name"),
2988            )
2989            .unwrap();
2990
2991        assert_eq!(rows.len(), 1);
2992        let order = rows.first().unwrap();
2993        assert_eq!(order.id, 1);
2994        assert_eq!(order.lines.len(), 2);
2995        assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
2996        assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
2997    }
2998
2999    #[tokio::test(flavor = "multi_thread")]
3000    async fn sqlite_fetches_typed_smart_list_entities() {
3001        use sqlx::sqlite::SqlitePoolOptions;
3002
3003        let pool = SqlitePoolOptions::new()
3004            .max_connections(1)
3005            .connect("sqlite::memory:")
3006            .await
3007            .unwrap();
3008
3009        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3010        let order = entity();
3011        mutation_executor
3012            .ensure_schema(&SqliteDialect, &[&order])
3013            .await
3014            .unwrap();
3015
3016        sqlx::query(
3017            "INSERT INTO orders (id, version, name) VALUES
3018                (1, 1, 'o1'),
3019                (2, 3, 'o2')",
3020        )
3021        .execute(&pool)
3022        .await
3023        .unwrap();
3024
3025        let executor = SqliteSyncExecutor::new(mutation_executor);
3026        let mut ctx = UserContext::new()
3027            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3028            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3029        ctx.insert_resource(SqliteDialect);
3030        ctx.insert_resource(executor);
3031
3032        let repo = ctx
3033            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3034            .unwrap();
3035        let rows = repo
3036            .fetch_entities::<OrderAggregateRow>(
3037                &repo
3038                    .select()
3039                    .project("id")
3040                    .project("version")
3041                    .project("name"),
3042            )
3043            .unwrap();
3044
3045        assert_eq!(rows.len(), 2);
3046        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3047        assert_eq!(rows.versions(), vec![1, 3]);
3048        assert!(rows.data[0].lines.is_empty());
3049        assert!(rows.data[1].lines.is_empty());
3050    }
3051
3052    #[tokio::test(flavor = "multi_thread")]
3053    async fn sqlite_fetches_smart_list_of_order_entities() {
3054        use sqlx::sqlite::SqlitePoolOptions;
3055
3056        let pool = SqlitePoolOptions::new()
3057            .max_connections(1)
3058            .connect("sqlite::memory:")
3059            .await
3060            .unwrap();
3061
3062        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3063        let order = entity();
3064        mutation_executor
3065            .ensure_schema(&SqliteDialect, &[&order])
3066            .await
3067            .unwrap();
3068
3069        sqlx::query(
3070            "INSERT INTO orders (id, version, name) VALUES
3071                (1, 1, 'o1'),
3072                (2, 3, 'o2')",
3073        )
3074        .execute(&pool)
3075        .await
3076        .unwrap();
3077
3078        let executor = SqliteSyncExecutor::new(mutation_executor);
3079        let mut ctx = UserContext::new()
3080            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3081            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3082        ctx.insert_resource(SqliteDialect);
3083        ctx.insert_resource(executor);
3084
3085        let repo = ctx
3086            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3087            .unwrap();
3088        let rows = repo
3089            .fetch_entities::<Order>(
3090                &repo
3091                    .select()
3092                    .project("id")
3093                    .project("version")
3094                    .project("name"),
3095            )
3096            .unwrap();
3097
3098        assert_eq!(
3099            rows.data,
3100            vec![
3101                Order {
3102                    id: 1,
3103                    version: 1,
3104                    name: "o1".to_owned(),
3105                },
3106                Order {
3107                    id: 2,
3108                    version: 3,
3109                    name: "o2".to_owned(),
3110                }
3111            ]
3112        );
3113        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3114        assert_eq!(rows.versions(), vec![1, 3]);
3115    }
3116
3117    #[tokio::test(flavor = "multi_thread")]
3118    async fn sqlite_insert_generates_missing_id() {
3119        use sqlx::sqlite::SqlitePoolOptions;
3120
3121        let pool = SqlitePoolOptions::new()
3122            .max_connections(1)
3123            .connect("sqlite::memory:")
3124            .await
3125            .unwrap();
3126
3127        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3128        let order = entity();
3129        mutation_executor
3130            .ensure_schema(&SqliteDialect, &[&order])
3131            .await
3132            .unwrap();
3133
3134        let executor = SqliteSyncExecutor::new(mutation_executor);
3135        let mut ctx = UserContext::new()
3136            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3137            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3138        ctx.insert_resource(SqliteDialect);
3139        ctx.insert_resource(executor);
3140
3141        let repo = ctx
3142            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3143            .unwrap();
3144        let affected = repo
3145            .insert(
3146                &repo
3147                    .insert_command()
3148                    .value("version", 1_i64)
3149                    .value("name", "generated"),
3150            )
3151            .unwrap();
3152        assert_eq!(affected, 1);
3153
3154        let rows = repo
3155            .fetch_entities::<Order>(
3156                &repo
3157                    .select()
3158                    .project("id")
3159                    .project("version")
3160                    .project("name")
3161                    .filter(Expr::eq("name", "generated")),
3162            )
3163            .unwrap();
3164        assert_eq!(rows.len(), 1);
3165        let order = rows.first().unwrap();
3166        assert!(order.id > 0);
3167        assert_eq!(order.version, 1);
3168        assert_eq!(order.name, "generated");
3169    }
3170
3171    #[tokio::test(flavor = "multi_thread")]
3172    async fn sqlite_save_graph_inserts_nested_rows() {
3173        use sqlx::{Row, sqlite::SqlitePoolOptions};
3174
3175        let pool = SqlitePoolOptions::new()
3176            .max_connections(1)
3177            .connect("sqlite::memory:?cache=shared")
3178            .await
3179            .unwrap();
3180
3181        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3182        let order = entity();
3183        let line = line_entity();
3184        let product = product_entity();
3185        mutation_executor
3186            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3187            .await
3188            .unwrap();
3189
3190        let executor = SqliteSyncExecutor::new(mutation_executor);
3191        let mut ctx = UserContext::new()
3192            .with_metadata(
3193                InMemoryMetadataStore::new()
3194                    .with_entity(order)
3195                    .with_entity(line)
3196                    .with_entity(product),
3197            )
3198            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3199        ctx.insert_resource(SqliteDialect);
3200        ctx.insert_resource(executor);
3201
3202        let repo = ctx
3203            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3204            .unwrap();
3205        let graph = GraphNode::new("Order")
3206            .value("id", 1_u64)
3207            .value("version", 1_i64)
3208            .value("name", "root")
3209            .relation(
3210                "lines",
3211                GraphNode::new("OrderLine")
3212                    .value("id", 10_u64)
3213                    .value("name", "line-1")
3214                    .relation(
3215                        "product",
3216                        GraphNode::new("Product")
3217                            .value("id", 100_u64)
3218                            .value("name", "sku-1"),
3219                    ),
3220            );
3221
3222        let saved = repo.save_graph(graph).unwrap();
3223        assert_eq!(
3224            saved.relations["lines"][0].values.get("order_id"),
3225            Some(&Value::U64(1))
3226        );
3227        assert_eq!(
3228            saved.relations["lines"][0].values.get("product_id"),
3229            Some(&Value::U64(100))
3230        );
3231
3232        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3233            .fetch_one(&pool)
3234            .await
3235            .unwrap()
3236            .try_get("count")
3237            .unwrap();
3238        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3239            .fetch_one(&pool)
3240            .await
3241            .unwrap();
3242        let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3243            .fetch_one(&pool)
3244            .await
3245            .unwrap()
3246            .try_get("count")
3247            .unwrap();
3248
3249        assert_eq!(order_count, 1);
3250        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3251        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3252        assert_eq!(product_count, 1);
3253    }
3254
3255    #[tokio::test(flavor = "multi_thread")]
3256    async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3257        use sqlx::{Row, sqlite::SqlitePoolOptions};
3258
3259        let pool = SqlitePoolOptions::new()
3260            .max_connections(1)
3261            .connect("sqlite::memory:")
3262            .await
3263            .unwrap();
3264
3265        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3266        let order = entity();
3267        let line = line_entity();
3268        let product = product_entity();
3269        mutation_executor
3270            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3271            .await
3272            .unwrap();
3273
3274        let executor = SqliteSyncExecutor::new(mutation_executor);
3275        let mut ctx = UserContext::new()
3276            .with_metadata(
3277                InMemoryMetadataStore::new()
3278                    .with_entity(order)
3279                    .with_entity(line)
3280                    .with_entity(product),
3281            )
3282            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3283        ctx.insert_resource(SqliteDialect);
3284        ctx.insert_resource(executor);
3285
3286        let repo = ctx
3287            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3288            .unwrap();
3289        let saved = repo
3290            .save_entity_graph(OrderAggregateRow {
3291                id: 2,
3292                version: 1,
3293                name: "typed-root".to_owned(),
3294                lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3295                    id: 20,
3296                    order_id: 0,
3297                    name: "typed-line".to_owned(),
3298                    product_id: 200,
3299                    product: Some(ProductEntityRow {
3300                        id: 200,
3301                        name: "typed-sku".to_owned(),
3302                    }),
3303                }]),
3304            })
3305            .unwrap();
3306
3307        assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3308        assert_eq!(
3309            saved.relations["lines"][0].values.get("order_id"),
3310            Some(&Value::U64(2))
3311        );
3312        assert_eq!(
3313            saved.relations["lines"][0].values.get("product_id"),
3314            Some(&Value::U64(200))
3315        );
3316
3317        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3318            .fetch_one(&pool)
3319            .await
3320            .unwrap()
3321            .try_get("count")
3322            .unwrap();
3323        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3324            .fetch_one(&pool)
3325            .await
3326            .unwrap();
3327        let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3328            .fetch_one(&pool)
3329            .await
3330            .unwrap();
3331
3332        assert_eq!(order_count, 1);
3333        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3334        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3335        assert_eq!(product_name, "typed-sku");
3336    }
3337
3338    #[tokio::test(flavor = "multi_thread")]
3339    async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3340        use sqlx::sqlite::SqlitePoolOptions;
3341        use std::time::{SystemTime, UNIX_EPOCH};
3342
3343        let db_path = std::env::temp_dir().join(format!(
3344            "teaql-plan-{}-{}.db",
3345            std::process::id(),
3346            SystemTime::now()
3347                .duration_since(UNIX_EPOCH)
3348                .unwrap()
3349                .as_nanos()
3350        ));
3351        let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3352        let pool = SqlitePoolOptions::new()
3353            .max_connections(1)
3354            .connect(&db_url)
3355            .await
3356            .unwrap();
3357
3358        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3359        let order = entity();
3360        let line = line_entity();
3361        let product = product_entity();
3362        mutation_executor
3363            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3364            .await
3365            .unwrap();
3366
3367        sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3368            .execute(&pool)
3369            .await
3370            .unwrap();
3371        sqlx::query(
3372            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3373                (200, 1, 100, 301, 'line-a'),
3374                (201, 1, 100, 302, 'line-b')",
3375        )
3376        .execute(&pool)
3377        .await
3378        .unwrap();
3379        let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3380            .fetch_one(&pool)
3381            .await
3382            .unwrap();
3383        assert_eq!(seeded_lines, 2);
3384
3385        let executor = SqliteSyncExecutor::new(mutation_executor);
3386        let mut ctx = UserContext::new()
3387            .with_metadata(
3388                InMemoryMetadataStore::new()
3389                    .with_entity(order)
3390                    .with_entity(line)
3391                    .with_entity(product),
3392            )
3393            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3394            .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3395        ctx.insert_resource(SqliteDialect);
3396        ctx.insert_resource(executor);
3397
3398        let graph = GraphNode::new("Order")
3399            .value("id", 100_u64)
3400            .value("version", 1_i64)
3401            .value("name", "existing-updated")
3402            .relation(
3403                "lines",
3404                GraphNode::new("OrderLine")
3405                    .value("name", "new-line-a")
3406                    .value("product_id", 401_u64),
3407            )
3408            .relation(
3409                "lines",
3410                GraphNode::new("OrderLine")
3411                    .value("name", "new-line-b")
3412                    .value("product_id", 402_u64),
3413            )
3414            .relation(
3415                "lines",
3416                GraphNode::new("OrderLine")
3417                    .value("id", 200_u64)
3418                    .value("version", 1_i64)
3419                    .value("name", "line-a-updated"),
3420            )
3421            .relation(
3422                "lines",
3423                GraphNode::new("OrderLine")
3424                    .value("id", 201_u64)
3425                    .value("version", 1_i64)
3426                    .value("name", "line-b-updated"),
3427            );
3428
3429        let plan = ctx
3430            .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3431            .unwrap();
3432        let counts = plan.grouped_counts();
3433        assert_eq!(
3434            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3435            Some(&1)
3436        );
3437        assert_eq!(
3438            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3439            Some(&2)
3440        );
3441        assert_eq!(
3442            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3443            Some(&2)
3444        );
3445
3446        let create_batch = plan
3447            .batches
3448            .iter()
3449            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3450            .unwrap();
3451        assert_eq!(create_batch.items.len(), 2);
3452        assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3453        assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3454
3455        let update_batch = plan
3456            .batches
3457            .iter()
3458            .find(|batch| {
3459                batch.entity == "OrderLine"
3460                    && batch.kind == GraphMutationKind::Update
3461                    && batch.update_fields == vec!["name".to_owned()]
3462            })
3463            .unwrap();
3464        assert_eq!(update_batch.items.len(), 2);
3465
3466        let repo = ctx
3467            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3468            .unwrap();
3469        let saved = repo.execute_graph_plan(plan).unwrap();
3470        let lines = saved.relations.get("lines").unwrap();
3471        assert_eq!(lines.len(), 4);
3472
3473        let generated_count: i64 = sqlx::query_scalar(
3474            "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3475        )
3476        .fetch_one(&pool)
3477        .await
3478        .unwrap();
3479        assert_eq!(generated_count, 2);
3480        let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3481            .fetch_one(&pool)
3482            .await
3483            .unwrap();
3484        assert_eq!(updated_name, "line-a-updated");
3485        pool.close().await;
3486        let _ = std::fs::remove_file(db_path);
3487    }
3488
3489    #[tokio::test(flavor = "multi_thread")]
3490    async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3491        use sqlx::{Row, sqlite::SqlitePoolOptions};
3492
3493        let pool = SqlitePoolOptions::new()
3494            .max_connections(1)
3495            .connect("sqlite::memory:")
3496            .await
3497            .unwrap();
3498
3499        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3500        let order = entity();
3501        let line = line_entity();
3502        let product = product_entity();
3503        mutation_executor
3504            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3505            .await
3506            .unwrap();
3507
3508        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3509            .execute(&pool)
3510            .await
3511            .unwrap();
3512        sqlx::query(
3513            "INSERT INTO product (id, name) VALUES
3514                (100, 'old-sku'),
3515                (101, 'removed-sku')",
3516        )
3517        .execute(&pool)
3518        .await
3519        .unwrap();
3520        sqlx::query(
3521            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3522                (10, 1, 100, 'old-line'),
3523                (11, 1, 101, 'removed-line')",
3524        )
3525        .execute(&pool)
3526        .await
3527        .unwrap();
3528        sqlx::query("UPDATE orderline SET version = 1")
3529            .execute(&pool)
3530            .await
3531            .unwrap();
3532
3533        let executor = SqliteSyncExecutor::new(mutation_executor);
3534        let mut ctx = UserContext::new()
3535            .with_metadata(
3536                InMemoryMetadataStore::new()
3537                    .with_entity(order)
3538                    .with_entity(line)
3539                    .with_entity(product),
3540            )
3541            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3542        ctx.insert_resource(SqliteDialect);
3543        ctx.insert_resource(executor);
3544
3545        let repo = ctx
3546            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3547            .unwrap();
3548        let graph = GraphNode::new("Order")
3549            .value("id", 1_u64)
3550            .value("version", 1_i64)
3551            .value("name", "new-root")
3552            .relation(
3553                "lines",
3554                GraphNode::new("OrderLine")
3555                    .value("id", 10_u64)
3556                    .value("version", 1_i64)
3557                    .value("name", "new-line")
3558                    .relation(
3559                        "product",
3560                        GraphNode::new("Product")
3561                            .value("id", 100_u64)
3562                            .value("name", "new-sku"),
3563                    ),
3564            )
3565            .relation(
3566                "lines",
3567                GraphNode::new("OrderLine")
3568                    .value("id", 12_u64)
3569                    .value("version", 1_i64)
3570                    .value("name", "added-line")
3571                    .relation(
3572                        "product",
3573                        GraphNode::new("Product")
3574                            .value("id", 102_u64)
3575                            .value("name", "added-sku"),
3576                    ),
3577            );
3578
3579        let saved = repo.save_graph(graph).unwrap();
3580        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3581
3582        let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3583            .fetch_one(&pool)
3584            .await
3585            .unwrap();
3586        assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3587        assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3588
3589        let updated_line =
3590            sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3591                .fetch_one(&pool)
3592                .await
3593                .unwrap();
3594        assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3595        assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3596        assert_eq!(
3597            updated_line.try_get::<String, _>("name").unwrap(),
3598            "new-line"
3599        );
3600
3601        let added_line =
3602            sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3603                .fetch_one(&pool)
3604                .await
3605                .unwrap();
3606        assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3607        assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3608        assert_eq!(
3609            added_line.try_get::<String, _>("name").unwrap(),
3610            "added-line"
3611        );
3612
3613        let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3614            .fetch_one(&pool)
3615            .await
3616            .unwrap();
3617        assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3618
3619        let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3620            .fetch_one(&pool)
3621            .await
3622            .unwrap();
3623        assert_eq!(
3624            updated_product.try_get::<String, _>("name").unwrap(),
3625            "new-sku"
3626        );
3627    }
3628
3629    #[tokio::test(flavor = "multi_thread")]
3630    async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3631        use sqlx::{Row, sqlite::SqlitePoolOptions};
3632
3633        let pool = SqlitePoolOptions::new()
3634            .max_connections(1)
3635            .connect("sqlite::memory:")
3636            .await
3637            .unwrap();
3638
3639        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3640        let order = sqlite_entity_keep_missing();
3641        let line = line_entity();
3642        let product = product_entity();
3643        mutation_executor
3644            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3645            .await
3646            .unwrap();
3647
3648        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3649            .execute(&pool)
3650            .await
3651            .unwrap();
3652        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3653            .execute(&pool)
3654            .await
3655            .unwrap();
3656        sqlx::query(
3657            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3658                (10, 1, 1, 100, 'remove-me'),
3659                (11, 1, 1, 100, 'keep-me')",
3660        )
3661        .execute(&pool)
3662        .await
3663        .unwrap();
3664
3665        let executor = SqliteSyncExecutor::new(mutation_executor);
3666        let mut ctx = UserContext::new()
3667            .with_metadata(
3668                InMemoryMetadataStore::new()
3669                    .with_entity(order)
3670                    .with_entity(line)
3671                    .with_entity(product),
3672            )
3673            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3674        ctx.insert_resource(SqliteDialect);
3675        ctx.insert_resource(executor);
3676
3677        let repo = ctx
3678            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3679            .unwrap();
3680        let graph = GraphNode::new("Order")
3681            .value("id", 1_u64)
3682            .value("version", 1_i64)
3683            .value("name", "root-updated")
3684            .relation(
3685                "lines",
3686                GraphNode::new("OrderLine")
3687                    .value("id", 10_u64)
3688                    .value("version", 1_i64)
3689                    .remove(),
3690            )
3691            .relation(
3692                "lines",
3693                GraphNode::new("OrderLine")
3694                    .value("id", 12_u64)
3695                    .value("version", 1_i64)
3696                    .value("name", "new-reference-line")
3697                    .relation(
3698                        "product",
3699                        GraphNode::new("Product").value("id", 100_u64).reference(),
3700                    ),
3701            );
3702
3703        let saved = repo.save_graph(graph).unwrap();
3704        assert_eq!(
3705            saved.relations["lines"][1].relations["product"][0]
3706                .values
3707                .get("name"),
3708            Some(&Value::Text("reference-only".to_owned()))
3709        );
3710
3711        let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3712            .fetch_one(&pool)
3713            .await
3714            .unwrap();
3715        assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3716
3717        let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3718            .fetch_one(&pool)
3719            .await
3720            .unwrap();
3721        assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3722        assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3723
3724        let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3725            .fetch_one(&pool)
3726            .await
3727            .unwrap();
3728        assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3729
3730        let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3731            .fetch_one(&pool)
3732            .await
3733            .unwrap();
3734        assert_eq!(
3735            product.try_get::<String, _>("name").unwrap(),
3736            "reference-only"
3737        );
3738    }
3739
3740    #[tokio::test(flavor = "multi_thread")]
3741    async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3742        use sqlx::sqlite::SqlitePoolOptions;
3743
3744        let pool = SqlitePoolOptions::new()
3745            .max_connections(1)
3746            .connect("sqlite::memory:")
3747            .await
3748            .unwrap();
3749
3750        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3751        let order = entity();
3752        let line = line_entity();
3753        let product = product_entity();
3754        mutation_executor
3755            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3756            .await
3757            .unwrap();
3758
3759        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3760            .execute(&pool)
3761            .await
3762            .unwrap();
3763        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3764            .execute(&pool)
3765            .await
3766            .unwrap();
3767        sqlx::query(
3768            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3769                (10, 1, 1, 100, 'line-10')",
3770        )
3771        .execute(&pool)
3772        .await
3773        .unwrap();
3774
3775        let executor = SqliteSyncExecutor::new(mutation_executor);
3776        let mut ctx = UserContext::new()
3777            .with_metadata(
3778                InMemoryMetadataStore::new()
3779                    .with_entity(order)
3780                    .with_entity(line)
3781                    .with_entity(product),
3782            )
3783            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3784        ctx.insert_resource(SqliteDialect);
3785        ctx.insert_resource(executor);
3786
3787        let repo = ctx
3788            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3789            .unwrap();
3790
3791        let missing_reference = repo.save_graph(
3792            GraphNode::new("Order").value("id", 1_u64).relation(
3793                "lines",
3794                GraphNode::new("OrderLine")
3795                    .value("id", 12_u64)
3796                    .value("version", 1_i64)
3797                    .value("name", "bad-reference")
3798                    .relation(
3799                        "product",
3800                        GraphNode::new("Product").value("id", 999_u64).reference(),
3801                    ),
3802            ),
3803        );
3804        assert!(format!("{missing_reference:?}").contains("does not exist"));
3805
3806        let mutable_reference = repo.save_graph(
3807            GraphNode::new("Order").value("id", 1_u64).relation(
3808                "lines",
3809                GraphNode::new("OrderLine")
3810                    .value("id", 12_u64)
3811                    .value("version", 1_i64)
3812                    .value("name", "mutable-reference")
3813                    .relation(
3814                        "product",
3815                        GraphNode::new("Product")
3816                            .value("id", 100_u64)
3817                            .value("name", "should-not-mutate")
3818                            .reference(),
3819                    ),
3820            ),
3821        );
3822        assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3823
3824        let duplicate_child = repo.save_graph(
3825            GraphNode::new("Order")
3826                .value("id", 1_u64)
3827                .relation(
3828                    "lines",
3829                    GraphNode::new("OrderLine")
3830                        .value("id", 10_u64)
3831                        .value("version", 1_i64)
3832                        .reference(),
3833                )
3834                .relation(
3835                    "lines",
3836                    GraphNode::new("OrderLine")
3837                        .value("id", 10_u64)
3838                        .value("version", 1_i64)
3839                        .reference(),
3840                ),
3841        );
3842        assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3843
3844        let reference_version_conflict = repo.save_graph(
3845            GraphNode::new("Order").value("id", 1_u64).relation(
3846                "lines",
3847                GraphNode::new("OrderLine")
3848                    .value("id", 10_u64)
3849                    .value("version", 2_i64)
3850                    .reference(),
3851            ),
3852        );
3853        assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3854
3855        let remove_with_child = repo.save_graph(
3856            GraphNode::new("Order").value("id", 1_u64).relation(
3857                "lines",
3858                GraphNode::new("OrderLine")
3859                    .value("id", 10_u64)
3860                    .value("version", 1_i64)
3861                    .relation(
3862                        "product",
3863                        GraphNode::new("Product").value("id", 100_u64).reference(),
3864                    )
3865                    .remove(),
3866            ),
3867        );
3868        assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3869
3870        let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3871        assert!(format!("{remove:?}").contains("does not exist"));
3872    }
3873
3874    #[tokio::test(flavor = "multi_thread")]
3875    async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3876        use sqlx::{Row, sqlite::SqlitePoolOptions};
3877
3878        let pool = SqlitePoolOptions::new()
3879            .max_connections(1)
3880            .connect("sqlite::memory:")
3881            .await
3882            .unwrap();
3883
3884        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3885        let order = entity();
3886        let line = line_entity();
3887        let product = product_entity();
3888        mutation_executor
3889            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3890            .await
3891            .unwrap();
3892
3893        let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3894        let mut ctx = UserContext::new()
3895            .with_metadata(
3896                InMemoryMetadataStore::new()
3897                    .with_entity(order)
3898                    .with_entity(line)
3899                    .with_entity(product),
3900            )
3901            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3902        ctx.insert_resource(SqliteDialect);
3903        ctx.insert_resource(executor);
3904
3905        let repo = ctx
3906            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3907            .unwrap();
3908        let error = repo.save_graph(
3909            GraphNode::new("Order")
3910                .value("id", 1_u64)
3911                .value("version", 1_i64)
3912                .value("name", "rollback-root")
3913                .relation(
3914                    "lines",
3915                    GraphNode::new("OrderLine")
3916                        .value("id", 10_u64)
3917                        .value("version", 1_i64)
3918                        .value("name", "rollback-line")
3919                        .value("product_id", 999_u64)
3920                        .relation(
3921                            "product",
3922                            GraphNode::new("Product").value("id", 999_u64).reference(),
3923                        ),
3924                ),
3925        );
3926        assert!(format!("{error:?}").contains("does not exist"));
3927
3928        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3929            .fetch_one(&pool)
3930            .await
3931            .unwrap()
3932            .try_get("count")
3933            .unwrap();
3934        assert_eq!(count, 0);
3935    }
3936
3937    #[tokio::test]
3938    async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3939        use sqlx::{PgPool, Row};
3940
3941        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
3942            return;
3943        };
3944
3945        let pool = PgPool::connect(&database_url).await.unwrap();
3946        sqlx::query("DROP TABLE IF EXISTS orderline")
3947            .execute(&pool)
3948            .await
3949            .unwrap();
3950        sqlx::query("DROP TABLE IF EXISTS orders")
3951            .execute(&pool)
3952            .await
3953            .unwrap();
3954
3955        let executor = PgMutationExecutor::new(pool.clone());
3956        let dialect = PostgresDialect;
3957        let order = entity();
3958        let line = line_entity();
3959
3960        executor
3961            .ensure_schema(&dialect, &[&order, &line])
3962            .await
3963            .unwrap();
3964
3965        let tables = sqlx::query(
3966            "SELECT table_name
3967             FROM information_schema.tables
3968             WHERE table_schema = current_schema()
3969               AND table_name IN ('orders', 'orderline')
3970             ORDER BY table_name",
3971        )
3972        .fetch_all(&pool)
3973        .await
3974        .unwrap()
3975        .into_iter()
3976        .map(|row| row.try_get::<String, _>("table_name").unwrap())
3977        .collect::<Vec<_>>();
3978        assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
3979
3980        let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
3981            .fetch_one(&pool)
3982            .await
3983            .unwrap();
3984        assert_eq!(soundex, "R163");
3985
3986        sqlx::query(
3987            "INSERT INTO orders (id, version, name) VALUES
3988                (1, 1, 'draft'),
3989                (2, 1, 'submitted'),
3990                (3, 1, 'archived')",
3991        )
3992        .execute(&pool)
3993        .await
3994        .unwrap();
3995
3996        let array_bound_query = dialect
3997            .compile_select(
3998                &order,
3999                &SelectQuery::new("Order")
4000                    .filter(
4001                        Expr::in_large(
4002                            "id",
4003                            vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
4004                        )
4005                        .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
4006                    )
4007                    .order_asc("id"),
4008            )
4009            .unwrap();
4010        assert_eq!(
4011            array_bound_query.sql,
4012            format!(
4013                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
4014            )
4015        );
4016        let rows = executor.fetch_all(&array_bound_query).await.unwrap();
4017        assert_eq!(rows.len(), 2);
4018        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4019        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4020
4021        sqlx::query(
4022            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
4023                (11, 1, 1, 101, 'line-1'),
4024                (12, 1, 2, 102, 'line-2'),
4025                (13, 1, 3, 103, 'archived-line')",
4026        )
4027        .execute(&pool)
4028        .await
4029        .unwrap();
4030
4031        let subquery = dialect
4032            .compile_select(
4033                &order,
4034                &SelectQuery::new("Order")
4035                    .filter(Expr::in_subquery(
4036                        "id",
4037                        line.clone(),
4038                        SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4039                        "order_id",
4040                    ))
4041                    .order_asc("id"),
4042            )
4043            .unwrap();
4044        assert_eq!(
4045            subquery.sql,
4046            format!(
4047                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4048            )
4049        );
4050        let rows = executor.fetch_all(&subquery).await.unwrap();
4051        assert_eq!(rows.len(), 2);
4052        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4053        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4054
4055        let projected = dialect
4056            .compile_select(
4057                &order,
4058                &SelectQuery::new("Order")
4059                    .project("id")
4060                    .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4061                    .order_gbk_asc("name")
4062                    .limit(1),
4063            )
4064            .unwrap();
4065        assert_eq!(
4066            projected.sql,
4067            "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4068        );
4069        let rows = executor.fetch_all(&projected).await.unwrap();
4070        assert_eq!(rows.len(), 1);
4071        assert!(rows[0].contains_key("nameSound"));
4072
4073        let aggregate = dialect
4074            .compile_select(
4075                &order,
4076                &SelectQuery::new("Order")
4077                    .count("total")
4078                    .stddev("version", "stddevVersion")
4079                    .var_pop("version", "varPopVersion")
4080                    .bit_or("version", "bitOrVersion")
4081                    .having(Expr::binary(
4082                        Expr::count_all(),
4083                        teaql_core::BinaryOp::Gt,
4084                        Expr::value(2_i64),
4085                    )),
4086            )
4087            .unwrap();
4088        assert_eq!(
4089            aggregate.sql,
4090            "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4091        );
4092        let rows = executor.fetch_all(&aggregate).await.unwrap();
4093        assert_eq!(rows.len(), 1);
4094        assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4095        assert!(matches!(
4096            rows[0].get("stddevVersion"),
4097            Some(Value::Decimal(_))
4098        ));
4099        assert!(matches!(
4100            rows[0].get("varPopVersion"),
4101            Some(Value::Decimal(_))
4102        ));
4103        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4104
4105        sqlx::query("DROP TABLE orderline")
4106            .execute(&pool)
4107            .await
4108            .unwrap();
4109        sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4110            .execute(&pool)
4111            .await
4112            .unwrap();
4113
4114        executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4115
4116        let columns = sqlx::query(
4117            "SELECT column_name
4118             FROM information_schema.columns
4119             WHERE table_schema = current_schema()
4120               AND table_name = 'orderline'
4121             ORDER BY column_name",
4122        )
4123        .fetch_all(&pool)
4124        .await
4125        .unwrap()
4126        .into_iter()
4127        .map(|row| row.try_get::<String, _>("column_name").unwrap())
4128        .collect::<Vec<_>>();
4129        assert!(columns.contains(&"id".to_owned()));
4130        assert!(columns.contains(&"order_id".to_owned()));
4131        assert!(columns.contains(&"product_id".to_owned()));
4132        assert!(columns.contains(&"name".to_owned()));
4133
4134        sqlx::query("DROP TABLE IF EXISTS orderline")
4135            .execute(&pool)
4136            .await
4137            .unwrap();
4138        sqlx::query("DROP TABLE IF EXISTS orders")
4139            .execute(&pool)
4140            .await
4141            .unwrap();
4142    }
4143
4144    #[tokio::test(flavor = "multi_thread")]
4145    async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4146        use sqlx::{PgPool, Row};
4147
4148        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4149            return;
4150        };
4151
4152        let pool = PgPool::connect(&database_url).await.unwrap();
4153        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4154            .execute(&pool)
4155            .await
4156            .unwrap();
4157        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4158            .execute(&pool)
4159            .await
4160            .unwrap();
4161
4162        let order = entity().table_name("orders_idgen");
4163        let executor = PgMutationExecutor::new(pool.clone());
4164        executor
4165            .ensure_schema(&PostgresDialect, &[&order])
4166            .await
4167            .unwrap();
4168
4169        let mut ctx = UserContext::new()
4170            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4171            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4172            .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4173        ctx.insert_resource(PostgresDialect);
4174        ctx.insert_resource(PgSyncExecutor::new(executor));
4175
4176        let repo = ctx
4177            .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4178            .unwrap();
4179        repo.insert(
4180            &repo
4181                .insert_command()
4182                .value("version", 1_i64)
4183                .value("name", "generated-1"),
4184        )
4185        .unwrap();
4186        repo.insert(
4187            &repo
4188                .insert_command()
4189                .value("version", 1_i64)
4190                .value("name", "generated-2"),
4191        )
4192        .unwrap();
4193
4194        let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4195            .fetch_all(&pool)
4196            .await
4197            .unwrap()
4198            .into_iter()
4199            .map(|row| row.try_get::<i64, _>("id").unwrap())
4200            .collect::<Vec<_>>();
4201        assert_eq!(ids, vec![1, 2]);
4202
4203        let current: i64 = sqlx::query_scalar(
4204            "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4205        )
4206        .fetch_one(&pool)
4207        .await
4208        .unwrap();
4209        assert_eq!(current, 2);
4210
4211        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4212            .execute(&pool)
4213            .await
4214            .unwrap();
4215        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4216            .execute(&pool)
4217            .await
4218            .unwrap();
4219    }
4220
4221    #[tokio::test]
4222    async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4223        use sqlx::PgPool;
4224
4225        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4226            return;
4227        };
4228
4229        let pool = PgPool::connect(&database_url).await.unwrap();
4230        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4231            .execute(&pool)
4232            .await
4233            .unwrap();
4234
4235        let order = entity().table_name("orders_agg");
4236        let executor = PgMutationExecutor::new(pool.clone());
4237        let dialect = PostgresDialect;
4238        executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4239
4240        sqlx::query(
4241            "INSERT INTO orders_agg (id, version, name) VALUES
4242                (1, 1, 'A'),
4243                (2, 2, 'A'),
4244                (3, 3, 'B'),
4245                (4, 4, 'B')",
4246        )
4247        .execute(&pool)
4248        .await
4249        .unwrap();
4250
4251        let aggregate = dialect
4252            .compile_select(
4253                &order,
4254                &SelectQuery::new("Order")
4255                    .count("total")
4256                    .count_field("version", "versionCount")
4257                    .sum("version", "sumVersion")
4258                    .avg("version", "avgVersion")
4259                    .min("version", "minVersion")
4260                    .max("version", "maxVersion")
4261                    .stddev("version", "stddevVersion")
4262                    .stddev_pop("version", "stddevPopVersion")
4263                    .var_samp("version", "varSampVersion")
4264                    .var_pop("version", "varPopVersion")
4265                    .bit_and("version", "bitAndVersion")
4266                    .bit_or("version", "bitOrVersion")
4267                    .bit_xor("version", "bitXorVersion")
4268                    .having(Expr::binary(
4269                        Expr::count_all(),
4270                        teaql_core::BinaryOp::Gt,
4271                        Expr::value(3_i64),
4272                    )),
4273            )
4274            .unwrap();
4275        let rows = executor.fetch_all(&aggregate).await.unwrap();
4276        assert_eq!(rows.len(), 1);
4277        let row = &rows[0];
4278        assert_eq!(row.get("total"), Some(&Value::I64(4)));
4279        assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4280        assert_eq!(
4281            row.get("sumVersion"),
4282            Some(&Value::Decimal(Decimal::new(10, 0)))
4283        );
4284        assert_eq!(
4285            row.get("avgVersion"),
4286            Some(&Value::Decimal(Decimal::new(25, 1)))
4287        );
4288        assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4289        assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4290        assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4291        assert!(matches!(
4292            row.get("stddevPopVersion"),
4293            Some(Value::Decimal(_))
4294        ));
4295        assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4296        assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4297        assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4298        assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4299        assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4300
4301        let grouped = dialect
4302            .compile_select(
4303                &order,
4304                &SelectQuery::new("Order")
4305                    .group_by("name")
4306                    .count("total")
4307                    .sum("version", "sumVersion")
4308                    .having(Expr::binary(
4309                        Expr::count_all(),
4310                        teaql_core::BinaryOp::Gt,
4311                        Expr::value(1_i64),
4312                    ))
4313                    .order_asc("name"),
4314            )
4315            .unwrap();
4316        let rows = executor.fetch_all(&grouped).await.unwrap();
4317        assert_eq!(rows.len(), 2);
4318        assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4319        assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4320        assert_eq!(
4321            rows[0].get("sumVersion"),
4322            Some(&Value::Decimal(Decimal::new(3, 0)))
4323        );
4324        assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4325        assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4326        assert_eq!(
4327            rows[1].get("sumVersion"),
4328            Some(&Value::Decimal(Decimal::new(7, 0)))
4329        );
4330
4331        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4332            .execute(&pool)
4333            .await
4334            .unwrap();
4335    }
4336
4337    #[tokio::test]
4338    async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4339        use sqlx::{PgPool, Row};
4340
4341        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4342            return;
4343        };
4344
4345        let pool = PgPool::connect(&database_url).await.unwrap();
4346        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4347            .execute(&pool)
4348            .await
4349            .unwrap();
4350        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4351            .execute(&pool)
4352            .await
4353            .unwrap();
4354        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4355            .execute(&pool)
4356            .await
4357            .unwrap();
4358
4359        let order = entity().table_name("orders_ctx");
4360        let line = line_entity().table_name("orderline_ctx");
4361        let product = product_entity().table_name("product_ctx");
4362        let module = super::RuntimeModule::new()
4363            .descriptor(order)
4364            .descriptor(line)
4365            .descriptor(product);
4366        let mut ctx = super::UserContext::new().with_module(module);
4367        ctx.insert_resource(PostgresDialect);
4368        ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4369
4370        ctx.ensure_postgres_schema().await.unwrap();
4371
4372        let tables = sqlx::query(
4373            "SELECT table_name
4374             FROM information_schema.tables
4375             WHERE table_schema = current_schema()
4376               AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4377             ORDER BY table_name",
4378        )
4379        .fetch_all(&pool)
4380        .await
4381        .unwrap()
4382        .into_iter()
4383        .map(|row| row.try_get::<String, _>("table_name").unwrap())
4384        .collect::<Vec<_>>();
4385
4386        assert_eq!(
4387            tables,
4388            vec![
4389                "orderline_ctx".to_owned(),
4390                "orders_ctx".to_owned(),
4391                "product_ctx".to_owned()
4392            ]
4393        );
4394
4395        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4396            .execute(&pool)
4397            .await
4398            .unwrap();
4399        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4400            .execute(&pool)
4401            .await
4402            .unwrap();
4403        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4404            .execute(&pool)
4405            .await
4406            .unwrap();
4407    }
4408
4409    #[tokio::test(flavor = "multi_thread")]
4410    async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4411        use sqlx::{PgPool, Row};
4412
4413        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4414            return;
4415        };
4416
4417        let pool = PgPool::connect(&database_url).await.unwrap();
4418        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4419            .execute(&pool)
4420            .await
4421            .unwrap();
4422        sqlx::query("DROP TABLE IF EXISTS product_tx")
4423            .execute(&pool)
4424            .await
4425            .unwrap();
4426        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4427            .execute(&pool)
4428            .await
4429            .unwrap();
4430
4431        let order = entity().table_name("orders_tx");
4432        let line = line_entity().table_name("orderline_tx");
4433        let product = product_entity().table_name("product_tx");
4434        let schema_executor = PgMutationExecutor::new(pool.clone());
4435        schema_executor
4436            .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4437            .await
4438            .unwrap();
4439
4440        let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4441        let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4442        let mut ctx = UserContext::new()
4443            .with_metadata(
4444                InMemoryMetadataStore::new()
4445                    .with_entity(order)
4446                    .with_entity(line)
4447                    .with_entity(product),
4448            )
4449            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4450        ctx.insert_resource(PostgresDialect);
4451        ctx.insert_resource(sync_executor);
4452
4453        let repo = ctx
4454            .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4455            .unwrap();
4456        repo.save_graph(
4457            GraphNode::new("Order")
4458                .value("id", 1_u64)
4459                .value("version", 1_i64)
4460                .value("name", "pg-rollback-root")
4461                .relation(
4462                    "lines",
4463                    GraphNode::new("OrderLine")
4464                        .value("id", 10_u64)
4465                        .value("version", 1_i64)
4466                        .value("name", "pg-rollback-line")
4467                        .relation(
4468                            "product",
4469                            GraphNode::new("Product")
4470                                .value("id", 100_u64)
4471                                .value("name", "pg-rollback-sku"),
4472                        ),
4473                ),
4474        )
4475        .unwrap();
4476
4477        tx_executor.rollback().await.unwrap();
4478
4479        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4480            .fetch_one(&pool)
4481            .await
4482            .unwrap()
4483            .try_get("count")
4484            .unwrap();
4485        assert_eq!(count, 0);
4486
4487        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4488            .execute(&pool)
4489            .await
4490            .unwrap();
4491        sqlx::query("DROP TABLE IF EXISTS product_tx")
4492            .execute(&pool)
4493            .await
4494            .unwrap();
4495        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4496            .execute(&pool)
4497            .await
4498            .unwrap();
4499    }
4500}
4501pub use checker::{
4502    CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4503    CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4504    mark_record_status,
4505};