Skip to main content

teaql_runtime/
lib.rs

1mod checker;
2mod context;
3mod error;
4mod event;
5mod graph;
6mod id;
7mod language;
8mod memory;
9mod registry;
10mod repository;
11
12pub use context::{SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
13pub use error::{ContextError, RepositoryError, RuntimeError};
14pub use event::{EntityEvent, EntityEventKind, EntityEventSink, InMemoryEntityEventSink};
15pub use graph::{
16    GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
17    GraphOperation, sorted_update_fields,
18};
19pub(crate) use id::local_id_generator;
20pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
21pub use language::{
22    BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
23};
24pub use memory::{MemoryRepository, MemoryRepositoryError};
25pub use registry::{
26    InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
27    MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
28    RuntimeModule,
29};
30pub use repository::{
31    AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
32    QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
33};
34
35#[cfg(feature = "sqlx")]
36pub mod sqlx_support;
37
38#[cfg(test)]
39mod tests {
40    use std::collections::{BTreeMap, VecDeque};
41    use std::sync::{Arc, Mutex};
42
43    use super::{
44        AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResults,
45        Checker, EntityEvent, EntityEventKind, EntityEventSink, GraphMutationKind, GraphNode,
46        GraphTransactionBoundary, InMemoryAggregationCache, InMemoryCheckerRegistry,
47        InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
48        InternalIdGenerator, Language, MemoryRepository, MetadataStore, ObjectLocation,
49        QueryExecutor, Repository, RepositoryBehavior, RepositoryError, RuntimeError,
50        RuntimeModule, SqlLogOperation, SqlLogOptions, UserContext, translate_check_result,
51    };
52    use teaql_core::{
53        Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
54        EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
55        RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
56    };
57    use teaql_dialect_pg::PostgresDialect;
58    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
59    use teaql_sql::CompiledQuery;
60
61    const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
62
63    fn entity() -> EntityDescriptor {
64        EntityDescriptor::new("Order")
65            .table_name("orders")
66            .property(
67                PropertyDescriptor::new("id", DataType::U64)
68                    .column_name("id")
69                    .id()
70                    .not_null(),
71            )
72            .property(
73                PropertyDescriptor::new("version", DataType::I64)
74                    .column_name("version")
75                    .version()
76                    .not_null(),
77            )
78            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
79            .relation(
80                teaql_core::RelationDescriptor::new("lines", "OrderLine")
81                    .local_key("id")
82                    .foreign_key("order_id")
83                    .many(),
84            )
85    }
86
87    fn line_entity() -> EntityDescriptor {
88        EntityDescriptor::new("OrderLine")
89            .table_name("orderline")
90            .property(
91                PropertyDescriptor::new("id", DataType::U64)
92                    .column_name("id")
93                    .id()
94                    .not_null(),
95            )
96            .property(
97                PropertyDescriptor::new("version", DataType::I64)
98                    .column_name("version")
99                    .version(),
100            )
101            .property(
102                PropertyDescriptor::new("order_id", DataType::U64)
103                    .column_name("order_id")
104                    .not_null(),
105            )
106            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
107            .property(
108                PropertyDescriptor::new("product_id", DataType::U64)
109                    .column_name("product_id")
110                    .not_null(),
111            )
112            .relation(
113                teaql_core::RelationDescriptor::new("product", "Product")
114                    .local_key("product_id")
115                    .foreign_key("id"),
116            )
117    }
118
119    fn product_entity() -> EntityDescriptor {
120        EntityDescriptor::new("Product")
121            .table_name("product")
122            .property(
123                PropertyDescriptor::new("id", DataType::U64)
124                    .column_name("id")
125                    .id()
126                    .not_null(),
127            )
128            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
129    }
130
131    #[derive(Debug, Default)]
132    struct StubExecutor {
133        affected: u64,
134        rows: Vec<Record>,
135    }
136
137    #[derive(Debug, Default)]
138    struct QueueExecutor {
139        affected: u64,
140        rows: Mutex<VecDeque<Vec<Record>>>,
141        queries: Mutex<Vec<String>>,
142    }
143
144    struct OrderBehavior;
145
146    #[allow(dead_code)]
147    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
148    #[teaql(entity = "CatalogProduct", table = "catalog_product")]
149    struct CatalogProductRow {
150        #[teaql(id)]
151        id: u64,
152        name: String,
153    }
154
155    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
156    #[teaql(entity = "OrderAggregate", table = "orders")]
157    struct OrderAggregateDynamic {
158        #[teaql(id)]
159        id: u64,
160        #[teaql(dynamic)]
161        dynamic: BTreeMap<String, Value>,
162    }
163
164    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
165    #[teaql(entity = "Product", table = "product")]
166    struct ProductEntityRow {
167        #[teaql(id)]
168        id: u64,
169        name: String,
170    }
171
172    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
173    #[teaql(entity = "OrderLine", table = "orderline")]
174    struct OrderLineEntityRow {
175        #[teaql(id)]
176        id: u64,
177        #[teaql(column = "order_id")]
178        order_id: u64,
179        name: String,
180        #[teaql(column = "product_id")]
181        product_id: u64,
182        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
183        product: Option<ProductEntityRow>,
184    }
185
186    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
187    #[teaql(entity = "Order", table = "orders")]
188    struct OrderAggregateRow {
189        #[teaql(id)]
190        id: u64,
191        #[teaql(version)]
192        version: i64,
193        name: String,
194        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
195        lines: teaql_core::SmartList<OrderLineEntityRow>,
196    }
197
198    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
199    #[teaql(entity = "Order", table = "orders")]
200    struct Order {
201        #[teaql(id)]
202        id: u64,
203        #[teaql(version)]
204        version: i64,
205        name: String,
206    }
207
208    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
209    #[teaql(entity = "Product", table = "product")]
210    struct TypedGraphProduct {
211        #[teaql(id)]
212        id: Option<u64>,
213        name: String,
214    }
215
216    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
217    #[teaql(entity = "OrderLine", table = "orderline")]
218    struct TypedGraphLine {
219        #[teaql(id)]
220        id: Option<u64>,
221        #[teaql(column = "order_id")]
222        order_id: Option<u64>,
223        name: String,
224        #[teaql(column = "product_id")]
225        product_id: Option<u64>,
226        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
227        product: Option<TypedGraphProduct>,
228    }
229
230    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
231    #[teaql(entity = "Order", table = "orders")]
232    struct TypedGraphOrder {
233        #[teaql(id)]
234        id: Option<u64>,
235        #[teaql(version)]
236        version: i64,
237        name: String,
238        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
239        lines: teaql_core::SmartList<TypedGraphLine>,
240    }
241
242    #[derive(Debug, PartialEq, Eq)]
243    struct OrderEntity {
244        id: u64,
245        version: i64,
246        name: String,
247    }
248
249    impl teaql_core::TeaqlEntity for OrderEntity {
250        fn entity_descriptor() -> EntityDescriptor {
251            entity()
252        }
253    }
254
255    impl Entity for OrderEntity {
256        fn from_record(record: Record) -> Result<Self, EntityError> {
257            let id = match record.get("id") {
258                Some(Value::U64(v)) => *v,
259                Some(Value::I64(v)) if *v >= 0 => *v as u64,
260                other => {
261                    return Err(EntityError::new(
262                        "Order",
263                        format!("invalid id field: {other:?}"),
264                    ));
265                }
266            };
267            let version = match record.get("version") {
268                Some(Value::I64(v)) => *v,
269                other => {
270                    return Err(EntityError::new(
271                        "Order",
272                        format!("invalid version field: {other:?}"),
273                    ));
274                }
275            };
276            let name = match record.get("name") {
277                Some(Value::Text(v)) => v.clone(),
278                other => {
279                    return Err(EntityError::new(
280                        "Order",
281                        format!("invalid name field: {other:?}"),
282                    ));
283                }
284            };
285            Ok(Self { id, version, name })
286        }
287
288        fn into_record(self) -> Record {
289            Record::from([
290                (String::from("id"), Value::U64(self.id)),
291                (String::from("version"), Value::I64(self.version)),
292                (String::from("name"), Value::Text(self.name)),
293            ])
294        }
295    }
296
297    #[derive(Debug)]
298    struct StubError;
299
300    impl std::fmt::Display for StubError {
301        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302            write!(f, "stub error")
303        }
304    }
305
306    impl std::error::Error for StubError {}
307
308    impl QueryExecutor for StubExecutor {
309        type Error = StubError;
310
311        fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
312            Ok(self.rows.clone())
313        }
314
315        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
316            Ok(self.affected)
317        }
318
319        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
320            Ok(GraphTransactionBoundary::Started)
321        }
322    }
323
324    impl QueryExecutor for QueueExecutor {
325        type Error = StubError;
326
327        fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
328            self.queries.lock().unwrap().push(query.sql.clone());
329            Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
330        }
331
332        fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
333            Ok(self.affected)
334        }
335    }
336
337    #[test]
338    fn user_context_records_configured_sql_logs() {
339        let mut ctx = UserContext::new()
340            .with_module(crate::module!(Order))
341            .with_sql_log_options(SqlLogOptions::select_only());
342        ctx.insert_resource(PostgresDialect);
343        ctx.insert_resource(StubExecutor {
344            affected: 1,
345            rows: Vec::new(),
346        });
347
348        {
349            let repo = ctx
350                .resolve_repository::<PostgresDialect, StubExecutor>("Order")
351                .unwrap();
352            repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
353                .unwrap();
354            repo.insert(&InsertCommand::new("Order").value("name", "created"))
355                .unwrap();
356        }
357
358        let logs = ctx.sql_logs();
359        assert_eq!(logs.len(), 1);
360        assert_eq!(logs[0].operation, SqlLogOperation::Select);
361        assert_eq!(
362            logs[0].debug_sql,
363            format!(
364                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
365            )
366        );
367
368        ctx.set_sql_log_options(SqlLogOptions::mutation_only());
369        ctx.clear_sql_logs();
370        ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
371            .unwrap()
372            .update(
373                &UpdateCommand::new("Order", 1_u64)
374                    .value("name", "updated")
375                    .expected_version(1),
376            )
377            .unwrap();
378
379        let logs = ctx.sql_logs();
380        assert_eq!(logs.len(), 1);
381        assert_eq!(logs[0].operation, SqlLogOperation::Update);
382        assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
383        assert!(logs[0].debug_sql.contains("'updated'"));
384    }
385
386    impl RepositoryBehavior for OrderBehavior {
387        fn before_select(
388            &self,
389            _ctx: &UserContext,
390            query: &mut teaql_core::SelectQuery,
391        ) -> Result<(), RuntimeError> {
392            query.filter = Some(Expr::eq("version", 1_i64));
393            Ok(())
394        }
395
396        fn before_insert(
397            &self,
398            _ctx: &UserContext,
399            command: &mut InsertCommand,
400        ) -> Result<(), RuntimeError> {
401            command
402                .values
403                .entry("version".to_owned())
404                .or_insert(Value::I64(1));
405            Ok(())
406        }
407
408        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
409            vec!["lines".to_owned()]
410        }
411    }
412
413    struct ContextAwareOrderBehavior;
414    struct OrderChecker;
415    #[derive(Clone)]
416    struct RecordingEventSink {
417        events: Arc<Mutex<Vec<EntityEvent>>>,
418    }
419
420    impl RepositoryBehavior for ContextAwareOrderBehavior {
421        fn before_insert(
422            &self,
423            ctx: &UserContext,
424            command: &mut InsertCommand,
425        ) -> Result<(), RuntimeError> {
426            let tenant = ctx
427                .get_named_resource::<String>("tenant")
428                .cloned()
429                .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
430            let version = *ctx
431                .get_named_resource::<i64>("initial_version")
432                .ok_or_else(|| {
433                    RuntimeError::Behavior("missing initial_version resource".to_owned())
434                })?;
435            let trace_id = match ctx.local("trace_id") {
436                Some(Value::Text(value)) => value.clone(),
437                other => {
438                    return Err(RuntimeError::Behavior(format!(
439                        "missing trace_id local, got {other:?}"
440                    )));
441                }
442            };
443
444            command
445                .values
446                .entry("name".to_owned())
447                .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
448            command
449                .values
450                .entry("version".to_owned())
451                .or_insert(Value::I64(version));
452            Ok(())
453        }
454    }
455
456    impl Checker for OrderChecker {
457        fn entity(&self) -> &str {
458            "Order"
459        }
460
461        fn check_and_fix(
462            &self,
463            _ctx: &UserContext,
464            record: &mut Record,
465            location: &ObjectLocation,
466            results: &mut CheckResults,
467        ) {
468            let status = CheckObjectStatus::from_record(record);
469            if status.is_create() {
470                self.required(record, "name", location, results);
471                record.entry("version".to_owned()).or_insert(Value::I64(1));
472            }
473            if status.is_update()
474                && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
475            {
476                record.insert(
477                    "name".to_owned(),
478                    Value::Text("graph-update-checked".to_owned()),
479                );
480            }
481            self.min_string_length(record, "name", 3, location, results);
482        }
483    }
484
485    impl EntityEventSink for RecordingEventSink {
486        fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
487            self.events.lock().unwrap().push(event.clone());
488            Ok(())
489        }
490    }
491
492    struct FixedIdGenerator(u64);
493
494    impl InternalIdGenerator for FixedIdGenerator {
495        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
496            Ok(self.0)
497        }
498    }
499
500    struct SequentialIdGenerator {
501        next: Mutex<u64>,
502    }
503
504    impl SequentialIdGenerator {
505        fn new(next: u64) -> Self {
506            Self {
507                next: Mutex::new(next),
508            }
509        }
510    }
511
512    impl InternalIdGenerator for SequentialIdGenerator {
513        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
514            let mut next = self
515                .next
516                .lock()
517                .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
518            let id = *next;
519            *next += 1;
520            Ok(id)
521        }
522    }
523
524    #[test]
525    fn metadata_store_registers_entities() {
526        let store = InMemoryMetadataStore::new().with_entity(entity());
527        assert!(store.entity("Order").is_some());
528    }
529
530    #[test]
531    fn runtime_module_registers_descriptor_into_context() {
532        let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
533        assert!(ctx.entity("Order").is_some());
534        assert!(ctx.has_repository("Order"));
535    }
536
537    #[test]
538    fn runtime_module_registers_derived_entity_and_behavior() {
539        let ctx = UserContext::new().with_module(
540            RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
541        );
542        assert!(ctx.entity("CatalogProduct").is_some());
543        assert!(ctx.has_repository("CatalogProduct"));
544        assert!(ctx.repository_behavior("CatalogProduct").is_some());
545    }
546
547    #[test]
548    fn module_macro_registers_multiple_entities() {
549        let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
550        assert!(ctx.entity("CatalogProduct").is_some());
551        assert!(ctx.has_repository("CatalogProduct"));
552    }
553
554    #[test]
555    fn module_macro_registers_entity_behavior_pairs() {
556        let ctx =
557            UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
558        assert!(ctx.entity("CatalogProduct").is_some());
559        assert!(ctx.repository_behavior("CatalogProduct").is_some());
560    }
561
562    #[test]
563    fn repository_returns_optimistic_lock_conflict() {
564        let store = InMemoryMetadataStore::new().with_entity(entity());
565        let executor = StubExecutor {
566            affected: 0,
567            rows: Vec::new(),
568        };
569        let repo = Repository::new(&PostgresDialect, &store, &executor);
570
571        let err = repo
572            .update(
573                &UpdateCommand::new("Order", 1_u64)
574                    .expected_version(3)
575                    .value("name", "next"),
576            )
577            .unwrap_err();
578
579        match err {
580            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
581            other => panic!("unexpected error: {other}"),
582        }
583    }
584
585    #[test]
586    fn user_context_indexes_resources_and_locals() {
587        let mut ctx =
588            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
589        ctx.insert_resource::<u64>(42);
590        ctx.insert_named_resource("tenant", String::from("acme"));
591        ctx.put_local("trace_id", "req-1");
592
593        assert!(ctx.entity("Order").is_some());
594        assert_eq!(ctx.get_resource::<u64>(), Some(&42));
595        assert_eq!(
596            ctx.get_named_resource::<String>("tenant"),
597            Some(&String::from("acme"))
598        );
599        assert_eq!(
600            ctx.local("trace_id"),
601            Some(&Value::Text("req-1".to_owned()))
602        );
603    }
604
605    #[test]
606    fn user_context_builds_context_repository() {
607        let mut ctx =
608            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
609        ctx.insert_resource(PostgresDialect);
610        ctx.insert_resource(StubExecutor {
611            affected: 1,
612            rows: Vec::new(),
613        });
614
615        let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
616        let affected = repo
617            .update(
618                &UpdateCommand::new("Order", 1_u64)
619                    .expected_version(3)
620                    .value("name", "next"),
621            )
622            .unwrap();
623
624        assert_eq!(affected, 1);
625    }
626
627    #[test]
628    fn user_context_resolves_repository_by_entity_type() {
629        let mut ctx = UserContext::new()
630            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
631            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
632        ctx.insert_resource(PostgresDialect);
633        ctx.insert_resource(StubExecutor {
634            affected: 1,
635            rows: Vec::new(),
636        });
637
638        let repo = ctx
639            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
640            .unwrap();
641        assert_eq!(repo.entity(), "Order");
642        assert_eq!(repo.select().entity, "Order");
643
644        let affected = repo
645            .insert(
646                &repo
647                    .insert_command()
648                    .value("id", 1_u64)
649                    .value("version", 1_i64)
650                    .value("name", "n"),
651            )
652            .unwrap();
653        assert_eq!(affected, 1);
654    }
655
656    #[test]
657    fn resolved_repository_applies_behavior_hooks() {
658        let mut ctx = UserContext::new()
659            .with_metadata(
660                InMemoryMetadataStore::new()
661                    .with_entity(entity())
662                    .with_entity(line_entity())
663                    .with_entity(product_entity()),
664            )
665            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
666            .with_repository_behavior_registry(
667                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
668            );
669        ctx.insert_resource(PostgresDialect);
670        ctx.insert_resource(StubExecutor {
671            affected: 1,
672            rows: Vec::new(),
673        });
674
675        let repo = ctx
676            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
677            .unwrap();
678
679        let compiled = repo.compile(&repo.select()).unwrap();
680        assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
681
682        let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
683        let affected = repo.insert(&insert).unwrap();
684        assert_eq!(affected, 1);
685        assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
686    }
687
688    #[test]
689    fn resolved_repository_prepares_insert_command_with_generated_id() {
690        let mut ctx = UserContext::new()
691            .with_metadata(
692                InMemoryMetadataStore::new()
693                    .with_entity(entity())
694                    .with_entity(line_entity())
695                    .with_entity(product_entity()),
696            )
697            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
698            .with_repository_behavior_registry(
699                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
700            )
701            .with_internal_id_generator(FixedIdGenerator(42));
702        ctx.insert_resource(PostgresDialect);
703        ctx.insert_resource(StubExecutor {
704            affected: 1,
705            rows: Vec::new(),
706        });
707
708        let repo = ctx
709            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
710            .unwrap();
711
712        let prepared = repo
713            .prepare_insert_command(&repo.insert_command().value("name", "n"))
714            .unwrap();
715
716        assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
717        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
718        assert_eq!(
719            prepared.values.get("name"),
720            Some(&Value::Text("n".to_owned()))
721        );
722    }
723
724    #[test]
725    fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
726        let mut ctx = UserContext::new()
727            .with_metadata(
728                InMemoryMetadataStore::new()
729                    .with_entity(entity())
730                    .with_entity(line_entity())
731                    .with_entity(product_entity()),
732            )
733            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
734            .with_repository_behavior_registry(
735                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
736            )
737            .with_internal_id_generator(SequentialIdGenerator::new(500));
738        ctx.insert_resource(PostgresDialect);
739        ctx.insert_resource(StubExecutor {
740            affected: 1,
741            rows: Vec::new(),
742        });
743
744        let repo = ctx
745            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
746            .unwrap();
747        let graph = GraphNode::new("Order").value("name", "root").relation(
748            "lines",
749            GraphNode::new("OrderLine")
750                .value("name", "line-1")
751                .relation("product", GraphNode::new("Product").value("name", "sku-1")),
752        );
753
754        let saved = repo.save_graph(graph).unwrap();
755
756        assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
757        assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
758        let lines = saved.relations.get("lines").unwrap();
759        assert_eq!(lines.len(), 1);
760        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
761        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
762        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
763        let product = lines[0].relations.get("product").unwrap();
764        assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
765    }
766
767    #[test]
768    fn resolved_repository_extracts_and_saves_typed_entity_graph() {
769        let mut ctx = UserContext::new()
770            .with_metadata(
771                InMemoryMetadataStore::new()
772                    .with_entity(entity())
773                    .with_entity(line_entity())
774                    .with_entity(product_entity()),
775            )
776            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
777            .with_internal_id_generator(SequentialIdGenerator::new(700));
778        ctx.insert_resource(PostgresDialect);
779        ctx.insert_resource(StubExecutor {
780            affected: 1,
781            rows: Vec::new(),
782        });
783
784        let repo = ctx
785            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
786            .unwrap();
787        let order = TypedGraphOrder {
788            id: None,
789            version: 1,
790            name: "typed-root".to_owned(),
791            lines: teaql_core::SmartList::from(vec![TypedGraphLine {
792                id: None,
793                order_id: None,
794                name: "typed-line".to_owned(),
795                product_id: None,
796                product: Some(TypedGraphProduct {
797                    id: None,
798                    name: "typed-product".to_owned(),
799                }),
800            }]),
801        };
802
803        let extracted = repo.graph_node_from_entity(order).unwrap();
804        assert_eq!(extracted.entity, "Order");
805        assert_eq!(
806            extracted.values.get("name"),
807            Some(&Value::Text("typed-root".to_owned()))
808        );
809        assert_eq!(extracted.values.get("id"), Some(&Value::Null));
810        assert_eq!(extracted.relations["lines"].len(), 1);
811        assert_eq!(
812            extracted.relations["lines"][0].values.get("name"),
813            Some(&Value::Text("typed-line".to_owned()))
814        );
815        assert_eq!(
816            extracted.relations["lines"][0].relations["product"].len(),
817            1
818        );
819
820        let saved = repo.save_graph(extracted).unwrap();
821        assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
822        let lines = saved.relations.get("lines").unwrap();
823        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
824        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
825        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
826        assert_eq!(
827            lines[0].relations["product"][0].values.get("id"),
828            Some(&Value::U64(702))
829        );
830    }
831
832    #[test]
833    fn resolved_repository_saves_typed_entity_graph_directly() {
834        let mut ctx = UserContext::new()
835            .with_metadata(
836                InMemoryMetadataStore::new()
837                    .with_entity(entity())
838                    .with_entity(line_entity())
839                    .with_entity(product_entity()),
840            )
841            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
842            .with_internal_id_generator(SequentialIdGenerator::new(800));
843        ctx.insert_resource(PostgresDialect);
844        ctx.insert_resource(StubExecutor {
845            affected: 1,
846            rows: Vec::new(),
847        });
848
849        let repo = ctx
850            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
851            .unwrap();
852        let saved = repo
853            .save_entity_graph(TypedGraphOrder {
854                id: None,
855                version: 1,
856                name: "typed-direct".to_owned(),
857                lines: teaql_core::SmartList::from(vec![TypedGraphLine {
858                    id: None,
859                    order_id: None,
860                    name: "typed-line".to_owned(),
861                    product_id: None,
862                    product: Some(TypedGraphProduct {
863                        id: None,
864                        name: "typed-product".to_owned(),
865                    }),
866                }]),
867            })
868            .unwrap();
869
870        assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
871        assert_eq!(
872            saved.relations["lines"][0].values.get("order_id"),
873            Some(&Value::U64(800))
874        );
875        assert_eq!(
876            saved.relations["lines"][0].values.get("product_id"),
877            Some(&Value::U64(802))
878        );
879    }
880
881    #[test]
882    fn custom_user_context_can_drive_insert_preparation() {
883        let mut ctx = UserContext::new()
884            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
885            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
886            .with_repository_behavior_registry(
887                InMemoryRepositoryBehaviorRegistry::new()
888                    .with_behavior("Order", ContextAwareOrderBehavior),
889            )
890            .with_internal_id_generator(FixedIdGenerator(99));
891        ctx.insert_named_resource("tenant", String::from("acme"));
892        ctx.insert_named_resource("initial_version", 7_i64);
893        ctx.put_local("trace_id", "req-9");
894        ctx.insert_resource(PostgresDialect);
895        ctx.insert_resource(StubExecutor {
896            affected: 1,
897            rows: Vec::new(),
898        });
899
900        let repo = ctx
901            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
902            .unwrap();
903        let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
904
905        assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
906        assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
907        assert_eq!(
908            prepared.values.get("name"),
909            Some(&Value::Text("acme:req-9".to_owned()))
910        );
911    }
912
913    #[test]
914    fn checker_registry_validates_and_fixes_insert_commands() {
915        let mut ctx = UserContext::new()
916            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
917            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
918            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
919            .with_internal_id_generator(FixedIdGenerator(77));
920        ctx.insert_resource(PostgresDialect);
921        ctx.insert_resource(StubExecutor {
922            affected: 1,
923            rows: Vec::new(),
924        });
925
926        let repo = ctx
927            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
928            .unwrap();
929        let prepared = repo
930            .prepare_insert_command(&repo.insert_command().value("name", "valid"))
931            .unwrap();
932
933        assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
934        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
935        assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
936
937        let error = repo
938            .prepare_insert_command(&repo.insert_command().value("name", "no"))
939            .unwrap_err();
940        match error {
941            RuntimeError::Check(results) => {
942                assert_eq!(results.len(), 1);
943                assert_eq!(results[0].location.to_string(), "name");
944            }
945            other => panic!("unexpected checker error: {other:?}"),
946        }
947    }
948
949    #[test]
950    fn checker_registry_validates_update_commands_without_required_insert_checks() {
951        let mut ctx = UserContext::new()
952            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
953            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
954            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
955        ctx.insert_resource(PostgresDialect);
956        ctx.insert_resource(StubExecutor {
957            affected: 1,
958            rows: Vec::new(),
959        });
960
961        let repo = ctx
962            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
963            .unwrap();
964        repo.update(&repo.update_command(1_u64).value("version", 1_i64))
965            .unwrap();
966
967        let error = repo
968            .update(&repo.update_command(1_u64).value("name", "no"))
969            .unwrap_err();
970        match error {
971            RepositoryError::Runtime(RuntimeError::Check(results)) => {
972                assert_eq!(results.len(), 1);
973                assert_eq!(results[0].location.to_string(), "name");
974            }
975            other => panic!("unexpected checker error: {other:?}"),
976        }
977    }
978
979    #[test]
980    fn built_in_language_translators_cover_fifteen_languages() {
981        assert_eq!(Language::ALL.len(), 15);
982        let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
983        let messages = Language::ALL
984            .iter()
985            .map(|language| translate_check_result(*language, &result))
986            .collect::<Vec<_>>();
987
988        assert!(messages.iter().all(|message| !message.is_empty()));
989        assert!(messages.iter().any(|message| message.contains("required")));
990        assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
991        assert!(
992            messages
993                .iter()
994                .any(|message| message.contains("obligatoire"))
995        );
996        assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
997        assert_eq!(
998            Language::from_code("zh-TW"),
999            Some(Language::TraditionalChinese)
1000        );
1001    }
1002
1003    #[test]
1004    fn user_context_language_switch_translates_checker_errors() {
1005        let mut ctx = UserContext::new()
1006            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1007            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1008            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1009            .with_internal_id_generator(FixedIdGenerator(77))
1010            .with_language(Language::Chinese);
1011        ctx.insert_resource(PostgresDialect);
1012        ctx.insert_resource(StubExecutor {
1013            affected: 1,
1014            rows: Vec::new(),
1015        });
1016
1017        let repo = ctx
1018            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1019            .unwrap();
1020        let error = repo
1021            .prepare_insert_command(&repo.insert_command())
1022            .unwrap_err();
1023        match error {
1024            RuntimeError::Check(results) => {
1025                assert_eq!(results.len(), 1);
1026                assert!(
1027                    results[0]
1028                        .message
1029                        .as_ref()
1030                        .is_some_and(|message| message.contains("å¿…å¡«"))
1031                );
1032            }
1033            other => panic!("unexpected checker error: {other:?}"),
1034        }
1035
1036        let mut ctx = UserContext::new().with_language(Language::English);
1037        ctx.set_language_code("es").unwrap();
1038        assert_eq!(ctx.language(), Language::Spanish);
1039    }
1040
1041    #[test]
1042    fn checker_registry_merges_graph_update_fixes_by_object_status() {
1043        let mut ctx = UserContext::new()
1044            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1045            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1046            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1047        ctx.insert_resource(PostgresDialect);
1048        ctx.insert_resource(StubExecutor {
1049            affected: 1,
1050            rows: vec![Record::from([
1051                ("id".to_owned(), Value::U64(1)),
1052                ("version".to_owned(), Value::I64(1)),
1053                ("name".to_owned(), Value::Text("old".to_owned())),
1054            ])],
1055        });
1056
1057        let repo = ctx
1058            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1059            .unwrap();
1060        let saved = repo
1061            .save_graph(
1062                GraphNode::new("Order")
1063                    .value("id", 1_u64)
1064                    .value("version", 1_i64)
1065                    .value("name", "graph-update"),
1066            )
1067            .unwrap();
1068
1069        assert_eq!(
1070            saved.values.get("name"),
1071            Some(&Value::Text("graph-update-checked".to_owned()))
1072        );
1073        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1074        assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1075    }
1076
1077    #[test]
1078    fn user_context_event_sink_receives_repository_mutation_events() {
1079        let events = Arc::new(Mutex::new(Vec::new()));
1080        let mut ctx = UserContext::new()
1081            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1082            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1083            .with_internal_id_generator(FixedIdGenerator(88))
1084            .with_event_sink(RecordingEventSink {
1085                events: events.clone(),
1086            });
1087        ctx.insert_resource(PostgresDialect);
1088        ctx.insert_resource(StubExecutor {
1089            affected: 1,
1090            rows: Vec::new(),
1091        });
1092
1093        let repo = ctx
1094            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1095            .unwrap();
1096        repo.insert(&repo.insert_command().value("name", "created"))
1097            .unwrap();
1098        repo.update(
1099            &repo
1100                .update_command(88_u64)
1101                .expected_version(1)
1102                .value("name", "updated"),
1103        )
1104        .unwrap();
1105        repo.delete(&repo.delete_command(88_u64).expected_version(2))
1106            .unwrap();
1107        repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1108
1109        let events = events.lock().unwrap();
1110        assert_eq!(events.len(), 4);
1111        assert_eq!(events[0].kind, EntityEventKind::Created);
1112        assert_eq!(events[0].entity, "Order");
1113        assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1114        assert_eq!(events[1].kind, EntityEventKind::Updated);
1115        assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1116        assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1117        assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1118        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1119        assert_eq!(events[3].kind, EntityEventKind::Recovered);
1120    }
1121
1122    #[test]
1123    fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1124        let events = Arc::new(Mutex::new(Vec::new()));
1125        let mut ctx = UserContext::new()
1126            .with_metadata(
1127                InMemoryMetadataStore::new()
1128                    .with_entity(entity())
1129                    .with_entity(line_entity())
1130                    .with_entity(product_entity()),
1131            )
1132            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1133            .with_event_sink(RecordingEventSink {
1134                events: events.clone(),
1135            });
1136        ctx.insert_resource(PostgresDialect);
1137        ctx.insert_resource(StubExecutor {
1138            affected: 1,
1139            rows: vec![Record::from([
1140                ("id".to_owned(), Value::U64(1)),
1141                ("version".to_owned(), Value::I64(1)),
1142                ("name".to_owned(), Value::Text("old".to_owned())),
1143            ])],
1144        });
1145
1146        let repo = ctx
1147            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1148            .unwrap();
1149        repo.save_graph(
1150            GraphNode::new("Order")
1151                .value("id", 1_u64)
1152                .value("version", 1_i64)
1153                .value("name", "updated")
1154                .relation(
1155                    "lines",
1156                    GraphNode::new("OrderLine")
1157                        .value("name", "line")
1158                        .value("product_id", 3_u64),
1159                ),
1160        )
1161        .unwrap();
1162
1163        let events = events.lock().unwrap();
1164        assert_eq!(events.len(), 3);
1165        assert_eq!(events[0].kind, EntityEventKind::Updated);
1166        assert_eq!(events[0].entity, "Order");
1167        assert_eq!(events[1].kind, EntityEventKind::Updated);
1168        assert_eq!(events[1].entity, "OrderLine");
1169        assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1170        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1171        assert_eq!(events[2].entity, "OrderLine");
1172    }
1173
1174    #[test]
1175    fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1176        let mut ctx = UserContext::new()
1177            .with_metadata(
1178                InMemoryMetadataStore::new()
1179                    .with_entity(entity())
1180                    .with_entity(line_entity())
1181                    .with_entity(product_entity()),
1182            )
1183            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1184            .with_internal_id_generator(SequentialIdGenerator::new(500));
1185        ctx.insert_resource(PostgresDialect);
1186        ctx.insert_resource(StubExecutor {
1187            affected: 1,
1188            rows: vec![Record::from([
1189                ("id".to_owned(), Value::U64(1)),
1190                ("version".to_owned(), Value::I64(1)),
1191                ("name".to_owned(), Value::Text("old".to_owned())),
1192            ])],
1193        });
1194
1195        let plan = ctx
1196            .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1197                GraphNode::new("Order")
1198                    .value("id", 1_u64)
1199                    .value("version", 1_i64)
1200                    .value("name", "updated")
1201                    .relation(
1202                        "lines",
1203                        GraphNode::new("OrderLine")
1204                            .value("name", "new-line-a")
1205                            .value("product_id", 2_u64),
1206                    )
1207                    .relation(
1208                        "lines",
1209                        GraphNode::new("OrderLine")
1210                            .value("name", "new-line-b")
1211                            .value("product_id", 2_u64),
1212                    )
1213                    .relation(
1214                        "lines",
1215                        GraphNode::new("OrderLine")
1216                            .value("id", 5_u64)
1217                            .value("version", 1_i64)
1218                            .value("name", "same-update-a"),
1219                    )
1220                    .relation(
1221                        "lines",
1222                        GraphNode::new("OrderLine")
1223                            .value("id", 6_u64)
1224                            .value("version", 1_i64)
1225                            .value("name", "same-update-b"),
1226                    )
1227                    .relation(
1228                        "lines",
1229                        GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1230                    )
1231                    .relation(
1232                        "lines",
1233                        GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1234                    ),
1235            )
1236            .unwrap();
1237        let counts = plan.grouped_counts();
1238
1239        assert_eq!(
1240            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1241            Some(&1)
1242        );
1243        assert_eq!(
1244            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1245            Some(&2)
1246        );
1247        assert_eq!(
1248            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1249            Some(&2)
1250        );
1251        assert_eq!(
1252            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1253            Some(&1)
1254        );
1255        assert_eq!(
1256            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1257            Some(&1)
1258        );
1259        let create_batch = plan
1260            .batches
1261            .iter()
1262            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1263            .unwrap();
1264        assert_eq!(create_batch.items.len(), 2);
1265        assert_eq!(
1266            create_batch.items[0].values.get("id"),
1267            Some(&Value::U64(500))
1268        );
1269        assert_eq!(
1270            create_batch.items[1].values.get("id"),
1271            Some(&Value::U64(501))
1272        );
1273        let update_batch = plan
1274            .batches
1275            .iter()
1276            .find(|batch| {
1277                batch.entity == "OrderLine"
1278                    && batch.kind == GraphMutationKind::Update
1279                    && batch.update_fields == vec!["name".to_owned()]
1280            })
1281            .unwrap();
1282        assert_eq!(update_batch.items.len(), 2);
1283    }
1284
1285    #[test]
1286    fn resolved_repository_builds_relation_plans() {
1287        let mut ctx = UserContext::new()
1288            .with_metadata(
1289                InMemoryMetadataStore::new()
1290                    .with_entity(entity())
1291                    .with_entity(line_entity())
1292                    .with_entity(product_entity()),
1293            )
1294            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1295            .with_repository_behavior_registry(
1296                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1297            );
1298        ctx.insert_resource(PostgresDialect);
1299        ctx.insert_resource(StubExecutor {
1300            affected: 1,
1301            rows: Vec::new(),
1302        });
1303
1304        let repo = ctx
1305            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1306            .unwrap();
1307        let plans = repo.relation_plans().unwrap();
1308
1309        assert_eq!(plans.len(), 1);
1310        assert_eq!(plans[0].relation_name, "lines");
1311        assert_eq!(plans[0].target_entity, "OrderLine");
1312        assert_eq!(plans[0].local_key, "id");
1313        assert_eq!(plans[0].foreign_key, "order_id");
1314        assert!(plans[0].many);
1315    }
1316
1317    #[test]
1318    fn resolved_repository_builds_relation_query_from_parent_rows() {
1319        let mut ctx = UserContext::new()
1320            .with_metadata(
1321                InMemoryMetadataStore::new()
1322                    .with_entity(entity())
1323                    .with_entity(line_entity())
1324                    .with_entity(product_entity()),
1325            )
1326            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1327            .with_repository_behavior_registry(
1328                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1329            );
1330        ctx.insert_resource(PostgresDialect);
1331        ctx.insert_resource(StubExecutor {
1332            affected: 1,
1333            rows: Vec::new(),
1334        });
1335
1336        let repo = ctx
1337            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1338            .unwrap();
1339        let parent_rows = vec![
1340            Record::from([(String::from("id"), Value::U64(11))]),
1341            Record::from([(String::from("id"), Value::U64(12))]),
1342        ];
1343
1344        let query = repo.relation_query("lines", &parent_rows).unwrap();
1345        let compiled = repo.compile(&query).unwrap();
1346        assert!(compiled.sql.contains("FROM \"orderline\""));
1347        assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1348        assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1349    }
1350
1351    #[test]
1352    fn resolved_repository_enhances_parent_rows_with_relations() {
1353        let mut ctx = UserContext::new()
1354            .with_metadata(
1355                InMemoryMetadataStore::new()
1356                    .with_entity(entity())
1357                    .with_entity(line_entity())
1358                    .with_entity(product_entity()),
1359            )
1360            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1361            .with_repository_behavior_registry(
1362                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1363            );
1364        ctx.insert_resource(PostgresDialect);
1365        ctx.insert_resource(StubExecutor {
1366            affected: 1,
1367            rows: vec![
1368                Record::from([
1369                    (String::from("id"), Value::U64(101)),
1370                    (String::from("order_id"), Value::U64(11)),
1371                    (String::from("name"), Value::Text(String::from("l1"))),
1372                ]),
1373                Record::from([
1374                    (String::from("id"), Value::U64(102)),
1375                    (String::from("order_id"), Value::U64(11)),
1376                    (String::from("name"), Value::Text(String::from("l2"))),
1377                ]),
1378                Record::from([
1379                    (String::from("id"), Value::U64(201)),
1380                    (String::from("order_id"), Value::U64(12)),
1381                    (String::from("name"), Value::Text(String::from("l3"))),
1382                ]),
1383            ],
1384        });
1385
1386        let repo = ctx
1387            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1388            .unwrap();
1389        let mut parents = vec![
1390            Record::from([(String::from("id"), Value::U64(11))]),
1391            Record::from([(String::from("id"), Value::U64(12))]),
1392        ];
1393
1394        repo.enhance_relations(&mut parents).unwrap();
1395
1396        match parents[0].get("lines") {
1397            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1398            other => panic!("unexpected lines payload: {other:?}"),
1399        }
1400        match parents[1].get("lines") {
1401            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1402            other => panic!("unexpected lines payload: {other:?}"),
1403        }
1404    }
1405
1406    #[test]
1407    fn resolved_repository_fetches_smart_list_of_entities() {
1408        let mut ctx = UserContext::new()
1409            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1410            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1411        ctx.insert_resource(PostgresDialect);
1412        ctx.insert_resource(StubExecutor {
1413            affected: 1,
1414            rows: vec![Record::from([
1415                (String::from("id"), Value::U64(7)),
1416                (String::from("version"), Value::I64(2)),
1417                (String::from("name"), Value::Text(String::from("typed"))),
1418            ])],
1419        });
1420
1421        let repo = ctx
1422            .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1423            .unwrap();
1424        let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1425
1426        assert_eq!(rows.len(), 1);
1427        assert_eq!(
1428            rows.first(),
1429            Some(&OrderEntity {
1430                id: 7,
1431                version: 2,
1432                name: String::from("typed"),
1433            })
1434        );
1435    }
1436
1437    #[test]
1438    fn resolved_repository_fetches_smart_list_of_derived_entities() {
1439        let mut ctx = UserContext::new()
1440            .with_metadata(
1441                InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1442            )
1443            .with_repository_registry(
1444                InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1445            );
1446        ctx.insert_resource(PostgresDialect);
1447        ctx.insert_resource(StubExecutor {
1448            affected: 1,
1449            rows: vec![Record::from([
1450                (String::from("id"), Value::U64(9)),
1451                (String::from("name"), Value::Text(String::from("derived"))),
1452            ])],
1453        });
1454
1455        let repo = ctx
1456            .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1457            .unwrap();
1458        let rows = repo
1459            .fetch_entities::<CatalogProductRow>(&repo.select())
1460            .unwrap();
1461
1462        assert_eq!(rows.len(), 1);
1463        assert_eq!(
1464            rows.first(),
1465            Some(&CatalogProductRow {
1466                id: 9,
1467                name: String::from("derived"),
1468            })
1469        );
1470    }
1471
1472    #[test]
1473    fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1474        let mut ctx = UserContext::new()
1475            .with_metadata(
1476                InMemoryMetadataStore::new()
1477                    .with_entity(OrderAggregateDynamic::entity_descriptor()),
1478            )
1479            .with_repository_registry(
1480                InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1481            );
1482        ctx.insert_resource(PostgresDialect);
1483        ctx.insert_resource(StubExecutor {
1484            affected: 1,
1485            rows: vec![Record::from([
1486                (String::from("id"), Value::U64(1)),
1487                (String::from("lineCount"), Value::I64(3)),
1488                (String::from("amount"), Value::F64(18.5)),
1489            ])],
1490        });
1491
1492        let repo = ctx
1493            .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1494            .unwrap();
1495        let rows = repo
1496            .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1497            .unwrap();
1498
1499        assert_eq!(rows.len(), 1);
1500        assert_eq!(rows.data[0].id, 1);
1501        assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1502        assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1503        assert_eq!(
1504            rows.into_vec().into_iter().next().unwrap().into_json(),
1505            serde_json::json!({
1506                "id": 1,
1507                "lineCount": 3,
1508                "amount": 18.5
1509            })
1510        );
1511    }
1512
1513    #[test]
1514    fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1515        let executor = QueueExecutor {
1516            affected: 1,
1517            rows: Mutex::new(VecDeque::from([
1518                vec![
1519                    Record::from([
1520                        (String::from("id"), Value::U64(1)),
1521                        (String::from("version"), Value::I64(1)),
1522                        (String::from("name"), Value::Text(String::from("first"))),
1523                    ]),
1524                    Record::from([
1525                        (String::from("id"), Value::U64(2)),
1526                        (String::from("version"), Value::I64(1)),
1527                        (String::from("name"), Value::Text(String::from("second"))),
1528                    ]),
1529                ],
1530                vec![Record::from([
1531                    (String::from("order_id"), Value::U64(1)),
1532                    (String::from("lineCount"), Value::I64(3)),
1533                ])],
1534            ])),
1535            queries: Mutex::new(Vec::new()),
1536        };
1537        let mut ctx = UserContext::new()
1538            .with_metadata(
1539                InMemoryMetadataStore::new()
1540                    .with_entity(entity())
1541                    .with_entity(line_entity()),
1542            )
1543            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1544        ctx.insert_resource(PostgresDialect);
1545        ctx.insert_resource(executor);
1546
1547        let repo = ctx
1548            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1549            .unwrap();
1550        let rows = repo
1551            .fetch_all_with_relation_aggregates(
1552                &repo
1553                    .select()
1554                    .project("id")
1555                    .project("version")
1556                    .project("name"),
1557                &[RelationAggregate::new(
1558                    "lines",
1559                    "lineCount",
1560                    SelectQuery::new("OrderLine"),
1561                    true,
1562                )],
1563            )
1564            .unwrap();
1565
1566        assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1567        assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1568
1569        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1570        let queries = executor.queries.lock().unwrap();
1571        assert_eq!(queries.len(), 2);
1572        assert_eq!(
1573            queries[1],
1574            "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1575        );
1576    }
1577
1578    #[test]
1579    fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1580        let executor = QueueExecutor {
1581            affected: 1,
1582            rows: Mutex::new(VecDeque::from([vec![Record::from([(
1583                String::from("count"),
1584                Value::I64(2),
1585            )])]])),
1586            queries: Mutex::new(Vec::new()),
1587        };
1588        let mut ctx = UserContext::new()
1589            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1590            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1591        ctx.insert_resource(PostgresDialect);
1592        ctx.insert_resource(executor);
1593        ctx.insert_resource(InMemoryAggregationCache::default());
1594
1595        let repo = ctx
1596            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1597            .unwrap();
1598        let query = repo
1599            .select()
1600            .count("count")
1601            .enable_aggregation_cache_for(60_000);
1602
1603        let first = repo.fetch_all(&query).unwrap();
1604        let second = repo.fetch_all(&query).unwrap();
1605
1606        assert_eq!(first, second);
1607        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1608        assert_eq!(executor.queries.lock().unwrap().len(), 1);
1609    }
1610
1611    #[test]
1612    fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1613        let executor = QueueExecutor {
1614            affected: 1,
1615            rows: Mutex::new(VecDeque::from([
1616                vec![Record::from([(String::from("count"), Value::I64(2))])],
1617                vec![Record::from([(String::from("count"), Value::I64(3))])],
1618            ])),
1619            queries: Mutex::new(Vec::new()),
1620        };
1621        let mut ctx = UserContext::new()
1622            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1623            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1624        ctx.insert_resource(PostgresDialect);
1625        ctx.insert_resource(executor);
1626        ctx.insert_resource(
1627            Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1628                as Arc<dyn AggregationCacheBackend>,
1629        );
1630
1631        let repo = ctx
1632            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1633            .unwrap();
1634        let query = repo
1635            .select()
1636            .count("count")
1637            .enable_aggregation_cache_for(60_000);
1638
1639        let first = repo.fetch_all(&query).unwrap();
1640        let cached = repo.fetch_all(&query).unwrap();
1641        repo.insert(
1642            &InsertCommand::new("Order")
1643                .value("id", 9_u64)
1644                .value("version", 1_i64)
1645                .value("name", "new"),
1646        )
1647        .unwrap();
1648        let refreshed = repo.fetch_all(&query).unwrap();
1649
1650        assert_eq!(first, cached);
1651        assert_ne!(cached, refreshed);
1652        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1653        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1654    }
1655
1656    #[test]
1657    fn aggregation_cache_propagates_to_relation_aggregates() {
1658        let parent_rows = vec![
1659            Record::from([
1660                (String::from("id"), Value::U64(1)),
1661                (String::from("version"), Value::I64(1)),
1662                (String::from("name"), Value::Text(String::from("first"))),
1663            ]),
1664            Record::from([
1665                (String::from("id"), Value::U64(2)),
1666                (String::from("version"), Value::I64(1)),
1667                (String::from("name"), Value::Text(String::from("second"))),
1668            ]),
1669        ];
1670        let aggregate_rows = vec![Record::from([
1671            (String::from("order_id"), Value::U64(1)),
1672            (String::from("lineCount"), Value::I64(3)),
1673        ])];
1674        let executor = QueueExecutor {
1675            affected: 1,
1676            rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
1677            queries: Mutex::new(Vec::new()),
1678        };
1679        let mut ctx = UserContext::new()
1680            .with_metadata(
1681                InMemoryMetadataStore::new()
1682                    .with_entity(entity())
1683                    .with_entity(line_entity()),
1684            )
1685            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1686        ctx.insert_resource(PostgresDialect);
1687        ctx.insert_resource(executor);
1688        ctx.insert_resource(InMemoryAggregationCache::default());
1689
1690        let repo = ctx
1691            .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1692            .unwrap();
1693        let query = repo
1694            .select()
1695            .project("id")
1696            .project("version")
1697            .project("name")
1698            .enable_aggregation_cache_for(60_000)
1699            .propagate_aggregation_cache(60_000);
1700        let aggregate =
1701            RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
1702
1703        let first = repo
1704            .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
1705            .unwrap();
1706        let second = repo
1707            .fetch_all_with_relation_aggregates(&query, &[aggregate])
1708            .unwrap();
1709
1710        assert_eq!(first, second);
1711        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1712        assert_eq!(executor.queries.lock().unwrap().len(), 2);
1713    }
1714
1715    #[test]
1716    fn memory_repository_fetches_smart_list_entities_with_query_features() {
1717        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1718        let repository = MemoryRepository::new(metadata).with_rows(
1719            "Order",
1720            vec![
1721                Record::from([
1722                    (String::from("id"), Value::U64(1)),
1723                    (String::from("version"), Value::I64(1)),
1724                    (String::from("name"), Value::Text(String::from("alpha"))),
1725                ]),
1726                Record::from([
1727                    (String::from("id"), Value::U64(2)),
1728                    (String::from("version"), Value::I64(1)),
1729                    (String::from("name"), Value::Text(String::from("beta"))),
1730                ]),
1731                Record::from([
1732                    (String::from("id"), Value::U64(3)),
1733                    (String::from("version"), Value::I64(1)),
1734                    (String::from("name"), Value::Text(String::from("gamma"))),
1735                ]),
1736            ],
1737        );
1738
1739        let query = teaql_core::SelectQuery::new("Order")
1740            .filter(Expr::Binary {
1741                left: Box::new(Expr::column("id")),
1742                op: teaql_core::BinaryOp::Gte,
1743                right: Box::new(Expr::value(2_u64)),
1744            })
1745            .order_by(OrderBy::desc("id"))
1746            .limit(1);
1747
1748        let orders = repository.fetch_entities::<Order>(&query).unwrap();
1749
1750        assert_eq!(orders.ids(), vec![Value::U64(3)]);
1751        assert_eq!(orders.versions(), vec![1]);
1752        assert_eq!(orders.first().unwrap().name, "gamma");
1753    }
1754
1755    #[test]
1756    fn memory_repository_runs_aggregates() {
1757        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1758        let repository = MemoryRepository::new(metadata).with_rows(
1759            "Order",
1760            vec![
1761                Record::from([
1762                    (String::from("id"), Value::U64(1)),
1763                    (String::from("version"), Value::I64(1)),
1764                    (String::from("name"), Value::Text(String::from("alpha"))),
1765                ]),
1766                Record::from([
1767                    (String::from("id"), Value::U64(2)),
1768                    (String::from("version"), Value::I64(2)),
1769                    (String::from("name"), Value::Text(String::from("beta"))),
1770                ]),
1771            ],
1772        );
1773
1774        let query = teaql_core::SelectQuery {
1775            entity: String::from("Order"),
1776            projection: Vec::new(),
1777            expr_projection: Vec::new(),
1778            filter: None,
1779            having: None,
1780            order_by: Vec::new(),
1781            slice: None,
1782            aggregates: vec![
1783                Aggregate {
1784                    function: AggregateFunction::Count,
1785                    field: String::from("id"),
1786                    alias: String::from("count"),
1787                },
1788                Aggregate {
1789                    function: AggregateFunction::Sum,
1790                    field: String::from("version"),
1791                    alias: String::from("versionSum"),
1792                },
1793            ],
1794            group_by: Vec::new(),
1795            relations: Vec::new(),
1796            aggregation_cache: None,
1797            comment: None,
1798            raw_sql: None,
1799            raw_sql_search_criteria: Vec::new(),
1800            json_expr: None,
1801            dynamic_properties: Vec::new(),
1802            raw_projections: Vec::new(),
1803            object_group_bys: Vec::new(),
1804            child_enhancements: Vec::new(),
1805        };
1806
1807        let rows = repository.fetch_all(&query).unwrap();
1808
1809        assert_eq!(rows.len(), 1);
1810        assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
1811        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1812    }
1813
1814    #[test]
1815    fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
1816        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1817        let repository = MemoryRepository::new(metadata).with_rows(
1818            "Order",
1819            vec![
1820                Record::from([
1821                    (String::from("id"), Value::U64(1)),
1822                    (String::from("version"), Value::I64(1)),
1823                    (String::from("name"), Value::Text(String::from("alpha"))),
1824                ]),
1825                Record::from([
1826                    (String::from("id"), Value::U64(2)),
1827                    (String::from("version"), Value::I64(2)),
1828                    (String::from("name"), Value::Text(String::from("alpha"))),
1829                ]),
1830                Record::from([
1831                    (String::from("id"), Value::U64(3)),
1832                    (String::from("version"), Value::I64(3)),
1833                    (String::from("name"), Value::Text(String::from("tmp-beta"))),
1834                ]),
1835            ],
1836        );
1837
1838        let rows = repository
1839            .fetch_all(
1840                &teaql_core::SelectQuery::new("Order")
1841                    .filter(
1842                        Expr::between("version", 1_i64, 3_i64)
1843                            .and_expr(Expr::not_like("name", "tmp%"))
1844                            .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
1845                    )
1846                    .group_by("name")
1847                    .count("total")
1848                    .sum("version", "versionSum"),
1849            )
1850            .unwrap();
1851
1852        assert_eq!(rows.len(), 1);
1853        assert_eq!(
1854            rows[0].get("name"),
1855            Some(&Value::Text(String::from("alpha")))
1856        );
1857        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1858        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
1859    }
1860
1861    #[test]
1862    fn memory_repository_runs_extended_aggregates_and_having() {
1863        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1864        let repository = MemoryRepository::new(metadata).with_rows(
1865            "Order",
1866            vec![
1867                Record::from([
1868                    (String::from("id"), Value::U64(1)),
1869                    (String::from("version"), Value::I64(1)),
1870                    (String::from("name"), Value::Text(String::from("alpha"))),
1871                ]),
1872                Record::from([
1873                    (String::from("id"), Value::U64(2)),
1874                    (String::from("version"), Value::I64(3)),
1875                    (String::from("name"), Value::Text(String::from("alpha"))),
1876                ]),
1877                Record::from([
1878                    (String::from("id"), Value::U64(3)),
1879                    (String::from("version"), Value::I64(7)),
1880                    (String::from("name"), Value::Text(String::from("beta"))),
1881                ]),
1882            ],
1883        );
1884
1885        let rows = repository
1886            .fetch_all(
1887                &teaql_core::SelectQuery::new("Order")
1888                    .group_by("name")
1889                    .count("total")
1890                    .stddev("version", "stddevVersion")
1891                    .var_pop("version", "varPopVersion")
1892                    .bit_or("version", "bitOrVersion")
1893                    .having(Expr::gt("total", 1_i64)),
1894            )
1895            .unwrap();
1896
1897        assert_eq!(rows.len(), 1);
1898        assert_eq!(
1899            rows[0].get("name"),
1900            Some(&Value::Text(String::from("alpha")))
1901        );
1902        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
1903        assert_eq!(
1904            rows[0].get("stddevVersion").map(Value::to_json_value),
1905            Some(serde_json::Value::String(
1906                "1.4142135623730951454746218583".to_owned()
1907            ))
1908        );
1909        assert_eq!(
1910            rows[0].get("varPopVersion"),
1911            Some(&Value::Decimal(Decimal::ONE))
1912        );
1913        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
1914    }
1915
1916    #[test]
1917    fn memory_repository_runs_sound_like_filter() {
1918        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1919        let repository = MemoryRepository::new(metadata).with_rows(
1920            "Order",
1921            vec![
1922                Record::from([
1923                    (String::from("id"), Value::U64(1)),
1924                    (String::from("version"), Value::I64(1)),
1925                    (String::from("name"), Value::Text(String::from("Robert"))),
1926                ]),
1927                Record::from([
1928                    (String::from("id"), Value::U64(2)),
1929                    (String::from("version"), Value::I64(1)),
1930                    (String::from("name"), Value::Text(String::from("Rupert"))),
1931                ]),
1932                Record::from([
1933                    (String::from("id"), Value::U64(3)),
1934                    (String::from("version"), Value::I64(1)),
1935                    (String::from("name"), Value::Text(String::from("Ashcraft"))),
1936                ]),
1937            ],
1938        );
1939
1940        let rows = repository
1941            .fetch_all(
1942                &teaql_core::SelectQuery::new("Order")
1943                    .filter(Expr::sound_like("name", "Robert"))
1944                    .order_asc("id"),
1945            )
1946            .unwrap();
1947
1948        assert_eq!(rows.len(), 2);
1949        assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
1950        assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
1951    }
1952
1953    #[test]
1954    fn memory_repository_runs_java_style_string_match_filters() {
1955        let metadata = InMemoryMetadataStore::new().with_entity(entity());
1956        let repository = MemoryRepository::new(metadata).with_rows(
1957            "Order",
1958            vec![
1959                Record::from([
1960                    (String::from("id"), Value::U64(1)),
1961                    (String::from("version"), Value::I64(1)),
1962                    (String::from("name"), Value::Text(String::from("tea-order"))),
1963                ]),
1964                Record::from([
1965                    (String::from("id"), Value::U64(2)),
1966                    (String::from("version"), Value::I64(1)),
1967                    (
1968                        String::from("name"),
1969                        Value::Text(String::from("coffee-order")),
1970                    ),
1971                ]),
1972                Record::from([
1973                    (String::from("id"), Value::U64(3)),
1974                    (String::from("version"), Value::I64(1)),
1975                    (
1976                        String::from("name"),
1977                        Value::Text(String::from("tea-archived")),
1978                    ),
1979                ]),
1980            ],
1981        );
1982
1983        let rows = repository
1984            .fetch_all(
1985                &teaql_core::SelectQuery::new("Order")
1986                    .filter(
1987                        Expr::contain("name", "tea")
1988                            .and_expr(Expr::begin_with("name", "tea"))
1989                            .and_expr(Expr::end_with("name", "order"))
1990                            .and_expr(Expr::not_contain("name", "coffee"))
1991                            .and_expr(Expr::not_begin_with("name", "archived"))
1992                            .and_expr(Expr::not_end_with("name", "draft")),
1993                    )
1994                    .order_asc("id"),
1995            )
1996            .unwrap();
1997
1998        assert_eq!(rows.len(), 1);
1999        assert_eq!(
2000            rows[0].get("name"),
2001            Some(&Value::Text("tea-order".to_owned()))
2002        );
2003    }
2004
2005    #[test]
2006    fn memory_repository_runs_property_to_property_filters() {
2007        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2008        let repository = MemoryRepository::new(metadata).with_rows(
2009            "Order",
2010            vec![
2011                Record::from([
2012                    (String::from("id"), Value::U64(1)),
2013                    (String::from("version"), Value::I64(2)),
2014                    (String::from("name"), Value::Text(String::from("keep"))),
2015                ]),
2016                Record::from([
2017                    (String::from("id"), Value::U64(2)),
2018                    (String::from("version"), Value::I64(1)),
2019                    (String::from("name"), Value::Text(String::from("skip"))),
2020                ]),
2021            ],
2022        );
2023
2024        let rows = repository
2025            .fetch_all(
2026                &teaql_core::SelectQuery::new("Order")
2027                    .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2028                    .order_asc("id"),
2029            )
2030            .unwrap();
2031
2032        assert_eq!(rows.len(), 1);
2033        assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2034    }
2035
2036    #[test]
2037    fn memory_repository_supports_mutations_and_optimistic_locking() {
2038        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2039        let repository = MemoryRepository::new(metadata);
2040
2041        repository
2042            .insert(
2043                &InsertCommand::new("Order")
2044                    .value("id", 10_u64)
2045                    .value("version", 1_i64)
2046                    .value("name", "draft"),
2047            )
2048            .unwrap();
2049        repository
2050            .update(
2051                &UpdateCommand::new("Order", 10_u64)
2052                    .expected_version(1)
2053                    .value("name", "submitted"),
2054            )
2055            .unwrap();
2056
2057        let row = repository
2058            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2059            .unwrap()
2060            .pop()
2061            .unwrap();
2062        assert_eq!(
2063            row.get("name"),
2064            Some(&Value::Text(String::from("submitted")))
2065        );
2066        assert_eq!(row.get("version"), Some(&Value::I64(2)));
2067
2068        let conflict = repository
2069            .update(
2070                &UpdateCommand::new("Order", 10_u64)
2071                    .expected_version(1)
2072                    .value("name", "stale"),
2073            )
2074            .unwrap_err();
2075        assert!(matches!(
2076            conflict,
2077            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2078        ));
2079
2080        repository
2081            .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2082            .unwrap();
2083        let row = repository
2084            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2085            .unwrap()
2086            .pop()
2087            .unwrap();
2088        assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2089
2090        repository
2091            .recover(&RecoverCommand::new("Order", 10_u64, -3))
2092            .unwrap();
2093        let row = repository
2094            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2095            .unwrap()
2096            .pop()
2097            .unwrap();
2098        assert_eq!(row.get("version"), Some(&Value::I64(4)));
2099    }
2100}
2101
2102#[cfg(all(test, feature = "sqlx"))]
2103mod sqlx_integration_tests {
2104    use super::sqlx_support::{
2105        MutationExecutorError, PgIdSpaceGenerator, PgMutationExecutor, PgTransactionExecutor,
2106        SqliteIdSpaceGenerator, SqliteMutationExecutor,
2107    };
2108    use super::{
2109        GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryMetadataStore,
2110        InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry, QueryExecutor,
2111        RepositoryBehavior, UserContext,
2112    };
2113    use chrono::{NaiveDate, TimeZone, Utc};
2114    use teaql_core::{
2115        DataType, Decimal, DeleteCommand, EntityDescriptor, Expr, InsertCommand,
2116        PropertyDescriptor, Record, RecoverCommand, SelectQuery, UpdateCommand, Value,
2117    };
2118    use teaql_dialect_pg::PostgresDialect;
2119    use teaql_dialect_sqlite::SqliteDialect;
2120    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
2121    use teaql_sql::SqlDialect;
2122    use tokio::runtime::Handle;
2123    use tokio::task::block_in_place;
2124
2125    const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
2126
2127    fn entity() -> EntityDescriptor {
2128        EntityDescriptor::new("Order")
2129            .table_name("orders")
2130            .property(
2131                PropertyDescriptor::new("id", DataType::U64)
2132                    .column_name("id")
2133                    .id()
2134                    .not_null(),
2135            )
2136            .property(
2137                PropertyDescriptor::new("version", DataType::I64)
2138                    .column_name("version")
2139                    .version()
2140                    .not_null(),
2141            )
2142            .property(
2143                PropertyDescriptor::new("name", DataType::Text)
2144                    .column_name("name")
2145                    .not_null(),
2146            )
2147            .relation(
2148                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2149                    .local_key("id")
2150                    .foreign_key("order_id")
2151                    .many(),
2152            )
2153    }
2154
2155    fn sqlite_entity_keep_missing() -> EntityDescriptor {
2156        EntityDescriptor::new("Order")
2157            .table_name("orders")
2158            .property(
2159                PropertyDescriptor::new("id", DataType::U64)
2160                    .column_name("id")
2161                    .id()
2162                    .not_null(),
2163            )
2164            .property(
2165                PropertyDescriptor::new("version", DataType::I64)
2166                    .column_name("version")
2167                    .version()
2168                    .not_null(),
2169            )
2170            .property(
2171                PropertyDescriptor::new("name", DataType::Text)
2172                    .column_name("name")
2173                    .not_null(),
2174            )
2175            .relation(
2176                teaql_core::RelationDescriptor::new("lines", "OrderLine")
2177                    .local_key("id")
2178                    .foreign_key("order_id")
2179                    .many()
2180                    .keep_missing(),
2181            )
2182    }
2183
2184    fn line_entity() -> EntityDescriptor {
2185        EntityDescriptor::new("OrderLine")
2186            .table_name("orderline")
2187            .property(
2188                PropertyDescriptor::new("id", DataType::U64)
2189                    .column_name("id")
2190                    .id()
2191                    .not_null(),
2192            )
2193            .property(
2194                PropertyDescriptor::new("version", DataType::I64)
2195                    .column_name("version")
2196                    .version(),
2197            )
2198            .property(
2199                PropertyDescriptor::new("order_id", DataType::U64)
2200                    .column_name("order_id")
2201                    .not_null(),
2202            )
2203            .property(
2204                PropertyDescriptor::new("name", DataType::Text)
2205                    .column_name("name")
2206                    .not_null(),
2207            )
2208            .property(
2209                PropertyDescriptor::new("product_id", DataType::U64)
2210                    .column_name("product_id")
2211                    .not_null(),
2212            )
2213            .relation(
2214                teaql_core::RelationDescriptor::new("product", "Product")
2215                    .local_key("product_id")
2216                    .foreign_key("id"),
2217            )
2218    }
2219
2220    fn product_entity() -> EntityDescriptor {
2221        EntityDescriptor::new("Product")
2222            .table_name("product")
2223            .property(
2224                PropertyDescriptor::new("id", DataType::U64)
2225                    .column_name("id")
2226                    .id()
2227                    .not_null(),
2228            )
2229            .property(
2230                PropertyDescriptor::new("name", DataType::Text)
2231                    .column_name("name")
2232                    .not_null(),
2233            )
2234    }
2235
2236    fn typed_entity() -> EntityDescriptor {
2237        EntityDescriptor::new("TypedValue")
2238            .table_name("typed_value")
2239            .property(
2240                PropertyDescriptor::new("id", DataType::U64)
2241                    .column_name("id")
2242                    .id()
2243                    .not_null(),
2244            )
2245            .property(
2246                PropertyDescriptor::new("payload", DataType::Json)
2247                    .column_name("payload")
2248                    .not_null(),
2249            )
2250            .property(
2251                PropertyDescriptor::new("amount", DataType::Decimal)
2252                    .column_name("amount")
2253                    .not_null(),
2254            )
2255            .property(
2256                PropertyDescriptor::new("birthday", DataType::Date)
2257                    .column_name("birthday")
2258                    .not_null(),
2259            )
2260            .property(
2261                PropertyDescriptor::new("happened_at", DataType::Timestamp)
2262                    .column_name("happened_at")
2263                    .not_null(),
2264            )
2265    }
2266
2267    struct OrderBehavior;
2268    struct NestedOrderBehavior;
2269
2270    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2271    #[teaql(entity = "Product", table = "product")]
2272    struct ProductEntityRow {
2273        #[teaql(id)]
2274        id: u64,
2275        name: String,
2276    }
2277
2278    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2279    #[teaql(entity = "OrderLine", table = "orderline")]
2280    struct OrderLineEntityRow {
2281        #[teaql(id)]
2282        id: u64,
2283        #[teaql(column = "order_id")]
2284        order_id: u64,
2285        name: String,
2286        #[teaql(column = "product_id")]
2287        product_id: u64,
2288        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
2289        product: Option<ProductEntityRow>,
2290    }
2291
2292    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2293    #[teaql(entity = "Order", table = "orders")]
2294    struct OrderAggregateRow {
2295        #[teaql(id)]
2296        id: u64,
2297        #[teaql(version)]
2298        version: i64,
2299        name: String,
2300        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
2301        lines: teaql_core::SmartList<OrderLineEntityRow>,
2302    }
2303
2304    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
2305    #[teaql(entity = "Order", table = "orders")]
2306    struct Order {
2307        #[teaql(id)]
2308        id: u64,
2309        #[teaql(version)]
2310        version: i64,
2311        name: String,
2312    }
2313
2314    impl RepositoryBehavior for OrderBehavior {
2315        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2316            vec!["lines".to_owned()]
2317        }
2318    }
2319
2320    impl RepositoryBehavior for NestedOrderBehavior {
2321        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
2322            vec!["lines.product".to_owned()]
2323        }
2324    }
2325
2326    #[derive(Clone)]
2327    struct SqliteSyncExecutor {
2328        inner: SqliteMutationExecutor,
2329    }
2330
2331    impl SqliteSyncExecutor {
2332        fn new(inner: SqliteMutationExecutor) -> Self {
2333            Self { inner }
2334        }
2335    }
2336
2337    impl QueryExecutor for SqliteSyncExecutor {
2338        type Error = MutationExecutorError;
2339
2340        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2341            let handle = Handle::current();
2342            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2343        }
2344
2345        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2346            let handle = Handle::current();
2347            block_in_place(|| handle.block_on(self.inner.execute(query)))
2348        }
2349
2350        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2351            let handle = Handle::current();
2352            block_in_place(|| handle.block_on(self.inner.begin_transaction()))?;
2353            Ok(GraphTransactionBoundary::Started)
2354        }
2355
2356        fn commit_transaction(&self) -> Result<(), Self::Error> {
2357            let handle = Handle::current();
2358            block_in_place(|| handle.block_on(self.inner.commit_transaction()))
2359        }
2360
2361        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2362            let handle = Handle::current();
2363            block_in_place(|| handle.block_on(self.inner.rollback_transaction()))
2364        }
2365    }
2366
2367    #[derive(Clone)]
2368    struct PgSyncExecutor {
2369        inner: PgMutationExecutor,
2370    }
2371
2372    impl PgSyncExecutor {
2373        fn new(inner: PgMutationExecutor) -> Self {
2374            Self { inner }
2375        }
2376    }
2377
2378    impl QueryExecutor for PgSyncExecutor {
2379        type Error = MutationExecutorError;
2380
2381        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2382            let handle = Handle::current();
2383            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2384        }
2385
2386        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2387            let handle = Handle::current();
2388            block_in_place(|| handle.block_on(self.inner.execute(query)))
2389        }
2390    }
2391
2392    #[derive(Clone)]
2393    struct PgTxSyncExecutor {
2394        inner: PgTransactionExecutor,
2395    }
2396
2397    impl PgTxSyncExecutor {
2398        fn new(inner: PgTransactionExecutor) -> Self {
2399            Self { inner }
2400        }
2401    }
2402
2403    impl QueryExecutor for PgTxSyncExecutor {
2404        type Error = MutationExecutorError;
2405
2406        fn fetch_all(&self, query: &teaql_sql::CompiledQuery) -> Result<Vec<Record>, Self::Error> {
2407            let handle = Handle::current();
2408            block_in_place(|| handle.block_on(self.inner.fetch_all(query)))
2409        }
2410
2411        fn execute(&self, query: &teaql_sql::CompiledQuery) -> Result<u64, Self::Error> {
2412            let handle = Handle::current();
2413            block_in_place(|| handle.block_on(self.inner.execute(query)))
2414        }
2415
2416        fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
2417            Ok(GraphTransactionBoundary::AlreadyActive)
2418        }
2419
2420        fn rollback_transaction(&self) -> Result<(), Self::Error> {
2421            let handle = Handle::current();
2422            block_in_place(|| handle.block_on(self.inner.rollback()))
2423        }
2424    }
2425
2426    #[tokio::test]
2427    async fn sqlite_executor_runs_crud_flow() {
2428        use sqlx::sqlite::SqlitePoolOptions;
2429
2430        let pool = SqlitePoolOptions::new()
2431            .max_connections(1)
2432            .connect("sqlite::memory:")
2433            .await
2434            .unwrap();
2435
2436        let executor = SqliteMutationExecutor::new(pool.clone());
2437        let dialect = SqliteDialect;
2438        let entity = entity();
2439        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2440
2441        let insert = dialect
2442            .compile_insert(
2443                &entity,
2444                &InsertCommand::new("Order")
2445                    .value("id", 1_u64)
2446                    .value("version", 1_i64)
2447                    .value("name", "first"),
2448            )
2449            .unwrap();
2450        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2451
2452        let select = dialect
2453            .compile_select(
2454                &entity,
2455                &SelectQuery::new("Order")
2456                    .project("id")
2457                    .project("version")
2458                    .project("name")
2459                    .filter(Expr::eq("id", 1_u64)),
2460            )
2461            .unwrap();
2462        let rows = executor.fetch_all(&select).await.unwrap();
2463        assert_eq!(rows.len(), 1);
2464        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
2465        assert_eq!(rows[0].get("version"), Some(&Value::I64(1)));
2466        assert_eq!(rows[0].get("name"), Some(&Value::Text("first".to_owned())));
2467
2468        let update = dialect
2469            .compile_update(
2470                &entity,
2471                &UpdateCommand::new("Order", 1_u64)
2472                    .expected_version(1)
2473                    .value("name", "second"),
2474            )
2475            .unwrap();
2476        assert_eq!(executor.execute(&update).await.unwrap(), 1);
2477
2478        let after_update = executor.fetch_all(&select).await.unwrap();
2479        assert_eq!(after_update[0].get("version"), Some(&Value::I64(2)));
2480        assert_eq!(
2481            after_update[0].get("name"),
2482            Some(&Value::Text("second".to_owned()))
2483        );
2484
2485        let delete = dialect
2486            .compile_delete(
2487                &entity,
2488                &DeleteCommand::new("Order", 1_u64).expected_version(2),
2489            )
2490            .unwrap();
2491        assert_eq!(executor.execute(&delete).await.unwrap(), 1);
2492
2493        let after_delete = executor.fetch_all(&select).await.unwrap();
2494        assert_eq!(after_delete[0].get("version"), Some(&Value::I64(-3)));
2495
2496        let recover = dialect
2497            .compile_recover(&entity, &RecoverCommand::new("Order", 1_u64, -3))
2498            .unwrap();
2499        assert_eq!(executor.execute(&recover).await.unwrap(), 1);
2500
2501        let after_recover = executor.fetch_all(&select).await.unwrap();
2502        assert_eq!(after_recover[0].get("version"), Some(&Value::I64(4)));
2503    }
2504
2505    #[tokio::test(flavor = "multi_thread")]
2506    async fn sqlite_executor_enhances_relations() {
2507        use sqlx::sqlite::SqlitePoolOptions;
2508
2509        let pool = SqlitePoolOptions::new()
2510            .max_connections(1)
2511            .connect("sqlite::memory:")
2512            .await
2513            .unwrap();
2514
2515        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2516        let order = entity();
2517        let line = line_entity();
2518        mutation_executor
2519            .ensure_schema(&SqliteDialect, &[&order, &line])
2520            .await
2521            .unwrap();
2522
2523        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1'), (2, 1, 'o2')")
2524            .execute(&pool)
2525            .await
2526            .unwrap();
2527        sqlx::query(
2528            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2529                (101, 1, 1001, 'l1'),
2530                (102, 1, 1002, 'l2'),
2531                (201, 2, 1003, 'l3')",
2532        )
2533        .execute(&pool)
2534        .await
2535        .unwrap();
2536
2537        let executor = SqliteSyncExecutor::new(mutation_executor);
2538        let mut ctx = UserContext::new()
2539            .with_metadata(
2540                InMemoryMetadataStore::new()
2541                    .with_entity(order)
2542                    .with_entity(line)
2543                    .with_entity(product_entity()),
2544            )
2545            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2546            .with_repository_behavior_registry(
2547                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
2548            );
2549        ctx.insert_resource(SqliteDialect);
2550        ctx.insert_resource(executor);
2551
2552        let repo = ctx
2553            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2554            .unwrap();
2555        let mut parents = repo
2556            .fetch_all(
2557                &repo
2558                    .select()
2559                    .project("id")
2560                    .project("version")
2561                    .project("name")
2562                    .order_by(teaql_core::OrderBy::asc("id")),
2563            )
2564            .unwrap();
2565
2566        repo.enhance_relations(&mut parents).unwrap();
2567
2568        assert_eq!(parents.len(), 2);
2569        match parents[0].get("lines") {
2570            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
2571            other => panic!("unexpected first lines payload: {other:?}"),
2572        }
2573        match parents[1].get("lines") {
2574            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
2575            other => panic!("unexpected second lines payload: {other:?}"),
2576        }
2577    }
2578
2579    #[tokio::test(flavor = "multi_thread")]
2580    async fn sqlite_executor_enhances_nested_relations() {
2581        use sqlx::sqlite::SqlitePoolOptions;
2582
2583        let pool = SqlitePoolOptions::new()
2584            .max_connections(1)
2585            .connect("sqlite::memory:")
2586            .await
2587            .unwrap();
2588
2589        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2590        let order = entity();
2591        let line = line_entity();
2592        let product = product_entity();
2593        mutation_executor
2594            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2595            .await
2596            .unwrap();
2597
2598        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2599            .execute(&pool)
2600            .await
2601            .unwrap();
2602        sqlx::query(
2603            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2604                (101, 1, 1001, 'l1'),
2605                (102, 1, 1002, 'l2')",
2606        )
2607        .execute(&pool)
2608        .await
2609        .unwrap();
2610        sqlx::query(
2611            "INSERT INTO product (id, name) VALUES
2612                (1001, 'p1'),
2613                (1002, 'p2')",
2614        )
2615        .execute(&pool)
2616        .await
2617        .unwrap();
2618
2619        let executor = SqliteSyncExecutor::new(mutation_executor);
2620        let mut ctx = UserContext::new()
2621            .with_metadata(
2622                InMemoryMetadataStore::new()
2623                    .with_entity(order)
2624                    .with_entity(line)
2625                    .with_entity(product),
2626            )
2627            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2628            .with_repository_behavior_registry(
2629                InMemoryRepositoryBehaviorRegistry::new()
2630                    .with_behavior("Order", NestedOrderBehavior),
2631            );
2632        ctx.insert_resource(SqliteDialect);
2633        ctx.insert_resource(executor);
2634
2635        let repo = ctx
2636            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2637            .unwrap();
2638        let mut parents = repo
2639            .fetch_all(
2640                &repo
2641                    .select()
2642                    .project("id")
2643                    .project("version")
2644                    .project("name"),
2645            )
2646            .unwrap();
2647
2648        repo.enhance_relations(&mut parents).unwrap();
2649
2650        match parents[0].get("lines") {
2651            Some(Value::List(lines)) => {
2652                assert_eq!(lines.len(), 2);
2653                for line in lines {
2654                    match line {
2655                        Value::Object(line_record) => match line_record.get("product") {
2656                            Some(Value::Object(product)) => {
2657                                assert!(product.get("name").is_some());
2658                            }
2659                            other => panic!("unexpected product payload: {other:?}"),
2660                        },
2661                        other => panic!("unexpected line payload: {other:?}"),
2662                    }
2663                }
2664            }
2665            other => panic!("unexpected nested lines payload: {other:?}"),
2666        }
2667    }
2668
2669    #[tokio::test(flavor = "multi_thread")]
2670    async fn sqlite_executor_enhances_query_relation_with_child_query() {
2671        use sqlx::sqlite::SqlitePoolOptions;
2672
2673        let pool = SqlitePoolOptions::new()
2674            .max_connections(1)
2675            .connect("sqlite::memory:")
2676            .await
2677            .unwrap();
2678
2679        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2680        let order = entity();
2681        let line = line_entity();
2682        let product = product_entity();
2683        mutation_executor
2684            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2685            .await
2686            .unwrap();
2687
2688        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2689            .execute(&pool)
2690            .await
2691            .unwrap();
2692        sqlx::query(
2693            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2694                (101, 1, 1001, 'keep'),
2695                (102, 1, 1002, 'drop')",
2696        )
2697        .execute(&pool)
2698        .await
2699        .unwrap();
2700        sqlx::query(
2701            "INSERT INTO product (id, name) VALUES
2702                (1001, 'p1'),
2703                (1002, 'p2')",
2704        )
2705        .execute(&pool)
2706        .await
2707        .unwrap();
2708
2709        let executor = SqliteSyncExecutor::new(mutation_executor);
2710        let mut ctx = UserContext::new()
2711            .with_metadata(
2712                InMemoryMetadataStore::new()
2713                    .with_entity(order)
2714                    .with_entity(line)
2715                    .with_entity(product),
2716            )
2717            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2718        ctx.insert_resource(SqliteDialect);
2719        ctx.insert_resource(executor);
2720
2721        let repo = ctx
2722            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2723            .unwrap();
2724        let query = repo.select().relation_query(
2725            "lines",
2726            SelectQuery::new("OrderLine")
2727                .project("name")
2728                .filter(Expr::eq("name", "keep"))
2729                .order_desc("id")
2730                .page(0, 10)
2731                .relation_query("product", SelectQuery::new("Product").project("name")),
2732        );
2733        let mut parents = repo
2734            .fetch_all(
2735                &repo
2736                    .select()
2737                    .project("id")
2738                    .project("version")
2739                    .project("name"),
2740            )
2741            .unwrap();
2742
2743        repo.enhance_query_relations(&mut parents, &query).unwrap();
2744
2745        match parents[0].get("lines") {
2746            Some(Value::List(lines)) => {
2747                assert_eq!(lines.len(), 1);
2748                let Value::Object(line) = &lines[0] else {
2749                    panic!("unexpected line payload: {:?}", lines[0]);
2750                };
2751                assert_eq!(line.get("name"), Some(&Value::Text("keep".to_owned())));
2752                assert_eq!(line.get("order_id"), Some(&Value::I64(1)));
2753                assert_eq!(line.get("product_id"), Some(&Value::I64(1001)));
2754                match line.get("product") {
2755                    Some(Value::Object(product)) => {
2756                        assert_eq!(product.get("name"), Some(&Value::Text("p1".to_owned())));
2757                        assert_eq!(product.get("id"), Some(&Value::I64(1001)));
2758                    }
2759                    other => panic!("unexpected product payload: {other:?}"),
2760                }
2761            }
2762            other => panic!("unexpected query relation payload: {other:?}"),
2763        }
2764    }
2765
2766    #[tokio::test]
2767    async fn sqlite_executor_ensure_schema_adds_missing_columns() {
2768        use sqlx::Row;
2769        use sqlx::sqlite::SqlitePoolOptions;
2770
2771        let pool = SqlitePoolOptions::new()
2772            .max_connections(1)
2773            .connect("sqlite::memory:")
2774            .await
2775            .unwrap();
2776
2777        sqlx::query(
2778            "CREATE TABLE orders (
2779                id INTEGER PRIMARY KEY
2780            )",
2781        )
2782        .execute(&pool)
2783        .await
2784        .unwrap();
2785
2786        let executor = SqliteMutationExecutor::new(pool.clone());
2787        let dialect = SqliteDialect;
2788        let entity = entity();
2789
2790        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2791
2792        let columns = sqlx::query("PRAGMA table_info(\"orders\")")
2793            .fetch_all(&pool)
2794            .await
2795            .unwrap()
2796            .into_iter()
2797            .map(|row| row.try_get::<String, _>("name").unwrap())
2798            .collect::<Vec<_>>();
2799
2800        assert!(columns.contains(&"id".to_owned()));
2801        assert!(columns.contains(&"version".to_owned()));
2802        assert!(columns.contains(&"name".to_owned()));
2803    }
2804
2805    #[tokio::test]
2806    async fn user_context_can_ensure_sqlite_schema_from_runtime_module() {
2807        use sqlx::Row;
2808        use sqlx::sqlite::SqlitePoolOptions;
2809
2810        let pool = SqlitePoolOptions::new()
2811            .max_connections(1)
2812            .connect("sqlite::memory:")
2813            .await
2814            .unwrap();
2815
2816        let module = super::RuntimeModule::new()
2817            .descriptor(entity())
2818            .descriptor(line_entity())
2819            .descriptor(product_entity())
2820            .initial_graph(
2821                GraphNode::new("Order")
2822                    .value("id", 1_u64)
2823                    .value("version", 1_i64)
2824                    .value("name", "seed-order"),
2825            );
2826        let mut ctx = super::UserContext::new().with_module(module);
2827        ctx.insert_resource(SqliteDialect);
2828        ctx.insert_resource(SqliteMutationExecutor::new(pool.clone()));
2829
2830        ctx.ensure_sqlite_schema().await.unwrap();
2831        sqlx::query("UPDATE orders SET name = 'stale-seed-order' WHERE id = 1")
2832            .execute(&pool)
2833            .await
2834            .unwrap();
2835        ctx.ensure_sqlite_schema().await.unwrap();
2836
2837        let tables = sqlx::query(
2838            "SELECT name FROM sqlite_master WHERE type = 'table' AND name IN ('orders', 'orderline', 'product') ORDER BY name",
2839        )
2840        .fetch_all(&pool)
2841        .await
2842        .unwrap()
2843        .into_iter()
2844        .map(|row| row.try_get::<String, _>("name").unwrap())
2845        .collect::<Vec<_>>();
2846
2847        assert_eq!(
2848            tables,
2849            vec![
2850                "orderline".to_owned(),
2851                "orders".to_owned(),
2852                "product".to_owned()
2853            ]
2854        );
2855
2856        let seed_count: i64 =
2857            sqlx::query_scalar("SELECT COUNT(1) FROM orders WHERE id = 1 AND name = 'seed-order'")
2858                .fetch_one(&pool)
2859                .await
2860                .unwrap();
2861        assert_eq!(seed_count, 1);
2862    }
2863
2864    #[tokio::test]
2865    async fn sqlite_executor_roundtrips_json_decimal_date_and_timestamp() {
2866        use sqlx::sqlite::SqlitePoolOptions;
2867
2868        let pool = SqlitePoolOptions::new()
2869            .max_connections(1)
2870            .connect("sqlite::memory:")
2871            .await
2872            .unwrap();
2873
2874        let executor = SqliteMutationExecutor::new(pool.clone());
2875        let dialect = SqliteDialect;
2876        let entity = typed_entity();
2877        executor.ensure_schema(&dialect, &[&entity]).await.unwrap();
2878
2879        let birthday = NaiveDate::from_ymd_opt(2024, 2, 3).unwrap();
2880        let happened_at = Utc.with_ymd_and_hms(2024, 2, 3, 4, 5, 6).unwrap();
2881        let payload = serde_json::json!({"name": "teaql", "count": 2});
2882        let amount = Decimal::new(12345, 2);
2883
2884        let insert = dialect
2885            .compile_insert(
2886                &entity,
2887                &InsertCommand::new("TypedValue")
2888                    .value("id", 1_u64)
2889                    .value("payload", payload.clone())
2890                    .value("amount", amount)
2891                    .value("birthday", birthday)
2892                    .value("happened_at", happened_at),
2893            )
2894            .unwrap();
2895        assert_eq!(executor.execute(&insert).await.unwrap(), 1);
2896
2897        let select = dialect
2898            .compile_select(
2899                &entity,
2900                &SelectQuery::new("TypedValue")
2901                    .project("id")
2902                    .project("payload")
2903                    .project("amount")
2904                    .project("birthday")
2905                    .project("happened_at")
2906                    .filter(Expr::eq("id", 1_u64)),
2907            )
2908            .unwrap();
2909        let rows = executor.fetch_all(&select).await.unwrap();
2910        assert_eq!(rows.len(), 1);
2911        assert_eq!(rows[0].get("payload"), Some(&Value::Json(payload)));
2912        assert_eq!(rows[0].get("amount"), Some(&Value::Decimal(amount)));
2913        assert_eq!(rows[0].get("birthday"), Some(&Value::Date(birthday)));
2914        assert_eq!(
2915            rows[0].get("happened_at"),
2916            Some(&Value::Timestamp(happened_at))
2917        );
2918    }
2919
2920    #[tokio::test(flavor = "multi_thread")]
2921    async fn sqlite_fetches_enhanced_typed_entities() {
2922        use sqlx::sqlite::SqlitePoolOptions;
2923
2924        let pool = SqlitePoolOptions::new()
2925            .max_connections(1)
2926            .connect("sqlite::memory:")
2927            .await
2928            .unwrap();
2929
2930        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
2931        let order = entity();
2932        let line = line_entity();
2933        let product = product_entity();
2934        mutation_executor
2935            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
2936            .await
2937            .unwrap();
2938
2939        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'o1')")
2940            .execute(&pool)
2941            .await
2942            .unwrap();
2943        sqlx::query(
2944            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
2945                (101, 1, 1001, 'l1'),
2946                (102, 1, 1002, 'l2')",
2947        )
2948        .execute(&pool)
2949        .await
2950        .unwrap();
2951        sqlx::query(
2952            "INSERT INTO product (id, name) VALUES
2953                (1001, 'p1'),
2954                (1002, 'p2')",
2955        )
2956        .execute(&pool)
2957        .await
2958        .unwrap();
2959
2960        let executor = SqliteSyncExecutor::new(mutation_executor);
2961        let mut ctx = UserContext::new()
2962            .with_metadata(
2963                InMemoryMetadataStore::new()
2964                    .with_entity(order)
2965                    .with_entity(line)
2966                    .with_entity(product),
2967            )
2968            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
2969            .with_repository_behavior_registry(
2970                InMemoryRepositoryBehaviorRegistry::new()
2971                    .with_behavior("Order", NestedOrderBehavior),
2972            );
2973        ctx.insert_resource(SqliteDialect);
2974        ctx.insert_resource(executor);
2975
2976        let repo = ctx
2977            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
2978            .unwrap();
2979        let rows = repo
2980            .fetch_enhanced_entities::<OrderAggregateRow>(
2981                &repo
2982                    .select()
2983                    .project("id")
2984                    .project("version")
2985                    .project("name"),
2986            )
2987            .unwrap();
2988
2989        assert_eq!(rows.len(), 1);
2990        let order = rows.first().unwrap();
2991        assert_eq!(order.id, 1);
2992        assert_eq!(order.lines.len(), 2);
2993        assert_eq!(order.lines.data[0].product.as_ref().unwrap().name, "p1");
2994        assert_eq!(order.lines.data[1].product.as_ref().unwrap().name, "p2");
2995    }
2996
2997    #[tokio::test(flavor = "multi_thread")]
2998    async fn sqlite_fetches_typed_smart_list_entities() {
2999        use sqlx::sqlite::SqlitePoolOptions;
3000
3001        let pool = SqlitePoolOptions::new()
3002            .max_connections(1)
3003            .connect("sqlite::memory:")
3004            .await
3005            .unwrap();
3006
3007        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3008        let order = entity();
3009        mutation_executor
3010            .ensure_schema(&SqliteDialect, &[&order])
3011            .await
3012            .unwrap();
3013
3014        sqlx::query(
3015            "INSERT INTO orders (id, version, name) VALUES
3016                (1, 1, 'o1'),
3017                (2, 3, 'o2')",
3018        )
3019        .execute(&pool)
3020        .await
3021        .unwrap();
3022
3023        let executor = SqliteSyncExecutor::new(mutation_executor);
3024        let mut ctx = UserContext::new()
3025            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3026            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3027        ctx.insert_resource(SqliteDialect);
3028        ctx.insert_resource(executor);
3029
3030        let repo = ctx
3031            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3032            .unwrap();
3033        let rows = repo
3034            .fetch_entities::<OrderAggregateRow>(
3035                &repo
3036                    .select()
3037                    .project("id")
3038                    .project("version")
3039                    .project("name"),
3040            )
3041            .unwrap();
3042
3043        assert_eq!(rows.len(), 2);
3044        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3045        assert_eq!(rows.versions(), vec![1, 3]);
3046        assert!(rows.data[0].lines.is_empty());
3047        assert!(rows.data[1].lines.is_empty());
3048    }
3049
3050    #[tokio::test(flavor = "multi_thread")]
3051    async fn sqlite_fetches_smart_list_of_order_entities() {
3052        use sqlx::sqlite::SqlitePoolOptions;
3053
3054        let pool = SqlitePoolOptions::new()
3055            .max_connections(1)
3056            .connect("sqlite::memory:")
3057            .await
3058            .unwrap();
3059
3060        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3061        let order = entity();
3062        mutation_executor
3063            .ensure_schema(&SqliteDialect, &[&order])
3064            .await
3065            .unwrap();
3066
3067        sqlx::query(
3068            "INSERT INTO orders (id, version, name) VALUES
3069                (1, 1, 'o1'),
3070                (2, 3, 'o2')",
3071        )
3072        .execute(&pool)
3073        .await
3074        .unwrap();
3075
3076        let executor = SqliteSyncExecutor::new(mutation_executor);
3077        let mut ctx = UserContext::new()
3078            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3079            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3080        ctx.insert_resource(SqliteDialect);
3081        ctx.insert_resource(executor);
3082
3083        let repo = ctx
3084            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3085            .unwrap();
3086        let rows = repo
3087            .fetch_entities::<Order>(
3088                &repo
3089                    .select()
3090                    .project("id")
3091                    .project("version")
3092                    .project("name"),
3093            )
3094            .unwrap();
3095
3096        assert_eq!(
3097            rows.data,
3098            vec![
3099                Order {
3100                    id: 1,
3101                    version: 1,
3102                    name: "o1".to_owned(),
3103                },
3104                Order {
3105                    id: 2,
3106                    version: 3,
3107                    name: "o2".to_owned(),
3108                }
3109            ]
3110        );
3111        assert_eq!(rows.ids(), vec![Value::U64(1), Value::U64(2)]);
3112        assert_eq!(rows.versions(), vec![1, 3]);
3113    }
3114
3115    #[tokio::test(flavor = "multi_thread")]
3116    async fn sqlite_insert_generates_missing_id() {
3117        use sqlx::sqlite::SqlitePoolOptions;
3118
3119        let pool = SqlitePoolOptions::new()
3120            .max_connections(1)
3121            .connect("sqlite::memory:")
3122            .await
3123            .unwrap();
3124
3125        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3126        let order = entity();
3127        mutation_executor
3128            .ensure_schema(&SqliteDialect, &[&order])
3129            .await
3130            .unwrap();
3131
3132        let executor = SqliteSyncExecutor::new(mutation_executor);
3133        let mut ctx = UserContext::new()
3134            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
3135            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3136        ctx.insert_resource(SqliteDialect);
3137        ctx.insert_resource(executor);
3138
3139        let repo = ctx
3140            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3141            .unwrap();
3142        let affected = repo
3143            .insert(
3144                &repo
3145                    .insert_command()
3146                    .value("version", 1_i64)
3147                    .value("name", "generated"),
3148            )
3149            .unwrap();
3150        assert_eq!(affected, 1);
3151
3152        let rows = repo
3153            .fetch_entities::<Order>(
3154                &repo
3155                    .select()
3156                    .project("id")
3157                    .project("version")
3158                    .project("name")
3159                    .filter(Expr::eq("name", "generated")),
3160            )
3161            .unwrap();
3162        assert_eq!(rows.len(), 1);
3163        let order = rows.first().unwrap();
3164        assert!(order.id > 0);
3165        assert_eq!(order.version, 1);
3166        assert_eq!(order.name, "generated");
3167    }
3168
3169    #[tokio::test(flavor = "multi_thread")]
3170    async fn sqlite_save_graph_inserts_nested_rows() {
3171        use sqlx::{Row, sqlite::SqlitePoolOptions};
3172
3173        let pool = SqlitePoolOptions::new()
3174            .max_connections(1)
3175            .connect("sqlite::memory:?cache=shared")
3176            .await
3177            .unwrap();
3178
3179        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3180        let order = entity();
3181        let line = line_entity();
3182        let product = product_entity();
3183        mutation_executor
3184            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3185            .await
3186            .unwrap();
3187
3188        let executor = SqliteSyncExecutor::new(mutation_executor);
3189        let mut ctx = UserContext::new()
3190            .with_metadata(
3191                InMemoryMetadataStore::new()
3192                    .with_entity(order)
3193                    .with_entity(line)
3194                    .with_entity(product),
3195            )
3196            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3197        ctx.insert_resource(SqliteDialect);
3198        ctx.insert_resource(executor);
3199
3200        let repo = ctx
3201            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3202            .unwrap();
3203        let graph = GraphNode::new("Order")
3204            .value("id", 1_u64)
3205            .value("version", 1_i64)
3206            .value("name", "root")
3207            .relation(
3208                "lines",
3209                GraphNode::new("OrderLine")
3210                    .value("id", 10_u64)
3211                    .value("name", "line-1")
3212                    .relation(
3213                        "product",
3214                        GraphNode::new("Product")
3215                            .value("id", 100_u64)
3216                            .value("name", "sku-1"),
3217                    ),
3218            );
3219
3220        let saved = repo.save_graph(graph).unwrap();
3221        assert_eq!(
3222            saved.relations["lines"][0].values.get("order_id"),
3223            Some(&Value::U64(1))
3224        );
3225        assert_eq!(
3226            saved.relations["lines"][0].values.get("product_id"),
3227            Some(&Value::U64(100))
3228        );
3229
3230        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3231            .fetch_one(&pool)
3232            .await
3233            .unwrap()
3234            .try_get("count")
3235            .unwrap();
3236        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 10")
3237            .fetch_one(&pool)
3238            .await
3239            .unwrap();
3240        let product_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM product")
3241            .fetch_one(&pool)
3242            .await
3243            .unwrap()
3244            .try_get("count")
3245            .unwrap();
3246
3247        assert_eq!(order_count, 1);
3248        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 1);
3249        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 100);
3250        assert_eq!(product_count, 1);
3251    }
3252
3253    #[tokio::test(flavor = "multi_thread")]
3254    async fn sqlite_save_typed_entity_graph_create_inserts_nested_rows() {
3255        use sqlx::{Row, sqlite::SqlitePoolOptions};
3256
3257        let pool = SqlitePoolOptions::new()
3258            .max_connections(1)
3259            .connect("sqlite::memory:")
3260            .await
3261            .unwrap();
3262
3263        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3264        let order = entity();
3265        let line = line_entity();
3266        let product = product_entity();
3267        mutation_executor
3268            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3269            .await
3270            .unwrap();
3271
3272        let executor = SqliteSyncExecutor::new(mutation_executor);
3273        let mut ctx = UserContext::new()
3274            .with_metadata(
3275                InMemoryMetadataStore::new()
3276                    .with_entity(order)
3277                    .with_entity(line)
3278                    .with_entity(product),
3279            )
3280            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3281        ctx.insert_resource(SqliteDialect);
3282        ctx.insert_resource(executor);
3283
3284        let repo = ctx
3285            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3286            .unwrap();
3287        let saved = repo
3288            .save_entity_graph(OrderAggregateRow {
3289                id: 2,
3290                version: 1,
3291                name: "typed-root".to_owned(),
3292                lines: teaql_core::SmartList::from(vec![OrderLineEntityRow {
3293                    id: 20,
3294                    order_id: 0,
3295                    name: "typed-line".to_owned(),
3296                    product_id: 200,
3297                    product: Some(ProductEntityRow {
3298                        id: 200,
3299                        name: "typed-sku".to_owned(),
3300                    }),
3301                }]),
3302            })
3303            .unwrap();
3304
3305        assert_eq!(saved.values.get("id"), Some(&Value::U64(2)));
3306        assert_eq!(
3307            saved.relations["lines"][0].values.get("order_id"),
3308            Some(&Value::U64(2))
3309        );
3310        assert_eq!(
3311            saved.relations["lines"][0].values.get("product_id"),
3312            Some(&Value::U64(200))
3313        );
3314
3315        let order_count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3316            .fetch_one(&pool)
3317            .await
3318            .unwrap()
3319            .try_get("count")
3320            .unwrap();
3321        let line = sqlx::query("SELECT order_id, product_id FROM orderline WHERE id = 20")
3322            .fetch_one(&pool)
3323            .await
3324            .unwrap();
3325        let product_name: String = sqlx::query_scalar("SELECT name FROM product WHERE id = 200")
3326            .fetch_one(&pool)
3327            .await
3328            .unwrap();
3329
3330        assert_eq!(order_count, 1);
3331        assert_eq!(line.try_get::<i64, _>("order_id").unwrap(), 2);
3332        assert_eq!(line.try_get::<i64, _>("product_id").unwrap(), 200);
3333        assert_eq!(product_name, "typed-sku");
3334    }
3335
3336    #[tokio::test(flavor = "multi_thread")]
3337    async fn sqlite_plan_for_save_graph_assigns_ids_and_batches_before_execution() {
3338        use sqlx::sqlite::SqlitePoolOptions;
3339        use std::time::{SystemTime, UNIX_EPOCH};
3340
3341        let db_path = std::env::temp_dir().join(format!(
3342            "teaql-plan-{}-{}.db",
3343            std::process::id(),
3344            SystemTime::now()
3345                .duration_since(UNIX_EPOCH)
3346                .unwrap()
3347                .as_nanos()
3348        ));
3349        let db_url = format!("sqlite://{}?mode=rwc", db_path.display());
3350        let pool = SqlitePoolOptions::new()
3351            .max_connections(1)
3352            .connect(&db_url)
3353            .await
3354            .unwrap();
3355
3356        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3357        let order = entity();
3358        let line = line_entity();
3359        let product = product_entity();
3360        mutation_executor
3361            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3362            .await
3363            .unwrap();
3364
3365        sqlx::query("INSERT INTO orders (id, version, name) VALUES (100, 1, 'existing')")
3366            .execute(&pool)
3367            .await
3368            .unwrap();
3369        sqlx::query(
3370            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3371                (200, 1, 100, 301, 'line-a'),
3372                (201, 1, 100, 302, 'line-b')",
3373        )
3374        .execute(&pool)
3375        .await
3376        .unwrap();
3377        let seeded_lines: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM orderline")
3378            .fetch_one(&pool)
3379            .await
3380            .unwrap();
3381        assert_eq!(seeded_lines, 2);
3382
3383        let executor = SqliteSyncExecutor::new(mutation_executor);
3384        let mut ctx = UserContext::new()
3385            .with_metadata(
3386                InMemoryMetadataStore::new()
3387                    .with_entity(order)
3388                    .with_entity(line)
3389                    .with_entity(product),
3390            )
3391            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
3392            .with_internal_id_generator(SqliteIdSpaceGenerator::new(pool.clone()));
3393        ctx.insert_resource(SqliteDialect);
3394        ctx.insert_resource(executor);
3395
3396        let graph = GraphNode::new("Order")
3397            .value("id", 100_u64)
3398            .value("version", 1_i64)
3399            .value("name", "existing-updated")
3400            .relation(
3401                "lines",
3402                GraphNode::new("OrderLine")
3403                    .value("name", "new-line-a")
3404                    .value("product_id", 401_u64),
3405            )
3406            .relation(
3407                "lines",
3408                GraphNode::new("OrderLine")
3409                    .value("name", "new-line-b")
3410                    .value("product_id", 402_u64),
3411            )
3412            .relation(
3413                "lines",
3414                GraphNode::new("OrderLine")
3415                    .value("id", 200_u64)
3416                    .value("version", 1_i64)
3417                    .value("name", "line-a-updated"),
3418            )
3419            .relation(
3420                "lines",
3421                GraphNode::new("OrderLine")
3422                    .value("id", 201_u64)
3423                    .value("version", 1_i64)
3424                    .value("name", "line-b-updated"),
3425            );
3426
3427        let plan = ctx
3428            .plan_for_save_graph::<SqliteDialect, SqliteSyncExecutor>(graph)
3429            .unwrap();
3430        let counts = plan.grouped_counts();
3431        assert_eq!(
3432            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
3433            Some(&1)
3434        );
3435        assert_eq!(
3436            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
3437            Some(&2)
3438        );
3439        assert_eq!(
3440            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
3441            Some(&2)
3442        );
3443
3444        let create_batch = plan
3445            .batches
3446            .iter()
3447            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
3448            .unwrap();
3449        assert_eq!(create_batch.items.len(), 2);
3450        assert_eq!(create_batch.items[0].values.get("id"), Some(&Value::U64(1)));
3451        assert_eq!(create_batch.items[1].values.get("id"), Some(&Value::U64(2)));
3452
3453        let update_batch = plan
3454            .batches
3455            .iter()
3456            .find(|batch| {
3457                batch.entity == "OrderLine"
3458                    && batch.kind == GraphMutationKind::Update
3459                    && batch.update_fields == vec!["name".to_owned()]
3460            })
3461            .unwrap();
3462        assert_eq!(update_batch.items.len(), 2);
3463
3464        let repo = ctx
3465            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3466            .unwrap();
3467        let saved = repo.execute_graph_plan(plan).unwrap();
3468        let lines = saved.relations.get("lines").unwrap();
3469        assert_eq!(lines.len(), 4);
3470
3471        let generated_count: i64 = sqlx::query_scalar(
3472            "SELECT COUNT(*) FROM orderline WHERE id IN (1, 2) AND order_id = 100",
3473        )
3474        .fetch_one(&pool)
3475        .await
3476        .unwrap();
3477        assert_eq!(generated_count, 2);
3478        let updated_name: String = sqlx::query_scalar("SELECT name FROM orderline WHERE id = 200")
3479            .fetch_one(&pool)
3480            .await
3481            .unwrap();
3482        assert_eq!(updated_name, "line-a-updated");
3483        pool.close().await;
3484        let _ = std::fs::remove_file(db_path);
3485    }
3486
3487    #[tokio::test(flavor = "multi_thread")]
3488    async fn sqlite_save_graph_updates_nested_rows_and_deletes_missing_children() {
3489        use sqlx::{Row, sqlite::SqlitePoolOptions};
3490
3491        let pool = SqlitePoolOptions::new()
3492            .max_connections(1)
3493            .connect("sqlite::memory:")
3494            .await
3495            .unwrap();
3496
3497        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3498        let order = entity();
3499        let line = line_entity();
3500        let product = product_entity();
3501        mutation_executor
3502            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3503            .await
3504            .unwrap();
3505
3506        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'old-root')")
3507            .execute(&pool)
3508            .await
3509            .unwrap();
3510        sqlx::query(
3511            "INSERT INTO product (id, name) VALUES
3512                (100, 'old-sku'),
3513                (101, 'removed-sku')",
3514        )
3515        .execute(&pool)
3516        .await
3517        .unwrap();
3518        sqlx::query(
3519            "INSERT INTO orderline (id, order_id, product_id, name) VALUES
3520                (10, 1, 100, 'old-line'),
3521                (11, 1, 101, 'removed-line')",
3522        )
3523        .execute(&pool)
3524        .await
3525        .unwrap();
3526        sqlx::query("UPDATE orderline SET version = 1")
3527            .execute(&pool)
3528            .await
3529            .unwrap();
3530
3531        let executor = SqliteSyncExecutor::new(mutation_executor);
3532        let mut ctx = UserContext::new()
3533            .with_metadata(
3534                InMemoryMetadataStore::new()
3535                    .with_entity(order)
3536                    .with_entity(line)
3537                    .with_entity(product),
3538            )
3539            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3540        ctx.insert_resource(SqliteDialect);
3541        ctx.insert_resource(executor);
3542
3543        let repo = ctx
3544            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3545            .unwrap();
3546        let graph = GraphNode::new("Order")
3547            .value("id", 1_u64)
3548            .value("version", 1_i64)
3549            .value("name", "new-root")
3550            .relation(
3551                "lines",
3552                GraphNode::new("OrderLine")
3553                    .value("id", 10_u64)
3554                    .value("version", 1_i64)
3555                    .value("name", "new-line")
3556                    .relation(
3557                        "product",
3558                        GraphNode::new("Product")
3559                            .value("id", 100_u64)
3560                            .value("name", "new-sku"),
3561                    ),
3562            )
3563            .relation(
3564                "lines",
3565                GraphNode::new("OrderLine")
3566                    .value("id", 12_u64)
3567                    .value("version", 1_i64)
3568                    .value("name", "added-line")
3569                    .relation(
3570                        "product",
3571                        GraphNode::new("Product")
3572                            .value("id", 102_u64)
3573                            .value("name", "added-sku"),
3574                    ),
3575            );
3576
3577        let saved = repo.save_graph(graph).unwrap();
3578        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
3579
3580        let order_row = sqlx::query("SELECT version, name FROM orders WHERE id = 1")
3581            .fetch_one(&pool)
3582            .await
3583            .unwrap();
3584        assert_eq!(order_row.try_get::<i64, _>("version").unwrap(), 2);
3585        assert_eq!(order_row.try_get::<String, _>("name").unwrap(), "new-root");
3586
3587        let updated_line =
3588            sqlx::query("SELECT version, product_id, name FROM orderline WHERE id = 10")
3589                .fetch_one(&pool)
3590                .await
3591                .unwrap();
3592        assert_eq!(updated_line.try_get::<i64, _>("version").unwrap(), 2);
3593        assert_eq!(updated_line.try_get::<i64, _>("product_id").unwrap(), 100);
3594        assert_eq!(
3595            updated_line.try_get::<String, _>("name").unwrap(),
3596            "new-line"
3597        );
3598
3599        let added_line =
3600            sqlx::query("SELECT order_id, product_id, name FROM orderline WHERE id = 12")
3601                .fetch_one(&pool)
3602                .await
3603                .unwrap();
3604        assert_eq!(added_line.try_get::<i64, _>("order_id").unwrap(), 1);
3605        assert_eq!(added_line.try_get::<i64, _>("product_id").unwrap(), 102);
3606        assert_eq!(
3607            added_line.try_get::<String, _>("name").unwrap(),
3608            "added-line"
3609        );
3610
3611        let deleted_line = sqlx::query("SELECT version FROM orderline WHERE id = 11")
3612            .fetch_one(&pool)
3613            .await
3614            .unwrap();
3615        assert_eq!(deleted_line.try_get::<i64, _>("version").unwrap(), -2);
3616
3617        let updated_product = sqlx::query("SELECT name FROM product WHERE id = 100")
3618            .fetch_one(&pool)
3619            .await
3620            .unwrap();
3621        assert_eq!(
3622            updated_product.try_get::<String, _>("name").unwrap(),
3623            "new-sku"
3624        );
3625    }
3626
3627    #[tokio::test(flavor = "multi_thread")]
3628    async fn sqlite_save_graph_supports_reference_remove_and_keep_missing() {
3629        use sqlx::{Row, sqlite::SqlitePoolOptions};
3630
3631        let pool = SqlitePoolOptions::new()
3632            .max_connections(1)
3633            .connect("sqlite::memory:")
3634            .await
3635            .unwrap();
3636
3637        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3638        let order = sqlite_entity_keep_missing();
3639        let line = line_entity();
3640        let product = product_entity();
3641        mutation_executor
3642            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3643            .await
3644            .unwrap();
3645
3646        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3647            .execute(&pool)
3648            .await
3649            .unwrap();
3650        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'reference-only')")
3651            .execute(&pool)
3652            .await
3653            .unwrap();
3654        sqlx::query(
3655            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3656                (10, 1, 1, 100, 'remove-me'),
3657                (11, 1, 1, 100, 'keep-me')",
3658        )
3659        .execute(&pool)
3660        .await
3661        .unwrap();
3662
3663        let executor = SqliteSyncExecutor::new(mutation_executor);
3664        let mut ctx = UserContext::new()
3665            .with_metadata(
3666                InMemoryMetadataStore::new()
3667                    .with_entity(order)
3668                    .with_entity(line)
3669                    .with_entity(product),
3670            )
3671            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3672        ctx.insert_resource(SqliteDialect);
3673        ctx.insert_resource(executor);
3674
3675        let repo = ctx
3676            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3677            .unwrap();
3678        let graph = GraphNode::new("Order")
3679            .value("id", 1_u64)
3680            .value("version", 1_i64)
3681            .value("name", "root-updated")
3682            .relation(
3683                "lines",
3684                GraphNode::new("OrderLine")
3685                    .value("id", 10_u64)
3686                    .value("version", 1_i64)
3687                    .remove(),
3688            )
3689            .relation(
3690                "lines",
3691                GraphNode::new("OrderLine")
3692                    .value("id", 12_u64)
3693                    .value("version", 1_i64)
3694                    .value("name", "new-reference-line")
3695                    .relation(
3696                        "product",
3697                        GraphNode::new("Product").value("id", 100_u64).reference(),
3698                    ),
3699            );
3700
3701        let saved = repo.save_graph(graph).unwrap();
3702        assert_eq!(
3703            saved.relations["lines"][1].relations["product"][0]
3704                .values
3705                .get("name"),
3706            Some(&Value::Text("reference-only".to_owned()))
3707        );
3708
3709        let removed = sqlx::query("SELECT version FROM orderline WHERE id = 10")
3710            .fetch_one(&pool)
3711            .await
3712            .unwrap();
3713        assert_eq!(removed.try_get::<i64, _>("version").unwrap(), -2);
3714
3715        let kept = sqlx::query("SELECT version, name FROM orderline WHERE id = 11")
3716            .fetch_one(&pool)
3717            .await
3718            .unwrap();
3719        assert_eq!(kept.try_get::<i64, _>("version").unwrap(), 1);
3720        assert_eq!(kept.try_get::<String, _>("name").unwrap(), "keep-me");
3721
3722        let added = sqlx::query("SELECT product_id FROM orderline WHERE id = 12")
3723            .fetch_one(&pool)
3724            .await
3725            .unwrap();
3726        assert_eq!(added.try_get::<i64, _>("product_id").unwrap(), 100);
3727
3728        let product = sqlx::query("SELECT name FROM product WHERE id = 100")
3729            .fetch_one(&pool)
3730            .await
3731            .unwrap();
3732        assert_eq!(
3733            product.try_get::<String, _>("name").unwrap(),
3734            "reference-only"
3735        );
3736    }
3737
3738    #[tokio::test(flavor = "multi_thread")]
3739    async fn sqlite_save_graph_rejects_invalid_reference_and_state_transitions() {
3740        use sqlx::sqlite::SqlitePoolOptions;
3741
3742        let pool = SqlitePoolOptions::new()
3743            .max_connections(1)
3744            .connect("sqlite::memory:")
3745            .await
3746            .unwrap();
3747
3748        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3749        let order = entity();
3750        let line = line_entity();
3751        let product = product_entity();
3752        mutation_executor
3753            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3754            .await
3755            .unwrap();
3756
3757        sqlx::query("INSERT INTO orders (id, version, name) VALUES (1, 1, 'root')")
3758            .execute(&pool)
3759            .await
3760            .unwrap();
3761        sqlx::query("INSERT INTO product (id, name) VALUES (100, 'valid-reference')")
3762            .execute(&pool)
3763            .await
3764            .unwrap();
3765        sqlx::query(
3766            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
3767                (10, 1, 1, 100, 'line-10')",
3768        )
3769        .execute(&pool)
3770        .await
3771        .unwrap();
3772
3773        let executor = SqliteSyncExecutor::new(mutation_executor);
3774        let mut ctx = UserContext::new()
3775            .with_metadata(
3776                InMemoryMetadataStore::new()
3777                    .with_entity(order)
3778                    .with_entity(line)
3779                    .with_entity(product),
3780            )
3781            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3782        ctx.insert_resource(SqliteDialect);
3783        ctx.insert_resource(executor);
3784
3785        let repo = ctx
3786            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3787            .unwrap();
3788
3789        let missing_reference = repo.save_graph(
3790            GraphNode::new("Order").value("id", 1_u64).relation(
3791                "lines",
3792                GraphNode::new("OrderLine")
3793                    .value("id", 12_u64)
3794                    .value("version", 1_i64)
3795                    .value("name", "bad-reference")
3796                    .relation(
3797                        "product",
3798                        GraphNode::new("Product").value("id", 999_u64).reference(),
3799                    ),
3800            ),
3801        );
3802        assert!(format!("{missing_reference:?}").contains("does not exist"));
3803
3804        let mutable_reference = repo.save_graph(
3805            GraphNode::new("Order").value("id", 1_u64).relation(
3806                "lines",
3807                GraphNode::new("OrderLine")
3808                    .value("id", 12_u64)
3809                    .value("version", 1_i64)
3810                    .value("name", "mutable-reference")
3811                    .relation(
3812                        "product",
3813                        GraphNode::new("Product")
3814                            .value("id", 100_u64)
3815                            .value("name", "should-not-mutate")
3816                            .reference(),
3817                    ),
3818            ),
3819        );
3820        assert!(format!("{mutable_reference:?}").contains("cannot carry mutable field"));
3821
3822        let duplicate_child = repo.save_graph(
3823            GraphNode::new("Order")
3824                .value("id", 1_u64)
3825                .relation(
3826                    "lines",
3827                    GraphNode::new("OrderLine")
3828                        .value("id", 10_u64)
3829                        .value("version", 1_i64)
3830                        .reference(),
3831                )
3832                .relation(
3833                    "lines",
3834                    GraphNode::new("OrderLine")
3835                        .value("id", 10_u64)
3836                        .value("version", 1_i64)
3837                        .reference(),
3838                ),
3839        );
3840        assert!(format!("{duplicate_child:?}").contains("duplicate child id"));
3841
3842        let reference_version_conflict = repo.save_graph(
3843            GraphNode::new("Order").value("id", 1_u64).relation(
3844                "lines",
3845                GraphNode::new("OrderLine")
3846                    .value("id", 10_u64)
3847                    .value("version", 2_i64)
3848                    .reference(),
3849            ),
3850        );
3851        assert!(format!("{reference_version_conflict:?}").contains("OptimisticLockConflict"));
3852
3853        let remove_with_child = repo.save_graph(
3854            GraphNode::new("Order").value("id", 1_u64).relation(
3855                "lines",
3856                GraphNode::new("OrderLine")
3857                    .value("id", 10_u64)
3858                    .value("version", 1_i64)
3859                    .relation(
3860                        "product",
3861                        GraphNode::new("Product").value("id", 100_u64).reference(),
3862                    )
3863                    .remove(),
3864            ),
3865        );
3866        assert!(format!("{remove_with_child:?}").contains("cannot contain child relations"));
3867
3868        let remove = repo.save_graph(GraphNode::new("Order").value("id", 999_u64).remove());
3869        assert!(format!("{remove:?}").contains("does not exist"));
3870    }
3871
3872    #[tokio::test(flavor = "multi_thread")]
3873    async fn sqlite_graph_write_rolls_back_all_batches_on_failure() {
3874        use sqlx::{Row, sqlite::SqlitePoolOptions};
3875
3876        let pool = SqlitePoolOptions::new()
3877            .max_connections(1)
3878            .connect("sqlite::memory:")
3879            .await
3880            .unwrap();
3881
3882        let mutation_executor = SqliteMutationExecutor::new(pool.clone());
3883        let order = entity();
3884        let line = line_entity();
3885        let product = product_entity();
3886        mutation_executor
3887            .ensure_schema(&SqliteDialect, &[&order, &line, &product])
3888            .await
3889            .unwrap();
3890
3891        let executor = SqliteSyncExecutor::new(mutation_executor.clone());
3892        let mut ctx = UserContext::new()
3893            .with_metadata(
3894                InMemoryMetadataStore::new()
3895                    .with_entity(order)
3896                    .with_entity(line)
3897                    .with_entity(product),
3898            )
3899            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
3900        ctx.insert_resource(SqliteDialect);
3901        ctx.insert_resource(executor);
3902
3903        let repo = ctx
3904            .resolve_repository::<SqliteDialect, SqliteSyncExecutor>("Order")
3905            .unwrap();
3906        let error = repo.save_graph(
3907            GraphNode::new("Order")
3908                .value("id", 1_u64)
3909                .value("version", 1_i64)
3910                .value("name", "rollback-root")
3911                .relation(
3912                    "lines",
3913                    GraphNode::new("OrderLine")
3914                        .value("id", 10_u64)
3915                        .value("version", 1_i64)
3916                        .value("name", "rollback-line")
3917                        .value("product_id", 999_u64)
3918                        .relation(
3919                            "product",
3920                            GraphNode::new("Product").value("id", 999_u64).reference(),
3921                        ),
3922                ),
3923        );
3924        assert!(format!("{error:?}").contains("does not exist"));
3925
3926        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders")
3927            .fetch_one(&pool)
3928            .await
3929            .unwrap()
3930            .try_get("count")
3931            .unwrap();
3932        assert_eq!(count, 0);
3933    }
3934
3935    #[tokio::test]
3936    async fn postgres_executor_ensure_schema_roundtrip_when_database_is_available() {
3937        use sqlx::{PgPool, Row};
3938
3939        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
3940            return;
3941        };
3942
3943        let pool = PgPool::connect(&database_url).await.unwrap();
3944        sqlx::query("DROP TABLE IF EXISTS orderline")
3945            .execute(&pool)
3946            .await
3947            .unwrap();
3948        sqlx::query("DROP TABLE IF EXISTS orders")
3949            .execute(&pool)
3950            .await
3951            .unwrap();
3952
3953        let executor = PgMutationExecutor::new(pool.clone());
3954        let dialect = PostgresDialect;
3955        let order = entity();
3956        let line = line_entity();
3957
3958        executor
3959            .ensure_schema(&dialect, &[&order, &line])
3960            .await
3961            .unwrap();
3962
3963        let tables = sqlx::query(
3964            "SELECT table_name
3965             FROM information_schema.tables
3966             WHERE table_schema = current_schema()
3967               AND table_name IN ('orders', 'orderline')
3968             ORDER BY table_name",
3969        )
3970        .fetch_all(&pool)
3971        .await
3972        .unwrap()
3973        .into_iter()
3974        .map(|row| row.try_get::<String, _>("table_name").unwrap())
3975        .collect::<Vec<_>>();
3976        assert_eq!(tables, vec!["orderline".to_owned(), "orders".to_owned()]);
3977
3978        let soundex: String = sqlx::query_scalar("SELECT soundex('Robert')")
3979            .fetch_one(&pool)
3980            .await
3981            .unwrap();
3982        assert_eq!(soundex, "R163");
3983
3984        sqlx::query(
3985            "INSERT INTO orders (id, version, name) VALUES
3986                (1, 1, 'draft'),
3987                (2, 1, 'submitted'),
3988                (3, 1, 'archived')",
3989        )
3990        .execute(&pool)
3991        .await
3992        .unwrap();
3993
3994        let array_bound_query = dialect
3995            .compile_select(
3996                &order,
3997                &SelectQuery::new("Order")
3998                    .filter(
3999                        Expr::in_large(
4000                            "id",
4001                            vec![Value::from(1_u64), Value::from(2_u64), Value::from(3_u64)],
4002                        )
4003                        .and_expr(Expr::not_in_large("name", vec![Value::from("archived")])),
4004                    )
4005                    .order_asc("id"),
4006            )
4007            .unwrap();
4008        assert_eq!(
4009            array_bound_query.sql,
4010            format!(
4011                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE ((\"id\" = ANY($1)) AND (\"name\" <> ALL($2))) ORDER BY \"id\" ASC"
4012            )
4013        );
4014        let rows = executor.fetch_all(&array_bound_query).await.unwrap();
4015        assert_eq!(rows.len(), 2);
4016        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4017        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4018
4019        sqlx::query(
4020            "INSERT INTO orderline (id, version, order_id, product_id, name) VALUES
4021                (11, 1, 1, 101, 'line-1'),
4022                (12, 1, 2, 102, 'line-2'),
4023                (13, 1, 3, 103, 'archived-line')",
4024        )
4025        .execute(&pool)
4026        .await
4027        .unwrap();
4028
4029        let subquery = dialect
4030            .compile_select(
4031                &order,
4032                &SelectQuery::new("Order")
4033                    .filter(Expr::in_subquery(
4034                        "id",
4035                        line.clone(),
4036                        SelectQuery::new("OrderLine").filter(Expr::contain("name", "line-")),
4037                        "order_id",
4038                    ))
4039                    .order_asc("id"),
4040            )
4041            .unwrap();
4042        assert_eq!(
4043            subquery.sql,
4044            format!(
4045                "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"id\" IN (SELECT \"order_id\" FROM \"orderline\" WHERE (\"name\" LIKE $1))) ORDER BY \"id\" ASC"
4046            )
4047        );
4048        let rows = executor.fetch_all(&subquery).await.unwrap();
4049        assert_eq!(rows.len(), 2);
4050        assert_eq!(rows[0].get("id"), Some(&Value::I64(1)));
4051        assert_eq!(rows[1].get("id"), Some(&Value::I64(2)));
4052
4053        let projected = dialect
4054            .compile_select(
4055                &order,
4056                &SelectQuery::new("Order")
4057                    .project("id")
4058                    .project_expr("nameSound", Expr::soundex(Expr::column("name")))
4059                    .order_gbk_asc("name")
4060                    .limit(1),
4061            )
4062            .unwrap();
4063        assert_eq!(
4064            projected.sql,
4065            "SELECT \"id\", SOUNDEX(\"name\") AS \"nameSound\" FROM \"orders\" ORDER BY convert_to(\"name\", 'GBK') ASC LIMIT 1"
4066        );
4067        let rows = executor.fetch_all(&projected).await.unwrap();
4068        assert_eq!(rows.len(), 1);
4069        assert!(rows[0].contains_key("nameSound"));
4070
4071        let aggregate = dialect
4072            .compile_select(
4073                &order,
4074                &SelectQuery::new("Order")
4075                    .count("total")
4076                    .stddev("version", "stddevVersion")
4077                    .var_pop("version", "varPopVersion")
4078                    .bit_or("version", "bitOrVersion")
4079                    .having(Expr::binary(
4080                        Expr::count_all(),
4081                        teaql_core::BinaryOp::Gt,
4082                        Expr::value(2_i64),
4083                    )),
4084            )
4085            .unwrap();
4086        assert_eq!(
4087            aggregate.sql,
4088            "SELECT COUNT(*) AS \"total\", STDDEV(\"version\") AS \"stddevVersion\", VAR_POP(\"version\") AS \"varPopVersion\", BIT_OR(\"version\") AS \"bitOrVersion\" FROM \"orders\" HAVING (COUNT(*) > $1)"
4089        );
4090        let rows = executor.fetch_all(&aggregate).await.unwrap();
4091        assert_eq!(rows.len(), 1);
4092        assert_eq!(rows[0].get("total"), Some(&Value::I64(3)));
4093        assert!(matches!(
4094            rows[0].get("stddevVersion"),
4095            Some(Value::Decimal(_))
4096        ));
4097        assert!(matches!(
4098            rows[0].get("varPopVersion"),
4099            Some(Value::Decimal(_))
4100        ));
4101        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(1)));
4102
4103        sqlx::query("DROP TABLE orderline")
4104            .execute(&pool)
4105            .await
4106            .unwrap();
4107        sqlx::query("CREATE TABLE orderline (id BIGINT PRIMARY KEY)")
4108            .execute(&pool)
4109            .await
4110            .unwrap();
4111
4112        executor.ensure_schema(&dialect, &[&line]).await.unwrap();
4113
4114        let columns = sqlx::query(
4115            "SELECT column_name
4116             FROM information_schema.columns
4117             WHERE table_schema = current_schema()
4118               AND table_name = 'orderline'
4119             ORDER BY column_name",
4120        )
4121        .fetch_all(&pool)
4122        .await
4123        .unwrap()
4124        .into_iter()
4125        .map(|row| row.try_get::<String, _>("column_name").unwrap())
4126        .collect::<Vec<_>>();
4127        assert!(columns.contains(&"id".to_owned()));
4128        assert!(columns.contains(&"order_id".to_owned()));
4129        assert!(columns.contains(&"product_id".to_owned()));
4130        assert!(columns.contains(&"name".to_owned()));
4131
4132        sqlx::query("DROP TABLE IF EXISTS orderline")
4133            .execute(&pool)
4134            .await
4135            .unwrap();
4136        sqlx::query("DROP TABLE IF EXISTS orders")
4137            .execute(&pool)
4138            .await
4139            .unwrap();
4140    }
4141
4142    #[tokio::test(flavor = "multi_thread")]
4143    async fn postgres_id_space_generator_prepares_repository_insert_when_database_is_available() {
4144        use sqlx::{PgPool, Row};
4145
4146        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4147            return;
4148        };
4149
4150        let pool = PgPool::connect(&database_url).await.unwrap();
4151        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4152            .execute(&pool)
4153            .await
4154            .unwrap();
4155        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4156            .execute(&pool)
4157            .await
4158            .unwrap();
4159
4160        let order = entity().table_name("orders_idgen");
4161        let executor = PgMutationExecutor::new(pool.clone());
4162        executor
4163            .ensure_schema(&PostgresDialect, &[&order])
4164            .await
4165            .unwrap();
4166
4167        let mut ctx = UserContext::new()
4168            .with_metadata(InMemoryMetadataStore::new().with_entity(order))
4169            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
4170            .with_internal_id_generator(PgIdSpaceGenerator::new(pool.clone()));
4171        ctx.insert_resource(PostgresDialect);
4172        ctx.insert_resource(PgSyncExecutor::new(executor));
4173
4174        let repo = ctx
4175            .resolve_repository::<PostgresDialect, PgSyncExecutor>("Order")
4176            .unwrap();
4177        repo.insert(
4178            &repo
4179                .insert_command()
4180                .value("version", 1_i64)
4181                .value("name", "generated-1"),
4182        )
4183        .unwrap();
4184        repo.insert(
4185            &repo
4186                .insert_command()
4187                .value("version", 1_i64)
4188                .value("name", "generated-2"),
4189        )
4190        .unwrap();
4191
4192        let ids = sqlx::query("SELECT id FROM orders_idgen ORDER BY id")
4193            .fetch_all(&pool)
4194            .await
4195            .unwrap()
4196            .into_iter()
4197            .map(|row| row.try_get::<i64, _>("id").unwrap())
4198            .collect::<Vec<_>>();
4199        assert_eq!(ids, vec![1, 2]);
4200
4201        let current: i64 = sqlx::query_scalar(
4202            "SELECT current_level FROM teaql_id_space WHERE type_name = 'Order'",
4203        )
4204        .fetch_one(&pool)
4205        .await
4206        .unwrap();
4207        assert_eq!(current, 2);
4208
4209        sqlx::query("DROP TABLE IF EXISTS orders_idgen")
4210            .execute(&pool)
4211            .await
4212            .unwrap();
4213        sqlx::query("DROP TABLE IF EXISTS teaql_id_space")
4214            .execute(&pool)
4215            .await
4216            .unwrap();
4217    }
4218
4219    #[tokio::test]
4220    async fn postgres_executor_runs_extended_aggregates_when_database_is_available() {
4221        use sqlx::PgPool;
4222
4223        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4224            return;
4225        };
4226
4227        let pool = PgPool::connect(&database_url).await.unwrap();
4228        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4229            .execute(&pool)
4230            .await
4231            .unwrap();
4232
4233        let order = entity().table_name("orders_agg");
4234        let executor = PgMutationExecutor::new(pool.clone());
4235        let dialect = PostgresDialect;
4236        executor.ensure_schema(&dialect, &[&order]).await.unwrap();
4237
4238        sqlx::query(
4239            "INSERT INTO orders_agg (id, version, name) VALUES
4240                (1, 1, 'A'),
4241                (2, 2, 'A'),
4242                (3, 3, 'B'),
4243                (4, 4, 'B')",
4244        )
4245        .execute(&pool)
4246        .await
4247        .unwrap();
4248
4249        let aggregate = dialect
4250            .compile_select(
4251                &order,
4252                &SelectQuery::new("Order")
4253                    .count("total")
4254                    .count_field("version", "versionCount")
4255                    .sum("version", "sumVersion")
4256                    .avg("version", "avgVersion")
4257                    .min("version", "minVersion")
4258                    .max("version", "maxVersion")
4259                    .stddev("version", "stddevVersion")
4260                    .stddev_pop("version", "stddevPopVersion")
4261                    .var_samp("version", "varSampVersion")
4262                    .var_pop("version", "varPopVersion")
4263                    .bit_and("version", "bitAndVersion")
4264                    .bit_or("version", "bitOrVersion")
4265                    .bit_xor("version", "bitXorVersion")
4266                    .having(Expr::binary(
4267                        Expr::count_all(),
4268                        teaql_core::BinaryOp::Gt,
4269                        Expr::value(3_i64),
4270                    )),
4271            )
4272            .unwrap();
4273        let rows = executor.fetch_all(&aggregate).await.unwrap();
4274        assert_eq!(rows.len(), 1);
4275        let row = &rows[0];
4276        assert_eq!(row.get("total"), Some(&Value::I64(4)));
4277        assert_eq!(row.get("versionCount"), Some(&Value::I64(4)));
4278        assert_eq!(
4279            row.get("sumVersion"),
4280            Some(&Value::Decimal(Decimal::new(10, 0)))
4281        );
4282        assert_eq!(
4283            row.get("avgVersion"),
4284            Some(&Value::Decimal(Decimal::new(25, 1)))
4285        );
4286        assert_eq!(row.get("minVersion"), Some(&Value::I64(1)));
4287        assert_eq!(row.get("maxVersion"), Some(&Value::I64(4)));
4288        assert!(matches!(row.get("stddevVersion"), Some(Value::Decimal(_))));
4289        assert!(matches!(
4290            row.get("stddevPopVersion"),
4291            Some(Value::Decimal(_))
4292        ));
4293        assert!(matches!(row.get("varSampVersion"), Some(Value::Decimal(_))));
4294        assert!(matches!(row.get("varPopVersion"), Some(Value::Decimal(_))));
4295        assert_eq!(row.get("bitAndVersion"), Some(&Value::I64(0)));
4296        assert_eq!(row.get("bitOrVersion"), Some(&Value::I64(7)));
4297        assert_eq!(row.get("bitXorVersion"), Some(&Value::I64(4)));
4298
4299        let grouped = dialect
4300            .compile_select(
4301                &order,
4302                &SelectQuery::new("Order")
4303                    .group_by("name")
4304                    .count("total")
4305                    .sum("version", "sumVersion")
4306                    .having(Expr::binary(
4307                        Expr::count_all(),
4308                        teaql_core::BinaryOp::Gt,
4309                        Expr::value(1_i64),
4310                    ))
4311                    .order_asc("name"),
4312            )
4313            .unwrap();
4314        let rows = executor.fetch_all(&grouped).await.unwrap();
4315        assert_eq!(rows.len(), 2);
4316        assert_eq!(rows[0].get("name"), Some(&Value::Text("A".to_owned())));
4317        assert_eq!(rows[0].get("total"), Some(&Value::I64(2)));
4318        assert_eq!(
4319            rows[0].get("sumVersion"),
4320            Some(&Value::Decimal(Decimal::new(3, 0)))
4321        );
4322        assert_eq!(rows[1].get("name"), Some(&Value::Text("B".to_owned())));
4323        assert_eq!(rows[1].get("total"), Some(&Value::I64(2)));
4324        assert_eq!(
4325            rows[1].get("sumVersion"),
4326            Some(&Value::Decimal(Decimal::new(7, 0)))
4327        );
4328
4329        sqlx::query("DROP TABLE IF EXISTS orders_agg")
4330            .execute(&pool)
4331            .await
4332            .unwrap();
4333    }
4334
4335    #[tokio::test]
4336    async fn user_context_can_ensure_postgres_schema_when_database_is_available() {
4337        use sqlx::{PgPool, Row};
4338
4339        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4340            return;
4341        };
4342
4343        let pool = PgPool::connect(&database_url).await.unwrap();
4344        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4345            .execute(&pool)
4346            .await
4347            .unwrap();
4348        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4349            .execute(&pool)
4350            .await
4351            .unwrap();
4352        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4353            .execute(&pool)
4354            .await
4355            .unwrap();
4356
4357        let order = entity().table_name("orders_ctx");
4358        let line = line_entity().table_name("orderline_ctx");
4359        let product = product_entity().table_name("product_ctx");
4360        let module = super::RuntimeModule::new()
4361            .descriptor(order)
4362            .descriptor(line)
4363            .descriptor(product);
4364        let mut ctx = super::UserContext::new().with_module(module);
4365        ctx.insert_resource(PostgresDialect);
4366        ctx.insert_resource(PgMutationExecutor::new(pool.clone()));
4367
4368        ctx.ensure_postgres_schema().await.unwrap();
4369
4370        let tables = sqlx::query(
4371            "SELECT table_name
4372             FROM information_schema.tables
4373             WHERE table_schema = current_schema()
4374               AND table_name IN ('orders_ctx', 'orderline_ctx', 'product_ctx')
4375             ORDER BY table_name",
4376        )
4377        .fetch_all(&pool)
4378        .await
4379        .unwrap()
4380        .into_iter()
4381        .map(|row| row.try_get::<String, _>("table_name").unwrap())
4382        .collect::<Vec<_>>();
4383
4384        assert_eq!(
4385            tables,
4386            vec![
4387                "orderline_ctx".to_owned(),
4388                "orders_ctx".to_owned(),
4389                "product_ctx".to_owned()
4390            ]
4391        );
4392
4393        sqlx::query("DROP TABLE IF EXISTS product_ctx")
4394            .execute(&pool)
4395            .await
4396            .unwrap();
4397        sqlx::query("DROP TABLE IF EXISTS orderline_ctx")
4398            .execute(&pool)
4399            .await
4400            .unwrap();
4401        sqlx::query("DROP TABLE IF EXISTS orders_ctx")
4402            .execute(&pool)
4403            .await
4404            .unwrap();
4405    }
4406
4407    #[tokio::test(flavor = "multi_thread")]
4408    async fn postgres_graph_write_transaction_rolls_back_when_database_is_available() {
4409        use sqlx::{PgPool, Row};
4410
4411        let Some(database_url) = std::env::var("TEAQL_TEST_PG_URL").ok() else {
4412            return;
4413        };
4414
4415        let pool = PgPool::connect(&database_url).await.unwrap();
4416        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4417            .execute(&pool)
4418            .await
4419            .unwrap();
4420        sqlx::query("DROP TABLE IF EXISTS product_tx")
4421            .execute(&pool)
4422            .await
4423            .unwrap();
4424        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4425            .execute(&pool)
4426            .await
4427            .unwrap();
4428
4429        let order = entity().table_name("orders_tx");
4430        let line = line_entity().table_name("orderline_tx");
4431        let product = product_entity().table_name("product_tx");
4432        let schema_executor = PgMutationExecutor::new(pool.clone());
4433        schema_executor
4434            .ensure_schema(&PostgresDialect, &[&order, &line, &product])
4435            .await
4436            .unwrap();
4437
4438        let tx_executor = PgTransactionExecutor::begin(&pool).await.unwrap();
4439        let sync_executor = PgTxSyncExecutor::new(tx_executor.clone());
4440        let mut ctx = UserContext::new()
4441            .with_metadata(
4442                InMemoryMetadataStore::new()
4443                    .with_entity(order)
4444                    .with_entity(line)
4445                    .with_entity(product),
4446            )
4447            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
4448        ctx.insert_resource(PostgresDialect);
4449        ctx.insert_resource(sync_executor);
4450
4451        let repo = ctx
4452            .resolve_repository::<PostgresDialect, PgTxSyncExecutor>("Order")
4453            .unwrap();
4454        repo.save_graph(
4455            GraphNode::new("Order")
4456                .value("id", 1_u64)
4457                .value("version", 1_i64)
4458                .value("name", "pg-rollback-root")
4459                .relation(
4460                    "lines",
4461                    GraphNode::new("OrderLine")
4462                        .value("id", 10_u64)
4463                        .value("version", 1_i64)
4464                        .value("name", "pg-rollback-line")
4465                        .relation(
4466                            "product",
4467                            GraphNode::new("Product")
4468                                .value("id", 100_u64)
4469                                .value("name", "pg-rollback-sku"),
4470                        ),
4471                ),
4472        )
4473        .unwrap();
4474
4475        tx_executor.rollback().await.unwrap();
4476
4477        let count: i64 = sqlx::query("SELECT COUNT(*) AS count FROM orders_tx")
4478            .fetch_one(&pool)
4479            .await
4480            .unwrap()
4481            .try_get("count")
4482            .unwrap();
4483        assert_eq!(count, 0);
4484
4485        sqlx::query("DROP TABLE IF EXISTS orderline_tx")
4486            .execute(&pool)
4487            .await
4488            .unwrap();
4489        sqlx::query("DROP TABLE IF EXISTS product_tx")
4490            .execute(&pool)
4491            .await
4492            .unwrap();
4493        sqlx::query("DROP TABLE IF EXISTS orders_tx")
4494            .execute(&pool)
4495            .await
4496            .unwrap();
4497    }
4498}
4499pub use checker::{
4500    CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
4501    CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, clear_record_status,
4502    mark_record_status,
4503};