Skip to main content

teaql_runtime/
lib.rs

1mod checker;
2mod context;
3mod entity_runtime;
4mod error;
5mod event;
6mod graph;
7mod id;
8mod language;
9mod memory;
10mod registry;
11mod repository;
12
13pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
14pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
15pub use error::{ContextError, RepositoryError, RuntimeError};
16pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
17pub use graph::{
18    GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
19    GraphOperation, sorted_update_fields,
20};
21pub(crate) use id::local_id_generator;
22pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
23pub use language::{
24    BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
25};
26pub use memory::{MemoryRepository, MemoryRepositoryError};
27pub use registry::{
28    InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
29    MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
30    RuntimeModule,
31};
32pub use repository::{
33    AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
34    QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
35};
36
37#[cfg(feature = "sqlx")]
38pub mod sqlx_support;
39
40#[cfg(test)]
41mod tests {
42    use std::collections::{BTreeMap, VecDeque};
43    use std::sync::{Arc, Mutex};
44
45    use super::{
46        AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
47        Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
48        GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
49        InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
50        InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
51        QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
52        RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
53    };
54    use teaql_core::{
55        Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
56        EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
57        RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
58    };
59    use teaql_dialect_pg::PostgresDialect;
60    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
61    use teaql_sql::CompiledQuery;
62
63    const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
64
65    fn entity() -> EntityDescriptor {
66        EntityDescriptor::new("Order")
67            .table_name("orders")
68            .property(
69                PropertyDescriptor::new("id", DataType::U64)
70                    .column_name("id")
71                    .id()
72                    .not_null(),
73            )
74            .property(
75                PropertyDescriptor::new("version", DataType::I64)
76                    .column_name("version")
77                    .version()
78                    .not_null(),
79            )
80            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
81            .relation(
82                teaql_core::RelationDescriptor::new("lines", "OrderLine")
83                    .local_key("id")
84                    .foreign_key("order_id")
85                    .many(),
86            )
87    }
88
89    fn line_entity() -> EntityDescriptor {
90        EntityDescriptor::new("OrderLine")
91            .table_name("orderline")
92            .property(
93                PropertyDescriptor::new("id", DataType::U64)
94                    .column_name("id")
95                    .id()
96                    .not_null(),
97            )
98            .property(
99                PropertyDescriptor::new("version", DataType::I64)
100                    .column_name("version")
101                    .version(),
102            )
103            .property(
104                PropertyDescriptor::new("order_id", DataType::U64)
105                    .column_name("order_id")
106                    .not_null(),
107            )
108            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
109            .property(
110                PropertyDescriptor::new("product_id", DataType::U64)
111                    .column_name("product_id")
112                    .not_null(),
113            )
114            .relation(
115                teaql_core::RelationDescriptor::new("product", "Product")
116                    .local_key("product_id")
117                    .foreign_key("id"),
118            )
119    }
120
121    fn product_entity() -> EntityDescriptor {
122        EntityDescriptor::new("Product")
123            .table_name("product")
124            .property(
125                PropertyDescriptor::new("id", DataType::U64)
126                    .column_name("id")
127                    .id()
128                    .not_null(),
129            )
130            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
131    }
132
133    #[derive(Debug, Default)]
134    struct StubExecutor {
135        affected: u64,
136        rows: Vec<Record>,
137    }
138
139    #[derive(Debug, Default)]
140    struct QueueExecutor {
141        affected: u64,
142        rows: Mutex<VecDeque<Vec<Record>>>,
143        queries: Mutex<Vec<String>>,
144    }
145
146    struct OrderBehavior;
147
148    #[allow(dead_code)]
149    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
150    #[teaql(entity = "CatalogProduct", table = "catalog_product")]
151    struct CatalogProductRow {
152        #[teaql(id)]
153        id: u64,
154        name: String,
155    }
156
157    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
158    #[teaql(entity = "OrderAggregate", table = "orders")]
159    struct OrderAggregateDynamic {
160        #[teaql(id)]
161        id: u64,
162        #[teaql(dynamic)]
163        dynamic: BTreeMap<String, Value>,
164    }
165
166    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
167    #[teaql(entity = "Product", table = "product")]
168    struct ProductEntityRow {
169        #[teaql(id)]
170        id: u64,
171        name: String,
172    }
173
174    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
175    #[teaql(entity = "OrderLine", table = "orderline")]
176    struct OrderLineEntityRow {
177        #[teaql(id)]
178        id: u64,
179        #[teaql(column = "order_id")]
180        order_id: u64,
181        name: String,
182        #[teaql(column = "product_id")]
183        product_id: u64,
184        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
185        product: Option<ProductEntityRow>,
186    }
187
188    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
189    #[teaql(entity = "Order", table = "orders")]
190    struct OrderAggregateRow {
191        #[teaql(id)]
192        id: u64,
193        #[teaql(version)]
194        version: i64,
195        name: String,
196        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
197        lines: teaql_core::SmartList<OrderLineEntityRow>,
198    }
199
200    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
201    #[teaql(entity = "Order", table = "orders")]
202    struct Order {
203        #[teaql(id)]
204        id: u64,
205        #[teaql(version)]
206        version: i64,
207        name: String,
208    }
209
210    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
211    #[teaql(entity = "Product", table = "product")]
212    struct TypedGraphProduct {
213        #[teaql(id)]
214        id: Option<u64>,
215        name: String,
216    }
217
218    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
219    #[teaql(entity = "OrderLine", table = "orderline")]
220    struct TypedGraphLine {
221        #[teaql(id)]
222        id: Option<u64>,
223        #[teaql(column = "order_id")]
224        order_id: Option<u64>,
225        name: String,
226        #[teaql(column = "product_id")]
227        product_id: Option<u64>,
228        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
229        product: Option<TypedGraphProduct>,
230    }
231
232    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
233    #[teaql(entity = "Order", table = "orders")]
234    struct TypedGraphOrder {
235        #[teaql(id)]
236        id: Option<u64>,
237        #[teaql(version)]
238        version: i64,
239        name: String,
240        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
241        lines: teaql_core::SmartList<TypedGraphLine>,
242    }
243
244    #[derive(Debug, PartialEq, Eq)]
245    struct OrderEntity {
246        id: u64,
247        version: i64,
248        name: String,
249    }
250
251    impl teaql_core::TeaqlEntity for OrderEntity {
252        fn entity_descriptor() -> EntityDescriptor {
253            entity()
254        }
255    }
256
257    impl Entity for OrderEntity {
258        fn from_record(record: Record) -> Result<Self, EntityError> {
259            let id = match record.get("id") {
260                Some(Value::U64(v)) => *v,
261                Some(Value::I64(v)) if *v >= 0 => *v as u64,
262                other => {
263                    return Err(EntityError::new(
264                        "Order",
265                        format!("invalid id field: {other:?}"),
266                    ));
267                }
268            };
269            let version = match record.get("version") {
270                Some(Value::I64(v)) => *v,
271                other => {
272                    return Err(EntityError::new(
273                        "Order",
274                        format!("invalid version field: {other:?}"),
275                    ));
276                }
277            };
278            let name = match record.get("name") {
279                Some(Value::Text(v)) => v.clone(),
280                other => {
281                    return Err(EntityError::new(
282                        "Order",
283                        format!("invalid name field: {other:?}"),
284                    ));
285                }
286            };
287            Ok(Self { id, version, name })
288        }
289
290        fn into_record(self) -> Record {
291            Record::from([
292                (String::from("id"), Value::U64(self.id)),
293                (String::from("version"), Value::I64(self.version)),
294                (String::from("name"), Value::Text(self.name)),
295            ])
296        }
297    }
298
299    #[derive(Debug)]
300    struct StubError;
301
302    impl std::fmt::Display for StubError {
303        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304            write!(f, "stub error")
305        }
306    }
307
308    impl std::error::Error for StubError {}
309
310    impl QueryExecutor for StubExecutor {
311        type Error = StubError;
312
313        fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
314            Ok(self.rows.clone())
315        }
316
317        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
318            Ok(self.affected)
319        }
320
321        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
322            Ok(GraphTransactionBoundary::Started)
323        }
324    }
325
326    impl QueryExecutor for QueueExecutor {
327        type Error = StubError;
328
329        fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
330            self.queries.lock().unwrap().push(query.sql.clone());
331            Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
332        }
333
334        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
335            Ok(self.affected)
336        }
337    }
338
339    #[test]
340    fn user_context_records_configured_sql_logs() {
341        let mut ctx = UserContext::new()
342            .with_module(crate::module!(Order))
343            .with_sql_log_options(SqlLogOptions::select_only());
344        ctx.insert_resource(PostgresDialect);
345        ctx.insert_resource(StubExecutor {
346            affected: 1,
347            rows: Vec::new(),
348        });
349
350        {
351            let repo = ctx
352                .resolve_repository::<PostgresDialect, StubExecutor>("Order")
353                .unwrap();
354            repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
355                .unwrap();
356            repo.insert(&InsertCommand::new("Order").value("name", "created"))
357                .unwrap();
358        }
359
360        let logs = ctx.sql_logs();
361        assert_eq!(logs.len(), 1);
362        assert_eq!(logs[0].operation, SqlLogOperation::Select);
363        assert_eq!(
364            logs[0].debug_sql,
365            format!(
366                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
367            )
368        );
369
370        ctx.set_sql_log_options(SqlLogOptions::mutation_only());
371        ctx.clear_sql_logs();
372        ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
373            .unwrap()
374            .update(
375                &UpdateCommand::new("Order", 1_u64)
376                    .value("name", "updated")
377                    .expected_version(1),
378            )
379            .unwrap();
380
381        let logs = ctx.sql_logs();
382        assert_eq!(logs.len(), 1);
383        assert_eq!(logs[0].operation, SqlLogOperation::Update);
384        assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
385        assert!(logs[0].debug_sql.contains("'updated'"));
386    }
387
388    impl RepositoryBehavior for OrderBehavior {
389        fn before_select(
390            &self,
391            _ctx: &UserContext,
392            query: &mut teaql_core::SelectQuery,
393        ) -> Result<(), RuntimeError> {
394            query.filter = Some(Expr::eq("version", 1_i64));
395            Ok(())
396        }
397
398        fn before_insert(
399            &self,
400            _ctx: &UserContext,
401            command: &mut InsertCommand,
402        ) -> Result<(), RuntimeError> {
403            command
404                .values
405                .entry("version".to_owned())
406                .or_insert(Value::I64(1));
407            Ok(())
408        }
409
410        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
411            vec!["lines".to_owned()]
412        }
413    }
414
415    struct ContextAwareOrderBehavior;
416    struct OrderChecker;
417    #[derive(Clone)]
418    struct RecordingEventSink {
419        events: Arc<Mutex<Vec<EntityEvent>>>,
420    }
421
422    impl RepositoryBehavior for ContextAwareOrderBehavior {
423        fn before_insert(
424            &self,
425            ctx: &UserContext,
426            command: &mut InsertCommand,
427        ) -> Result<(), RuntimeError> {
428            let tenant = ctx
429                .get_named_resource::<String>("tenant")
430                .cloned()
431                .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
432            let version = *ctx
433                .get_named_resource::<i64>("initial_version")
434                .ok_or_else(|| {
435                    RuntimeError::Behavior("missing initial_version resource".to_owned())
436                })?;
437            let trace_id = match ctx.local("trace_id") {
438                Some(Value::Text(value)) => value.clone(),
439                other => {
440                    return Err(RuntimeError::Behavior(format!(
441                        "missing trace_id local, got {other:?}"
442                    )));
443                }
444            };
445
446            command
447                .values
448                .entry("name".to_owned())
449                .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
450            command
451                .values
452                .entry("version".to_owned())
453                .or_insert(Value::I64(version));
454            Ok(())
455        }
456    }
457
458    impl Checker for OrderChecker {
459        fn entity(&self) -> &str {
460            "Order"
461        }
462
463        fn check_and_fix(
464            &self,
465            _ctx: &UserContext,
466            record: &mut Record,
467            location: &ObjectLocation,
468            results: &mut CheckResults,
469        ) {
470            let status = CheckObjectStatus::from_record(record);
471            if status.is_create() {
472                self.required(record, "name", location, results);
473                record.entry("version".to_owned()).or_insert(Value::I64(1));
474            }
475            if status.is_update()
476                && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
477            {
478                record.insert(
479                    "name".to_owned(),
480                    Value::Text("graph-update-checked".to_owned()),
481                );
482            }
483            self.min_string_length(record, "name", 3, location, results);
484        }
485    }
486
487    impl EntityEventSink for RecordingEventSink {
488        fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
489            self.events.lock().unwrap().push(event.clone());
490            Ok(())
491        }
492    }
493
494    struct FixedIdGenerator(u64);
495
496    impl InternalIdGenerator for FixedIdGenerator {
497        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
498            Ok(self.0)
499        }
500    }
501
502    struct SequentialIdGenerator {
503        next: Mutex<u64>,
504    }
505
506    impl SequentialIdGenerator {
507        fn new(next: u64) -> Self {
508            Self {
509                next: Mutex::new(next),
510            }
511        }
512    }
513
514    impl InternalIdGenerator for SequentialIdGenerator {
515        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
516            let mut next = self
517                .next
518                .lock()
519                .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
520            let id = *next;
521            *next += 1;
522            Ok(id)
523        }
524    }
525
526    #[test]
527    fn metadata_store_registers_entities() {
528        let store = InMemoryMetadataStore::new().with_entity(entity());
529        assert!(store.entity("Order").is_some());
530    }
531
532    #[test]
533    fn runtime_module_registers_descriptor_into_context() {
534        let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
535        assert!(ctx.entity("Order").is_some());
536        assert!(ctx.has_repository("Order"));
537    }
538
539    #[test]
540    fn runtime_module_registers_derived_entity_and_behavior() {
541        let ctx = UserContext::new().with_module(
542            RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
543        );
544        assert!(ctx.entity("CatalogProduct").is_some());
545        assert!(ctx.has_repository("CatalogProduct"));
546        assert!(ctx.repository_behavior("CatalogProduct").is_some());
547    }
548
549    #[test]
550    fn module_macro_registers_multiple_entities() {
551        let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
552        assert!(ctx.entity("CatalogProduct").is_some());
553        assert!(ctx.has_repository("CatalogProduct"));
554    }
555
556    #[test]
557    fn module_macro_registers_entity_behavior_pairs() {
558        let ctx =
559            UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
560        assert!(ctx.entity("CatalogProduct").is_some());
561        assert!(ctx.repository_behavior("CatalogProduct").is_some());
562    }
563
564    #[test]
565    fn repository_returns_optimistic_lock_conflict() {
566        let store = InMemoryMetadataStore::new().with_entity(entity());
567        let executor = StubExecutor {
568            affected: 0,
569            rows: Vec::new(),
570        };
571        let repo = Repository::new(&PostgresDialect, &store, &executor);
572
573        let err = repo
574            .update(
575                &UpdateCommand::new("Order", 1_u64)
576                    .expected_version(3)
577                    .value("name", "next"),
578            )
579            .unwrap_err();
580
581        match err {
582            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
583            other => panic!("unexpected error: {other}"),
584        }
585    }
586
587    #[test]
588    fn user_context_indexes_resources_and_locals() {
589        let mut ctx =
590            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
591        ctx.insert_resource::<u64>(42);
592        ctx.insert_named_resource("tenant", String::from("acme"));
593        ctx.put_local("trace_id", "req-1");
594
595        assert!(ctx.entity("Order").is_some());
596        assert_eq!(ctx.get_resource::<u64>(), Some(&42));
597        assert_eq!(
598            ctx.get_named_resource::<String>("tenant"),
599            Some(&String::from("acme"))
600        );
601        assert_eq!(
602            ctx.local("trace_id"),
603            Some(&Value::Text("req-1".to_owned()))
604        );
605    }
606
607    #[test]
608    fn user_context_builds_context_repository() {
609        let mut ctx =
610            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
611        ctx.insert_resource(PostgresDialect);
612        ctx.insert_resource(StubExecutor {
613            affected: 1,
614            rows: Vec::new(),
615        });
616
617        let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
618        let affected = repo
619            .update(
620                &UpdateCommand::new("Order", 1_u64)
621                    .expected_version(3)
622                    .value("name", "next"),
623            )
624            .unwrap();
625
626        assert_eq!(affected, 1);
627    }
628
629    #[test]
630    fn user_context_resolves_repository_by_entity_type() {
631        let mut ctx = UserContext::new()
632            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
633            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
634        ctx.insert_resource(PostgresDialect);
635        ctx.insert_resource(StubExecutor {
636            affected: 1,
637            rows: Vec::new(),
638        });
639
640        let repo = ctx
641            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
642            .unwrap();
643        assert_eq!(repo.entity(), "Order");
644        assert_eq!(repo.select().entity, "Order");
645
646        let affected = repo
647            .insert(
648                &repo
649                    .insert_command()
650                    .value("id", 1_u64)
651                    .value("version", 1_i64)
652                    .value("name", "n"),
653            )
654            .unwrap();
655        assert_eq!(affected, 1);
656    }
657
658    #[test]
659    fn resolved_repository_applies_behavior_hooks() {
660        let mut ctx = UserContext::new()
661            .with_metadata(
662                InMemoryMetadataStore::new()
663                    .with_entity(entity())
664                    .with_entity(line_entity())
665                    .with_entity(product_entity()),
666            )
667            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
668            .with_repository_behavior_registry(
669                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
670            );
671        ctx.insert_resource(PostgresDialect);
672        ctx.insert_resource(StubExecutor {
673            affected: 1,
674            rows: Vec::new(),
675        });
676
677        let repo = ctx
678            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
679            .unwrap();
680
681        let compiled = repo.compile(&repo.select()).unwrap();
682        assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
683
684        let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
685        let affected = repo.insert(&insert).unwrap();
686        assert_eq!(affected, 1);
687        assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
688    }
689
690    #[test]
691    fn resolved_repository_prepares_insert_command_with_generated_id() {
692        let mut ctx = UserContext::new()
693            .with_metadata(
694                InMemoryMetadataStore::new()
695                    .with_entity(entity())
696                    .with_entity(line_entity())
697                    .with_entity(product_entity()),
698            )
699            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
700            .with_repository_behavior_registry(
701                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
702            )
703            .with_internal_id_generator(FixedIdGenerator(42));
704        ctx.insert_resource(PostgresDialect);
705        ctx.insert_resource(StubExecutor {
706            affected: 1,
707            rows: Vec::new(),
708        });
709
710        let repo = ctx
711            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
712            .unwrap();
713
714        let prepared = repo
715            .prepare_insert_command(&repo.insert_command().value("name", "n"))
716            .unwrap();
717
718        assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
719        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
720        assert_eq!(
721            prepared.values.get("name"),
722            Some(&Value::Text("n".to_owned()))
723        );
724    }
725
726    #[test]
727    fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
728        let mut ctx = UserContext::new()
729            .with_metadata(
730                InMemoryMetadataStore::new()
731                    .with_entity(entity())
732                    .with_entity(line_entity())
733                    .with_entity(product_entity()),
734            )
735            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
736            .with_repository_behavior_registry(
737                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
738            )
739            .with_internal_id_generator(SequentialIdGenerator::new(500));
740        ctx.insert_resource(PostgresDialect);
741        ctx.insert_resource(StubExecutor {
742            affected: 1,
743            rows: Vec::new(),
744        });
745
746        let repo = ctx
747            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
748            .unwrap();
749        let graph = GraphNode::new("Order").value("name", "root").relation(
750            "lines",
751            GraphNode::new("OrderLine")
752                .value("name", "line-1")
753                .relation("product", GraphNode::new("Product").value("name", "sku-1")),
754        );
755
756        let saved = repo.save_graph(graph).unwrap();
757
758        assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
759        assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
760        let lines = saved.relations.get("lines").unwrap();
761        assert_eq!(lines.len(), 1);
762        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
763        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
764        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
765        let product = lines[0].relations.get("product").unwrap();
766        assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
767    }
768
769    #[test]
770    fn resolved_repository_extracts_and_saves_typed_entity_graph() {
771        let mut ctx = UserContext::new()
772            .with_metadata(
773                InMemoryMetadataStore::new()
774                    .with_entity(entity())
775                    .with_entity(line_entity())
776                    .with_entity(product_entity()),
777            )
778            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
779            .with_internal_id_generator(SequentialIdGenerator::new(700));
780        ctx.insert_resource(PostgresDialect);
781        ctx.insert_resource(StubExecutor {
782            affected: 1,
783            rows: Vec::new(),
784        });
785
786        let repo = ctx
787            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
788            .unwrap();
789        let order = TypedGraphOrder {
790            id: None,
791            version: 1,
792            name: "typed-root".to_owned(),
793            lines: teaql_core::SmartList::from(vec![TypedGraphLine {
794                id: None,
795                order_id: None,
796                name: "typed-line".to_owned(),
797                product_id: None,
798                product: Some(TypedGraphProduct {
799                    id: None,
800                    name: "typed-product".to_owned(),
801                }),
802            }]),
803        };
804
805        let extracted = repo.graph_node_from_entity(order).unwrap();
806        assert_eq!(extracted.entity, "Order");
807        assert_eq!(
808            extracted.values.get("name"),
809            Some(&Value::Text("typed-root".to_owned()))
810        );
811        assert_eq!(extracted.values.get("id"), Some(&Value::Null));
812        assert_eq!(extracted.relations["lines"].len(), 1);
813        assert_eq!(
814            extracted.relations["lines"][0].values.get("name"),
815            Some(&Value::Text("typed-line".to_owned()))
816        );
817        assert_eq!(
818            extracted.relations["lines"][0].relations["product"].len(),
819            1
820        );
821
822        let saved = repo.save_graph(extracted).unwrap();
823        assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
824        let lines = saved.relations.get("lines").unwrap();
825        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
826        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
827        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
828        assert_eq!(
829            lines[0].relations["product"][0].values.get("id"),
830            Some(&Value::U64(702))
831        );
832    }
833
834    #[test]
835    fn resolved_repository_saves_typed_entity_graph_directly() {
836        let mut ctx = UserContext::new()
837            .with_metadata(
838                InMemoryMetadataStore::new()
839                    .with_entity(entity())
840                    .with_entity(line_entity())
841                    .with_entity(product_entity()),
842            )
843            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
844            .with_internal_id_generator(SequentialIdGenerator::new(800));
845        ctx.insert_resource(PostgresDialect);
846        ctx.insert_resource(StubExecutor {
847            affected: 1,
848            rows: Vec::new(),
849        });
850
851        let repo = ctx
852            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
853            .unwrap();
854        let saved = repo
855            .save_entity_graph(TypedGraphOrder {
856                id: None,
857                version: 1,
858                name: "typed-direct".to_owned(),
859                lines: teaql_core::SmartList::from(vec![TypedGraphLine {
860                    id: None,
861                    order_id: None,
862                    name: "typed-line".to_owned(),
863                    product_id: None,
864                    product: Some(TypedGraphProduct {
865                        id: None,
866                        name: "typed-product".to_owned(),
867                    }),
868                }]),
869            })
870            .unwrap();
871
872        assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
873        assert_eq!(
874            saved.relations["lines"][0].values.get("order_id"),
875            Some(&Value::U64(800))
876        );
877        assert_eq!(
878            saved.relations["lines"][0].values.get("product_id"),
879            Some(&Value::U64(802))
880        );
881    }
882
883    #[test]
884    fn custom_user_context_can_drive_insert_preparation() {
885        let mut ctx = UserContext::new()
886            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
887            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
888            .with_repository_behavior_registry(
889                InMemoryRepositoryBehaviorRegistry::new()
890                    .with_behavior("Order", ContextAwareOrderBehavior),
891            )
892            .with_internal_id_generator(FixedIdGenerator(99));
893        ctx.insert_named_resource("tenant", String::from("acme"));
894        ctx.insert_named_resource("initial_version", 7_i64);
895        ctx.put_local("trace_id", "req-9");
896        ctx.insert_resource(PostgresDialect);
897        ctx.insert_resource(StubExecutor {
898            affected: 1,
899            rows: Vec::new(),
900        });
901
902        let repo = ctx
903            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
904            .unwrap();
905        let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
906
907        assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
908        assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
909        assert_eq!(
910            prepared.values.get("name"),
911            Some(&Value::Text("acme:req-9".to_owned()))
912        );
913    }
914
915    #[test]
916    fn checker_registry_validates_and_fixes_insert_commands() {
917        let mut ctx = UserContext::new()
918            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
919            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
920            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
921            .with_internal_id_generator(FixedIdGenerator(77));
922        ctx.insert_resource(PostgresDialect);
923        ctx.insert_resource(StubExecutor {
924            affected: 1,
925            rows: Vec::new(),
926        });
927
928        let repo = ctx
929            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
930            .unwrap();
931        let prepared = repo
932            .prepare_insert_command(&repo.insert_command().value("name", "valid"))
933            .unwrap();
934
935        assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
936        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
937        assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
938
939        let error = repo
940            .prepare_insert_command(&repo.insert_command().value("name", "no"))
941            .unwrap_err();
942        match error {
943            RuntimeError::Check(results) => {
944                assert_eq!(results.len(), 1);
945                assert_eq!(results[0].location.to_string(), "name");
946            }
947            other => panic!("unexpected checker error: {other:?}"),
948        }
949    }
950
951    #[test]
952    fn checker_registry_validates_update_commands_without_required_insert_checks() {
953        let mut ctx = UserContext::new()
954            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
955            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
956            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
957        ctx.insert_resource(PostgresDialect);
958        ctx.insert_resource(StubExecutor {
959            affected: 1,
960            rows: Vec::new(),
961        });
962
963        let repo = ctx
964            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
965            .unwrap();
966        repo.update(&repo.update_command(1_u64).value("version", 1_i64))
967            .unwrap();
968
969        let error = repo
970            .update(&repo.update_command(1_u64).value("name", "no"))
971            .unwrap_err();
972        match error {
973            RepositoryError::Runtime(RuntimeError::Check(results)) => {
974                assert_eq!(results.len(), 1);
975                assert_eq!(results[0].location.to_string(), "name");
976            }
977            other => panic!("unexpected checker error: {other:?}"),
978        }
979    }
980
981    #[test]
982    fn built_in_language_translators_cover_fifteen_languages() {
983        assert_eq!(Language::ALL.len(), 15);
984        let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
985        let messages = Language::ALL
986            .iter()
987            .map(|language| translate_check_result(*language, &result))
988            .collect::<Vec<_>>();
989
990        assert!(messages.iter().all(|message| !message.is_empty()));
991        assert!(messages.iter().any(|message| message.contains("required")));
992        assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
993        assert!(
994            messages
995                .iter()
996                .any(|message| message.contains("obligatoire"))
997        );
998        assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
999        assert_eq!(
1000            Language::from_code("zh-TW"),
1001            Some(Language::TraditionalChinese)
1002        );
1003    }
1004
1005    #[test]
1006    fn user_context_language_switch_translates_checker_errors() {
1007        let mut ctx = UserContext::new()
1008            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1009            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1010            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1011            .with_internal_id_generator(FixedIdGenerator(77))
1012            .with_language(Language::Chinese);
1013        ctx.insert_resource(PostgresDialect);
1014        ctx.insert_resource(StubExecutor {
1015            affected: 1,
1016            rows: Vec::new(),
1017        });
1018
1019        let repo = ctx
1020            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1021            .unwrap();
1022        let error = repo
1023            .prepare_insert_command(&repo.insert_command())
1024            .unwrap_err();
1025        match error {
1026            RuntimeError::Check(results) => {
1027                assert_eq!(results.len(), 1);
1028                assert!(
1029                    results[0]
1030                        .message
1031                        .as_ref()
1032                        .is_some_and(|message| message.contains("å¿…å¡«"))
1033                );
1034            }
1035            other => panic!("unexpected checker error: {other:?}"),
1036        }
1037
1038        let mut ctx = UserContext::new().with_language(Language::English);
1039        ctx.set_language_code("es").unwrap();
1040        assert_eq!(ctx.language(), Language::Spanish);
1041    }
1042
1043    #[test]
1044    fn checker_registry_merges_graph_update_fixes_by_object_status() {
1045        let mut ctx = UserContext::new()
1046            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1047            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1048            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1049        ctx.insert_resource(PostgresDialect);
1050        ctx.insert_resource(StubExecutor {
1051            affected: 1,
1052            rows: vec![Record::from([
1053                ("id".to_owned(), Value::U64(1)),
1054                ("version".to_owned(), Value::I64(1)),
1055                ("name".to_owned(), Value::Text("old".to_owned())),
1056            ])],
1057        });
1058
1059        let repo = ctx
1060            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1061            .unwrap();
1062        let saved = repo
1063            .save_graph(
1064                GraphNode::new("Order")
1065                    .value("id", 1_u64)
1066                    .value("version", 1_i64)
1067                    .value("name", "graph-update"),
1068            )
1069            .unwrap();
1070
1071        assert_eq!(
1072            saved.values.get("name"),
1073            Some(&Value::Text("graph-update-checked".to_owned()))
1074        );
1075        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1076        assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1077    }
1078
1079    #[test]
1080    fn user_context_event_sink_receives_repository_mutation_events() {
1081        let events = Arc::new(Mutex::new(Vec::new()));
1082        let mut ctx = UserContext::new()
1083            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1084            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1085            .with_internal_id_generator(FixedIdGenerator(88))
1086            .with_event_sink(RecordingEventSink {
1087                events: events.clone(),
1088            });
1089        ctx.insert_resource(PostgresDialect);
1090        ctx.insert_resource(StubExecutor {
1091            affected: 1,
1092            rows: Vec::new(),
1093        });
1094
1095        let repo = ctx
1096            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1097            .unwrap();
1098        repo.insert(&repo.insert_command().value("name", "created"))
1099            .unwrap();
1100        repo.update(
1101            &repo
1102                .update_command(88_u64)
1103                .expected_version(1)
1104                .value("name", "updated"),
1105        )
1106        .unwrap();
1107        repo.delete(&repo.delete_command(88_u64).expected_version(2))
1108            .unwrap();
1109        repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1110
1111        let events = events.lock().unwrap();
1112        assert_eq!(events.len(), 4);
1113        assert_eq!(events[0].kind, EntityEventKind::Created);
1114        assert_eq!(events[0].entity, "Order");
1115        assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1116        assert_eq!(events[1].kind, EntityEventKind::Updated);
1117        assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1118        assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1119        assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1120        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1121        assert_eq!(events[3].kind, EntityEventKind::Recovered);
1122    }
1123
1124    #[test]
1125    fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1126        let events = Arc::new(Mutex::new(Vec::new()));
1127        let mut ctx = UserContext::new()
1128            .with_metadata(
1129                InMemoryMetadataStore::new()
1130                    .with_entity(entity())
1131                    .with_entity(line_entity())
1132                    .with_entity(product_entity()),
1133            )
1134            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1135            .with_event_sink(RecordingEventSink {
1136                events: events.clone(),
1137            });
1138        ctx.insert_resource(PostgresDialect);
1139        ctx.insert_resource(StubExecutor {
1140            affected: 1,
1141            rows: vec![Record::from([
1142                ("id".to_owned(), Value::U64(1)),
1143                ("version".to_owned(), Value::I64(1)),
1144                ("name".to_owned(), Value::Text("old".to_owned())),
1145            ])],
1146        });
1147
1148        let repo = ctx
1149            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1150            .unwrap();
1151        repo.save_graph(
1152            GraphNode::new("Order")
1153                .value("id", 1_u64)
1154                .value("version", 1_i64)
1155                .value("name", "updated")
1156                .relation(
1157                    "lines",
1158                    GraphNode::new("OrderLine")
1159                        .value("name", "line")
1160                        .value("product_id", 3_u64),
1161                ),
1162        )
1163        .unwrap();
1164
1165        let events = events.lock().unwrap();
1166        assert_eq!(events.len(), 3);
1167        assert_eq!(events[0].kind, EntityEventKind::Updated);
1168        assert_eq!(events[0].entity, "Order");
1169        assert_eq!(events[1].kind, EntityEventKind::Updated);
1170        assert_eq!(events[1].entity, "OrderLine");
1171        assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1172        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1173        assert_eq!(events[2].entity, "OrderLine");
1174    }
1175
1176    #[test]
1177    fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1178        let mut ctx = UserContext::new()
1179            .with_metadata(
1180                InMemoryMetadataStore::new()
1181                    .with_entity(entity())
1182                    .with_entity(line_entity())
1183                    .with_entity(product_entity()),
1184            )
1185            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1186            .with_internal_id_generator(SequentialIdGenerator::new(500));
1187        ctx.insert_resource(PostgresDialect);
1188        ctx.insert_resource(StubExecutor {
1189            affected: 1,
1190            rows: vec![Record::from([
1191                ("id".to_owned(), Value::U64(1)),
1192                ("version".to_owned(), Value::I64(1)),
1193                ("name".to_owned(), Value::Text("old".to_owned())),
1194            ])],
1195        });
1196
1197        let plan = ctx
1198            .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1199                GraphNode::new("Order")
1200                    .value("id", 1_u64)
1201                    .value("version", 1_i64)
1202                    .value("name", "updated")
1203                    .relation(
1204                        "lines",
1205                        GraphNode::new("OrderLine")
1206                            .value("name", "new-line-a")
1207                            .value("product_id", 2_u64),
1208                    )
1209                    .relation(
1210                        "lines",
1211                        GraphNode::new("OrderLine")
1212                            .value("name", "new-line-b")
1213                            .value("product_id", 2_u64),
1214                    )
1215                    .relation(
1216                        "lines",
1217                        GraphNode::new("OrderLine")
1218                            .value("id", 5_u64)
1219                            .value("version", 1_i64)
1220                            .value("name", "same-update-a"),
1221                    )
1222                    .relation(
1223                        "lines",
1224                        GraphNode::new("OrderLine")
1225                            .value("id", 6_u64)
1226                            .value("version", 1_i64)
1227                            .value("name", "same-update-b"),
1228                    )
1229                    .relation(
1230                        "lines",
1231                        GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1232                    )
1233                    .relation(
1234                        "lines",
1235                        GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1236                    ),
1237            )
1238            .unwrap();
1239        let counts = plan.grouped_counts();
1240
1241        assert_eq!(
1242            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1243            Some(&1)
1244        );
1245        assert_eq!(
1246            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1247            Some(&2)
1248        );
1249        assert_eq!(
1250            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1251            Some(&2)
1252        );
1253        assert_eq!(
1254            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1255            Some(&1)
1256        );
1257        assert_eq!(
1258            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1259            Some(&1)
1260        );
1261        let create_batch = plan
1262            .batches
1263            .iter()
1264            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1265            .unwrap();
1266        assert_eq!(create_batch.items.len(), 2);
1267        assert_eq!(
1268            create_batch.items[0].values.get("id"),
1269            Some(&Value::U64(500))
1270        );
1271        assert_eq!(
1272            create_batch.items[1].values.get("id"),
1273            Some(&Value::U64(501))
1274        );
1275        let update_batch = plan
1276            .batches
1277            .iter()
1278            .find(|batch| {
1279                batch.entity == "OrderLine"
1280                    && batch.kind == GraphMutationKind::Update
1281                    && batch.update_fields == vec!["name".to_owned()]
1282            })
1283            .unwrap();
1284        assert_eq!(update_batch.items.len(), 2);
1285    }
1286
1287    #[test]
1288    fn resolved_repository_builds_relation_plans() {
1289        let mut ctx = UserContext::new()
1290            .with_metadata(
1291                InMemoryMetadataStore::new()
1292                    .with_entity(entity())
1293                    .with_entity(line_entity())
1294                    .with_entity(product_entity()),
1295            )
1296            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1297            .with_repository_behavior_registry(
1298                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1299            );
1300        ctx.insert_resource(PostgresDialect);
1301        ctx.insert_resource(StubExecutor {
1302            affected: 1,
1303            rows: Vec::new(),
1304        });
1305
1306        let repo = ctx
1307            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1308            .unwrap();
1309        let plans = repo.relation_plans().unwrap();
1310
1311        assert_eq!(plans.len(), 1);
1312        assert_eq!(plans[0].relation_name, "lines");
1313        assert_eq!(plans[0].target_entity, "OrderLine");
1314        assert_eq!(plans[0].local_key, "id");
1315        assert_eq!(plans[0].foreign_key, "order_id");
1316        assert!(plans[0].many);
1317    }
1318
1319    #[test]
1320    fn resolved_repository_builds_relation_query_from_parent_rows() {
1321        let mut ctx = UserContext::new()
1322            .with_metadata(
1323                InMemoryMetadataStore::new()
1324                    .with_entity(entity())
1325                    .with_entity(line_entity())
1326                    .with_entity(product_entity()),
1327            )
1328            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1329            .with_repository_behavior_registry(
1330                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1331            );
1332        ctx.insert_resource(PostgresDialect);
1333        ctx.insert_resource(StubExecutor {
1334            affected: 1,
1335            rows: Vec::new(),
1336        });
1337
1338        let repo = ctx
1339            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1340            .unwrap();
1341        let parent_rows = vec![
1342            Record::from([(String::from("id"), Value::U64(11))]),
1343            Record::from([(String::from("id"), Value::U64(12))]),
1344        ];
1345
1346        let query = repo.relation_query("lines", &parent_rows).unwrap();
1347        let compiled = repo.compile(&query).unwrap();
1348        assert!(compiled.sql.contains("FROM \"orderline\""));
1349        assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1350        assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1351    }
1352
1353    #[test]
1354    fn resolved_repository_enhances_parent_rows_with_relations() {
1355        let mut ctx = UserContext::new()
1356            .with_metadata(
1357                InMemoryMetadataStore::new()
1358                    .with_entity(entity())
1359                    .with_entity(line_entity())
1360                    .with_entity(product_entity()),
1361            )
1362            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1363            .with_repository_behavior_registry(
1364                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1365            );
1366        ctx.insert_resource(PostgresDialect);
1367        ctx.insert_resource(StubExecutor {
1368            affected: 1,
1369            rows: vec![
1370                Record::from([
1371                    (String::from("id"), Value::U64(101)),
1372                    (String::from("order_id"), Value::U64(11)),
1373                    (String::from("name"), Value::Text(String::from("l1"))),
1374                ]),
1375                Record::from([
1376                    (String::from("id"), Value::U64(102)),
1377                    (String::from("order_id"), Value::U64(11)),
1378                    (String::from("name"), Value::Text(String::from("l2"))),
1379                ]),
1380                Record::from([
1381                    (String::from("id"), Value::U64(201)),
1382                    (String::from("order_id"), Value::U64(12)),
1383                    (String::from("name"), Value::Text(String::from("l3"))),
1384                ]),
1385            ],
1386        });
1387
1388        let repo = ctx
1389            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1390            .unwrap();
1391        let mut parents = vec![
1392            Record::from([(String::from("id"), Value::U64(11))]),
1393            Record::from([(String::from("id"), Value::U64(12))]),
1394        ];
1395
1396        repo.enhance_relations(&mut parents).unwrap();
1397
1398        match parents[0].get("lines") {
1399            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1400            other => panic!("unexpected lines payload: {other:?}"),
1401        }
1402        match parents[1].get("lines") {
1403            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1404            other => panic!("unexpected lines payload: {other:?}"),
1405        }
1406    }
1407
1408    #[test]
1409    fn resolved_repository_fetches_smart_list_of_entities() {
1410        let mut ctx = UserContext::new()
1411            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1412            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1413        ctx.insert_resource(PostgresDialect);
1414        ctx.insert_resource(StubExecutor {
1415            affected: 1,
1416            rows: vec![Record::from([
1417                (String::from("id"), Value::U64(7)),
1418                (String::from("version"), Value::I64(2)),
1419                (String::from("name"), Value::Text(String::from("typed"))),
1420            ])],
1421        });
1422
1423        let repo = ctx
1424            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1425            .unwrap();
1426        let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1427
1428        assert_eq!(rows.len(), 1);
1429        assert_eq!(
1430            rows.first(),
1431            Some(&OrderEntity {
1432                id: 7,
1433                version: 2,
1434                name: String::from("typed"),
1435            })
1436        );
1437    }
1438
1439    #[test]
1440    fn resolved_repository_fetches_smart_list_of_derived_entities() {
1441        let mut ctx = UserContext::new()
1442            .with_metadata(
1443                InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1444            )
1445            .with_repository_registry(
1446                InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1447            );
1448        ctx.insert_resource(PostgresDialect);
1449        ctx.insert_resource(StubExecutor {
1450            affected: 1,
1451            rows: vec![Record::from([
1452                (String::from("id"), Value::U64(9)),
1453                (String::from("name"), Value::Text(String::from("derived"))),
1454            ])],
1455        });
1456
1457        let repo = ctx
1458            .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1459            .unwrap();
1460        let rows = repo
1461            .fetch_entities::<CatalogProductRow>(&repo.select())
1462            .unwrap();
1463
1464        assert_eq!(rows.len(), 1);
1465        assert_eq!(
1466            rows.first(),
1467            Some(&CatalogProductRow {
1468                id: 9,
1469                name: String::from("derived"),
1470            })
1471        );
1472    }
1473
1474    #[test]
1475    fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1476        let mut ctx = UserContext::new()
1477            .with_metadata(
1478                InMemoryMetadataStore::new()
1479                    .with_entity(OrderAggregateDynamic::entity_descriptor()),
1480            )
1481            .with_repository_registry(
1482                InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1483            );
1484        ctx.insert_resource(PostgresDialect);
1485        ctx.insert_resource(StubExecutor {
1486            affected: 1,
1487            rows: vec![Record::from([
1488                (String::from("id"), Value::U64(1)),
1489                (String::from("lineCount"), Value::I64(3)),
1490                (String::from("amount"), Value::F64(18.5)),
1491            ])],
1492        });
1493
1494        let repo = ctx
1495            .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1496            .unwrap();
1497        let rows = repo
1498            .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1499            .unwrap();
1500
1501        assert_eq!(rows.len(), 1);
1502        assert_eq!(rows.data[0].id, 1);
1503        assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1504        assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1505        assert_eq!(
1506            rows.into_vec().into_iter().next().unwrap().into_json(),
1507            serde_json::json!({
1508                "id": 1,
1509                "lineCount": 3,
1510                "amount": 18.5
1511            })
1512        );
1513    }
1514
1515    #[test]
1516    fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1517        let executor = QueueExecutor {
1518            affected: 1,
1519            rows: Mutex::new(VecDeque::from([
1520                vec![
1521                    Record::from([
1522                        (String::from("id"), Value::U64(1)),
1523                        (String::from("version"), Value::I64(1)),
1524                        (String::from("name"), Value::Text(String::from("first"))),
1525                    ]),
1526                    Record::from([
1527                        (String::from("id"), Value::U64(2)),
1528                        (String::from("version"), Value::I64(1)),
1529                        (String::from("name"), Value::Text(String::from("second"))),
1530                    ]),
1531                ],
1532                vec![Record::from([
1533                    (String::from("order_id"), Value::U64(1)),
1534                    (String::from("lineCount"), Value::I64(3)),
1535                ])],
1536            ])),
1537            queries: Mutex::new(Vec::new()),
1538        };
1539        let mut ctx = UserContext::new()
1540            .with_metadata(
1541                InMemoryMetadataStore::new()
1542                    .with_entity(entity())
1543                    .with_entity(line_entity()),
1544            )
1545            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1546        ctx.insert_resource(PostgresDialect);
1547        ctx.insert_resource(executor);
1548
1549        let repo = ctx
1550            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1551            .unwrap();
1552        let rows = repo
1553            .fetch_all_with_relation_aggregates(
1554                &repo
1555                    .select()
1556                    .project("id")
1557                    .project("version")
1558                    .project("name"),
1559                &[RelationAggregate::new(
1560                    "lines",
1561                    "lineCount",
1562                    SelectQuery::new("OrderLine"),
1563                    true,
1564                )],
1565            )
1566            .unwrap();
1567
1568        assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1569        assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1570
1571        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1572        let queries = executor.queries.lock().unwrap();
1573        assert_eq!(queries.len(), 2);
1574        assert_eq!(
1575            queries[1],
1576            "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1577        );
1578    }
1579
1580    #[test]
1581    fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1582        let mut line = line_entity();
1583        line.properties
1584            .iter_mut()
1585            .find(|property| property.name == "order_id")
1586            .unwrap()
1587            .column_name = "order_ref".to_owned();
1588        let executor = QueueExecutor {
1589            affected: 1,
1590            rows: Mutex::new(VecDeque::from([
1591                vec![Record::from([
1592                    (String::from("id"), Value::U64(1)),
1593                    (String::from("version"), Value::I64(1)),
1594                    (String::from("name"), Value::Text(String::from("first"))),
1595                ])],
1596                vec![Record::from([
1597                    (String::from("order_ref"), Value::I64(1)),
1598                    (String::from("lineCount"), Value::I64(3)),
1599                ])],
1600            ])),
1601            queries: Mutex::new(Vec::new()),
1602        };
1603        let mut ctx = UserContext::new()
1604            .with_metadata(
1605                InMemoryMetadataStore::new()
1606                    .with_entity(entity())
1607                    .with_entity(line),
1608            )
1609            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1610        ctx.insert_resource(PostgresDialect);
1611        ctx.insert_resource(executor);
1612
1613        let repo = ctx
1614            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1615            .unwrap();
1616        let rows = repo
1617            .fetch_all_with_relation_aggregates(
1618                &repo
1619                    .select()
1620                    .project("id")
1621                    .project("version")
1622                    .project("name"),
1623                &[RelationAggregate::new(
1624                    "lines",
1625                    "lineCount",
1626                    SelectQuery::new("OrderLine"),
1627                    true,
1628                )],
1629            )
1630            .unwrap();
1631
1632        assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1633        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1634        assert_eq!(
1635            executor.queries.lock().unwrap()[1],
1636            "SELECT \"order_ref\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_ref\" IN ($1)) GROUP BY \"order_ref\""
1637        );
1638    }
1639
1640    #[test]
1641    fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1642        let executor = QueueExecutor {
1643            affected: 1,
1644            rows: Mutex::new(VecDeque::from([vec![Record::from([(
1645                String::from("count"),
1646                Value::I64(2),
1647            )])]])),
1648            queries: Mutex::new(Vec::new()),
1649        };
1650        let mut ctx = UserContext::new()
1651            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1652            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1653        ctx.insert_resource(PostgresDialect);
1654        ctx.insert_resource(executor);
1655        ctx.insert_resource(InMemoryAggregationCache::default());
1656
1657        let repo = ctx
1658            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1659            .unwrap();
1660        let query = repo
1661            .select()
1662            .count("count")
1663            .enable_aggregation_cache_for(60_000);
1664
1665        let first = repo.fetch_all(&query).unwrap();
1666        let second = repo.fetch_all(&query).unwrap();
1667
1668        assert_eq!(first, second);
1669        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1670        assert_eq!(executor.queries.lock().unwrap().len(), 1);
1671    }
1672
1673    #[test]
1674    fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1675        let executor = QueueExecutor {
1676            affected: 1,
1677            rows: Mutex::new(VecDeque::from([
1678                vec![Record::from([(String::from("count"), Value::I64(2))])],
1679                vec![Record::from([(String::from("count"), Value::I64(3))])],
1680            ])),
1681            queries: Mutex::new(Vec::new()),
1682        };
1683        let mut ctx = UserContext::new()
1684            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1685            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1686        ctx.insert_resource(PostgresDialect);
1687        ctx.insert_resource(executor);
1688        ctx.insert_resource(
1689            Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1690                as Arc<dyn AggregationCacheBackend>,
1691        );
1692
1693        let repo = ctx
1694            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1695            .unwrap();
1696        let query = repo
1697            .select()
1698            .count("count")
1699            .enable_aggregation_cache_for(60_000);
1700
1701        let first = repo.fetch_all(&query).unwrap();
1702        let cached = repo.fetch_all(&query).unwrap();
1703        repo.insert(
1704            &InsertCommand::new("Order")
1705                .value("id", 9_u64)
1706                .value("version", 1_i64)
1707                .value("name", "new"),
1708        )
1709        .unwrap();
1710        let refreshed = repo.fetch_all(&query).unwrap();
1711
1712        assert_eq!(first, cached);
1713        assert_ne!(cached, refreshed);
1714        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1715        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1716    }
1717
1718    #[test]
1719    fn aggregation_cache_propagates_to_relation_aggregates() {
1720        let parent_rows = vec![
1721            Record::from([
1722                (String::from("id"), Value::U64(1)),
1723                (String::from("version"), Value::I64(1)),
1724                (String::from("name"), Value::Text(String::from("first"))),
1725            ]),
1726            Record::from([
1727                (String::from("id"), Value::U64(2)),
1728                (String::from("version"), Value::I64(1)),
1729                (String::from("name"), Value::Text(String::from("second"))),
1730            ]),
1731        ];
1732        let aggregate_rows = vec![Record::from([
1733            (String::from("order_id"), Value::U64(1)),
1734            (String::from("lineCount"), Value::I64(3)),
1735        ])];
1736        let executor = QueueExecutor {
1737            affected: 1,
1738            rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1739            queries: Mutex::new(Vec::new()),
1740        };
1741        let mut ctx = UserContext::new()
1742            .with_metadata(
1743                InMemoryMetadataStore::new()
1744                    .with_entity(entity())
1745                    .with_entity(line_entity()),
1746            )
1747            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1748        ctx.insert_resource(PostgresDialect);
1749        ctx.insert_resource(executor);
1750        ctx.insert_resource(InMemoryAggregationCache::default());
1751
1752        let repo = ctx
1753            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1754            .unwrap();
1755        let query = repo
1756            .select()
1757            .project("id")
1758            .project("version")
1759            .project("name")
1760            .enable_aggregation_cache_for(60_000)
1761            .propagate_aggregation_cache(60_000);
1762        let aggregate =
1763            RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1764
1765        let first = repo
1766            .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1767            .unwrap();
1768        let second = repo
1769            .fetch_all_with_relation_aggregates(&query, &[aggregate])
1770            .unwrap();
1771
1772        assert_eq!(first, second);
1773        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1774        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1775    }
1776
1777    #[test]
1778    fn memory_repository_fetches_smart_list_entities_with_query_features() {
1779        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1780        let repository = MemoryRepository::new(metadata).with_rows(
1781            "Order",
1782            vec![
1783                Record::from([
1784                    (String::from("id"), Value::U64(1)),
1785                    (String::from("version"), Value::I64(1)),
1786                    (String::from("name"), Value::Text(String::from("alpha"))),
1787                ]),
1788                Record::from([
1789                    (String::from("id"), Value::U64(2)),
1790                    (String::from("version"), Value::I64(1)),
1791                    (String::from("name"), Value::Text(String::from("beta"))),
1792                ]),
1793                Record::from([
1794                    (String::from("id"), Value::U64(3)),
1795                    (String::from("version"), Value::I64(1)),
1796                    (String::from("name"), Value::Text(String::from("gamma"))),
1797                ]),
1798            ],
1799        );
1800
1801        let query = teaql_core::SelectQuery::new("Order")
1802            .filter(Expr::Binary {
1803                left: Box::new(Expr::column("id")),
1804                op: teaql_core::BinaryOp::Gte,
1805                right: Box::new(Expr::value(2_u64)),
1806            })
1807            .order_by(OrderBy::desc("id"))
1808            .limit(1);
1809
1810        let orders = repository.fetch_entities::<Order>(&query).unwrap();
1811
1812        assert_eq!(orders.ids(), vec![Value::U64(3)]);
1813        assert_eq!(orders.versions(), vec![1]);
1814        assert_eq!(orders.first().unwrap().name, "gamma");
1815    }
1816
1817    #[test]
1818    fn memory_repository_runs_aggregates() {
1819        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1820        let repository = MemoryRepository::new(metadata).with_rows(
1821            "Order",
1822            vec![
1823                Record::from([
1824                    (String::from("id"), Value::U64(1)),
1825                    (String::from("version"), Value::I64(1)),
1826                    (String::from("name"), Value::Text(String::from("alpha"))),
1827                ]),
1828                Record::from([
1829                    (String::from("id"), Value::U64(2)),
1830                    (String::from("version"), Value::I64(2)),
1831                    (String::from("name"), Value::Text(String::from("beta"))),
1832                ]),
1833            ],
1834        );
1835
1836        let query = teaql_core::SelectQuery {
1837            entity: String::from("Order"),
1838            projection: Vec::new(),
1839            expr_projection: Vec::new(),
1840            filter: None,
1841            having: None,
1842            order_by: Vec::new(),
1843            slice: None,
1844            aggregates: vec![
1845                Aggregate {
1846                    function: AggregateFunction::Count,
1847                    field: String::from("id"),
1848                    alias: String::from("count"),
1849                },
1850                Aggregate {
1851                    function: AggregateFunction::Sum,
1852                    field: String::from("version"),
1853                    alias: String::from("versionSum"),
1854                },
1855            ],
1856            group_by: Vec::new(),
1857            relations: Vec::new(),
1858            aggregation_cache: None,
1859            comment: None,
1860            raw_sql: None,
1861            raw_sql_search_criteria: Vec::new(),
1862            json_expr: None,
1863            dynamic_properties: Vec::new(),
1864            raw_projections: Vec::new(),
1865            object_group_bys: Vec::new(),
1866            child_enhancements: Vec::new(),
1867        };
1868
1869        let rows = repository.fetch_all(&query).unwrap();
1870
1871        assert_eq!(rows.len(), 1);
1872        assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1873        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1874    }
1875
1876    #[test]
1877    fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1878        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1879        let repository = MemoryRepository::new(metadata).with_rows(
1880            "Order",
1881            vec![
1882                Record::from([
1883                    (String::from("id"), Value::U64(1)),
1884                    (String::from("version"), Value::I64(1)),
1885                    (String::from("name"), Value::Text(String::from("alpha"))),
1886                ]),
1887                Record::from([
1888                    (String::from("id"), Value::U64(2)),
1889                    (String::from("version"), Value::I64(2)),
1890                    (String::from("name"), Value::Text(String::from("alpha"))),
1891                ]),
1892                Record::from([
1893                    (String::from("id"), Value::U64(3)),
1894                    (String::from("version"), Value::I64(3)),
1895                    (String::from("name"), Value::Text(String::from("tmp-beta"))),
1896                ]),
1897            ],
1898        );
1899
1900        let rows = repository
1901            .fetch_all(
1902                &teaql_core::SelectQuery::new("Order")
1903                    .filter(
1904                        Expr::between("version", 1_i64, 3_i64)
1905                            .and_expr(Expr::not_like("name", "tmp%"))
1906                            .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1907                    )
1908                    .group_by("name")
1909                    .count("total")
1910                    .sum("version", "versionSum"),
1911            )
1912            .unwrap();
1913
1914        assert_eq!(rows.len(), 1);
1915        assert_eq!(
1916            rows[0].get("name"),
1917            Some(&Value::Text(String::from("alpha")))
1918        );
1919        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1920        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1921    }
1922
1923    #[test]
1924    fn memory_repository_runs_extended_aggregates_and_having() {
1925        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1926        let repository = MemoryRepository::new(metadata).with_rows(
1927            "Order",
1928            vec![
1929                Record::from([
1930                    (String::from("id"), Value::U64(1)),
1931                    (String::from("version"), Value::I64(1)),
1932                    (String::from("name"), Value::Text(String::from("alpha"))),
1933                ]),
1934                Record::from([
1935                    (String::from("id"), Value::U64(2)),
1936                    (String::from("version"), Value::I64(3)),
1937                    (String::from("name"), Value::Text(String::from("alpha"))),
1938                ]),
1939                Record::from([
1940                    (String::from("id"), Value::U64(3)),
1941                    (String::from("version"), Value::I64(7)),
1942                    (String::from("name"), Value::Text(String::from("beta"))),
1943                ]),
1944            ],
1945        );
1946
1947        let rows = repository
1948            .fetch_all(
1949                &teaql_core::SelectQuery::new("Order")
1950                    .group_by("name")
1951                    .count("total")
1952                    .stddev("version", "stddevVersion")
1953                    .var_pop("version", "varPopVersion")
1954                    .bit_or("version", "bitOrVersion")
1955                    .having(Expr::gt("total", 1_i64)),
1956            )
1957            .unwrap();
1958
1959        assert_eq!(rows.len(), 1);
1960        assert_eq!(
1961            rows[0].get("name"),
1962            Some(&Value::Text(String::from("alpha")))
1963        );
1964        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1965        assert_eq!(
1966            rows[0].get("stddevVersion").map(Value::to_json_value),
1967            Some(serde_json::Value::String(
1968                "1.4142135623730951454746218583".to_owned()
1969            ))
1970        );
1971        assert_eq!(
1972            rows[0].get("varPopVersion"),
1973            Some(&Value::Decimal(Decimal::ONE))
1974        );
1975        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1976    }
1977
1978    #[test]
1979    fn memory_repository_runs_sound_like_filter() {
1980        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1981        let repository = MemoryRepository::new(metadata).with_rows(
1982            "Order",
1983            vec![
1984                Record::from([
1985                    (String::from("id"), Value::U64(1)),
1986                    (String::from("version"), Value::I64(1)),
1987                    (String::from("name"), Value::Text(String::from("Robert"))),
1988                ]),
1989                Record::from([
1990                    (String::from("id"), Value::U64(2)),
1991                    (String::from("version"), Value::I64(1)),
1992                    (String::from("name"), Value::Text(String::from("Rupert"))),
1993                ]),
1994                Record::from([
1995                    (String::from("id"), Value::U64(3)),
1996                    (String::from("version"), Value::I64(1)),
1997                    (String::from("name"), Value::Text(String::from("Ashcraft"))),
1998                ]),
1999            ],
2000        );
2001
2002        let rows = repository
2003            .fetch_all(
2004                &teaql_core::SelectQuery::new("Order")
2005                    .filter(Expr::sound_like("name", "Robert"))
2006                    .order_asc("id"),
2007            )
2008            .unwrap();
2009
2010        assert_eq!(rows.len(), 2);
2011        assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2012        assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2013    }
2014
2015    #[test]
2016    fn memory_repository_runs_java_style_string_match_filters() {
2017        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2018        let repository = MemoryRepository::new(metadata).with_rows(
2019            "Order",
2020            vec![
2021                Record::from([
2022                    (String::from("id"), Value::U64(1)),
2023                    (String::from("version"), Value::I64(1)),
2024                    (String::from("name"), Value::Text(String::from("tea-order"))),
2025                ]),
2026                Record::from([
2027                    (String::from("id"), Value::U64(2)),
2028                    (String::from("version"), Value::I64(1)),
2029                    (
2030                        String::from("name"),
2031                        Value::Text(String::from("coffee-order")),
2032                    ),
2033                ]),
2034                Record::from([
2035                    (String::from("id"), Value::U64(3)),
2036                    (String::from("version"), Value::I64(1)),
2037                    (
2038                        String::from("name"),
2039                        Value::Text(String::from("tea-archived")),
2040                    ),
2041                ]),
2042            ],
2043        );
2044
2045        let rows = repository
2046            .fetch_all(
2047                &teaql_core::SelectQuery::new("Order")
2048                    .filter(
2049                        Expr::contain("name", "tea")
2050                            .and_expr(Expr::begin_with("name", "tea"))
2051                            .and_expr(Expr::end_with("name", "order"))
2052                            .and_expr(Expr::not_contain("name", "coffee"))
2053                            .and_expr(Expr::not_begin_with("name", "archived"))
2054                            .and_expr(Expr::not_end_with("name", "draft")),
2055                    )
2056                    .order_asc("id"),
2057            )
2058            .unwrap();
2059
2060        assert_eq!(rows.len(), 1);
2061        assert_eq!(
2062            rows[0].get("name"),
2063            Some(&Value::Text("tea-order".to_owned()))
2064        );
2065    }
2066
2067    #[test]
2068    fn memory_repository_runs_property_to_property_filters() {
2069        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2070        let repository = MemoryRepository::new(metadata).with_rows(
2071            "Order",
2072            vec![
2073                Record::from([
2074                    (String::from("id"), Value::U64(1)),
2075                    (String::from("version"), Value::I64(2)),
2076                    (String::from("name"), Value::Text(String::from("keep"))),
2077                ]),
2078                Record::from([
2079                    (String::from("id"), Value::U64(2)),
2080                    (String::from("version"), Value::I64(1)),
2081                    (String::from("name"), Value::Text(String::from("skip"))),
2082                ]),
2083            ],
2084        );
2085
2086        let rows = repository
2087            .fetch_all(
2088                &teaql_core::SelectQuery::new("Order")
2089                    .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2090                    .order_asc("id"),
2091            )
2092            .unwrap();
2093
2094        assert_eq!(rows.len(), 1);
2095        assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2096    }
2097
2098    #[test]
2099    fn memory_repository_supports_mutations_and_optimistic_locking() {
2100        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2101        let repository = MemoryRepository::new(metadata);
2102
2103        repository
2104            .insert(
2105                &InsertCommand::new("Order")
2106                    .value("id", 10_u64)
2107                    .value("version", 1_i64)
2108                    .value("name", "draft"),
2109            )
2110            .unwrap();
2111        repository
2112            .update(
2113                &UpdateCommand::new("Order", 10_u64)
2114                    .expected_version(1)
2115                    .value("name", "submitted"),
2116            )
2117            .unwrap();
2118
2119        let row = repository
2120            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2121            .unwrap()
2122            .pop()
2123            .unwrap();
2124        assert_eq!(
2125            row.get("name"),
2126            Some(&Value::Text(String::from("submitted")))
2127        );
2128        assert_eq!(row.get("version"), Some(&Value::I64(2)));
2129
2130        let conflict = repository
2131            .update(
2132                &UpdateCommand::new("Order", 10_u64)
2133                    .expected_version(1)
2134                    .value("name", "stale"),
2135            )
2136            .unwrap_err();
2137        assert!(matches!(
2138            conflict,
2139            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2140        ));
2141
2142        repository
2143            .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2144            .unwrap();
2145        let row = repository
2146            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2147            .unwrap()
2148            .pop()
2149            .unwrap();
2150        assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2151
2152        repository
2153            .recover(&RecoverCommand::new("Order", 10_u64, -3))
2154            .unwrap();
2155        let row = repository
2156            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2157            .unwrap()
2158            .pop()
2159            .unwrap();
2160        assert_eq!(row.get("version"), Some(&Value::I64(4)));
2161    }
2162}
2163
2164#[cfg(all(test, feature = "sqlx"))]
2165mod sqlx_integration_tests {
2166    use super::sqlx_support::{
2167        MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2168        SqliteIdSpaceGenerator, SqliteMutationExecutor,
2169    };
2170    use super::{
2171        GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2172        InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2173        RepositoryBehavior, UserContext,
2174    };
2175    use chrono::{NaiveDate, TimeZone, Utc};
2176    use teaql_core::{
2177        DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2178        PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2179    };
2180    use teaql_dialect_pg::PostgresDialect;
2181    use teaql_dialect_sqlite::SqliteDialect;
2182    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2183    use teaql_sql::SqlDialect;
2184    use tokio::runtime::Handle;
2185    use tokio::task::block_in_place;
2186
2187    const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
2188
2189    fn entity() -> EntityDescriptor {
2190        EntityDescriptor::new("Order")
2191            .table_name("orders")
2192            .property(
2193                PropertyDescriptor::new("id", DataType::U64)
2194                    .column_name("id")
2195                    .id()
2196                    .not_null(),
2197            )
2198            .property(
2199                PropertyDescriptor::new("version", DataType::I64)
2200                    .column_name("version")
2201                    .version()
2202                    .not_null(),
2203            )
2204            .property(
2205                PropertyDescriptor::new("name", DataType::Text)
2206                    .column_name("name")
2207                    .not_null(),
2208            )
2209            .relation(
2210                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2211                    .local_key("id")
2212                    .foreign_key("order_id")
2213                    .many(),
2214            )
2215    }
2216
2217    fn sqlite_entity_keep_missing() -> EntityDescriptor {
2218        EntityDescriptor::new("Order")
2219            .table_name("orders")
2220            .property(
2221                PropertyDescriptor::new("id", DataType::U64)
2222                    .column_name("id")
2223                    .id()
2224                    .not_null(),
2225            )
2226            .property(
2227                PropertyDescriptor::new("version", DataType::I64)
2228                    .column_name("version")
2229                    .version()
2230                    .not_null(),
2231            )
2232            .property(
2233                PropertyDescriptor::new("name", DataType::Text)
2234                    .column_name("name")
2235                    .not_null(),
2236            )
2237            .relation(
2238                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2239                    .local_key("id")
2240                    .foreign_key("order_id")
2241                    .many()
2242                    .keep_missing(),
2243            )
2244    }
2245
2246    fn line_entity() -> EntityDescriptor {
2247        EntityDescriptor::new("OrderLine")
2248            .table_name("orderline")
2249            .property(
2250                PropertyDescriptor::new("id", DataType::U64)
2251                    .column_name("id")
2252                    .id()
2253                    .not_null(),
2254            )
2255            .property(
2256                PropertyDescriptor::new("version", DataType::I64)
2257                    .column_name("version")
2258                    .version(),
2259            )
2260            .property(
2261                PropertyDescriptor::new("order_id", DataType::U64)
2262                    .column_name("order_id")
2263                    .not_null(),
2264            )
2265            .property(
2266                PropertyDescriptor::new("name", DataType::Text)
2267                    .column_name("name")
2268                    .not_null(),
2269            )
2270            .property(
2271                PropertyDescriptor::new("product_id", DataType::U64)
2272                    .column_name("product_id")
2273                    .not_null(),
2274            )
2275            .relation(
2276                teaql_core::RelationDescriptor::new("product", "Product")
2277                    .local_key("product_id")
2278                    .foreign_key("id"),
2279            )
2280    }
2281
2282    fn product_entity() -> EntityDescriptor {
2283        EntityDescriptor::new("Product")
2284            .table_name("product")
2285            .property(
2286                PropertyDescriptor::new("id", DataType::U64)
2287                    .column_name("id")
2288                    .id()
2289                    .not_null(),
2290            )
2291            .property(
2292                PropertyDescriptor::new("name", DataType::Text)
2293                    .column_name("name")
2294                    .not_null(),
2295            )
2296    }
2297
2298    fn typed_entity() -> EntityDescriptor {
2299        EntityDescriptor::new("TypedValue")
2300            .table_name("typed_value")
2301            .property(
2302                PropertyDescriptor::new("id", DataType::U64)
2303                    .column_name("id")
2304                    .id()
2305                    .not_null(),
2306            )
2307            .property(
2308                PropertyDescriptor::new("payload", DataType::Json)
2309                    .column_name("payload")
2310                    .not_null(),
2311            )
2312            .property(
2313                PropertyDescriptor::new("amount", DataType::Decimal)
2314                    .column_name("amount")
2315                    .not_null(),
2316            )
2317            .property(
2318                PropertyDescriptor::new("birthday", DataType::Date)
2319                    .column_name("birthday")
2320                    .not_null(),
2321            )
2322            .property(
2323                PropertyDescriptor::new("happened_at", DataType::Timestamp)
2324                    .column_name("happened_at")
2325                    .not_null(),
2326            )
2327    }
2328
2329    struct OrderBehavior;
2330    struct NestedOrderBehavior;
2331
2332    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2333    #[teaql(entity = "Product", table = "product")]
2334    struct ProductEntityRow {
2335        #[teaql(id)]
2336        id: u64,
2337        name: String,
2338    }
2339
2340    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2341    #[teaql(entity = "OrderLine", table = "orderline")]
2342    struct OrderLineEntityRow {
2343        #[teaql(id)]
2344        id: u64,
2345        #[teaql(column = "order_id")]
2346        order_id: u64,
2347        name: String,
2348        #[teaql(column = "product_id")]
2349        product_id: u64,
2350        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2351        product: Option<ProductEntityRow>,
2352    }
2353
2354    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2355    #[teaql(entity = "Order", table = "orders")]
2356    struct OrderAggregateRow {
2357        #[teaql(id)]
2358        id: u64,
2359        #[teaql(version)]
2360        version: i64,
2361        name: String,
2362        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2363        lines: teaql_core::SmartList<OrderLineEntityRow>,
2364    }
2365
2366    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2367    #[teaql(entity = "Order", table = "orders")]
2368    struct Order {
2369        #[teaql(id)]
2370        id: u64,
2371        #[teaql(version)]
2372        version: i64,
2373        name: String,
2374    }
2375
2376    impl RepositoryBehavior for OrderBehavior {
2377        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2378            vec!["lines".to_owned()]
2379        }
2380    }
2381
2382    impl RepositoryBehavior for NestedOrderBehavior {
2383        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2384            vec!["lines.product".to_owned()]
2385        }
2386    }
2387
2388    #[derive(Clone)]
2389    struct SqliteSyncExecutor {
2390        inner: SqliteMutationExecutor,
2391    }
2392
2393    impl SqliteSyncExecutor {
2394        fn new(inner: SqliteMutationExecutor) -> Self {
2395            Self { inner }
2396        }
2397    }
2398
2399    impl QueryExecutor for SqliteSyncExecutor {
2400        type Error = MutationExecutorError;
2401
2402        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2403            let handle = Handle::current();
2404            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2405        }
2406
2407        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2408            let handle = Handle::current();
2409            block_in_place(|| handle.block_on(self.inner.execute(query)))
2410        }
2411
2412        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2413            let handle = Handle::current();
2414            block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2415            Ok(GraphTransactionBoundary::Started)
2416        }
2417
2418        fn commit_transaction(&self) -> Result<(), Self::Error> {
2419            let handle = Handle::current();
2420            block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2421        }
2422
2423        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2424            let handle = Handle::current();
2425            block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2426        }
2427    }
2428
2429    #[derive(Clone)]
2430    struct PgSyncExecutor {
2431        inner: PgMutationExecutor,
2432    }
2433
2434    impl PgSyncExecutor {
2435        fn new(inner: PgMutationExecutor) -> Self {
2436            Self { inner }
2437        }
2438    }
2439
2440    impl QueryExecutor for PgSyncExecutor {
2441        type Error = MutationExecutorError;
2442
2443        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2444            let handle = Handle::current();
2445            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2446        }
2447
2448        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2449            let handle = Handle::current();
2450            block_in_place(|| handle.block_on(self.inner.execute(query)))
2451        }
2452    }
2453
2454    #[derive(Clone)]
2455    struct PgTxSyncExecutor {
2456        inner: PgTransactionExecutor,
2457    }
2458
2459    impl PgTxSyncExecutor {
2460        fn new(inner: PgTransactionExecutor) -> Self {
2461            Self { inner }
2462        }
2463    }
2464
2465    impl QueryExecutor for PgTxSyncExecutor {
2466        type Error = MutationExecutorError;
2467
2468        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2469            let handle = Handle::current();
2470            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2471        }
2472
2473        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2474            let handle = Handle::current();
2475            block_in_place(|| handle.block_on(self.inner.execute(query)))
2476        }
2477
2478        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2479            Ok(GraphTransactionBoundary::AlreadyActive)
2480        }
2481
2482        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2483            let handle = Handle::current();
2484            block_in_place(|| handle.block_on(self.inner.rollback()))
2485        }
2486    }
2487
2488    #[tokio::test]
2489    async fn sqlite_executor_runs_crud_flow() {
2490        use sqlx::sqlite::SqlitePoolOptions;
2491
2492        let pool = SqlitePoolOptions::new()
2493            .max_connections(1)
2494            .connect("sqlite::memory:")
2495            .await
2496            .unwrap();
2497
2498        let executor = SqliteMutationExecutor::new(pool.clone());
2499        let dialect = SqliteDialect;
2500        let entity = entity();
2501        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2502
2503        let insert = dialect
2504            .compile_insert(
2505                &entity,
2506                &InsertCommand::new("Order")
2507                    .value("id", 1_u64)
2508                    .value("version", 1_i64)
2509                    .value("name", "first"),
2510            )
2511            .unwrap();
2512        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2513
2514        let select = dialect
2515            .compile_select(
2516                &entity,
2517                &SelectQuery::new("Order")
2518                    .project("id")
2519                    .project("version")
2520                    .project("name")
2521                    .filter(Expr::eq("id", 1_u64)),
2522            )
2523            .unwrap();
2524        let rows = executor.fetch_all(&select).await.unwrap();
2525        assert_eq!(rows.len(), 1);
2526        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2527        assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2528        assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2529
2530        let update = dialect
2531            .compile_update(
2532                &entity,
2533                &UpdateCommand::new("Order", 1_u64)
2534                    .expected_version(1)
2535                    .value("name", "second"),
2536            )
2537            .unwrap();
2538        assert_eq!(executor.execute(&update).await.unwrap(), 1);
2539
2540        let after_update = executor.fetch_all(&select).await.unwrap();
2541        assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2542        assert_eq!(
2543            after_update[0].get("name"),
2544            Some(&Value::Text("second".to_owned()))
2545        );
2546
2547        let delete = dialect
2548            .compile_delete(
2549                &entity,
2550                &DeleteCommand::new("Order", 1_u64).expected_version(2),
2551            )
2552            .unwrap();
2553        assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2554
2555        let after_delete = executor.fetch_all(&select).await.unwrap();
2556        assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2557
2558        let recover = dialect
2559            .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2560            .unwrap();
2561        assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2562
2563        let after_recover = executor.fetch_all(&select).await.unwrap();
2564        assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2565    }
2566
2567    #[tokio::test(flavor = "multi_thread")]
2568    async fn sqlite_executor_enhances_relations() {
2569        use sqlx::sqlite::SqlitePoolOptions;
2570
2571        let pool = SqlitePoolOptions::new()
2572            .max_connections(1)
2573            .connect("sqlite::memory:")
2574            .await
2575            .unwrap();
2576
2577        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2578        let order = entity();
2579        let line = line_entity();
2580        mutation_executor
2581            .ensure_schema(&SqliteDialect, &[&order, &line])
2582            .await
2583            .unwrap();
2584
2585        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2586            .execute(&pool)
2587            .await
2588            .unwrap();
2589        sqlx::query(
2590            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2591                (101, 1, 1001, 'l1'),
2592                (102, 1, 1002, 'l2'),
2593                (201, 2, 1003, 'l3')",
2594        )
2595        .execute(&pool)
2596        .await
2597        .unwrap();
2598
2599        let executor = SqliteSyncExecutor::new(mutation_executor);
2600        let mut ctx = UserContext::new()
2601            .with_metadata(
2602                InMemoryMetadataStore::new()
2603                    .with_entity(order)
2604                    .with_entity(line)
2605                    .with_entity(product_entity()),
2606            )
2607            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2608            .with_repository_behavior_registry(
2609                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2610            );
2611        ctx.insert_resource(SqliteDialect);
2612        ctx.insert_resource(executor);
2613
2614        let repo = ctx
2615            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2616            .unwrap();
2617        let mut parents = repo
2618            .fetch_all(
2619                &repo
2620                    .select()
2621                    .project("id")
2622                    .project("version")
2623                    .project("name")
2624                    .order_by(teaql_core::OrderBy::asc("id")),
2625            )
2626            .unwrap();
2627
2628        repo.enhance_relations(&mut parents).unwrap();
2629
2630        assert_eq!(parents.len(), 2);
2631        match parents[0].get("lines") {
2632            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2633            other => panic!("unexpected first lines payload: {other:?}"),
2634        }
2635        match parents[1].get("lines") {
2636            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2637            other => panic!("unexpected second lines payload: {other:?}"),
2638        }
2639    }
2640
2641    #[tokio::test(flavor = "multi_thread")]
2642    async fn sqlite_executor_enhances_nested_relations() {
2643        use sqlx::sqlite::SqlitePoolOptions;
2644
2645        let pool = SqlitePoolOptions::new()
2646            .max_connections(1)
2647            .connect("sqlite::memory:")
2648            .await
2649            .unwrap();
2650
2651        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2652        let order = entity();
2653        let line = line_entity();
2654        let product = product_entity();
2655        mutation_executor
2656            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2657            .await
2658            .unwrap();
2659
2660        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2661            .execute(&pool)
2662            .await
2663            .unwrap();
2664        sqlx::query(
2665            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2666                (101, 1, 1001, 'l1'),
2667                (102, 1, 1002, 'l2')",
2668        )
2669        .execute(&pool)
2670        .await
2671        .unwrap();
2672        sqlx::query(
2673            "INSERT INTO product (id, name) VALUES
2674                (1001, 'p1'),
2675                (1002, 'p2')",
2676        )
2677        .execute(&pool)
2678        .await
2679        .unwrap();
2680
2681        let executor = SqliteSyncExecutor::new(mutation_executor);
2682        let mut ctx = UserContext::new()
2683            .with_metadata(
2684                InMemoryMetadataStore::new()
2685                    .with_entity(order)
2686                    .with_entity(line)
2687                    .with_entity(product),
2688            )
2689            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2690            .with_repository_behavior_registry(
2691                InMemoryRepositoryBehaviorRegistry::new()
2692                    .with_behavior("Order", NestedOrderBehavior),
2693            );
2694        ctx.insert_resource(SqliteDialect);
2695        ctx.insert_resource(executor);
2696
2697        let repo = ctx
2698            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2699            .unwrap();
2700        let mut parents = repo
2701            .fetch_all(
2702                &repo
2703                    .select()
2704                    .project("id")
2705                    .project("version")
2706                    .project("name"),
2707            )
2708            .unwrap();
2709
2710        repo.enhance_relations(&mut parents).unwrap();
2711
2712        match parents[0].get("lines") {
2713            Some(Value::List(lines)) => {
2714                assert_eq!(lines.len(), 2);
2715                for line in lines {
2716                    match line {
2717                        Value::Object(line_record) => match line_record.get("product") {
2718                            Some(Value::Object(product)) => {
2719                                assert!(product.get("name").is_some());
2720                            }
2721                            other => panic!("unexpected product payload: {other:?}"),
2722                        },
2723                        other => panic!("unexpected line payload: {other:?}"),
2724                    }
2725                }
2726            }
2727            other => panic!("unexpected nested lines payload: {other:?}"),
2728        }
2729    }
2730
2731    #[tokio::test(flavor = "multi_thread")]
2732    async fn sqlite_executor_enhances_query_relation_with_child_query() {
2733        use sqlx::sqlite::SqlitePoolOptions;
2734
2735        let pool = SqlitePoolOptions::new()
2736            .max_connections(1)
2737            .connect("sqlite::memory:")
2738            .await
2739            .unwrap();
2740
2741        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2742        let order = entity();
2743        let line = line_entity();
2744        let product = product_entity();
2745        mutation_executor
2746            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2747            .await
2748            .unwrap();
2749
2750        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2751            .execute(&pool)
2752            .await
2753            .unwrap();
2754        sqlx::query(
2755            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2756                (101, 1, 1001, 'keep'),
2757                (102, 1, 1002, 'drop')",
2758        )
2759        .execute(&pool)
2760        .await
2761        .unwrap();
2762        sqlx::query(
2763            "INSERT INTO product (id, name) VALUES
2764                (1001, 'p1'),
2765                (1002, 'p2')",
2766        )
2767        .execute(&pool)
2768        .await
2769        .unwrap();
2770
2771        let executor = SqliteSyncExecutor::new(mutation_executor);
2772        let mut ctx = UserContext::new()
2773            .with_metadata(
2774                InMemoryMetadataStore::new()
2775                    .with_entity(order)
2776                    .with_entity(line)
2777                    .with_entity(product),
2778            )
2779            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2780        ctx.insert_resource(SqliteDialect);
2781        ctx.insert_resource(executor);
2782
2783        let repo = ctx
2784            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2785            .unwrap();
2786        let query = repo.select().relation_query(
2787            "lines",
2788            SelectQuery::new("OrderLine")
2789                .project("name")
2790                .filter(Expr::eq("name", "keep"))
2791                .order_desc("id")
2792                .page(0, 10)
2793                .relation_query("product", SelectQuery::new("Product").project("name")),
2794        );
2795        let mut parents = repo
2796            .fetch_all(
2797                &repo
2798                    .select()
2799                    .project("id")
2800                    .project("version")
2801                    .project("name"),
2802            )
2803            .unwrap();
2804
2805        repo.enhance_query_relations(&mut parents, &query).unwrap();
2806
2807        match parents[0].get("lines") {
2808            Some(Value::List(lines)) => {
2809                assert_eq!(lines.len(), 1);
2810                let Value::Object(line) = &lines[0] else {
2811                    panic!("unexpected line payload: {:?}", lines[0]);
2812                };
2813                assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2814                assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2815                assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2816                match line.get("product") {
2817                    Some(Value::Object(product)) => {
2818                        assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2819                        assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2820                    }
2821                    other => panic!("unexpected product payload: {other:?}"),
2822                }
2823            }
2824            other => panic!("unexpected query relation payload: {other:?}"),
2825        }
2826    }
2827
2828    #[tokio::test]
2829    async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2830        use sqlx::Row;
2831        use sqlx::sqlite::SqlitePoolOptions;
2832
2833        let pool = SqlitePoolOptions::new()
2834            .max_connections(1)
2835            .connect("sqlite::memory:")
2836            .await
2837            .unwrap();
2838
2839        sqlx::query(
2840            "CREATE TABLE orders (
2841                id INTEGER PRIMARY KEY
2842            )",
2843        )
2844        .execute(&pool)
2845        .await
2846        .unwrap();
2847
2848        let executor = SqliteMutationExecutor::new(pool.clone());
2849        let dialect = SqliteDialect;
2850        let entity = entity();
2851
2852        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2853
2854        let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2855            .fetch_all(&pool)
2856            .await
2857            .unwrap()
2858            .into_iter()
2859            .map(|row| row.try_get::<String, _>("name").unwrap())
2860            .collect::<Vec<_>>();
2861
2862        assert!(columns.contains(&"id".to_owned()));
2863        assert!(columns.contains(&"version".to_owned()));
2864        assert!(columns.contains(&"name".to_owned()));
2865    }
2866
2867    #[tokio::test]
2868    async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2869        use sqlx::Row;
2870        use sqlx::sqlite::SqlitePoolOptions;
2871
2872        let pool = SqlitePoolOptions::new()
2873            .max_connections(1)
2874            .connect("sqlite::memory:")
2875            .await
2876            .unwrap();
2877
2878        let module = super::RuntimeModule::new()
2879            .descriptor(entity())
2880            .descriptor(line_entity())
2881            .descriptor(product_entity())
2882            .initial_graph(
2883                GraphNode::new("Order")
2884                    .value("id", 1_u64)
2885                    .value("version", 1_i64)
2886                    .value("name", "seed-order"),
2887            );
2888        let mut ctx = super::UserContext::new().with_module(module);
2889        ctx.insert_resource(SqliteDialect);
2890        ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2891
2892        ctx.ensure_sqlite_schema().await.unwrap();
2893        sqlx::query("UPDATE orders SET name = 'stale-seed-order' WHERE id = 1")
2894            .execute(&pool)
2895            .await
2896            .unwrap();
2897        ctx.ensure_sqlite_schema().await.unwrap();
2898
2899        let tables = sqlx::query(
2900            "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2901        )
2902        .fetch_all(&pool)
2903        .await
2904        .unwrap()
2905        .into_iter()
2906        .map(|row| row.try_get::<String, _>("name").unwrap())
2907        .collect::<Vec<_>>();
2908
2909        assert_eq!(
2910            tables,
2911            vec![
2912                "orderline".to_owned(),
2913                "orders".to_owned(),
2914                "product".to_owned()
2915            ]
2916        );
2917
2918        let seed_count: i64 =
2919            sqlx::query_scalar("SELECT COUNT(1) FROM orders WHERE id = 1 AND name = 'seed-order'")
2920                .fetch_one(&pool)
2921                .await
2922                .unwrap();
2923        assert_eq!(seed_count, 1);
2924    }
2925
2926    #[tokio::test]
2927    async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2928        use sqlx::sqlite::SqlitePoolOptions;
2929
2930        let pool = SqlitePoolOptions::new()
2931            .max_connections(1)
2932            .connect("sqlite::memory:")
2933            .await
2934            .unwrap();
2935
2936        let executor = SqliteMutationExecutor::new(pool.clone());
2937        let dialect = SqliteDialect;
2938        let entity = typed_entity();
2939        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2940
2941        let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2942        let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2943        let payload = serde_json::json!({"name": "teaql", "count": 2});
2944        let amount = Decimal::new(12345, 2);
2945
2946        let insert = dialect
2947            .compile_insert(
2948                &entity,
2949                &InsertCommand::new("TypedValue")
2950                    .value("id", 1_u64)
2951                    .value("payload", payload.clone())
2952                    .value("amount", amount)
2953                    .value("birthday", birthday)
2954                    .value("happened_at", happened_at),
2955            )
2956            .unwrap();
2957        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2958
2959        let select = dialect
2960            .compile_select(
2961                &entity,
2962                &SelectQuery::new("TypedValue")
2963                    .project("id")
2964                    .project("payload")
2965                    .project("amount")
2966                    .project("birthday")
2967                    .project("happened_at")
2968                    .filter(Expr::eq("id", 1_u64)),
2969            )
2970            .unwrap();
2971        let rows = executor.fetch_all(&select).await.unwrap();
2972        assert_eq!(rows.len(), 1);
2973        assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2974        assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2975        assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2976        assert_eq!(
2977            rows[0].get("happened_at"),
2978            Some(&Value::Timestamp(happened_at))
2979        );
2980    }
2981
2982    #[tokio::test(flavor = "multi_thread")]
2983    async fn sqlite_fetches_enhanced_typed_entities() {
2984        use sqlx::sqlite::SqlitePoolOptions;
2985
2986        let pool = SqlitePoolOptions::new()
2987            .max_connections(1)
2988            .connect("sqlite::memory:")
2989            .await
2990            .unwrap();
2991
2992        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2993        let order = entity();
2994        let line = line_entity();
2995        let product = product_entity();
2996        mutation_executor
2997            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2998            .await
2999            .unwrap();
3000
3001        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
3002            .execute(&pool)
3003            .await
3004            .unwrap();
3005        sqlx::query(
3006            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3007                (101, 1, 1001, 'l1'),
3008                (102, 1, 1002, 'l2')",
3009        )
3010        .execute(&pool)
3011        .await
3012        .unwrap();
3013        sqlx::query(
3014            "INSERT INTO product (id, name) VALUES
3015                (1001, 'p1'),
3016                (1002, 'p2')",
3017        )
3018        .execute(&pool)
3019        .await
3020        .unwrap();
3021
3022        let executor = SqliteSyncExecutor::new(mutation_executor);
3023        let mut ctx = UserContext::new()
3024            .with_metadata(
3025                InMemoryMetadataStore::new()
3026                    .with_entity(order)
3027                    .with_entity(line)
3028                    .with_entity(product),
3029            )
3030            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3031            .with_repository_behavior_registry(
3032                InMemoryRepositoryBehaviorRegistry::new()
3033                    .with_behavior("Order", NestedOrderBehavior),
3034            );
3035        ctx.insert_resource(SqliteDialect);
3036        ctx.insert_resource(executor);
3037
3038        let repo = ctx
3039            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3040            .unwrap();
3041        let rows = repo
3042            .fetch_enhanced_entities::<OrderAggregateRow>(
3043                &repo
3044                    .select()
3045                    .project("id")
3046                    .project("version")
3047                    .project("name"),
3048            )
3049            .unwrap();
3050
3051        assert_eq!(rows.len(), 1);
3052        let order = rows.first().unwrap();
3053        assert_eq!(order.id, 1);
3054        assert_eq!(order.lines.len(), 2);
3055        assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
3056        assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
3057    }
3058
3059    #[tokio::test(flavor = "multi_thread")]
3060    async fn sqlite_fetches_typed_smart_list_entities() {
3061        use sqlx::sqlite::SqlitePoolOptions;
3062
3063        let pool = SqlitePoolOptions::new()
3064            .max_connections(1)
3065            .connect("sqlite::memory:")
3066            .await
3067            .unwrap();
3068
3069        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3070        let order = entity();
3071        mutation_executor
3072            .ensure_schema(&SqliteDialect, &[&order])
3073            .await
3074            .unwrap();
3075
3076        sqlx::query(
3077            "INSERT INTO orders (id, version, name) VALUES
3078                (1, 1, 'o1'),
3079                (2, 3, 'o2')",
3080        )
3081        .execute(&pool)
3082        .await
3083        .unwrap();
3084
3085        let executor = SqliteSyncExecutor::new(mutation_executor);
3086        let mut ctx = UserContext::new()
3087            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3088            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3089        ctx.insert_resource(SqliteDialect);
3090        ctx.insert_resource(executor);
3091
3092        let repo = ctx
3093            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3094            .unwrap();
3095        let rows = repo
3096            .fetch_entities::<OrderAggregateRow>(
3097                &repo
3098                    .select()
3099                    .project("id")
3100                    .project("version")
3101                    .project("name"),
3102            )
3103            .unwrap();
3104
3105        assert_eq!(rows.len(), 2);
3106        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3107        assert_eq!(rows.versions(), vec![1, 3]);
3108        assert!(rows.data[0].lines.is_empty());
3109        assert!(rows.data[1].lines.is_empty());
3110    }
3111
3112    #[tokio::test(flavor = "multi_thread")]
3113    async fn sqlite_fetches_smart_list_of_order_entities() {
3114        use sqlx::sqlite::SqlitePoolOptions;
3115
3116        let pool = SqlitePoolOptions::new()
3117            .max_connections(1)
3118            .connect("sqlite::memory:")
3119            .await
3120            .unwrap();
3121
3122        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3123        let order = entity();
3124        mutation_executor
3125            .ensure_schema(&SqliteDialect, &[&order])
3126            .await
3127            .unwrap();
3128
3129        sqlx::query(
3130            "INSERT INTO orders (id, version, name) VALUES
3131                (1, 1, 'o1'),
3132                (2, 3, 'o2')",
3133        )
3134        .execute(&pool)
3135        .await
3136        .unwrap();
3137
3138        let executor = SqliteSyncExecutor::new(mutation_executor);
3139        let mut ctx = UserContext::new()
3140            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3141            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3142        ctx.insert_resource(SqliteDialect);
3143        ctx.insert_resource(executor);
3144
3145        let repo = ctx
3146            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3147            .unwrap();
3148        let rows = repo
3149            .fetch_entities::<Order>(
3150                &repo
3151                    .select()
3152                    .project("id")
3153                    .project("version")
3154                    .project("name"),
3155            )
3156            .unwrap();
3157
3158        assert_eq!(
3159            rows.data,
3160            vec![
3161                Order {
3162                    id: 1,
3163                    version: 1,
3164                    name: "o1".to_owned(),
3165                },
3166                Order {
3167                    id: 2,
3168                    version: 3,
3169                    name: "o2".to_owned(),
3170                }
3171            ]
3172        );
3173        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3174        assert_eq!(rows.versions(), vec![1, 3]);
3175    }
3176
3177    #[tokio::test(flavor = "multi_thread")]
3178    async fn sqlite_insert_generates_missing_id() {
3179        use sqlx::sqlite::SqlitePoolOptions;
3180
3181        let pool = SqlitePoolOptions::new()
3182            .max_connections(1)
3183            .connect("sqlite::memory:")
3184            .await
3185            .unwrap();
3186
3187        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3188        let order = entity();
3189        mutation_executor
3190            .ensure_schema(&SqliteDialect, &[&order])
3191            .await
3192            .unwrap();
3193
3194        let executor = SqliteSyncExecutor::new(mutation_executor);
3195        let mut ctx = UserContext::new()
3196            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3197            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3198        ctx.insert_resource(SqliteDialect);
3199        ctx.insert_resource(executor);
3200
3201        let repo = ctx
3202            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3203            .unwrap();
3204        let affected = repo
3205            .insert(
3206                &repo
3207                    .insert_command()
3208                    .value("version", 1_i64)
3209                    .value("name", "generated"),
3210            )
3211            .unwrap();
3212        assert_eq!(affected, 1);
3213
3214        let rows = repo
3215            .fetch_entities::<Order>(
3216                &repo
3217                    .select()
3218                    .project("id")
3219                    .project("version")
3220                    .project("name")
3221                    .filter(Expr::eq("name", "generated")),
3222            )
3223            .unwrap();
3224        assert_eq!(rows.len(), 1);
3225        let order = rows.first().unwrap();
3226        assert!(order.id > 0);
3227        assert_eq!(order.version, 1);
3228        assert_eq!(order.name, "generated");
3229    }
3230
3231    #[tokio::test(flavor = "multi_thread")]
3232    async fn sqlite_save_graph_inserts_nested_rows() {
3233        use sqlx::{Row, sqlite::SqlitePoolOptions};
3234
3235        let pool = SqlitePoolOptions::new()
3236            .max_connections(1)
3237            .connect("sqlite::memory:?cache=shared")
3238            .await
3239            .unwrap();
3240
3241        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3242        let order = entity();
3243        let line = line_entity();
3244        let product = product_entity();
3245        mutation_executor
3246            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3247            .await
3248            .unwrap();
3249
3250        let executor = SqliteSyncExecutor::new(mutation_executor);
3251        let mut ctx = UserContext::new()
3252            .with_metadata(
3253                InMemoryMetadataStore::new()
3254                    .with_entity(order)
3255                    .with_entity(line)
3256                    .with_entity(product),
3257            )
3258            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3259        ctx.insert_resource(SqliteDialect);
3260        ctx.insert_resource(executor);
3261
3262        let repo = ctx
3263            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3264            .unwrap();
3265        let graph = GraphNode::new("Order")
3266            .value("id", 1_u64)
3267            .value("version", 1_i64)
3268            .value("name", "root")
3269            .relation(
3270                "lines",
3271                GraphNode::new("OrderLine")
3272                    .value("id", 10_u64)
3273                    .value("name", "line-1")
3274                    .relation(
3275                        "product",
3276                        GraphNode::new("Product")
3277                            .value("id", 100_u64)
3278                            .value("name", "sku-1"),
3279                    ),
3280            );
3281
3282        let saved = repo.save_graph(graph).unwrap();
3283        assert_eq!(
3284            saved.relations["lines"][0].values.get("order_id"),
3285            Some(&Value::U64(1))
3286        );
3287        assert_eq!(
3288            saved.relations["lines"][0].values.get("product_id"),
3289            Some(&Value::U64(100))
3290        );
3291
3292        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3293            .fetch_one(&pool)
3294            .await
3295            .unwrap()
3296            .try_get("count")
3297            .unwrap();
3298        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3299            .fetch_one(&pool)
3300            .await
3301            .unwrap();
3302        let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3303            .fetch_one(&pool)
3304            .await
3305            .unwrap()
3306            .try_get("count")
3307            .unwrap();
3308
3309        assert_eq!(order_count, 1);
3310        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3311        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3312        assert_eq!(product_count, 1);
3313    }
3314
3315    #[tokio::test(flavor = "multi_thread")]
3316    async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3317        use sqlx::{Row, sqlite::SqlitePoolOptions};
3318
3319        let pool = SqlitePoolOptions::new()
3320            .max_connections(1)
3321            .connect("sqlite::memory:")
3322            .await
3323            .unwrap();
3324
3325        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3326        let order = entity();
3327        let line = line_entity();
3328        let product = product_entity();
3329        mutation_executor
3330            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3331            .await
3332            .unwrap();
3333
3334        let executor = SqliteSyncExecutor::new(mutation_executor);
3335        let mut ctx = UserContext::new()
3336            .with_metadata(
3337                InMemoryMetadataStore::new()
3338                    .with_entity(order)
3339                    .with_entity(line)
3340                    .with_entity(product),
3341            )
3342            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3343        ctx.insert_resource(SqliteDialect);
3344        ctx.insert_resource(executor);
3345
3346        let repo = ctx
3347            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3348            .unwrap();
3349        let saved = repo
3350            .save_entity_graph(OrderAggregateRow {
3351                id: 2,
3352                version: 1,
3353                name: "typed-root".to_owned(),
3354                lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3355                    id: 20,
3356                    order_id: 0,
3357                    name: "typed-line".to_owned(),
3358                    product_id: 200,
3359                    product: Some(ProductEntityRow {
3360                        id: 200,
3361                        name: "typed-sku".to_owned(),
3362                    }),
3363                }]),
3364            })
3365            .unwrap();
3366
3367        assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3368        assert_eq!(
3369            saved.relations["lines"][0].values.get("order_id"),
3370            Some(&Value::U64(2))
3371        );
3372        assert_eq!(
3373            saved.relations["lines"][0].values.get("product_id"),
3374            Some(&Value::U64(200))
3375        );
3376
3377        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3378            .fetch_one(&pool)
3379            .await
3380            .unwrap()
3381            .try_get("count")
3382            .unwrap();
3383        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3384            .fetch_one(&pool)
3385            .await
3386            .unwrap();
3387        let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3388            .fetch_one(&pool)
3389            .await
3390            .unwrap();
3391
3392        assert_eq!(order_count, 1);
3393        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3394        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3395        assert_eq!(product_name, "typed-sku");
3396    }
3397
3398    #[tokio::test(flavor = "multi_thread")]
3399    async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3400        use sqlx::sqlite::SqlitePoolOptions;
3401        use std::time::{SystemTime, UNIX_EPOCH};
3402
3403        let db_path = std::env::temp_dir().join(format!(
3404            "teaql-plan-{}-{}.db",
3405            std::process::id(),
3406            SystemTime::now()
3407                .duration_since(UNIX_EPOCH)
3408                .unwrap()
3409                .as_nanos()
3410        ));
3411        let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3412        let pool = SqlitePoolOptions::new()
3413            .max_connections(1)
3414            .connect(&db_url)
3415            .await
3416            .unwrap();
3417
3418        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3419        let order = entity();
3420        let line = line_entity();
3421        let product = product_entity();
3422        mutation_executor
3423            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3424            .await
3425            .unwrap();
3426
3427        sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3428            .execute(&pool)
3429            .await
3430            .unwrap();
3431        sqlx::query(
3432            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3433                (200, 1, 100, 301, 'line-a'),
3434                (201, 1, 100, 302, 'line-b')",
3435        )
3436        .execute(&pool)
3437        .await
3438        .unwrap();
3439        let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3440            .fetch_one(&pool)
3441            .await
3442            .unwrap();
3443        assert_eq!(seeded_lines, 2);
3444
3445        let executor = SqliteSyncExecutor::new(mutation_executor);
3446        let mut ctx = UserContext::new()
3447            .with_metadata(
3448                InMemoryMetadataStore::new()
3449                    .with_entity(order)
3450                    .with_entity(line)
3451                    .with_entity(product),
3452            )
3453            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3454            .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3455        ctx.insert_resource(SqliteDialect);
3456        ctx.insert_resource(executor);
3457
3458        let graph = GraphNode::new("Order")
3459            .value("id", 100_u64)
3460            .value("version", 1_i64)
3461            .value("name", "existing-updated")
3462            .relation(
3463                "lines",
3464                GraphNode::new("OrderLine")
3465                    .value("name", "new-line-a")
3466                    .value("product_id", 401_u64),
3467            )
3468            .relation(
3469                "lines",
3470                GraphNode::new("OrderLine")
3471                    .value("name", "new-line-b")
3472                    .value("product_id", 402_u64),
3473            )
3474            .relation(
3475                "lines",
3476                GraphNode::new("OrderLine")
3477                    .value("id", 200_u64)
3478                    .value("version", 1_i64)
3479                    .value("name", "line-a-updated"),
3480            )
3481            .relation(
3482                "lines",
3483                GraphNode::new("OrderLine")
3484                    .value("id", 201_u64)
3485                    .value("version", 1_i64)
3486                    .value("name", "line-b-updated"),
3487            );
3488
3489        let plan = ctx
3490            .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3491            .unwrap();
3492        let counts = plan.grouped_counts();
3493        assert_eq!(
3494            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3495            Some(&1)
3496        );
3497        assert_eq!(
3498            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3499            Some(&2)
3500        );
3501        assert_eq!(
3502            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3503            Some(&2)
3504        );
3505
3506        let create_batch = plan
3507            .batches
3508            .iter()
3509            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3510            .unwrap();
3511        assert_eq!(create_batch.items.len(), 2);
3512        assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3513        assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3514
3515        let update_batch = plan
3516            .batches
3517            .iter()
3518            .find(|batch| {
3519                batch.entity == "OrderLine"
3520                    && batch.kind == GraphMutationKind::Update
3521                    && batch.update_fields == vec!["name".to_owned()]
3522            })
3523            .unwrap();
3524        assert_eq!(update_batch.items.len(), 2);
3525
3526        let repo = ctx
3527            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3528            .unwrap();
3529        let saved = repo.execute_graph_plan(plan).unwrap();
3530        let lines = saved.relations.get("lines").unwrap();
3531        assert_eq!(lines.len(), 4);
3532
3533        let generated_count: i64 = sqlx::query_scalar(
3534            "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3535        )
3536        .fetch_one(&pool)
3537        .await
3538        .unwrap();
3539        assert_eq!(generated_count, 2);
3540        let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3541            .fetch_one(&pool)
3542            .await
3543            .unwrap();
3544        assert_eq!(updated_name, "line-a-updated");
3545        pool.close().await;
3546        let _ = std::fs::remove_file(db_path);
3547    }
3548
3549    #[tokio::test(flavor = "multi_thread")]
3550    async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3551        use sqlx::{Row, sqlite::SqlitePoolOptions};
3552
3553        let pool = SqlitePoolOptions::new()
3554            .max_connections(1)
3555            .connect("sqlite::memory:")
3556            .await
3557            .unwrap();
3558
3559        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3560        let order = entity();
3561        let line = line_entity();
3562        let product = product_entity();
3563        mutation_executor
3564            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3565            .await
3566            .unwrap();
3567
3568        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3569            .execute(&pool)
3570            .await
3571            .unwrap();
3572        sqlx::query(
3573            "INSERT INTO product (id, name) VALUES
3574                (100, 'old-sku'),
3575                (101, 'removed-sku')",
3576        )
3577        .execute(&pool)
3578        .await
3579        .unwrap();
3580        sqlx::query(
3581            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3582                (10, 1, 100, 'old-line'),
3583                (11, 1, 101, 'removed-line')",
3584        )
3585        .execute(&pool)
3586        .await
3587        .unwrap();
3588        sqlx::query("UPDATE orderline SET version = 1")
3589            .execute(&pool)
3590            .await
3591            .unwrap();
3592
3593        let executor = SqliteSyncExecutor::new(mutation_executor);
3594        let mut ctx = UserContext::new()
3595            .with_metadata(
3596                InMemoryMetadataStore::new()
3597                    .with_entity(order)
3598                    .with_entity(line)
3599                    .with_entity(product),
3600            )
3601            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3602        ctx.insert_resource(SqliteDialect);
3603        ctx.insert_resource(executor);
3604
3605        let repo = ctx
3606            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3607            .unwrap();
3608        let graph = GraphNode::new("Order")
3609            .value("id", 1_u64)
3610            .value("version", 1_i64)
3611            .value("name", "new-root")
3612            .relation(
3613                "lines",
3614                GraphNode::new("OrderLine")
3615                    .value("id", 10_u64)
3616                    .value("version", 1_i64)
3617                    .value("name", "new-line")
3618                    .relation(
3619                        "product",
3620                        GraphNode::new("Product")
3621                            .value("id", 100_u64)
3622                            .value("name", "new-sku"),
3623                    ),
3624            )
3625            .relation(
3626                "lines",
3627                GraphNode::new("OrderLine")
3628                    .value("id", 12_u64)
3629                    .value("version", 1_i64)
3630                    .value("name", "added-line")
3631                    .relation(
3632                        "product",
3633                        GraphNode::new("Product")
3634                            .value("id", 102_u64)
3635                            .value("name", "added-sku"),
3636                    ),
3637            );
3638
3639        let saved = repo.save_graph(graph).unwrap();
3640        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3641
3642        let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3643            .fetch_one(&pool)
3644            .await
3645            .unwrap();
3646        assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3647        assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3648
3649        let updated_line =
3650            sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3651                .fetch_one(&pool)
3652                .await
3653                .unwrap();
3654        assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3655        assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3656        assert_eq!(
3657            updated_line.try_get::<String, _>("name").unwrap(),
3658            "new-line"
3659        );
3660
3661        let added_line =
3662            sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3663                .fetch_one(&pool)
3664                .await
3665                .unwrap();
3666        assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3667        assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3668        assert_eq!(
3669            added_line.try_get::<String, _>("name").unwrap(),
3670            "added-line"
3671        );
3672
3673        let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3674            .fetch_one(&pool)
3675            .await
3676            .unwrap();
3677        assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3678
3679        let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3680            .fetch_one(&pool)
3681            .await
3682            .unwrap();
3683        assert_eq!(
3684            updated_product.try_get::<String, _>("name").unwrap(),
3685            "new-sku"
3686        );
3687    }
3688
3689    #[tokio::test(flavor = "multi_thread")]
3690    async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3691        use sqlx::{Row, sqlite::SqlitePoolOptions};
3692
3693        let pool = SqlitePoolOptions::new()
3694            .max_connections(1)
3695            .connect("sqlite::memory:")
3696            .await
3697            .unwrap();
3698
3699        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3700        let order = sqlite_entity_keep_missing();
3701        let line = line_entity();
3702        let product = product_entity();
3703        mutation_executor
3704            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3705            .await
3706            .unwrap();
3707
3708        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3709            .execute(&pool)
3710            .await
3711            .unwrap();
3712        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3713            .execute(&pool)
3714            .await
3715            .unwrap();
3716        sqlx::query(
3717            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3718                (10, 1, 1, 100, 'remove-me'),
3719                (11, 1, 1, 100, 'keep-me')",
3720        )
3721        .execute(&pool)
3722        .await
3723        .unwrap();
3724
3725        let executor = SqliteSyncExecutor::new(mutation_executor);
3726        let mut ctx = UserContext::new()
3727            .with_metadata(
3728                InMemoryMetadataStore::new()
3729                    .with_entity(order)
3730                    .with_entity(line)
3731                    .with_entity(product),
3732            )
3733            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3734        ctx.insert_resource(SqliteDialect);
3735        ctx.insert_resource(executor);
3736
3737        let repo = ctx
3738            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3739            .unwrap();
3740        let graph = GraphNode::new("Order")
3741            .value("id", 1_u64)
3742            .value("version", 1_i64)
3743            .value("name", "root-updated")
3744            .relation(
3745                "lines",
3746                GraphNode::new("OrderLine")
3747                    .value("id", 10_u64)
3748                    .value("version", 1_i64)
3749                    .remove(),
3750            )
3751            .relation(
3752                "lines",
3753                GraphNode::new("OrderLine")
3754                    .value("id", 12_u64)
3755                    .value("version", 1_i64)
3756                    .value("name", "new-reference-line")
3757                    .relation(
3758                        "product",
3759                        GraphNode::new("Product").value("id", 100_u64).reference(),
3760                    ),
3761            );
3762
3763        let saved = repo.save_graph(graph).unwrap();
3764        assert_eq!(
3765            saved.relations["lines"][1].relations["product"][0]
3766                .values
3767                .get("name"),
3768            Some(&Value::Text("reference-only".to_owned()))
3769        );
3770
3771        let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3772            .fetch_one(&pool)
3773            .await
3774            .unwrap();
3775        assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3776
3777        let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3778            .fetch_one(&pool)
3779            .await
3780            .unwrap();
3781        assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3782        assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3783
3784        let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3785            .fetch_one(&pool)
3786            .await
3787            .unwrap();
3788        assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3789
3790        let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3791            .fetch_one(&pool)
3792            .await
3793            .unwrap();
3794        assert_eq!(
3795            product.try_get::<String, _>("name").unwrap(),
3796            "reference-only"
3797        );
3798    }
3799
3800    #[tokio::test(flavor = "multi_thread")]
3801    async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3802        use sqlx::sqlite::SqlitePoolOptions;
3803
3804        let pool = SqlitePoolOptions::new()
3805            .max_connections(1)
3806            .connect("sqlite::memory:")
3807            .await
3808            .unwrap();
3809
3810        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3811        let order = entity();
3812        let line = line_entity();
3813        let product = product_entity();
3814        mutation_executor
3815            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3816            .await
3817            .unwrap();
3818
3819        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3820            .execute(&pool)
3821            .await
3822            .unwrap();
3823        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3824            .execute(&pool)
3825            .await
3826            .unwrap();
3827        sqlx::query(
3828            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3829                (10, 1, 1, 100, 'line-10')",
3830        )
3831        .execute(&pool)
3832        .await
3833        .unwrap();
3834
3835        let executor = SqliteSyncExecutor::new(mutation_executor);
3836        let mut ctx = UserContext::new()
3837            .with_metadata(
3838                InMemoryMetadataStore::new()
3839                    .with_entity(order)
3840                    .with_entity(line)
3841                    .with_entity(product),
3842            )
3843            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3844        ctx.insert_resource(SqliteDialect);
3845        ctx.insert_resource(executor);
3846
3847        let repo = ctx
3848            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3849            .unwrap();
3850
3851        let missing_reference = repo.save_graph(
3852            GraphNode::new("Order").value("id", 1_u64).relation(
3853                "lines",
3854                GraphNode::new("OrderLine")
3855                    .value("id", 12_u64)
3856                    .value("version", 1_i64)
3857                    .value("name", "bad-reference")
3858                    .relation(
3859                        "product",
3860                        GraphNode::new("Product").value("id", 999_u64).reference(),
3861                    ),
3862            ),
3863        );
3864        assert!(format!("{missing_reference:?}").contains("does not exist"));
3865
3866        let mutable_reference = repo.save_graph(
3867            GraphNode::new("Order").value("id", 1_u64).relation(
3868                "lines",
3869                GraphNode::new("OrderLine")
3870                    .value("id", 12_u64)
3871                    .value("version", 1_i64)
3872                    .value("name", "mutable-reference")
3873                    .relation(
3874                        "product",
3875                        GraphNode::new("Product")
3876                            .value("id", 100_u64)
3877                            .value("name", "should-not-mutate")
3878                            .reference(),
3879                    ),
3880            ),
3881        );
3882        assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3883
3884        let duplicate_child = repo.save_graph(
3885            GraphNode::new("Order")
3886                .value("id", 1_u64)
3887                .relation(
3888                    "lines",
3889                    GraphNode::new("OrderLine")
3890                        .value("id", 10_u64)
3891                        .value("version", 1_i64)
3892                        .reference(),
3893                )
3894                .relation(
3895                    "lines",
3896                    GraphNode::new("OrderLine")
3897                        .value("id", 10_u64)
3898                        .value("version", 1_i64)
3899                        .reference(),
3900                ),
3901        );
3902        assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3903
3904        let reference_version_conflict = repo.save_graph(
3905            GraphNode::new("Order").value("id", 1_u64).relation(
3906                "lines",
3907                GraphNode::new("OrderLine")
3908                    .value("id", 10_u64)
3909                    .value("version", 2_i64)
3910                    .reference(),
3911            ),
3912        );
3913        assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3914
3915        let remove_with_child = repo.save_graph(
3916            GraphNode::new("Order").value("id", 1_u64).relation(
3917                "lines",
3918                GraphNode::new("OrderLine")
3919                    .value("id", 10_u64)
3920                    .value("version", 1_i64)
3921                    .relation(
3922                        "product",
3923                        GraphNode::new("Product").value("id", 100_u64).reference(),
3924                    )
3925                    .remove(),
3926            ),
3927        );
3928        assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3929
3930        let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3931        assert!(format!("{remove:?}").contains("does not exist"));
3932    }
3933
3934    #[tokio::test(flavor = "multi_thread")]
3935    async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3936        use sqlx::{Row, sqlite::SqlitePoolOptions};
3937
3938        let pool = SqlitePoolOptions::new()
3939            .max_connections(1)
3940            .connect("sqlite::memory:")
3941            .await
3942            .unwrap();
3943
3944        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3945        let order = entity();
3946        let line = line_entity();
3947        let product = product_entity();
3948        mutation_executor
3949            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3950            .await
3951            .unwrap();
3952
3953        let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3954        let mut ctx = UserContext::new()
3955            .with_metadata(
3956                InMemoryMetadataStore::new()
3957                    .with_entity(order)
3958                    .with_entity(line)
3959                    .with_entity(product),
3960            )
3961            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3962        ctx.insert_resource(SqliteDialect);
3963        ctx.insert_resource(executor);
3964
3965        let repo = ctx
3966            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3967            .unwrap();
3968        let error = repo.save_graph(
3969            GraphNode::new("Order")
3970                .value("id", 1_u64)
3971                .value("version", 1_i64)
3972                .value("name", "rollback-root")
3973                .relation(
3974                    "lines",
3975                    GraphNode::new("OrderLine")
3976                        .value("id", 10_u64)
3977                        .value("version", 1_i64)
3978                        .value("name", "rollback-line")
3979                        .value("product_id", 999_u64)
3980                        .relation(
3981                            "product",
3982                            GraphNode::new("Product").value("id", 999_u64).reference(),
3983                        ),
3984                ),
3985        );
3986        assert!(format!("{error:?}").contains("does not exist"));
3987
3988        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3989            .fetch_one(&pool)
3990            .await
3991            .unwrap()
3992            .try_get("count")
3993            .unwrap();
3994        assert_eq!(count, 0);
3995    }
3996
3997    #[tokio::test]
3998    async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3999        use sqlx::{PgPool, Row};
4000
4001        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4002            return;
4003        };
4004
4005        let pool = PgPool::connect(&database_url).await.unwrap();
4006        sqlx::query("DROP TABLE IF EXISTS orderline")
4007            .execute(&pool)
4008            .await
4009            .unwrap();
4010        sqlx::query("DROP TABLE IF EXISTS orders")
4011            .execute(&pool)
4012            .await
4013            .unwrap();
4014
4015        let executor = PgMutationExecutor::new(pool.clone());
4016        let dialect = PostgresDialect;
4017        let order = entity();
4018        let line = line_entity();
4019
4020        executor
4021            .ensure_schema(&dialect, &[&order, &line])
4022            .await
4023            .unwrap();
4024
4025        let tables = sqlx::query(
4026            "SELECT table_name
4027             FROM information_schema.tables
4028             WHERE table_schema = current_schema()
4029               AND table_name IN ('orders', 'orderline')
4030             ORDER BY table_name",
4031        )
4032        .fetch_all(&pool)
4033        .await
4034        .unwrap()
4035        .into_iter()
4036        .map(|row| row.try_get::<String, _>("table_name").unwrap())
4037        .collect::<Vec<_>>();
4038        assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
4039
4040        let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
4041            .fetch_one(&pool)
4042            .await
4043            .unwrap();
4044        assert_eq!(soundex, "R163");
4045
4046        sqlx::query(
4047            "INSERT INTO orders (id, version, name) VALUES
4048                (1, 1, 'draft'),
4049                (2, 1, 'submitted'),
4050                (3, 1, 'archived')",
4051        )
4052        .execute(&pool)
4053        .await
4054        .unwrap();
4055
4056        let array_bound_query = dialect
4057            .compile_select(
4058                &order,
4059                &SelectQuery::new("Order")
4060                    .filter(
4061                        Expr::in_large(
4062                            "id",
4063                            vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
4064                        )
4065                        .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
4066                    )
4067                    .order_asc("id"),
4068            )
4069            .unwrap();
4070        assert_eq!(
4071            array_bound_query.sql,
4072            format!(
4073                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
4074            )
4075        );
4076        let rows = executor.fetch_all(&array_bound_query).await.unwrap();
4077        assert_eq!(rows.len(), 2);
4078        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4079        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4080
4081        sqlx::query(
4082            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
4083                (11, 1, 1, 101, 'line-1'),
4084                (12, 1, 2, 102, 'line-2'),
4085                (13, 1, 3, 103, 'archived-line')",
4086        )
4087        .execute(&pool)
4088        .await
4089        .unwrap();
4090
4091        let subquery = dialect
4092            .compile_select(
4093                &order,
4094                &SelectQuery::new("Order")
4095                    .filter(Expr::in_subquery(
4096                        "id",
4097                        line.clone(),
4098                        SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4099                        "order_id",
4100                    ))
4101                    .order_asc("id"),
4102            )
4103            .unwrap();
4104        assert_eq!(
4105            subquery.sql,
4106            format!(
4107                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4108            )
4109        );
4110        let rows = executor.fetch_all(&subquery).await.unwrap();
4111        assert_eq!(rows.len(), 2);
4112        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4113        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4114
4115        let projected = dialect
4116            .compile_select(
4117                &order,
4118                &SelectQuery::new("Order")
4119                    .project("id")
4120                    .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4121                    .order_gbk_asc("name")
4122                    .limit(1),
4123            )
4124            .unwrap();
4125        assert_eq!(
4126            projected.sql,
4127            "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4128        );
4129        let rows = executor.fetch_all(&projected).await.unwrap();
4130        assert_eq!(rows.len(), 1);
4131        assert!(rows[0].contains_key("nameSound"));
4132
4133        let aggregate = dialect
4134            .compile_select(
4135                &order,
4136                &SelectQuery::new("Order")
4137                    .count("total")
4138                    .stddev("version", "stddevVersion")
4139                    .var_pop("version", "varPopVersion")
4140                    .bit_or("version", "bitOrVersion")
4141                    .having(Expr::binary(
4142                        Expr::count_all(),
4143                        teaql_core::BinaryOp::Gt,
4144                        Expr::value(2_i64),
4145                    )),
4146            )
4147            .unwrap();
4148        assert_eq!(
4149            aggregate.sql,
4150            "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4151        );
4152        let rows = executor.fetch_all(&aggregate).await.unwrap();
4153        assert_eq!(rows.len(), 1);
4154        assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4155        assert!(matches!(
4156            rows[0].get("stddevVersion"),
4157            Some(Value::Decimal(_))
4158        ));
4159        assert!(matches!(
4160            rows[0].get("varPopVersion"),
4161            Some(Value::Decimal(_))
4162        ));
4163        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4164
4165        sqlx::query("DROP TABLE orderline")
4166            .execute(&pool)
4167            .await
4168            .unwrap();
4169        sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4170            .execute(&pool)
4171            .await
4172            .unwrap();
4173
4174        executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4175
4176        let columns = sqlx::query(
4177            "SELECT column_name
4178             FROM information_schema.columns
4179             WHERE table_schema = current_schema()
4180               AND table_name = 'orderline'
4181             ORDER BY column_name",
4182        )
4183        .fetch_all(&pool)
4184        .await
4185        .unwrap()
4186        .into_iter()
4187        .map(|row| row.try_get::<String, _>("column_name").unwrap())
4188        .collect::<Vec<_>>();
4189        assert!(columns.contains(&"id".to_owned()));
4190        assert!(columns.contains(&"order_id".to_owned()));
4191        assert!(columns.contains(&"product_id".to_owned()));
4192        assert!(columns.contains(&"name".to_owned()));
4193
4194        sqlx::query("DROP TABLE IF EXISTS orderline")
4195            .execute(&pool)
4196            .await
4197            .unwrap();
4198        sqlx::query("DROP TABLE IF EXISTS orders")
4199            .execute(&pool)
4200            .await
4201            .unwrap();
4202    }
4203
4204    #[tokio::test(flavor = "multi_thread")]
4205    async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4206        use sqlx::{PgPool, Row};
4207
4208        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4209            return;
4210        };
4211
4212        let pool = PgPool::connect(&database_url).await.unwrap();
4213        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4214            .execute(&pool)
4215            .await
4216            .unwrap();
4217        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4218            .execute(&pool)
4219            .await
4220            .unwrap();
4221
4222        let order = entity().table_name("orders_idgen");
4223        let executor = PgMutationExecutor::new(pool.clone());
4224        executor
4225            .ensure_schema(&PostgresDialect, &[&order])
4226            .await
4227            .unwrap();
4228
4229        let mut ctx = UserContext::new()
4230            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4231            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4232            .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4233        ctx.insert_resource(PostgresDialect);
4234        ctx.insert_resource(PgSyncExecutor::new(executor));
4235
4236        let repo = ctx
4237            .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4238            .unwrap();
4239        repo.insert(
4240            &repo
4241                .insert_command()
4242                .value("version", 1_i64)
4243                .value("name", "generated-1"),
4244        )
4245        .unwrap();
4246        repo.insert(
4247            &repo
4248                .insert_command()
4249                .value("version", 1_i64)
4250                .value("name", "generated-2"),
4251        )
4252        .unwrap();
4253
4254        let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4255            .fetch_all(&pool)
4256            .await
4257            .unwrap()
4258            .into_iter()
4259            .map(|row| row.try_get::<i64, _>("id").unwrap())
4260            .collect::<Vec<_>>();
4261        assert_eq!(ids, vec![1, 2]);
4262
4263        let current: i64 = sqlx::query_scalar(
4264            "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4265        )
4266        .fetch_one(&pool)
4267        .await
4268        .unwrap();
4269        assert_eq!(current, 2);
4270
4271        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4272            .execute(&pool)
4273            .await
4274            .unwrap();
4275        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4276            .execute(&pool)
4277            .await
4278            .unwrap();
4279    }
4280
4281    #[tokio::test]
4282    async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4283        use sqlx::PgPool;
4284
4285        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4286            return;
4287        };
4288
4289        let pool = PgPool::connect(&database_url).await.unwrap();
4290        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4291            .execute(&pool)
4292            .await
4293            .unwrap();
4294
4295        let order = entity().table_name("orders_agg");
4296        let executor = PgMutationExecutor::new(pool.clone());
4297        let dialect = PostgresDialect;
4298        executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4299
4300        sqlx::query(
4301            "INSERT INTO orders_agg (id, version, name) VALUES
4302                (1, 1, 'A'),
4303                (2, 2, 'A'),
4304                (3, 3, 'B'),
4305                (4, 4, 'B')",
4306        )
4307        .execute(&pool)
4308        .await
4309        .unwrap();
4310
4311        let aggregate = dialect
4312            .compile_select(
4313                &order,
4314                &SelectQuery::new("Order")
4315                    .count("total")
4316                    .count_field("version", "versionCount")
4317                    .sum("version", "sumVersion")
4318                    .avg("version", "avgVersion")
4319                    .min("version", "minVersion")
4320                    .max("version", "maxVersion")
4321                    .stddev("version", "stddevVersion")
4322                    .stddev_pop("version", "stddevPopVersion")
4323                    .var_samp("version", "varSampVersion")
4324                    .var_pop("version", "varPopVersion")
4325                    .bit_and("version", "bitAndVersion")
4326                    .bit_or("version", "bitOrVersion")
4327                    .bit_xor("version", "bitXorVersion")
4328                    .having(Expr::binary(
4329                        Expr::count_all(),
4330                        teaql_core::BinaryOp::Gt,
4331                        Expr::value(3_i64),
4332                    )),
4333            )
4334            .unwrap();
4335        let rows = executor.fetch_all(&aggregate).await.unwrap();
4336        assert_eq!(rows.len(), 1);
4337        let row = &rows[0];
4338        assert_eq!(row.get("total"), Some(&Value::I64(4)));
4339        assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4340        assert_eq!(
4341            row.get("sumVersion"),
4342            Some(&Value::Decimal(Decimal::new(10, 0)))
4343        );
4344        assert_eq!(
4345            row.get("avgVersion"),
4346            Some(&Value::Decimal(Decimal::new(25, 1)))
4347        );
4348        assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4349        assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4350        assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4351        assert!(matches!(
4352            row.get("stddevPopVersion"),
4353            Some(Value::Decimal(_))
4354        ));
4355        assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4356        assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4357        assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4358        assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4359        assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4360
4361        let grouped = dialect
4362            .compile_select(
4363                &order,
4364                &SelectQuery::new("Order")
4365                    .group_by("name")
4366                    .count("total")
4367                    .sum("version", "sumVersion")
4368                    .having(Expr::binary(
4369                        Expr::count_all(),
4370                        teaql_core::BinaryOp::Gt,
4371                        Expr::value(1_i64),
4372                    ))
4373                    .order_asc("name"),
4374            )
4375            .unwrap();
4376        let rows = executor.fetch_all(&grouped).await.unwrap();
4377        assert_eq!(rows.len(), 2);
4378        assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4379        assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4380        assert_eq!(
4381            rows[0].get("sumVersion"),
4382            Some(&Value::Decimal(Decimal::new(3, 0)))
4383        );
4384        assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4385        assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4386        assert_eq!(
4387            rows[1].get("sumVersion"),
4388            Some(&Value::Decimal(Decimal::new(7, 0)))
4389        );
4390
4391        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4392            .execute(&pool)
4393            .await
4394            .unwrap();
4395    }
4396
4397    #[tokio::test]
4398    async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4399        use sqlx::{PgPool, Row};
4400
4401        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4402            return;
4403        };
4404
4405        let pool = PgPool::connect(&database_url).await.unwrap();
4406        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4407            .execute(&pool)
4408            .await
4409            .unwrap();
4410        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4411            .execute(&pool)
4412            .await
4413            .unwrap();
4414        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4415            .execute(&pool)
4416            .await
4417            .unwrap();
4418
4419        let order = entity().table_name("orders_ctx");
4420        let line = line_entity().table_name("orderline_ctx");
4421        let product = product_entity().table_name("product_ctx");
4422        let module = super::RuntimeModule::new()
4423            .descriptor(order)
4424            .descriptor(line)
4425            .descriptor(product);
4426        let mut ctx = super::UserContext::new().with_module(module);
4427        ctx.insert_resource(PostgresDialect);
4428        ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4429
4430        ctx.ensure_postgres_schema().await.unwrap();
4431
4432        let tables = sqlx::query(
4433            "SELECT table_name
4434             FROM information_schema.tables
4435             WHERE table_schema = current_schema()
4436               AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4437             ORDER BY table_name",
4438        )
4439        .fetch_all(&pool)
4440        .await
4441        .unwrap()
4442        .into_iter()
4443        .map(|row| row.try_get::<String, _>("table_name").unwrap())
4444        .collect::<Vec<_>>();
4445
4446        assert_eq!(
4447            tables,
4448            vec![
4449                "orderline_ctx".to_owned(),
4450                "orders_ctx".to_owned(),
4451                "product_ctx".to_owned()
4452            ]
4453        );
4454
4455        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4456            .execute(&pool)
4457            .await
4458            .unwrap();
4459        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4460            .execute(&pool)
4461            .await
4462            .unwrap();
4463        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4464            .execute(&pool)
4465            .await
4466            .unwrap();
4467    }
4468
4469    #[tokio::test(flavor = "multi_thread")]
4470    async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4471        use sqlx::{PgPool, Row};
4472
4473        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4474            return;
4475        };
4476
4477        let pool = PgPool::connect(&database_url).await.unwrap();
4478        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4479            .execute(&pool)
4480            .await
4481            .unwrap();
4482        sqlx::query("DROP TABLE IF EXISTS product_tx")
4483            .execute(&pool)
4484            .await
4485            .unwrap();
4486        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4487            .execute(&pool)
4488            .await
4489            .unwrap();
4490
4491        let order = entity().table_name("orders_tx");
4492        let line = line_entity().table_name("orderline_tx");
4493        let product = product_entity().table_name("product_tx");
4494        let schema_executor = PgMutationExecutor::new(pool.clone());
4495        schema_executor
4496            .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4497            .await
4498            .unwrap();
4499
4500        let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4501        let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4502        let mut ctx = UserContext::new()
4503            .with_metadata(
4504                InMemoryMetadataStore::new()
4505                    .with_entity(order)
4506                    .with_entity(line)
4507                    .with_entity(product),
4508            )
4509            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4510        ctx.insert_resource(PostgresDialect);
4511        ctx.insert_resource(sync_executor);
4512
4513        let repo = ctx
4514            .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4515            .unwrap();
4516        repo.save_graph(
4517            GraphNode::new("Order")
4518                .value("id", 1_u64)
4519                .value("version", 1_i64)
4520                .value("name", "pg-rollback-root")
4521                .relation(
4522                    "lines",
4523                    GraphNode::new("OrderLine")
4524                        .value("id", 10_u64)
4525                        .value("version", 1_i64)
4526                        .value("name", "pg-rollback-line")
4527                        .relation(
4528                            "product",
4529                            GraphNode::new("Product")
4530                                .value("id", 100_u64)
4531                                .value("name", "pg-rollback-sku"),
4532                        ),
4533                ),
4534        )
4535        .unwrap();
4536
4537        tx_executor.rollback().await.unwrap();
4538
4539        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4540            .fetch_one(&pool)
4541            .await
4542            .unwrap()
4543            .try_get("count")
4544            .unwrap();
4545        assert_eq!(count, 0);
4546
4547        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4548            .execute(&pool)
4549            .await
4550            .unwrap();
4551        sqlx::query("DROP TABLE IF EXISTS product_tx")
4552            .execute(&pool)
4553            .await
4554            .unwrap();
4555        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4556            .execute(&pool)
4557            .await
4558            .unwrap();
4559    }
4560}
4561pub use checker::{
4562    CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4563    CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4564    mark_record_status,
4565};