Skip to main content

teaql_runtime/
lib.rs

1#![allow(warnings)]
2mod checker;
3mod context;
4mod entity_runtime;
5mod entity_status;
6mod error;
7mod event;
8mod graph;
9mod id;
10mod language;
11mod memory;
12mod registry;
13mod repository;
14
15pub use context::{
16    InfoLogEntry, LogPayload, SchemaProvider, SqlLogEntry, SqlLogOperation,
17    SqlLogOptions, UnifiedLogBuffer, UnifiedLogEntry, UserContext,
18};
19pub use entity_status::{EntityAction, EntityStatus};
20pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
21pub use error::{ContextError, RepositoryError, RuntimeError};
22pub use event::{
23    EntityEvent, EntityEventKind, EntityEventSink, EntityPropertyChange, InMemoryEntityEventSink,
24};
25pub use graph::{
26    GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem,
27    GraphNode, GraphOperation, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
28};
29pub(crate) use id::local_id_generator;
30pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
31pub use language::{
32    BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
33};
34pub use memory::{MemoryRepository, MemoryRepositoryError};
35pub use registry::{
36    InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
37    MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
38    RequestPolicy, RuntimeModule,
39};
40pub use repository::{
41    AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
42    RelationLoadPlan, Repository, ResolvedRepository,
43};
44
45#[cfg(test)]
46mod tests {
47    use std::collections::{BTreeMap, VecDeque};
48    use std::sync::{Arc, Mutex};
49
50    use super::{
51        AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult,
52        CheckResults, CheckRule, Checker, EntityEvent, EntityEventKind, EntityEventSink,
53        GraphMutationKind, GraphNode, InMemoryAggregationCache,
54        InMemoryCheckerRegistry, InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry,
55        InMemoryRepositoryRegistry, InternalIdGenerator, Language, MemoryRepository, MetadataStore,
56        ObjectLocation, Repository, RepositoryBehavior, RepositoryError,
57        RequestPolicy, RuntimeError, RuntimeModule, SqlLogOperation, SqlLogOptions, TypedChecker,
58        TypedEntityChecker, UserContext, translate_check_result,
59    };
60    use teaql_data_service::{
61        DataServiceCapabilities, DataServiceExecutor, ExecutionMetadata, MutationExecutor,
62        MutationRequest, MutationResult, QueryExecutor, QueryRequest, QueryResult, DataServiceOperation,
63    };
64    use teaql_core::{
65        Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
66        EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
67        RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
68    };
69    use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
70    use teaql_sql::{CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect, quote_identifier_if_needed};
71
72    const ORDER_DEFAULT_PROJECTION: &str = "id, version, name";
73
74    #[derive(Debug, Default, Clone, Copy)]
75    struct PostgresDialect;
76
77    impl SqlDialect for PostgresDialect {
78        fn kind(&self) -> DatabaseKind {
79            DatabaseKind::PostgreSql
80        }
81
82        fn quote_ident(&self, ident: &str) -> String {
83            quote_identifier_if_needed(ident, '"')
84        }
85
86        fn placeholder(&self, index: usize) -> String {
87            format!("${index}")
88        }
89
90        fn schema_type_sql(
91            &self,
92            data_type: DataType,
93            _property: &PropertyDescriptor,
94        ) -> Result<&'static str, SqlCompileError> {
95            match data_type {
96                DataType::Bool => Ok("BOOLEAN"),
97                DataType::I64 | DataType::U64 => Ok("BIGINT"),
98                DataType::F64 => Ok("DOUBLE PRECISION"),
99                DataType::Decimal => Ok("NUMERIC"),
100                DataType::Text => Ok("TEXT"),
101                DataType::Json => Ok("JSONB"),
102                DataType::Date => Ok("DATE"),
103                DataType::Timestamp => Ok("TIMESTAMPTZ"),
104            }
105        }
106    }
107
108    fn entity() -> EntityDescriptor {
109        EntityDescriptor::new("Order")
110            .table_name("orders")
111            .property(
112                PropertyDescriptor::new("id", DataType::U64)
113                    .column_name("id")
114                    .id()
115                    .not_null(),
116            )
117            .property(
118                PropertyDescriptor::new("version", DataType::I64)
119                    .column_name("version")
120                    .version()
121                    .not_null(),
122            )
123            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
124            .relation(
125                teaql_core::RelationDescriptor::new("lines", "OrderLine")
126                    .local_key("id")
127                    .foreign_key("order_id")
128                    .many(),
129            )
130    }
131
132    fn line_entity() -> EntityDescriptor {
133        EntityDescriptor::new("OrderLine")
134            .table_name("orderline")
135            .property(
136                PropertyDescriptor::new("id", DataType::U64)
137                    .column_name("id")
138                    .id()
139                    .not_null(),
140            )
141            .property(
142                PropertyDescriptor::new("version", DataType::I64)
143                    .column_name("version")
144                    .version(),
145            )
146            .property(
147                PropertyDescriptor::new("order_id", DataType::U64)
148                    .column_name("order_id")
149                    .not_null(),
150            )
151            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
152            .property(
153                PropertyDescriptor::new("product_id", DataType::U64)
154                    .column_name("product_id")
155                    .not_null(),
156            )
157            .relation(
158                teaql_core::RelationDescriptor::new("product", "Product")
159                    .local_key("product_id")
160                    .foreign_key("id"),
161            )
162    }
163
164    fn product_entity() -> EntityDescriptor {
165        EntityDescriptor::new("Product")
166            .table_name("product")
167            .property(
168                PropertyDescriptor::new("id", DataType::U64)
169                    .column_name("id")
170                    .id()
171                    .not_null(),
172            )
173            .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
174    }
175
176    #[derive(Debug, Default)]
177    struct StubExecutor {
178        affected: u64,
179        rows: Vec<Record>,
180    }
181
182    #[derive(Debug, Default)]
183    struct QueueExecutor {
184        affected: u64,
185        rows: Mutex<VecDeque<Vec<Record>>>,
186        queries: Mutex<Vec<String>>,
187    }
188
189    struct OrderBehavior;
190
191    #[allow(dead_code)]
192    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
193    #[teaql(entity = "CatalogProduct", table = "catalog_product")]
194    struct CatalogProductRow {
195        #[teaql(id)]
196        id: u64,
197        name: String,
198    }
199
200    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
201    #[teaql(entity = "OrderAggregate", table = "orders")]
202    struct OrderAggregateDynamic {
203        #[teaql(id)]
204        id: u64,
205        #[teaql(dynamic)]
206        dynamic: BTreeMap<String, Value>,
207    }
208
209    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
210    #[teaql(entity = "Product", table = "product")]
211    struct ProductEntityRow {
212        #[teaql(id)]
213        id: u64,
214        name: String,
215    }
216
217    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
218    #[teaql(entity = "OrderLine", table = "orderline")]
219    struct OrderLineEntityRow {
220        #[teaql(id)]
221        id: u64,
222        #[teaql(column = "order_id")]
223        order_id: u64,
224        name: String,
225        #[teaql(column = "product_id")]
226        product_id: u64,
227        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
228        product: Option<ProductEntityRow>,
229    }
230
231    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
232    #[teaql(entity = "OrderLine", table = "orderline")]
233    struct ProductLineEntityRow {
234        #[teaql(id)]
235        id: u64,
236        #[teaql(column = "order_id")]
237        order_id: u64,
238        name: String,
239        #[teaql(column = "product_id")]
240        product_id: u64,
241    }
242
243    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
244    #[teaql(entity = "Product", table = "product")]
245    struct ProductWithLinesEntityRow {
246        #[teaql(id)]
247        id: u64,
248        name: String,
249        #[teaql(relation(
250            target = "OrderLine",
251            local_key = "id",
252            foreign_key = "product_id",
253            many
254        ))]
255        lines: teaql_core::SmartList<ProductLineEntityRow>,
256    }
257
258    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
259    #[teaql(entity = "OrderLine", table = "orderline")]
260    struct OrderLineWithProductEntityRow {
261        #[teaql(id)]
262        id: u64,
263        #[teaql(column = "order_id")]
264        order_id: u64,
265        name: String,
266        #[teaql(column = "product_id")]
267        product_id: u64,
268        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
269        product: Option<ProductWithLinesEntityRow>,
270    }
271
272    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
273    #[teaql(entity = "Order", table = "orders")]
274    struct OrderAggregateRow {
275        #[teaql(id)]
276        id: u64,
277        #[teaql(version)]
278        version: i64,
279        name: String,
280        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
281        lines: teaql_core::SmartList<OrderLineEntityRow>,
282    }
283
284    #[derive(Debug, Clone, PartialEq, DeriveTeaqlEntity)]
285    #[teaql(entity = "Order", table = "orders")]
286    struct Order {
287        #[teaql(id)]
288        id: u64,
289        #[teaql(version)]
290        version: i64,
291        name: String,
292    }
293
294    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
295    #[teaql(entity = "Product", table = "product")]
296    struct TypedGraphProduct {
297        #[teaql(id)]
298        id: u64,
299        name: String,
300    }
301
302    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
303    #[teaql(entity = "OrderLine", table = "orderline")]
304    struct TypedGraphLine {
305        #[teaql(id)]
306        id: u64,
307        #[teaql(column = "order_id")]
308        order_id: Option<u64>,
309        name: String,
310        #[teaql(column = "product_id")]
311        product_id: Option<u64>,
312        #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
313        product: Option<TypedGraphProduct>,
314    }
315
316    #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
317    #[teaql(entity = "Order", table = "orders")]
318    struct TypedGraphOrder {
319        #[teaql(id)]
320        id: u64,
321        #[teaql(version)]
322        version: i64,
323        name: String,
324        #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
325        lines: teaql_core::SmartList<TypedGraphLine>,
326    }
327
328    #[derive(Debug, PartialEq, Eq)]
329    struct OrderEntity {
330        id: u64,
331        version: i64,
332        name: String,
333    }
334
335    impl teaql_core::TeaqlEntity for OrderEntity {
336        fn entity_descriptor() -> EntityDescriptor {
337            entity()
338        }
339    }
340
341    impl Entity for OrderEntity {
342        fn from_record(record: Record) -> Result<Self, EntityError> {
343            let id = match record.get("id") {
344                Some(Value::U64(v)) => *v,
345                Some(Value::I64(v)) if *v >= 0 => *v as u64,
346                other => {
347                    return Err(EntityError::new(
348                        "Order",
349                        format!("invalid id field: {other:?}"),
350                    ));
351                }
352            };
353            let version = match record.get("version") {
354                Some(Value::I64(v)) => *v,
355                other => {
356                    return Err(EntityError::new(
357                        "Order",
358                        format!("invalid version field: {other:?}"),
359                    ));
360                }
361            };
362            let name = match record.get("name") {
363                Some(Value::Text(v)) => v.clone(),
364                other => {
365                    return Err(EntityError::new(
366                        "Order",
367                        format!("invalid name field: {other:?}"),
368                    ));
369                }
370            };
371            Ok(Self { id, version, name })
372        }
373
374        fn into_record(self) -> Record {
375            Record::from([
376                (String::from("id"), Value::U64(self.id)),
377                (String::from("version"), Value::I64(self.version)),
378                (String::from("name"), Value::Text(self.name)),
379            ])
380        }
381    }
382
383    #[derive(Debug)]
384    struct StubError;
385
386    impl std::fmt::Display for StubError {
387        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388            write!(f, "stub error")
389        }
390    }
391
392    impl std::error::Error for StubError {}
393
394    impl DataServiceExecutor for StubExecutor {
395        type Error = StubError;
396
397        fn capabilities(&self) -> DataServiceCapabilities {
398            DataServiceCapabilities::default()
399        }
400    }
401
402    impl QueryExecutor for StubExecutor {
403        async fn query(&self, _request: QueryRequest) -> Result<QueryResult, Self::Error> {
404            Ok(QueryResult {
405                rows: self.rows.clone(),
406                metadata: ExecutionMetadata {
407                    debug_query: None,
408                    backend: "stub".to_owned(),
409                    operation: DataServiceOperation::Query,
410                    started_at: std::time::SystemTime::now(),
411                    ended_at: std::time::SystemTime::now(),
412                    affected_rows: None,
413                    result_count: Some(self.rows.len()),
414                    trace_chain: Vec::new(),
415                    comment: None,
416                    backend_request_id: None,
417                },
418            })
419        }
420    }
421
422    impl MutationExecutor for StubExecutor {
423        async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
424            Ok(MutationResult {
425                affected_rows: self.affected,
426                generated_values: Record::new(),
427                metadata: ExecutionMetadata {
428                    debug_query: None,
429                    backend: "stub".to_owned(),
430                    operation: DataServiceOperation::Update,
431                    started_at: std::time::SystemTime::now(),
432                    ended_at: std::time::SystemTime::now(),
433                    affected_rows: Some(self.affected),
434                    result_count: None,
435                    trace_chain: Vec::new(),
436                    comment: None,
437                    backend_request_id: None,
438                },
439            })
440        }
441    }
442
443    impl DataServiceExecutor for QueueExecutor {
444        type Error = StubError;
445
446        fn capabilities(&self) -> DataServiceCapabilities {
447            DataServiceCapabilities::default()
448        }
449    }
450
451    impl QueryExecutor for QueueExecutor {
452        async fn query(&self, request: QueryRequest) -> Result<QueryResult, Self::Error> {
453            let sql_approx = format!("SELECT ... FROM {} ...", request.query.entity);
454            self.queries.lock().unwrap().push(sql_approx);
455            Ok(QueryResult {
456                rows: self.rows.lock().unwrap().pop_front().unwrap_or_default(),
457                metadata: ExecutionMetadata {
458                    debug_query: None,
459                    backend: "queue".to_owned(),
460                    operation: DataServiceOperation::Query,
461                    started_at: std::time::SystemTime::now(),
462                    ended_at: std::time::SystemTime::now(),
463                    affected_rows: None,
464                    result_count: Some(0),
465                    trace_chain: Vec::new(),
466                    comment: None,
467                    backend_request_id: None,
468                },
469            })
470        }
471    }
472
473    impl MutationExecutor for QueueExecutor {
474        async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
475            Ok(MutationResult {
476                affected_rows: self.affected,
477                generated_values: Record::new(),
478                metadata: ExecutionMetadata {
479                    debug_query: None,
480                    backend: "queue".to_owned(),
481                    operation: DataServiceOperation::Update,
482                    started_at: std::time::SystemTime::now(),
483                    ended_at: std::time::SystemTime::now(),
484                    affected_rows: Some(self.affected),
485                    result_count: None,
486                    trace_chain: Vec::new(),
487                    comment: None,
488                    backend_request_id: None,
489                },
490            })
491        }
492    }
493
494
495
496    impl RepositoryBehavior for OrderBehavior {
497        fn before_select(
498            &self,
499            _ctx: &UserContext,
500            query: &mut teaql_core::SelectQuery,
501        ) -> Result<(), RuntimeError> {
502            query.filter = Some(Expr::eq("version", 1_i64));
503            Ok(())
504        }
505
506        fn before_insert(
507            &self,
508            _ctx: &UserContext,
509            command: &mut InsertCommand,
510        ) -> Result<(), RuntimeError> {
511            command
512                .values
513                .entry("version".to_owned())
514                .or_insert(Value::I64(1));
515            Ok(())
516        }
517
518        fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
519            vec!["lines".to_owned()]
520        }
521    }
522
523    struct ContextAwareOrderBehavior;
524    struct TenantRequestPolicy;
525    struct OrderChecker;
526    struct TypedOrderChecker;
527    #[derive(Clone)]
528    struct RecordingEventSink {
529        events: Arc<Mutex<Vec<EntityEvent>>>,
530    }
531
532    impl RepositoryBehavior for ContextAwareOrderBehavior {
533        fn before_insert(
534            &self,
535            ctx: &UserContext,
536            command: &mut InsertCommand,
537        ) -> Result<(), RuntimeError> {
538            let tenant = ctx
539                .get_named_resource::<String>("tenant")
540                .cloned()
541                .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
542            let version = *ctx
543                .get_named_resource::<i64>("initial_version")
544                .ok_or_else(|| {
545                    RuntimeError::Behavior("missing initial_version resource".to_owned())
546                })?;
547            let trace_id = match ctx.local("trace_id") {
548                Some(Value::Text(value)) => value.clone(),
549                other => {
550                    return Err(RuntimeError::Behavior(format!(
551                        "missing trace_id local, got {other:?}"
552                    )));
553                }
554            };
555
556            command
557                .values
558                .entry("name".to_owned())
559                .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
560            command
561                .values
562                .entry("version".to_owned())
563                .or_insert(Value::I64(version));
564            Ok(())
565        }
566    }
567
568    impl RequestPolicy for TenantRequestPolicy {
569        fn enforce_select(
570            &self,
571            ctx: &UserContext,
572            query: &mut SelectQuery,
573        ) -> Result<(), RuntimeError> {
574            if query.entity == "Order" {
575                let tenant_id = ctx
576                    .get_named_resource::<u64>("tenant_id")
577                    .copied()
578                    .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
579                query.filter = Some(match query.filter.take() {
580                    Some(filter) => filter.and_expr(Expr::eq("id", tenant_id)),
581                    None => Expr::eq("id", tenant_id),
582                });
583            }
584            Ok(())
585        }
586
587        fn enforce_insert(
588            &self,
589            ctx: &UserContext,
590            command: &mut InsertCommand,
591        ) -> Result<(), RuntimeError> {
592            if command.entity == "Order" {
593                let tenant_id = ctx
594                    .get_named_resource::<u64>("tenant_id")
595                    .copied()
596                    .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
597                command
598                    .values
599                    .insert("version".to_owned(), Value::I64(tenant_id as i64));
600            }
601            Ok(())
602        }
603    }
604
605    impl Checker for OrderChecker {
606        fn entity(&self) -> &str {
607            "Order"
608        }
609
610        fn check_and_fix(
611            &self,
612            _ctx: &UserContext,
613            record: &mut Record,
614            location: &ObjectLocation,
615            results: &mut CheckResults,
616        ) {
617            let status = CheckObjectStatus::from_record(record);
618            if status.is_create() {
619                self.required(record, "name", location, results);
620                record.entry("version".to_owned()).or_insert(Value::I64(1));
621            }
622            if status.is_update()
623                && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
624            {
625                record.insert(
626                    "name".to_owned(),
627                    Value::Text("graph-update-checked".to_owned()),
628                );
629            }
630            self.min_string_length(record, "name", 3, location, results);
631        }
632    }
633
634    impl TypedChecker<Order> for TypedOrderChecker {
635        fn check_and_fix_typed(
636            &self,
637            _ctx: &UserContext,
638            entity: &mut Order,
639            status: CheckObjectStatus,
640            location: &ObjectLocation,
641            results: &mut CheckResults,
642        ) {
643            if status.is_create() {
644                if entity.name.is_empty() {
645                    results.push(CheckResult::required(location.clone().member("name")));
646                }
647            }
648            if entity.name.chars().count() < 3 {
649                results.push(CheckResult::min_str(
650                    location.clone().member("name"),
651                    3,
652                    entity.name.clone(),
653                ));
654            }
655            if entity.name == "fix" {
656                entity.name = "fixed".to_owned();
657            }
658        }
659    }
660
661    impl EntityEventSink for RecordingEventSink {
662        fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
663            self.events.lock().unwrap().push(event.clone());
664            Ok(())
665        }
666    }
667
668    struct FixedIdGenerator(u64);
669
670    impl InternalIdGenerator for FixedIdGenerator {
671        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
672            Ok(self.0)
673        }
674    }
675
676    struct SequentialIdGenerator {
677        next: Mutex<u64>,
678    }
679
680    impl SequentialIdGenerator {
681        fn new(next: u64) -> Self {
682            Self {
683                next: Mutex::new(next),
684            }
685        }
686    }
687
688    impl InternalIdGenerator for SequentialIdGenerator {
689        fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
690            let mut next = self
691                .next
692                .lock()
693                .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
694            let id = *next;
695            *next += 1;
696            Ok(id)
697        }
698    }
699
700    #[tokio::test]
701    async fn metadata_store_registers_entities() {
702        let store = InMemoryMetadataStore::new().with_entity(entity());
703        assert!(store.entity("Order").is_some());
704    }
705
706    #[tokio::test]
707    async fn runtime_module_registers_descriptor_into_context() {
708        let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
709        assert!(ctx.entity("Order").is_some());
710        assert!(ctx.has_repository("Order"));
711    }
712
713    #[tokio::test]
714    async fn runtime_module_registers_derived_entity_and_behavior() {
715        let ctx = UserContext::new().with_module(
716            RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
717        );
718        assert!(ctx.entity("CatalogProduct").is_some());
719        assert!(ctx.has_repository("CatalogProduct"));
720        assert!(ctx.repository_behavior("CatalogProduct").is_some());
721    }
722
723    #[tokio::test]
724    async fn module_macro_registers_multiple_entities() {
725        let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
726        assert!(ctx.entity("CatalogProduct").is_some());
727        assert!(ctx.has_repository("CatalogProduct"));
728    }
729
730    #[tokio::test]
731    async fn module_macro_registers_entity_behavior_pairs() {
732        let ctx =
733            UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
734        assert!(ctx.entity("CatalogProduct").is_some());
735        assert!(ctx.repository_behavior("CatalogProduct").is_some());
736    }
737
738    #[tokio::test]
739    async fn repository_returns_optimistic_lock_conflict() {
740        let store = InMemoryMetadataStore::new().with_entity(entity());
741        let executor = StubExecutor {
742            affected: 0,
743            rows: Vec::new(),
744        };
745        let repo = Repository::new(&store, &executor);
746
747        let err = repo
748            .update(
749                &UpdateCommand::new("Order", 1_u64)
750                    .expected_version(3)
751                    .value("name", "next"),
752            )
753            .await.unwrap_err();
754
755        match err {
756            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
757            other => panic!("unexpected error: {other}"),
758        }
759    }
760
761    #[tokio::test]
762    async fn user_context_indexes_resources_and_locals() {
763        let mut ctx =
764            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
765        ctx.insert_resource::<u64>(42);
766        ctx.insert_named_resource("tenant", String::from("acme"));
767        ctx.put_local("trace_id", "req-1");
768
769        assert!(ctx.entity("Order").is_some());
770        assert_eq!(ctx.get_resource::<u64>(), Some(&42));
771        assert_eq!(
772            ctx.get_named_resource::<String>("tenant"),
773            Some(&String::from("acme"))
774        );
775        assert_eq!(
776            ctx.local("trace_id"),
777            Some(&Value::Text("req-1".to_owned()))
778        );
779    }
780
781    #[tokio::test]
782    async fn user_context_builds_context_repository() {
783        let mut ctx =
784            UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
785        ctx.insert_resource(PostgresDialect);
786        ctx.insert_resource(StubExecutor {
787            affected: 1,
788            rows: Vec::new(),
789        });
790
791        let repo = ctx.repository::< StubExecutor>().unwrap();
792        let affected = repo
793            .update(
794                &UpdateCommand::new("Order", 1_u64)
795                    .expected_version(3)
796                    .value("name", "next"),
797            )
798            .await.unwrap();
799
800        assert_eq!(affected, 1);
801    }
802
803    #[tokio::test]
804    async fn user_context_resolves_repository_by_entity_type() {
805        let mut ctx = UserContext::new()
806            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
807            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
808        ctx.insert_resource(PostgresDialect);
809        ctx.insert_resource(StubExecutor {
810            affected: 1,
811            rows: Vec::new(),
812        });
813
814        let repo = ctx
815            .resolve_repository::< StubExecutor>("Order")
816            .unwrap();
817        assert_eq!(repo.entity(), "Order");
818        assert_eq!(repo.select().entity, "Order");
819
820        let affected = repo
821            .insert(
822                &repo
823                    .insert_command()
824                    .value("id", 1_u64)
825                    .value("version", 1_i64)
826                    .value("name", "n"),
827            )
828            .await.unwrap();
829        assert_eq!(affected, 1);
830    }
831
832    #[tokio::test]
833    async fn resolved_repository_applies_behavior_hooks() {
834        let mut ctx = UserContext::new()
835            .with_metadata(
836                InMemoryMetadataStore::new()
837                    .with_entity(entity())
838                    .with_entity(line_entity())
839                    .with_entity(product_entity()),
840            )
841            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
842            .with_repository_behavior_registry(
843                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
844            );
845        ctx.insert_resource(PostgresDialect);
846        ctx.insert_resource(StubExecutor {
847            affected: 1,
848            rows: Vec::new(),
849        });
850
851        let repo = ctx
852            .resolve_repository::< StubExecutor>("Order")
853            .unwrap();
854
855        // let compiled = repo.compile(&repo.select()).unwrap();
856        // assert!(compiled.sql.contains("WHERE (version = $1)"));
857
858        let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
859        let affected = repo.insert(&insert).await.unwrap();
860        assert_eq!(affected, 1);
861        assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
862    }
863
864    #[tokio::test]
865    async fn resolved_repository_applies_request_policy_after_behavior_hooks() {
866        let mut ctx = UserContext::new()
867            .with_metadata(
868                InMemoryMetadataStore::new()
869                    .with_entity(entity())
870                    .with_entity(line_entity())
871                    .with_entity(product_entity()),
872            )
873            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
874            .with_repository_behavior_registry(
875                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
876            )
877            .with_request_policy(TenantRequestPolicy);
878        ctx.insert_named_resource("tenant_id", 9_u64);
879        ctx.insert_resource(PostgresDialect);
880        ctx.insert_resource(StubExecutor {
881            affected: 1,
882            rows: Vec::new(),
883        });
884
885        let repo = ctx
886            .resolve_repository::< StubExecutor>("Order")
887            .unwrap();
888
889        // let compiled = repo.compile(&repo.select()).unwrap();
890        // assert!(compiled.sql.contains("version = $1"));
891        // assert!(compiled.sql.contains("id = $2"));
892
893        let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
894        let command = repo.prepare_insert_command(&insert).unwrap();
895        assert_eq!(command.values.get("version"), Some(&Value::I64(9)));
896    }
897
898    #[tokio::test]
899    async fn resolved_repository_prepares_insert_command_with_generated_id() {
900        let mut ctx = UserContext::new()
901            .with_metadata(
902                InMemoryMetadataStore::new()
903                    .with_entity(entity())
904                    .with_entity(line_entity())
905                    .with_entity(product_entity()),
906            )
907            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
908            .with_repository_behavior_registry(
909                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
910            )
911            .with_internal_id_generator(FixedIdGenerator(42));
912        ctx.insert_resource(PostgresDialect);
913        ctx.insert_resource(StubExecutor {
914            affected: 1,
915            rows: Vec::new(),
916        });
917
918        let repo = ctx
919            .resolve_repository::< StubExecutor>("Order")
920            .unwrap();
921
922        let prepared = repo
923            .prepare_insert_command(&repo.insert_command().value("id", 0_u64).value("name", "n"))
924            .unwrap();
925
926        assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
927        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
928        assert_eq!(
929            prepared.values.get("name"),
930            Some(&Value::Text("n".to_owned()))
931        );
932
933        let prepared_zero_version = repo
934            .prepare_insert_command(
935                &repo
936                    .insert_command()
937                    .value("id", 0_u64)
938                    .value("version", 0_i64)
939                    .value("name", "zero-version"),
940            )
941            .unwrap();
942        assert_eq!(
943            prepared_zero_version.values.get("version"),
944            Some(&Value::I64(1))
945        );
946    }
947
948    #[tokio::test]
949    async fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
950        let mut ctx = UserContext::new()
951            .with_metadata(
952                InMemoryMetadataStore::new()
953                    .with_entity(entity())
954                    .with_entity(line_entity())
955                    .with_entity(product_entity()),
956            )
957            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
958            .with_repository_behavior_registry(
959                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
960            )
961            .with_internal_id_generator(SequentialIdGenerator::new(500));
962        ctx.insert_resource(PostgresDialect);
963        ctx.insert_resource(StubExecutor {
964            affected: 1,
965            rows: Vec::new(),
966        });
967
968        let repo = ctx
969            .resolve_repository::< StubExecutor>("Order")
970            .unwrap();
971        let graph = GraphNode::new("Order").value("name", "root").relation(
972            "lines",
973            GraphNode::new("OrderLine")
974                .value("name", "line-1")
975                .relation("product", GraphNode::new("Product").value("name", "sku-1")),
976        );
977
978        let saved = repo.save_graph(graph).await.unwrap();
979
980        assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
981        assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
982        let lines = saved.relations.get("lines").unwrap();
983        assert_eq!(lines.len(), 1);
984        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(502)));
985        assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
986        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
987        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(501)));
988        let product = lines[0].relations.get("product").unwrap();
989        assert_eq!(product[0].values.get("id"), Some(&Value::U64(501)));
990    }
991
992    #[tokio::test]
993    async fn resolved_repository_extracts_and_saves_typed_entity_graph() {
994        let mut ctx = UserContext::new()
995            .with_metadata(
996                InMemoryMetadataStore::new()
997                    .with_entity(entity())
998                    .with_entity(line_entity())
999                    .with_entity(product_entity()),
1000            )
1001            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1002            .with_internal_id_generator(SequentialIdGenerator::new(700));
1003        ctx.insert_resource(PostgresDialect);
1004        ctx.insert_resource(StubExecutor {
1005            affected: 1,
1006            rows: Vec::new(),
1007        });
1008
1009        let repo = ctx
1010            .resolve_repository::< StubExecutor>("Order")
1011            .unwrap();
1012        let order = TypedGraphOrder {
1013            id: 0,
1014            version: 0,
1015            name: "typed-root".to_owned(),
1016            lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1017                id: 0,
1018                order_id: None,
1019                name: "typed-line".to_owned(),
1020                product_id: None,
1021                product: Some(TypedGraphProduct {
1022                    id: 0,
1023                    name: "typed-product".to_owned(),
1024                }),
1025            }]),
1026        };
1027
1028        let extracted = repo.graph_node_from_entity(order).unwrap();
1029        assert_eq!(extracted.entity, "Order");
1030        assert_eq!(
1031            extracted.values.get("name"),
1032            Some(&Value::Text("typed-root".to_owned()))
1033        );
1034        assert_eq!(extracted.values.get("id"), Some(&Value::U64(0)));
1035        assert_eq!(extracted.relations["lines"].len(), 1);
1036        assert_eq!(
1037            extracted.relations["lines"][0].values.get("name"),
1038            Some(&Value::Text("typed-line".to_owned()))
1039        );
1040        assert_eq!(
1041            extracted.relations["lines"][0].relations["product"].len(),
1042            1
1043        );
1044
1045        let saved = repo.save_graph(extracted).await.unwrap();
1046        assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
1047        assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1048        let lines = saved.relations.get("lines").unwrap();
1049        assert_eq!(lines[0].values.get("id"), Some(&Value::U64(702)));
1050        assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
1051        assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
1052        assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(701)));
1053        assert_eq!(
1054            lines[0].relations["product"][0].values.get("id"),
1055            Some(&Value::U64(701))
1056        );
1057    }
1058
1059    #[tokio::test]
1060    async fn resolved_repository_saves_typed_entity_graph_directly() {
1061        let mut ctx = UserContext::new()
1062            .with_metadata(
1063                InMemoryMetadataStore::new()
1064                    .with_entity(entity())
1065                    .with_entity(line_entity())
1066                    .with_entity(product_entity()),
1067            )
1068            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1069            .with_internal_id_generator(SequentialIdGenerator::new(800));
1070        ctx.insert_resource(PostgresDialect);
1071        ctx.insert_resource(StubExecutor {
1072            affected: 1,
1073            rows: Vec::new(),
1074        });
1075
1076        let repo = ctx
1077            .resolve_repository::< StubExecutor>("Order")
1078            .unwrap();
1079        let saved = repo
1080            .save_entity_graph(TypedGraphOrder {
1081                id: 0,
1082                version: 0,
1083                name: "typed-direct".to_owned(),
1084                lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1085                    id: 0,
1086                    order_id: None,
1087                    name: "typed-line".to_owned(),
1088                    product_id: None,
1089                    product: Some(TypedGraphProduct {
1090                        id: 0,
1091                        name: "typed-product".to_owned(),
1092                    }),
1093                }]),
1094            })
1095            .await.unwrap();
1096
1097        assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
1098        assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1099        assert_eq!(
1100            saved.relations["lines"][0].values.get("order_id"),
1101            Some(&Value::U64(800))
1102        );
1103        assert_eq!(
1104            saved.relations["lines"][0].values.get("product_id"),
1105            Some(&Value::U64(801))
1106        );
1107    }
1108
1109    #[tokio::test]
1110    async fn custom_user_context_can_drive_insert_preparation() {
1111        let mut ctx = UserContext::new()
1112            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1113            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1114            .with_repository_behavior_registry(
1115                InMemoryRepositoryBehaviorRegistry::new()
1116                    .with_behavior("Order", ContextAwareOrderBehavior),
1117            )
1118            .with_internal_id_generator(FixedIdGenerator(99));
1119        ctx.insert_named_resource("tenant", String::from("acme"));
1120        ctx.insert_named_resource("initial_version", 7_i64);
1121        ctx.put_local("trace_id", "req-9");
1122        ctx.insert_resource(PostgresDialect);
1123        ctx.insert_resource(StubExecutor {
1124            affected: 1,
1125            rows: Vec::new(),
1126        });
1127
1128        let repo = ctx
1129            .resolve_repository::< StubExecutor>("Order")
1130            .unwrap();
1131        let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
1132
1133        assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
1134        assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
1135        assert_eq!(
1136            prepared.values.get("name"),
1137            Some(&Value::Text("acme:req-9".to_owned()))
1138        );
1139    }
1140
1141    #[tokio::test]
1142    async fn checker_registry_validates_and_fixes_insert_commands() {
1143        let mut ctx = UserContext::new()
1144            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1145            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1146            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1147            .with_internal_id_generator(FixedIdGenerator(77));
1148        ctx.insert_resource(PostgresDialect);
1149        ctx.insert_resource(StubExecutor {
1150            affected: 1,
1151            rows: Vec::new(),
1152        });
1153
1154        let repo = ctx
1155            .resolve_repository::< StubExecutor>("Order")
1156            .unwrap();
1157        let prepared = repo
1158            .prepare_insert_command(&repo.insert_command().value("name", "valid"))
1159            .unwrap();
1160
1161        assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
1162        assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
1163        assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1164
1165        let error = repo
1166            .prepare_insert_command(&repo.insert_command().value("name", "no"))
1167            .unwrap_err();
1168        match error {
1169            RuntimeError::Check(results) => {
1170                assert_eq!(results.len(), 1);
1171                assert_eq!(results[0].location.to_string(), "name");
1172            }
1173            other => panic!("unexpected checker error: {other:?}"),
1174        }
1175    }
1176
1177    #[tokio::test]
1178    async fn typed_checker_validates_and_fixes_derived_entities_without_record_access() {
1179        let mut ctx = UserContext::new()
1180            .with_metadata(InMemoryMetadataStore::new().with_entity(Order::entity_descriptor()))
1181            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1182            .with_checker_registry(
1183                InMemoryCheckerRegistry::new()
1184                    .with_checker(TypedEntityChecker::<Order, _>::new(TypedOrderChecker)),
1185            )
1186            .with_internal_id_generator(FixedIdGenerator(79));
1187        ctx.insert_resource(PostgresDialect);
1188        ctx.insert_resource(StubExecutor {
1189            affected: 1,
1190            rows: Vec::new(),
1191        });
1192
1193        let repo = ctx
1194            .resolve_repository::< StubExecutor>("Order")
1195            .unwrap();
1196        let prepared = repo
1197            .prepare_insert_command(
1198                &repo
1199                    .insert_command()
1200                    .value("name", "fix")
1201                    .value("version", 1_i64),
1202            )
1203            .unwrap();
1204        assert_eq!(
1205            prepared.values.get("name"),
1206            Some(&Value::Text("fixed".to_owned()))
1207        );
1208        assert_eq!(prepared.values.get("id"), Some(&Value::U64(79)));
1209        assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1210
1211        let error = repo
1212            .prepare_insert_command(&repo.insert_command().value("version", 1_i64))
1213            .unwrap_err();
1214        match error {
1215            RuntimeError::Check(results) => {
1216                assert!(
1217                    results
1218                        .iter()
1219                        .any(|result| result.rule == CheckRule::Required
1220                            && result.location.to_string() == "name")
1221                );
1222            }
1223            other => panic!("unexpected typed checker error: {other:?}"),
1224        }
1225    }
1226
1227
1228
1229    #[tokio::test]
1230    async fn checker_registry_reports_nested_create_locations_and_fixes_records() {
1231        let ctx = UserContext::new()
1232            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1233
1234        let mut child = Record::from([
1235            (String::from("id"), Value::U64(10)),
1236            (
1237                String::from(CHECK_OBJECT_STATUS_FIELD),
1238                Value::from(CheckObjectStatus::Create),
1239            ),
1240        ]);
1241        let error = ctx
1242            .check_and_fix_record_at(
1243                "Order",
1244                &mut child,
1245                &ObjectLocation::hash_root("lines").element(0),
1246            )
1247            .unwrap_err();
1248
1249        assert_eq!(child.get("version"), Some(&Value::I64(1)));
1250        match error {
1251            RuntimeError::Check(results) => {
1252                assert_eq!(results.len(), 1);
1253                assert_eq!(results[0].rule, CheckRule::Required);
1254                assert_eq!(results[0].location.to_string(), "lines[0].name");
1255            }
1256            other => panic!("unexpected checker error: {other:?}"),
1257        }
1258
1259        child.insert("name".to_owned(), Value::Text("valid child".to_owned()));
1260        ctx.check_and_fix_record_at(
1261            "Order",
1262            &mut child,
1263            &ObjectLocation::hash_root("lines").element(0),
1264        )
1265        .unwrap();
1266    }
1267
1268    #[tokio::test]
1269    async fn built_in_language_translators_cover_fifteen_languages() {
1270        assert_eq!(Language::ALL.len(), 15);
1271        let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
1272        let messages = Language::ALL
1273            .iter()
1274            .map(|language| translate_check_result(*language, &result))
1275            .collect::<Vec<_>>();
1276
1277        assert!(messages.iter().all(|message| !message.is_empty()));
1278        assert!(messages.iter().any(|message| message.contains("required")));
1279        assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
1280        assert!(
1281            messages
1282                .iter()
1283                .any(|message| message.contains("obligatoire"))
1284        );
1285        assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
1286        assert_eq!(
1287            Language::from_code("zh-TW"),
1288            Some(Language::TraditionalChinese)
1289        );
1290    }
1291
1292    #[tokio::test]
1293    async fn user_context_language_switch_translates_checker_errors() {
1294        let mut ctx = UserContext::new()
1295            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1296            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1297            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1298            .with_internal_id_generator(FixedIdGenerator(77))
1299            .with_language(Language::Chinese);
1300        ctx.insert_resource(PostgresDialect);
1301        ctx.insert_resource(StubExecutor {
1302            affected: 1,
1303            rows: Vec::new(),
1304        });
1305
1306        let repo = ctx
1307            .resolve_repository::< StubExecutor>("Order")
1308            .unwrap();
1309        let error = repo
1310            .prepare_insert_command(&repo.insert_command())
1311            .unwrap_err();
1312        match error {
1313            RuntimeError::Check(results) => {
1314                assert_eq!(results.len(), 1);
1315                assert!(
1316                    results[0]
1317                        .message
1318                        .as_ref()
1319                        .is_some_and(|message| message.contains("å¿…å¡«"))
1320                );
1321            }
1322            other => panic!("unexpected checker error: {other:?}"),
1323        }
1324
1325        let mut ctx = UserContext::new().with_language(Language::English);
1326        ctx.set_language_code("es").unwrap();
1327        assert_eq!(ctx.language(), Language::Spanish);
1328    }
1329
1330    #[tokio::test]
1331    async fn checker_registry_merges_graph_update_fixes_by_object_status() {
1332        let mut ctx = UserContext::new()
1333            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1334            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1335            .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1336        ctx.insert_resource(PostgresDialect);
1337        ctx.insert_resource(StubExecutor {
1338            affected: 1,
1339            rows: vec![Record::from([
1340                ("id".to_owned(), Value::U64(1)),
1341                ("version".to_owned(), Value::I64(1)),
1342                ("name".to_owned(), Value::Text("old".to_owned())),
1343            ])],
1344        });
1345
1346        let repo = ctx
1347            .resolve_repository::< StubExecutor>("Order")
1348            .unwrap();
1349        let saved = repo
1350            .save_graph(
1351                GraphNode::new("Order")
1352                    .value("id", 1_u64)
1353                    .value("version", 1_i64)
1354                    .value("name", "graph-update"),
1355            )
1356            .await.unwrap();
1357
1358        assert_eq!(
1359            saved.values.get("name"),
1360            Some(&Value::Text("graph-update-checked".to_owned()))
1361        );
1362        assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1363        assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1364    }
1365
1366    #[tokio::test]
1367    async fn user_context_event_sink_receives_repository_mutation_events() {
1368        let events = Arc::new(Mutex::new(Vec::new()));
1369        let mut ctx = UserContext::new()
1370            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1371            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1372            .with_internal_id_generator(FixedIdGenerator(88))
1373            .with_event_sink(RecordingEventSink {
1374                events: events.clone(),
1375            });
1376        ctx.insert_resource(PostgresDialect);
1377        ctx.insert_resource(StubExecutor {
1378            affected: 1,
1379            rows: vec![Record::from([
1380                ("id".to_owned(), Value::U64(88)),
1381                ("version".to_owned(), Value::I64(1)),
1382                ("name".to_owned(), Value::Text("old".to_owned())),
1383            ])],
1384        });
1385
1386        let repo = ctx
1387            .resolve_repository::< StubExecutor>("Order")
1388            .unwrap();
1389        repo.insert(&repo.insert_command().value("name", "created"))
1390            .await.unwrap();
1391        repo.update(
1392            &repo
1393                .update_command(88_u64)
1394                .expected_version(1)
1395                .value("name", "updated"),
1396        )
1397        .await.unwrap();
1398        repo.delete(&repo.delete_command(88_u64).expected_version(2))
1399            .await.unwrap();
1400        repo.recover(&repo.recover_command(88_u64, -3)).await.unwrap();
1401
1402        let events = events.lock().unwrap();
1403        assert_eq!(events.len(), 4);
1404        assert_eq!(events[0].kind, EntityEventKind::Created);
1405        assert_eq!(events[0].entity, "Order");
1406        assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1407        assert_eq!(events[1].kind, EntityEventKind::Updated);
1408        assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1409        assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1410        assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1411        assert_eq!(
1412            events[1]
1413                .old_values
1414                .as_ref()
1415                .and_then(|values| values.get("name")),
1416            None // We no longer fetch old_values dynamically
1417        );
1418        assert_eq!(
1419            events[1]
1420                .new_values
1421                .as_ref()
1422                .and_then(|values| values.get("name")),
1423            Some(&Value::Text("updated".to_owned()))
1424        );
1425        assert_eq!(events[1].changes.len(), 1);
1426        assert_eq!(events[1].changes[0].field, "name");
1427        assert_eq!(
1428            events[1].changes[0].old_value,
1429            None // Old value is now absent during blind updates
1430        );
1431        assert_eq!(
1432            events[1].changes[0].new_value,
1433            Some(Value::Text("updated".to_owned()))
1434        );
1435        assert_eq!(events[2].kind, EntityEventKind::Deleted);
1436        assert!(events[2].old_values.is_none()); // No longer fetched
1437        assert!(events[2].new_values.is_none());
1438        assert_eq!(events[3].kind, EntityEventKind::Recovered);
1439        assert_eq!(
1440            events[3]
1441                .old_values
1442                .as_ref()
1443                .and_then(|values| values.get("version")),
1444            None // No longer fetched
1445        );
1446        assert_eq!(
1447            events[3]
1448                .new_values
1449                .as_ref()
1450                .and_then(|values| values.get("version")),
1451            Some(&Value::I64(4))
1452        );
1453        assert_eq!(events[3].changes[0].field, "version");
1454    }
1455
1456    #[tokio::test]
1457    async fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1458        let events = Arc::new(Mutex::new(Vec::new()));
1459        let mut ctx = UserContext::new()
1460            .with_metadata(
1461                InMemoryMetadataStore::new()
1462                    .with_entity(entity())
1463                    .with_entity(line_entity())
1464                    .with_entity(product_entity()),
1465            )
1466            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1467            .with_event_sink(RecordingEventSink {
1468                events: events.clone(),
1469            });
1470        ctx.insert_resource(PostgresDialect);
1471        ctx.insert_resource(StubExecutor {
1472            affected: 1,
1473            rows: vec![Record::from([
1474                ("id".to_owned(), Value::U64(1)),
1475                ("version".to_owned(), Value::I64(1)),
1476                ("name".to_owned(), Value::Text("old".to_owned())),
1477            ])],
1478        });
1479
1480        let repo = ctx
1481            .resolve_repository::< StubExecutor>("Order")
1482            .unwrap();
1483        repo.save_graph(
1484            GraphNode::new("Order")
1485                .value("id", 1_u64)
1486                .value("version", 1_i64)
1487                .value("name", "updated")
1488                .relation(
1489                    "lines",
1490                    GraphNode::new("OrderLine")
1491                        .value("name", "line")
1492                        .value("product_id", 3_u64),
1493                ),
1494        )
1495        .await.unwrap();
1496
1497        let events = events.lock().unwrap();
1498        assert_eq!(events.len(), 2);
1499        assert_eq!(events[0].kind, EntityEventKind::Updated);
1500        assert_eq!(events[0].entity, "Order");
1501        assert_eq!(events[1].kind, EntityEventKind::Created);
1502        assert_eq!(events[1].entity, "OrderLine");
1503        assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1504    }
1505
1506    #[tokio::test]
1507    async fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1508        let mut ctx = UserContext::new()
1509            .with_metadata(
1510                InMemoryMetadataStore::new()
1511                    .with_entity(entity())
1512                    .with_entity(line_entity())
1513                    .with_entity(product_entity()),
1514            )
1515            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1516            .with_internal_id_generator(SequentialIdGenerator::new(500));
1517        ctx.insert_resource(PostgresDialect);
1518        ctx.insert_resource(StubExecutor {
1519            affected: 1,
1520            rows: vec![Record::from([
1521                ("id".to_owned(), Value::U64(1)),
1522                ("version".to_owned(), Value::I64(1)),
1523                ("name".to_owned(), Value::Text("old".to_owned())),
1524            ])],
1525        });
1526
1527        let plan = ctx
1528            .plan_for_save_graph::< StubExecutor>(
1529                GraphNode::new("Order")
1530                    .value("id", 1_u64)
1531                    .value("version", 1_i64)
1532                    .value("name", "updated")
1533                    .relation(
1534                        "lines",
1535                        GraphNode::new("OrderLine")
1536                            .value("name", "new-line-a")
1537                            .value("product_id", 2_u64),
1538                    )
1539                    .relation(
1540                        "lines",
1541                        GraphNode::new("OrderLine")
1542                            .value("name", "new-line-b")
1543                            .value("product_id", 2_u64),
1544                    )
1545                    .relation(
1546                        "lines",
1547                        GraphNode::new("OrderLine")
1548                            .value("id", 5_u64)
1549                            .value("version", 1_i64)
1550                            .value("name", "same-update-a"),
1551                    )
1552                    .relation(
1553                        "lines",
1554                        GraphNode::new("OrderLine")
1555                            .value("id", 6_u64)
1556                            .value("version", 1_i64)
1557                            .value("name", "same-update-b"),
1558                    )
1559                    .relation(
1560                        "lines",
1561                        GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1562                    )
1563                    .relation(
1564                        "lines",
1565                        GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1566                    ),
1567            )
1568            .await.unwrap();
1569        let counts = plan.grouped_counts();
1570
1571        assert_eq!(
1572            counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1573            Some(&1)
1574        );
1575        assert_eq!(
1576            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1577            Some(&2)
1578        );
1579        assert_eq!(
1580            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1581            Some(&2)
1582        );
1583        assert_eq!(
1584            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1585            Some(&1)
1586        );
1587        assert_eq!(
1588            counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1589            Some(&1)
1590        );
1591        let create_batch = plan
1592            .batches
1593            .iter()
1594            .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1595            .unwrap();
1596        assert_eq!(create_batch.items.len(), 2);
1597        assert_eq!(
1598            create_batch.items[0].values.get("id"),
1599            Some(&Value::U64(500))
1600        );
1601        assert_eq!(
1602            create_batch.items[1].values.get("id"),
1603            Some(&Value::U64(501))
1604        );
1605        let update_batch = plan
1606            .batches
1607            .iter()
1608            .find(|batch| {
1609                batch.entity == "OrderLine"
1610                    && batch.kind == GraphMutationKind::Update
1611                    && batch.update_fields == vec!["name".to_owned()]
1612            })
1613            .unwrap();
1614        assert_eq!(update_batch.items.len(), 2);
1615    }
1616
1617    #[tokio::test]
1618    async fn resolved_repository_builds_relation_plans() {
1619        let mut ctx = UserContext::new()
1620            .with_metadata(
1621                InMemoryMetadataStore::new()
1622                    .with_entity(entity())
1623                    .with_entity(line_entity())
1624                    .with_entity(product_entity()),
1625            )
1626            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1627            .with_repository_behavior_registry(
1628                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1629            );
1630        ctx.insert_resource(PostgresDialect);
1631        ctx.insert_resource(StubExecutor {
1632            affected: 1,
1633            rows: Vec::new(),
1634        });
1635
1636        let repo = ctx
1637            .resolve_repository::< StubExecutor>("Order")
1638            .unwrap();
1639        let plans = repo.relation_plans().unwrap();
1640
1641        assert_eq!(plans.len(), 1);
1642        assert_eq!(plans[0].relation_name, "lines");
1643        assert_eq!(plans[0].target_entity, "OrderLine");
1644        assert_eq!(plans[0].local_key, "id");
1645        assert_eq!(plans[0].foreign_key, "order_id");
1646        assert!(plans[0].many);
1647    }
1648
1649    #[tokio::test]
1650    async fn resolved_repository_builds_relation_query_from_parent_rows() {
1651        let mut ctx = UserContext::new()
1652            .with_metadata(
1653                InMemoryMetadataStore::new()
1654                    .with_entity(entity())
1655                    .with_entity(line_entity())
1656                    .with_entity(product_entity()),
1657            )
1658            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1659            .with_repository_behavior_registry(
1660                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1661            );
1662        ctx.insert_resource(PostgresDialect);
1663        ctx.insert_resource(StubExecutor {
1664            affected: 1,
1665            rows: Vec::new(),
1666        });
1667
1668        let repo = ctx
1669            .resolve_repository::< StubExecutor>("Order")
1670            .unwrap();
1671        let parent_rows = vec![
1672            Record::from([(String::from("id"), Value::U64(11))]),
1673            Record::from([(String::from("id"), Value::U64(12))]),
1674        ];
1675
1676        let query = repo.relation_query("lines", &parent_rows).unwrap();
1677        // let compiled = repo.compile(&query).unwrap();
1678        // assert!(compiled.sql.contains("FROM orderline"));
1679        // assert!(compiled.sql.contains("order_id IN ($1, $2)"));
1680        // assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1681    }
1682
1683    #[tokio::test]
1684    async fn resolved_repository_enhances_parent_rows_with_relations() {
1685        let mut ctx = UserContext::new()
1686            .with_metadata(
1687                InMemoryMetadataStore::new()
1688                    .with_entity(entity())
1689                    .with_entity(line_entity())
1690                    .with_entity(product_entity()),
1691            )
1692            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1693            .with_repository_behavior_registry(
1694                InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1695            );
1696        ctx.insert_resource(PostgresDialect);
1697        ctx.insert_resource(StubExecutor {
1698            affected: 1,
1699            rows: vec![
1700                Record::from([
1701                    (String::from("id"), Value::U64(101)),
1702                    (String::from("order_id"), Value::U64(11)),
1703                    (String::from("name"), Value::Text(String::from("l1"))),
1704                ]),
1705                Record::from([
1706                    (String::from("id"), Value::U64(102)),
1707                    (String::from("order_id"), Value::U64(11)),
1708                    (String::from("name"), Value::Text(String::from("l2"))),
1709                ]),
1710                Record::from([
1711                    (String::from("id"), Value::U64(201)),
1712                    (String::from("order_id"), Value::U64(12)),
1713                    (String::from("name"), Value::Text(String::from("l3"))),
1714                ]),
1715            ],
1716        });
1717
1718        let repo = ctx
1719            .resolve_repository::< StubExecutor>("Order")
1720            .unwrap();
1721        let mut parents = vec![
1722            Record::from([(String::from("id"), Value::U64(11))]),
1723            Record::from([(String::from("id"), Value::U64(12))]),
1724        ];
1725
1726        repo.enhance_relations(&mut parents).await.unwrap();
1727
1728        match parents[0].get("lines") {
1729            Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1730            other => panic!("unexpected lines payload: {other:?}"),
1731        }
1732        match parents[1].get("lines") {
1733            Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1734            other => panic!("unexpected lines payload: {other:?}"),
1735        }
1736    }
1737
1738    #[tokio::test]
1739    async fn relation_enhancement_wraps_inverse_many_relation_as_list() {
1740        let mut ctx = UserContext::new()
1741            .with_metadata(
1742                InMemoryMetadataStore::new()
1743                    .with_entity(OrderLineWithProductEntityRow::entity_descriptor())
1744                    .with_entity(ProductWithLinesEntityRow::entity_descriptor()),
1745            )
1746            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("OrderLine"));
1747        ctx.insert_resource(PostgresDialect);
1748        ctx.insert_resource(QueueExecutor {
1749            affected: 1,
1750            rows: Mutex::new(VecDeque::from([
1751                vec![Record::from([
1752                    (String::from("id"), Value::U64(11)),
1753                    (String::from("order_id"), Value::U64(7)),
1754                    (String::from("name"), Value::Text(String::from("line"))),
1755                    (String::from("product_id"), Value::U64(101)),
1756                ])],
1757                vec![Record::from([
1758                    (String::from("id"), Value::U64(101)),
1759                    (String::from("name"), Value::Text(String::from("sku"))),
1760                ])],
1761            ])),
1762            queries: Mutex::new(Vec::new()),
1763        });
1764
1765        let repo = ctx
1766            .resolve_repository::< QueueExecutor>("OrderLine")
1767            .unwrap();
1768        let rows = repo
1769            .fetch_enhanced_entities::<OrderLineWithProductEntityRow>(
1770                &SelectQuery::new("OrderLine").relation("product"),
1771            )
1772            .await.unwrap();
1773
1774        let product = rows.data[0].product.as_ref().unwrap();
1775        assert_eq!(product.lines.data.len(), 1);
1776        assert_eq!(product.lines.data[0].id, 11);
1777    }
1778
1779    #[tokio::test]
1780    async fn resolved_repository_fetches_smart_list_of_entities() {
1781        let mut ctx = UserContext::new()
1782            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1783            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1784        ctx.insert_resource(PostgresDialect);
1785        ctx.insert_resource(StubExecutor {
1786            affected: 1,
1787            rows: vec![Record::from([
1788                (String::from("id"), Value::U64(7)),
1789                (String::from("version"), Value::I64(2)),
1790                (String::from("name"), Value::Text(String::from("typed"))),
1791            ])],
1792        });
1793
1794        let repo = ctx
1795            .resolve_repository::< StubExecutor>("Order")
1796            .unwrap();
1797        let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).await.unwrap();
1798
1799        assert_eq!(rows.len(), 1);
1800        assert_eq!(
1801            rows.first(),
1802            Some(&OrderEntity {
1803                id: 7,
1804                version: 2,
1805                name: String::from("typed"),
1806            })
1807        );
1808    }
1809
1810    #[tokio::test]
1811    async fn resolved_repository_fetches_smart_list_of_derived_entities() {
1812        let mut ctx = UserContext::new()
1813            .with_metadata(
1814                InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1815            )
1816            .with_repository_registry(
1817                InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1818            );
1819        ctx.insert_resource(PostgresDialect);
1820        ctx.insert_resource(StubExecutor {
1821            affected: 1,
1822            rows: vec![Record::from([
1823                (String::from("id"), Value::U64(9)),
1824                (String::from("name"), Value::Text(String::from("derived"))),
1825            ])],
1826        });
1827
1828        let repo = ctx
1829            .resolve_repository::< StubExecutor>("CatalogProduct")
1830            .unwrap();
1831        let rows = repo
1832            .fetch_entities::<CatalogProductRow>(&repo.select())
1833            .await.unwrap();
1834
1835        assert_eq!(rows.len(), 1);
1836        assert_eq!(
1837            rows.first(),
1838            Some(&CatalogProductRow {
1839                id: 9,
1840                name: String::from("derived"),
1841            })
1842        );
1843    }
1844
1845    #[tokio::test]
1846    async fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1847        let mut ctx = UserContext::new()
1848            .with_metadata(
1849                InMemoryMetadataStore::new()
1850                    .with_entity(OrderAggregateDynamic::entity_descriptor()),
1851            )
1852            .with_repository_registry(
1853                InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1854            );
1855        ctx.insert_resource(PostgresDialect);
1856        ctx.insert_resource(StubExecutor {
1857            affected: 1,
1858            rows: vec![Record::from([
1859                (String::from("id"), Value::U64(1)),
1860                (String::from("lineCount"), Value::I64(3)),
1861                (String::from("amount"), Value::F64(18.5)),
1862            ])],
1863        });
1864
1865        let repo = ctx
1866            .resolve_repository::< StubExecutor>("OrderAggregate")
1867            .unwrap();
1868        let rows = repo
1869            .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1870            .await.unwrap();
1871
1872        assert_eq!(rows.len(), 1);
1873        assert_eq!(rows.data[0].id, 1);
1874        assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1875        assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1876        assert_eq!(
1877            rows.into_vec().into_iter().next().unwrap().into_json(),
1878            serde_json::json!({
1879                "id": 1,
1880                "lineCount": 3,
1881                "amount": 18.5
1882            })
1883        );
1884    }
1885
1886    #[tokio::test]
1887    async fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1888        let executor = QueueExecutor {
1889            affected: 1,
1890            rows: Mutex::new(VecDeque::from([
1891                vec![
1892                    Record::from([
1893                        (String::from("id"), Value::U64(1)),
1894                        (String::from("version"), Value::I64(1)),
1895                        (String::from("name"), Value::Text(String::from("first"))),
1896                    ]),
1897                    Record::from([
1898                        (String::from("id"), Value::U64(2)),
1899                        (String::from("version"), Value::I64(1)),
1900                        (String::from("name"), Value::Text(String::from("second"))),
1901                    ]),
1902                ],
1903                vec![Record::from([
1904                    (String::from("order_id"), Value::U64(1)),
1905                    (String::from("lineCount"), Value::I64(3)),
1906                ])],
1907            ])),
1908            queries: Mutex::new(Vec::new()),
1909        };
1910        let mut ctx = UserContext::new()
1911            .with_metadata(
1912                InMemoryMetadataStore::new()
1913                    .with_entity(entity())
1914                    .with_entity(line_entity()),
1915            )
1916            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1917        ctx.insert_resource(PostgresDialect);
1918        ctx.insert_resource(executor);
1919
1920        let repo = ctx
1921            .resolve_repository::< QueueExecutor>("Order")
1922            .unwrap();
1923        let rows = repo
1924            .fetch_all_with_relation_aggregates(
1925                &repo
1926                    .select()
1927                    .project("id")
1928                    .project("version")
1929                    .project("name"),
1930                &[RelationAggregate::new(
1931                    "lines",
1932                    "lineCount",
1933                    SelectQuery::new("OrderLine"),
1934                    true,
1935                )],
1936            )
1937            .await.unwrap();
1938
1939        assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1940        assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1941
1942        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1943        let queries = executor.queries.lock().unwrap();
1944        assert_eq!(queries.len(), 2);
1945        assert_eq!(
1946            queries[1],
1947            "SELECT ... FROM OrderLine ..."
1948        );
1949    }
1950
1951    #[tokio::test]
1952    async fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1953        let mut line = line_entity();
1954        line.properties
1955            .iter_mut()
1956            .find(|property| property.name == "order_id")
1957            .unwrap()
1958            .column_name = "order_ref".to_owned();
1959        let executor = QueueExecutor {
1960            affected: 1,
1961            rows: Mutex::new(VecDeque::from([
1962                vec![Record::from([
1963                    (String::from("id"), Value::U64(1)),
1964                    (String::from("version"), Value::I64(1)),
1965                    (String::from("name"), Value::Text(String::from("first"))),
1966                ])],
1967                vec![Record::from([
1968                    (String::from("order_ref"), Value::I64(1)),
1969                    (String::from("lineCount"), Value::I64(3)),
1970                ])],
1971            ])),
1972            queries: Mutex::new(Vec::new()),
1973        };
1974        let mut ctx = UserContext::new()
1975            .with_metadata(
1976                InMemoryMetadataStore::new()
1977                    .with_entity(entity())
1978                    .with_entity(line),
1979            )
1980            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1981        ctx.insert_resource(PostgresDialect);
1982        ctx.insert_resource(executor);
1983
1984        let repo = ctx
1985            .resolve_repository::< QueueExecutor>("Order")
1986            .unwrap();
1987        let rows = repo
1988            .fetch_all_with_relation_aggregates(
1989                &repo
1990                    .select()
1991                    .project("id")
1992                    .project("version")
1993                    .project("name"),
1994                &[RelationAggregate::new(
1995                    "lines",
1996                    "lineCount",
1997                    SelectQuery::new("OrderLine"),
1998                    true,
1999                )],
2000            )
2001            .await.unwrap();
2002
2003        assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
2004        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2005        assert_eq!(
2006            executor.queries.lock().unwrap()[1],
2007            "SELECT ... FROM OrderLine ..."
2008        );
2009    }
2010
2011    #[tokio::test]
2012    async fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
2013        let executor = QueueExecutor {
2014            affected: 1,
2015            rows: Mutex::new(VecDeque::from([vec![Record::from([(
2016                String::from("count"),
2017                Value::I64(2),
2018            )])]])),
2019            queries: Mutex::new(Vec::new()),
2020        };
2021        let mut ctx = UserContext::new()
2022            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2023            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2024        ctx.insert_resource(PostgresDialect);
2025        ctx.insert_resource(executor);
2026        ctx.insert_resource(InMemoryAggregationCache::default());
2027
2028        let repo = ctx
2029            .resolve_repository::< QueueExecutor>("Order")
2030            .unwrap();
2031        let query = repo
2032            .select()
2033            .count("count")
2034            .enable_aggregation_cache_for(60_000);
2035
2036        let first = repo.fetch_all(&query).await.unwrap();
2037        let second = repo.fetch_all(&query).await.unwrap();
2038
2039        assert_eq!(first, second);
2040        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2041        assert_eq!(executor.queries.lock().unwrap().len(), 1);
2042    }
2043
2044    #[tokio::test]
2045    async fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
2046        let executor = QueueExecutor {
2047            affected: 1,
2048            rows: Mutex::new(VecDeque::from([
2049                vec![Record::from([(String::from("count"), Value::I64(2))])],
2050                vec![Record::from([(String::from("count"), Value::I64(3))])],
2051            ])),
2052            queries: Mutex::new(Vec::new()),
2053        };
2054        let mut ctx = UserContext::new()
2055            .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2056            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2057        ctx.insert_resource(PostgresDialect);
2058        ctx.insert_resource(executor);
2059        ctx.insert_resource(
2060            Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
2061                as Arc<dyn AggregationCacheBackend>,
2062        );
2063
2064        let repo = ctx
2065            .resolve_repository::< QueueExecutor>("Order")
2066            .unwrap();
2067        let query = repo
2068            .select()
2069            .count("count")
2070            .enable_aggregation_cache_for(60_000);
2071
2072        let first = repo.fetch_all(&query).await.unwrap();
2073        let cached = repo.fetch_all(&query).await.unwrap();
2074        repo.insert(
2075            &InsertCommand::new("Order")
2076                .value("id", 9_u64)
2077                .value("version", 1_i64)
2078                .value("name", "new"),
2079        )
2080        .await.unwrap();
2081        let refreshed = repo.fetch_all(&query).await.unwrap();
2082
2083        assert_eq!(first, cached);
2084        assert_ne!(cached, refreshed);
2085        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2086        assert_eq!(executor.queries.lock().unwrap().len(), 2);
2087    }
2088
2089    #[tokio::test]
2090    async fn aggregation_cache_propagates_to_relation_aggregates() {
2091        let parent_rows = vec![
2092            Record::from([
2093                (String::from("id"), Value::U64(1)),
2094                (String::from("version"), Value::I64(1)),
2095                (String::from("name"), Value::Text(String::from("first"))),
2096            ]),
2097            Record::from([
2098                (String::from("id"), Value::U64(2)),
2099                (String::from("version"), Value::I64(1)),
2100                (String::from("name"), Value::Text(String::from("second"))),
2101            ]),
2102        ];
2103        let aggregate_rows = vec![Record::from([
2104            (String::from("order_id"), Value::U64(1)),
2105            (String::from("lineCount"), Value::I64(3)),
2106        ])];
2107        let executor = QueueExecutor {
2108            affected: 1,
2109            rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
2110            queries: Mutex::new(Vec::new()),
2111        };
2112        let mut ctx = UserContext::new()
2113            .with_metadata(
2114                InMemoryMetadataStore::new()
2115                    .with_entity(entity())
2116                    .with_entity(line_entity()),
2117            )
2118            .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2119        ctx.insert_resource(PostgresDialect);
2120        ctx.insert_resource(executor);
2121        ctx.insert_resource(InMemoryAggregationCache::default());
2122
2123        let repo = ctx
2124            .resolve_repository::< QueueExecutor>("Order")
2125            .unwrap();
2126        let query = repo
2127            .select()
2128            .project("id")
2129            .project("version")
2130            .project("name")
2131            .enable_aggregation_cache_for(60_000)
2132            .propagate_aggregation_cache(60_000);
2133        let aggregate =
2134            RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2135
2136        let first = repo
2137            .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
2138            .await.unwrap();
2139        let second = repo
2140            .fetch_all_with_relation_aggregates(&query, &[aggregate])
2141            .await.unwrap();
2142
2143        assert_eq!(first, second);
2144        let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2145        assert_eq!(executor.queries.lock().unwrap().len(), 2);
2146    }
2147
2148    #[tokio::test]
2149    async fn memory_repository_fetches_smart_list_entities_with_query_features() {
2150        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2151        let repository = MemoryRepository::new(metadata).with_rows(
2152            "Order",
2153            vec![
2154                Record::from([
2155                    (String::from("id"), Value::U64(1)),
2156                    (String::from("version"), Value::I64(1)),
2157                    (String::from("name"), Value::Text(String::from("alpha"))),
2158                ]),
2159                Record::from([
2160                    (String::from("id"), Value::U64(2)),
2161                    (String::from("version"), Value::I64(1)),
2162                    (String::from("name"), Value::Text(String::from("beta"))),
2163                ]),
2164                Record::from([
2165                    (String::from("id"), Value::U64(3)),
2166                    (String::from("version"), Value::I64(1)),
2167                    (String::from("name"), Value::Text(String::from("gamma"))),
2168                ]),
2169            ],
2170        );
2171
2172        let query = teaql_core::SelectQuery::new("Order")
2173            .filter(Expr::Binary {
2174                left: Box::new(Expr::column("id")),
2175                op: teaql_core::BinaryOp::Gte,
2176                right: Box::new(Expr::value(2_u64)),
2177            })
2178            .order_by(OrderBy::desc("id"))
2179            .limit(1);
2180
2181        let orders = repository.fetch_entities::<Order>(&query).unwrap();
2182
2183        assert_eq!(orders.ids(), vec![Value::U64(3)]);
2184        assert_eq!(orders.versions(), vec![1]);
2185        assert_eq!(orders.first().unwrap().name, "gamma");
2186    }
2187
2188    #[tokio::test]
2189    async fn memory_repository_runs_relation_aggregates() {
2190        let metadata = InMemoryMetadataStore::new()
2191            .with_entity(entity())
2192            .with_entity(line_entity());
2193
2194        let repository = MemoryRepository::new(metadata)
2195            .with_rows(
2196                "Order",
2197                vec![
2198                    Record::from([
2199                        (String::from("id"), Value::U64(1)),
2200                        (String::from("version"), Value::I64(1)),
2201                        (String::from("name"), Value::Text(String::from("first"))),
2202                    ]),
2203                    Record::from([
2204                        (String::from("id"), Value::U64(2)),
2205                        (String::from("version"), Value::I64(1)),
2206                        (String::from("name"), Value::Text(String::from("second"))),
2207                    ]),
2208                ],
2209            )
2210            .with_rows(
2211                "OrderLine",
2212                vec![
2213                    Record::from([
2214                        (String::from("id"), Value::U64(10)),
2215                        (String::from("version"), Value::I64(1)),
2216                        (String::from("order_id"), Value::U64(1)),
2217                        (String::from("name"), Value::Text(String::from("line1"))),
2218                    ]),
2219                    Record::from([
2220                        (String::from("id"), Value::U64(11)),
2221                        (String::from("version"), Value::I64(1)),
2222                        (String::from("order_id"), Value::U64(1)),
2223                        (String::from("name"), Value::Text(String::from("line2"))),
2224                    ]),
2225                    Record::from([
2226                        (String::from("id"), Value::U64(12)),
2227                        (String::from("version"), Value::I64(1)),
2228                        (String::from("order_id"), Value::U64(2)),
2229                        (String::from("name"), Value::Text(String::from("line3"))),
2230                    ]),
2231                ],
2232            );
2233
2234        let query = SelectQuery::new("Order").project("id").project("name");
2235        let aggregate = RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2236
2237        let rows = repository
2238            .fetch_all_with_relation_aggregates(&query, &[aggregate])
2239            .unwrap();
2240
2241        assert_eq!(rows.len(), 2);
2242
2243        let first_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(1))).unwrap();
2244        assert_eq!(first_order.get("lineCount"), Some(&Value::U64(2)));
2245
2246        let second_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(2))).unwrap();
2247        assert_eq!(second_order.get("lineCount"), Some(&Value::U64(1)));
2248    }
2249
2250    #[tokio::test]
2251    async fn memory_repository_runs_aggregates() {
2252        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2253        let repository = MemoryRepository::new(metadata).with_rows(
2254            "Order",
2255            vec![
2256                Record::from([
2257                    (String::from("id"), Value::U64(1)),
2258                    (String::from("version"), Value::I64(1)),
2259                    (String::from("name"), Value::Text(String::from("alpha"))),
2260                ]),
2261                Record::from([
2262                    (String::from("id"), Value::U64(2)),
2263                    (String::from("version"), Value::I64(2)),
2264                    (String::from("name"), Value::Text(String::from("beta"))),
2265                ]),
2266            ],
2267        );
2268
2269        let query = teaql_core::SelectQuery {
2270            entity: String::from("Order"),
2271            projection: Vec::new(),
2272            expr_projection: Vec::new(),
2273            filter: None,
2274            having: None,
2275            order_by: Vec::new(),
2276            slice: None,
2277            trace_chain: Vec::new(),
2278            aggregates: vec![
2279                Aggregate {
2280                    function: AggregateFunction::Count,
2281                    field: String::from("id"),
2282                    alias: String::from("count"),
2283                },
2284                Aggregate {
2285                    function: AggregateFunction::Sum,
2286                    field: String::from("version"),
2287                    alias: String::from("versionSum"),
2288                },
2289            ],
2290            group_by: Vec::new(),
2291            relations: Vec::new(),
2292            aggregation_cache: None,
2293            comment: None,
2294            raw_sql: None,
2295            raw_sql_search_criteria: Vec::new(),
2296            dynamic_properties: Vec::new(),
2297            raw_projections: Vec::new(),
2298            object_group_bys: Vec::new(),
2299            child_enhancements: Vec::new(),
2300        };
2301
2302        let rows = repository.fetch_all(&query).unwrap();
2303
2304        assert_eq!(rows.len(), 1);
2305        assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
2306        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2307    }
2308
2309    #[tokio::test]
2310    async fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
2311        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2312        let repository = MemoryRepository::new(metadata).with_rows(
2313            "Order",
2314            vec![
2315                Record::from([
2316                    (String::from("id"), Value::U64(1)),
2317                    (String::from("version"), Value::I64(1)),
2318                    (String::from("name"), Value::Text(String::from("alpha"))),
2319                ]),
2320                Record::from([
2321                    (String::from("id"), Value::U64(2)),
2322                    (String::from("version"), Value::I64(2)),
2323                    (String::from("name"), Value::Text(String::from("alpha"))),
2324                ]),
2325                Record::from([
2326                    (String::from("id"), Value::U64(3)),
2327                    (String::from("version"), Value::I64(3)),
2328                    (String::from("name"), Value::Text(String::from("tmp-beta"))),
2329                ]),
2330            ],
2331        );
2332
2333        let rows = repository
2334            .fetch_all(
2335                &teaql_core::SelectQuery::new("Order")
2336                    .filter(
2337                        Expr::between("version", 1_i64, 3_i64)
2338                            .and_expr(Expr::not_like("name", "tmp%"))
2339                            .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
2340                    )
2341                    .group_by("name")
2342                    .count("total")
2343                    .sum("version", "versionSum"),
2344            )
2345            .unwrap();
2346
2347        assert_eq!(rows.len(), 1);
2348        assert_eq!(
2349            rows[0].get("name"),
2350            Some(&Value::Text(String::from("alpha")))
2351        );
2352        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2353        assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2354    }
2355
2356    #[tokio::test]
2357    async fn memory_repository_runs_extended_aggregates_and_having() {
2358        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2359        let repository = MemoryRepository::new(metadata).with_rows(
2360            "Order",
2361            vec![
2362                Record::from([
2363                    (String::from("id"), Value::U64(1)),
2364                    (String::from("version"), Value::I64(1)),
2365                    (String::from("name"), Value::Text(String::from("alpha"))),
2366                ]),
2367                Record::from([
2368                    (String::from("id"), Value::U64(2)),
2369                    (String::from("version"), Value::I64(3)),
2370                    (String::from("name"), Value::Text(String::from("alpha"))),
2371                ]),
2372                Record::from([
2373                    (String::from("id"), Value::U64(3)),
2374                    (String::from("version"), Value::I64(7)),
2375                    (String::from("name"), Value::Text(String::from("beta"))),
2376                ]),
2377            ],
2378        );
2379
2380        let rows = repository
2381            .fetch_all(
2382                &teaql_core::SelectQuery::new("Order")
2383                    .group_by("name")
2384                    .count("total")
2385                    .stddev("version", "stddevVersion")
2386                    .var_pop("version", "varPopVersion")
2387                    .bit_or("version", "bitOrVersion")
2388                    .having(Expr::gt("total", 1_i64)),
2389            )
2390            .unwrap();
2391
2392        assert_eq!(rows.len(), 1);
2393        assert_eq!(
2394            rows[0].get("name"),
2395            Some(&Value::Text(String::from("alpha")))
2396        );
2397        assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2398        assert_eq!(
2399            rows[0].get("stddevVersion").map(Value::to_json_value),
2400            Some(serde_json::Value::String(
2401                "1.4142135623730951454746218583".to_owned()
2402            ))
2403        );
2404        assert_eq!(
2405            rows[0].get("varPopVersion"),
2406            Some(&Value::Decimal(Decimal::ONE))
2407        );
2408        assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
2409    }
2410
2411    #[tokio::test]
2412    async fn memory_repository_runs_sound_like_filter() {
2413        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2414        let repository = MemoryRepository::new(metadata).with_rows(
2415            "Order",
2416            vec![
2417                Record::from([
2418                    (String::from("id"), Value::U64(1)),
2419                    (String::from("version"), Value::I64(1)),
2420                    (String::from("name"), Value::Text(String::from("Robert"))),
2421                ]),
2422                Record::from([
2423                    (String::from("id"), Value::U64(2)),
2424                    (String::from("version"), Value::I64(1)),
2425                    (String::from("name"), Value::Text(String::from("Rupert"))),
2426                ]),
2427                Record::from([
2428                    (String::from("id"), Value::U64(3)),
2429                    (String::from("version"), Value::I64(1)),
2430                    (String::from("name"), Value::Text(String::from("Ashcraft"))),
2431                ]),
2432            ],
2433        );
2434
2435        let rows = repository
2436            .fetch_all(
2437                &teaql_core::SelectQuery::new("Order")
2438                    .filter(Expr::sound_like("name", "Robert"))
2439                    .order_asc("id"),
2440            )
2441            .unwrap();
2442
2443        assert_eq!(rows.len(), 2);
2444        assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2445        assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2446    }
2447
2448    #[tokio::test]
2449    async fn memory_repository_runs_java_style_string_match_filters() {
2450        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2451        let repository = MemoryRepository::new(metadata).with_rows(
2452            "Order",
2453            vec![
2454                Record::from([
2455                    (String::from("id"), Value::U64(1)),
2456                    (String::from("version"), Value::I64(1)),
2457                    (String::from("name"), Value::Text(String::from("tea-order"))),
2458                ]),
2459                Record::from([
2460                    (String::from("id"), Value::U64(2)),
2461                    (String::from("version"), Value::I64(1)),
2462                    (
2463                        String::from("name"),
2464                        Value::Text(String::from("coffee-order")),
2465                    ),
2466                ]),
2467                Record::from([
2468                    (String::from("id"), Value::U64(3)),
2469                    (String::from("version"), Value::I64(1)),
2470                    (
2471                        String::from("name"),
2472                        Value::Text(String::from("tea-archived")),
2473                    ),
2474                ]),
2475            ],
2476        );
2477
2478        let rows = repository
2479            .fetch_all(
2480                &teaql_core::SelectQuery::new("Order")
2481                    .filter(
2482                        Expr::contain("name", "tea")
2483                            .and_expr(Expr::begin_with("name", "tea"))
2484                            .and_expr(Expr::end_with("name", "order"))
2485                            .and_expr(Expr::not_contain("name", "coffee"))
2486                            .and_expr(Expr::not_begin_with("name", "archived"))
2487                            .and_expr(Expr::not_end_with("name", "draft")),
2488                    )
2489                    .order_asc("id"),
2490            )
2491            .unwrap();
2492
2493        assert_eq!(rows.len(), 1);
2494        assert_eq!(
2495            rows[0].get("name"),
2496            Some(&Value::Text("tea-order".to_owned()))
2497        );
2498    }
2499
2500    #[tokio::test]
2501    async fn memory_repository_runs_property_to_property_filters() {
2502        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2503        let repository = MemoryRepository::new(metadata).with_rows(
2504            "Order",
2505            vec![
2506                Record::from([
2507                    (String::from("id"), Value::U64(1)),
2508                    (String::from("version"), Value::I64(2)),
2509                    (String::from("name"), Value::Text(String::from("keep"))),
2510                ]),
2511                Record::from([
2512                    (String::from("id"), Value::U64(2)),
2513                    (String::from("version"), Value::I64(1)),
2514                    (String::from("name"), Value::Text(String::from("skip"))),
2515                ]),
2516            ],
2517        );
2518
2519        let rows = repository
2520            .fetch_all(
2521                &teaql_core::SelectQuery::new("Order")
2522                    .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2523                    .order_asc("id"),
2524            )
2525            .unwrap();
2526
2527        assert_eq!(rows.len(), 1);
2528        assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2529    }
2530
2531    #[tokio::test]
2532    async fn memory_repository_supports_mutations_and_optimistic_locking() {
2533        let metadata = InMemoryMetadataStore::new().with_entity(entity());
2534        let repository = MemoryRepository::new(metadata);
2535
2536        repository
2537            .insert(
2538                &InsertCommand::new("Order")
2539                    .value("id", 10_u64)
2540                    .value("version", 1_i64)
2541                    .value("name", "draft"),
2542            )
2543            .unwrap();
2544        repository
2545            .update(
2546                &UpdateCommand::new("Order", 10_u64)
2547                    .expected_version(1)
2548                    .value("name", "submitted"),
2549            )
2550            .unwrap();
2551
2552        let row = repository
2553            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2554            .unwrap()
2555            .pop()
2556            .unwrap();
2557        assert_eq!(
2558            row.get("name"),
2559            Some(&Value::Text(String::from("submitted")))
2560        );
2561        assert_eq!(row.get("version"), Some(&Value::I64(2)));
2562
2563        let conflict = repository
2564            .update(
2565                &UpdateCommand::new("Order", 10_u64)
2566                    .expected_version(1)
2567                    .value("name", "stale"),
2568            )
2569            .unwrap_err();
2570        assert!(matches!(
2571            conflict,
2572            RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2573        ));
2574
2575        repository
2576            .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2577            .unwrap();
2578        let row = repository
2579            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2580            .unwrap()
2581            .pop()
2582            .unwrap();
2583        assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2584
2585        repository
2586            .recover(&RecoverCommand::new("Order", 10_u64, -3))
2587            .unwrap();
2588        let row = repository
2589            .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2590            .unwrap()
2591            .pop()
2592            .unwrap();
2593        assert_eq!(row.get("version"), Some(&Value::I64(4)));
2594    }
2595
2596    #[tokio::test]
2597    async fn user_context_reports_missing_schema_provider() {
2598        let err = UserContext::new().ensure_schema().await.unwrap_err();
2599        assert!(
2600            matches!(err, RuntimeError::Schema(message) if message == "missing schema provider")
2601        );
2602    }
2603
2604    #[tokio::test]
2605    async fn user_context_stores_and_exposes_user_identifier() {
2606        let mut ctx = UserContext::new();
2607        let pid = std::process::id();
2608        let thread_id_str = format!("{:?}", std::thread::current().id());
2609        let numeric_thread_id = thread_id_str
2610            .strip_prefix("ThreadId(")
2611            .and_then(|s| s.strip_suffix(")"))
2612            .unwrap_or(&thread_id_str);
2613        let os_user = std::env::var("USER")
2614            .or_else(|_| std::env::var("USERNAME"))
2615            .unwrap_or_else(|_| "main".to_owned());
2616        let expected_default = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
2617        assert_eq!(ctx.user_identifier(), Some(expected_default.as_str()));
2618
2619        ctx.set_user_identifier("user-123");
2620        assert_eq!(ctx.user_identifier(), Some("user-123"));
2621
2622        let ctx2 = UserContext::new().with_user_identifier("user-456");
2623        assert_eq!(ctx2.user_identifier(), Some("user-456"));
2624
2625        let mut ctx3 = UserContext::new();
2626        ctx3.set_user_identifier_option(Some("user-789".to_owned()));
2627        assert_eq!(ctx3.user_identifier(), Some("user-789"));
2628        ctx3.set_user_identifier_option(None);
2629        assert_eq!(ctx3.user_identifier(), None);
2630
2631        let ctx4 = UserContext::new().with_user_identifier_option(Some("user-abc".to_owned()));
2632        assert_eq!(ctx4.user_identifier(), Some("user-abc"));
2633
2634    }
2635}
2636
2637pub use checker::{
2638    CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
2639    CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, TypedChecker,
2640    TypedEntityChecker, clear_record_status, mark_record_status,
2641};