1#![allow(warnings)]
2mod checker;
3mod context;
4mod entity_runtime;
5mod entity_status;
6mod error;
7mod event;
8mod graph;
9mod id;
10mod language;
11mod memory;
12mod registry;
13mod repository;
14
15pub use context::{
16 InfoLogEntry, LogPayload, SchemaProvider, SqlLogEntry, SqlLogOperation,
17 SqlLogOptions, UnifiedLogBuffer, UnifiedLogEntry, UserContext,
18};
19pub use entity_status::{EntityAction, EntityStatus};
20pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
21pub use error::{ContextError, RepositoryError, RuntimeError};
22pub use event::{
23 EntityEvent, EntityEventKind, EntityEventSink, EntityPropertyChange, InMemoryEntityEventSink,
24};
25pub use graph::{
26 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem,
27 GraphNode, GraphOperation, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
28};
29pub(crate) use id::local_id_generator;
30pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
31pub use language::{
32 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
33};
34pub use memory::{MemoryRepository, MemoryRepositoryError};
35pub use registry::{
36 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
37 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
38 RequestPolicy, RuntimeModule,
39};
40pub use repository::{
41 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
42 RelationLoadPlan, Repository, ResolvedRepository,
43};
44
45#[cfg(test)]
46mod tests {
47 use std::collections::{BTreeMap, VecDeque};
48 use std::sync::{Arc, Mutex};
49
50 use super::{
51 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult,
52 CheckResults, CheckRule, Checker, EntityEvent, EntityEventKind, EntityEventSink,
53 GraphMutationKind, GraphNode, InMemoryAggregationCache,
54 InMemoryCheckerRegistry, InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry,
55 InMemoryRepositoryRegistry, InternalIdGenerator, Language, MemoryRepository, MetadataStore,
56 ObjectLocation, Repository, RepositoryBehavior, RepositoryError,
57 RequestPolicy, RuntimeError, RuntimeModule, SqlLogOperation, SqlLogOptions, TypedChecker,
58 TypedEntityChecker, UserContext, translate_check_result,
59 };
60 use teaql_data_service::{
61 DataServiceCapabilities, DataServiceExecutor, ExecutionMetadata, MutationExecutor,
62 MutationRequest, MutationResult, QueryExecutor, QueryRequest, QueryResult, DataServiceOperation,
63 };
64 use teaql_core::{
65 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
66 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
67 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
68 };
69 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
70 use teaql_sql::{CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect, quote_identifier_if_needed};
71
72 const ORDER_DEFAULT_PROJECTION: &str = "id, version, name";
73
74 #[derive(Debug, Default, Clone, Copy)]
75 struct PostgresDialect;
76
77 impl SqlDialect for PostgresDialect {
78 fn kind(&self) -> DatabaseKind {
79 DatabaseKind::PostgreSql
80 }
81
82 fn quote_ident(&self, ident: &str) -> String {
83 quote_identifier_if_needed(ident, '"')
84 }
85
86 fn placeholder(&self, index: usize) -> String {
87 format!("${index}")
88 }
89
90 fn schema_type_sql(
91 &self,
92 data_type: DataType,
93 _property: &PropertyDescriptor,
94 ) -> Result<&'static str, SqlCompileError> {
95 match data_type {
96 DataType::Bool => Ok("BOOLEAN"),
97 DataType::I64 | DataType::U64 => Ok("BIGINT"),
98 DataType::F64 => Ok("DOUBLE PRECISION"),
99 DataType::Decimal => Ok("NUMERIC"),
100 DataType::Text => Ok("TEXT"),
101 DataType::Json => Ok("JSONB"),
102 DataType::Date => Ok("DATE"),
103 DataType::Timestamp => Ok("TIMESTAMPTZ"),
104 }
105 }
106 }
107
108 fn entity() -> EntityDescriptor {
109 EntityDescriptor::new("Order")
110 .table_name("orders")
111 .property(
112 PropertyDescriptor::new("id", DataType::U64)
113 .column_name("id")
114 .id()
115 .not_null(),
116 )
117 .property(
118 PropertyDescriptor::new("version", DataType::I64)
119 .column_name("version")
120 .version()
121 .not_null(),
122 )
123 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
124 .relation(
125 teaql_core::RelationDescriptor::new("lines", "OrderLine")
126 .local_key("id")
127 .foreign_key("order_id")
128 .many(),
129 )
130 }
131
132 fn line_entity() -> EntityDescriptor {
133 EntityDescriptor::new("OrderLine")
134 .table_name("orderline")
135 .property(
136 PropertyDescriptor::new("id", DataType::U64)
137 .column_name("id")
138 .id()
139 .not_null(),
140 )
141 .property(
142 PropertyDescriptor::new("version", DataType::I64)
143 .column_name("version")
144 .version(),
145 )
146 .property(
147 PropertyDescriptor::new("order_id", DataType::U64)
148 .column_name("order_id")
149 .not_null(),
150 )
151 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
152 .property(
153 PropertyDescriptor::new("product_id", DataType::U64)
154 .column_name("product_id")
155 .not_null(),
156 )
157 .relation(
158 teaql_core::RelationDescriptor::new("product", "Product")
159 .local_key("product_id")
160 .foreign_key("id"),
161 )
162 }
163
164 fn product_entity() -> EntityDescriptor {
165 EntityDescriptor::new("Product")
166 .table_name("product")
167 .property(
168 PropertyDescriptor::new("id", DataType::U64)
169 .column_name("id")
170 .id()
171 .not_null(),
172 )
173 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
174 }
175
176 #[derive(Debug, Default)]
177 struct StubExecutor {
178 affected: u64,
179 rows: Vec<Record>,
180 }
181
182 #[derive(Debug, Default)]
183 struct QueueExecutor {
184 affected: u64,
185 rows: Mutex<VecDeque<Vec<Record>>>,
186 queries: Mutex<Vec<String>>,
187 }
188
189 struct OrderBehavior;
190
191 #[allow(dead_code)]
192 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
193 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
194 struct CatalogProductRow {
195 #[teaql(id)]
196 id: u64,
197 name: String,
198 }
199
200 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
201 #[teaql(entity = "OrderAggregate", table = "orders")]
202 struct OrderAggregateDynamic {
203 #[teaql(id)]
204 id: u64,
205 #[teaql(dynamic)]
206 dynamic: BTreeMap<String, Value>,
207 }
208
209 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
210 #[teaql(entity = "Product", table = "product")]
211 struct ProductEntityRow {
212 #[teaql(id)]
213 id: u64,
214 name: String,
215 }
216
217 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
218 #[teaql(entity = "OrderLine", table = "orderline")]
219 struct OrderLineEntityRow {
220 #[teaql(id)]
221 id: u64,
222 #[teaql(column = "order_id")]
223 order_id: u64,
224 name: String,
225 #[teaql(column = "product_id")]
226 product_id: u64,
227 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
228 product: Option<ProductEntityRow>,
229 }
230
231 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
232 #[teaql(entity = "OrderLine", table = "orderline")]
233 struct ProductLineEntityRow {
234 #[teaql(id)]
235 id: u64,
236 #[teaql(column = "order_id")]
237 order_id: u64,
238 name: String,
239 #[teaql(column = "product_id")]
240 product_id: u64,
241 }
242
243 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
244 #[teaql(entity = "Product", table = "product")]
245 struct ProductWithLinesEntityRow {
246 #[teaql(id)]
247 id: u64,
248 name: String,
249 #[teaql(relation(
250 target = "OrderLine",
251 local_key = "id",
252 foreign_key = "product_id",
253 many
254 ))]
255 lines: teaql_core::SmartList<ProductLineEntityRow>,
256 }
257
258 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
259 #[teaql(entity = "OrderLine", table = "orderline")]
260 struct OrderLineWithProductEntityRow {
261 #[teaql(id)]
262 id: u64,
263 #[teaql(column = "order_id")]
264 order_id: u64,
265 name: String,
266 #[teaql(column = "product_id")]
267 product_id: u64,
268 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
269 product: Option<ProductWithLinesEntityRow>,
270 }
271
272 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
273 #[teaql(entity = "Order", table = "orders")]
274 struct OrderAggregateRow {
275 #[teaql(id)]
276 id: u64,
277 #[teaql(version)]
278 version: i64,
279 name: String,
280 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
281 lines: teaql_core::SmartList<OrderLineEntityRow>,
282 }
283
284 #[derive(Debug, Clone, PartialEq, DeriveTeaqlEntity)]
285 #[teaql(entity = "Order", table = "orders")]
286 struct Order {
287 #[teaql(id)]
288 id: u64,
289 #[teaql(version)]
290 version: i64,
291 name: String,
292 }
293
294 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
295 #[teaql(entity = "Product", table = "product")]
296 struct TypedGraphProduct {
297 #[teaql(id)]
298 id: u64,
299 name: String,
300 }
301
302 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
303 #[teaql(entity = "OrderLine", table = "orderline")]
304 struct TypedGraphLine {
305 #[teaql(id)]
306 id: u64,
307 #[teaql(column = "order_id")]
308 order_id: Option<u64>,
309 name: String,
310 #[teaql(column = "product_id")]
311 product_id: Option<u64>,
312 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
313 product: Option<TypedGraphProduct>,
314 }
315
316 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
317 #[teaql(entity = "Order", table = "orders")]
318 struct TypedGraphOrder {
319 #[teaql(id)]
320 id: u64,
321 #[teaql(version)]
322 version: i64,
323 name: String,
324 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
325 lines: teaql_core::SmartList<TypedGraphLine>,
326 }
327
328 #[derive(Debug, PartialEq, Eq)]
329 struct OrderEntity {
330 id: u64,
331 version: i64,
332 name: String,
333 }
334
335 impl teaql_core::TeaqlEntity for OrderEntity {
336 fn entity_descriptor() -> EntityDescriptor {
337 entity()
338 }
339 }
340
341 impl Entity for OrderEntity {
342 fn from_record(record: Record) -> Result<Self, EntityError> {
343 let id = match record.get("id") {
344 Some(Value::U64(v)) => *v,
345 Some(Value::I64(v)) if *v >= 0 => *v as u64,
346 other => {
347 return Err(EntityError::new(
348 "Order",
349 format!("invalid id field: {other:?}"),
350 ));
351 }
352 };
353 let version = match record.get("version") {
354 Some(Value::I64(v)) => *v,
355 other => {
356 return Err(EntityError::new(
357 "Order",
358 format!("invalid version field: {other:?}"),
359 ));
360 }
361 };
362 let name = match record.get("name") {
363 Some(Value::Text(v)) => v.clone(),
364 other => {
365 return Err(EntityError::new(
366 "Order",
367 format!("invalid name field: {other:?}"),
368 ));
369 }
370 };
371 Ok(Self { id, version, name })
372 }
373
374 fn into_record(self) -> Record {
375 Record::from([
376 (String::from("id"), Value::U64(self.id)),
377 (String::from("version"), Value::I64(self.version)),
378 (String::from("name"), Value::Text(self.name)),
379 ])
380 }
381 }
382
383 #[derive(Debug)]
384 struct StubError;
385
386 impl std::fmt::Display for StubError {
387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388 write!(f, "stub error")
389 }
390 }
391
392 impl std::error::Error for StubError {}
393
394 impl DataServiceExecutor for StubExecutor {
395 type Error = StubError;
396
397 fn capabilities(&self) -> DataServiceCapabilities {
398 DataServiceCapabilities::default()
399 }
400 }
401
402 impl QueryExecutor for StubExecutor {
403 async fn query(&self, _request: QueryRequest) -> Result<QueryResult, Self::Error> {
404 Ok(QueryResult {
405 rows: self.rows.clone(),
406 metadata: ExecutionMetadata {
407 debug_query: None,
408 backend: "stub".to_owned(),
409 operation: DataServiceOperation::Query,
410 started_at: std::time::SystemTime::now(),
411 ended_at: std::time::SystemTime::now(),
412 affected_rows: None,
413 result_count: Some(self.rows.len()),
414 trace_chain: Vec::new(),
415 comment: None,
416 backend_request_id: None,
417 },
418 })
419 }
420 }
421
422 impl MutationExecutor for StubExecutor {
423 async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
424 Ok(MutationResult {
425 affected_rows: self.affected,
426 generated_values: Record::new(),
427 metadata: ExecutionMetadata {
428 debug_query: None,
429 backend: "stub".to_owned(),
430 operation: DataServiceOperation::Update,
431 started_at: std::time::SystemTime::now(),
432 ended_at: std::time::SystemTime::now(),
433 affected_rows: Some(self.affected),
434 result_count: None,
435 trace_chain: Vec::new(),
436 comment: None,
437 backend_request_id: None,
438 },
439 })
440 }
441 }
442
443 impl DataServiceExecutor for QueueExecutor {
444 type Error = StubError;
445
446 fn capabilities(&self) -> DataServiceCapabilities {
447 DataServiceCapabilities::default()
448 }
449 }
450
451 impl QueryExecutor for QueueExecutor {
452 async fn query(&self, request: QueryRequest) -> Result<QueryResult, Self::Error> {
453 let sql_approx = format!("SELECT ... FROM {} ...", request.query.entity);
454 self.queries.lock().unwrap().push(sql_approx);
455 Ok(QueryResult {
456 rows: self.rows.lock().unwrap().pop_front().unwrap_or_default(),
457 metadata: ExecutionMetadata {
458 debug_query: None,
459 backend: "queue".to_owned(),
460 operation: DataServiceOperation::Query,
461 started_at: std::time::SystemTime::now(),
462 ended_at: std::time::SystemTime::now(),
463 affected_rows: None,
464 result_count: Some(0),
465 trace_chain: Vec::new(),
466 comment: None,
467 backend_request_id: None,
468 },
469 })
470 }
471 }
472
473 impl MutationExecutor for QueueExecutor {
474 async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
475 Ok(MutationResult {
476 affected_rows: self.affected,
477 generated_values: Record::new(),
478 metadata: ExecutionMetadata {
479 debug_query: None,
480 backend: "queue".to_owned(),
481 operation: DataServiceOperation::Update,
482 started_at: std::time::SystemTime::now(),
483 ended_at: std::time::SystemTime::now(),
484 affected_rows: Some(self.affected),
485 result_count: None,
486 trace_chain: Vec::new(),
487 comment: None,
488 backend_request_id: None,
489 },
490 })
491 }
492 }
493
494
495
496 impl RepositoryBehavior for OrderBehavior {
497 fn before_select(
498 &self,
499 _ctx: &UserContext,
500 query: &mut teaql_core::SelectQuery,
501 ) -> Result<(), RuntimeError> {
502 query.filter = Some(Expr::eq("version", 1_i64));
503 Ok(())
504 }
505
506 fn before_insert(
507 &self,
508 _ctx: &UserContext,
509 command: &mut InsertCommand,
510 ) -> Result<(), RuntimeError> {
511 command
512 .values
513 .entry("version".to_owned())
514 .or_insert(Value::I64(1));
515 Ok(())
516 }
517
518 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
519 vec!["lines".to_owned()]
520 }
521 }
522
523 struct ContextAwareOrderBehavior;
524 struct TenantRequestPolicy;
525 struct OrderChecker;
526 struct TypedOrderChecker;
527 #[derive(Clone)]
528 struct RecordingEventSink {
529 events: Arc<Mutex<Vec<EntityEvent>>>,
530 }
531
532 impl RepositoryBehavior for ContextAwareOrderBehavior {
533 fn before_insert(
534 &self,
535 ctx: &UserContext,
536 command: &mut InsertCommand,
537 ) -> Result<(), RuntimeError> {
538 let tenant = ctx
539 .get_named_resource::<String>("tenant")
540 .cloned()
541 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
542 let version = *ctx
543 .get_named_resource::<i64>("initial_version")
544 .ok_or_else(|| {
545 RuntimeError::Behavior("missing initial_version resource".to_owned())
546 })?;
547 let trace_id = match ctx.local("trace_id") {
548 Some(Value::Text(value)) => value.clone(),
549 other => {
550 return Err(RuntimeError::Behavior(format!(
551 "missing trace_id local, got {other:?}"
552 )));
553 }
554 };
555
556 command
557 .values
558 .entry("name".to_owned())
559 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
560 command
561 .values
562 .entry("version".to_owned())
563 .or_insert(Value::I64(version));
564 Ok(())
565 }
566 }
567
568 impl RequestPolicy for TenantRequestPolicy {
569 fn enforce_select(
570 &self,
571 ctx: &UserContext,
572 query: &mut SelectQuery,
573 ) -> Result<(), RuntimeError> {
574 if query.entity == "Order" {
575 let tenant_id = ctx
576 .get_named_resource::<u64>("tenant_id")
577 .copied()
578 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
579 query.filter = Some(match query.filter.take() {
580 Some(filter) => filter.and_expr(Expr::eq("id", tenant_id)),
581 None => Expr::eq("id", tenant_id),
582 });
583 }
584 Ok(())
585 }
586
587 fn enforce_insert(
588 &self,
589 ctx: &UserContext,
590 command: &mut InsertCommand,
591 ) -> Result<(), RuntimeError> {
592 if command.entity == "Order" {
593 let tenant_id = ctx
594 .get_named_resource::<u64>("tenant_id")
595 .copied()
596 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
597 command
598 .values
599 .insert("version".to_owned(), Value::I64(tenant_id as i64));
600 }
601 Ok(())
602 }
603 }
604
605 impl Checker for OrderChecker {
606 fn entity(&self) -> &str {
607 "Order"
608 }
609
610 fn check_and_fix(
611 &self,
612 _ctx: &UserContext,
613 record: &mut Record,
614 location: &ObjectLocation,
615 results: &mut CheckResults,
616 ) {
617 let status = CheckObjectStatus::from_record(record);
618 if status.is_create() {
619 self.required(record, "name", location, results);
620 record.entry("version".to_owned()).or_insert(Value::I64(1));
621 }
622 if status.is_update()
623 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
624 {
625 record.insert(
626 "name".to_owned(),
627 Value::Text("graph-update-checked".to_owned()),
628 );
629 }
630 self.min_string_length(record, "name", 3, location, results);
631 }
632 }
633
634 impl TypedChecker<Order> for TypedOrderChecker {
635 fn check_and_fix_typed(
636 &self,
637 _ctx: &UserContext,
638 entity: &mut Order,
639 status: CheckObjectStatus,
640 location: &ObjectLocation,
641 results: &mut CheckResults,
642 ) {
643 if status.is_create() {
644 if entity.name.is_empty() {
645 results.push(CheckResult::required(location.clone().member("name")));
646 }
647 }
648 if entity.name.chars().count() < 3 {
649 results.push(CheckResult::min_str(
650 location.clone().member("name"),
651 3,
652 entity.name.clone(),
653 ));
654 }
655 if entity.name == "fix" {
656 entity.name = "fixed".to_owned();
657 }
658 }
659 }
660
661 impl EntityEventSink for RecordingEventSink {
662 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
663 self.events.lock().unwrap().push(event.clone());
664 Ok(())
665 }
666 }
667
668 struct FixedIdGenerator(u64);
669
670 impl InternalIdGenerator for FixedIdGenerator {
671 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
672 Ok(self.0)
673 }
674 }
675
676 struct SequentialIdGenerator {
677 next: Mutex<u64>,
678 }
679
680 impl SequentialIdGenerator {
681 fn new(next: u64) -> Self {
682 Self {
683 next: Mutex::new(next),
684 }
685 }
686 }
687
688 impl InternalIdGenerator for SequentialIdGenerator {
689 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
690 let mut next = self
691 .next
692 .lock()
693 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
694 let id = *next;
695 *next += 1;
696 Ok(id)
697 }
698 }
699
700 #[tokio::test]
701 async fn metadata_store_registers_entities() {
702 let store = InMemoryMetadataStore::new().with_entity(entity());
703 assert!(store.entity("Order").is_some());
704 }
705
706 #[tokio::test]
707 async fn runtime_module_registers_descriptor_into_context() {
708 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
709 assert!(ctx.entity("Order").is_some());
710 assert!(ctx.has_repository("Order"));
711 }
712
713 #[tokio::test]
714 async fn runtime_module_registers_derived_entity_and_behavior() {
715 let ctx = UserContext::new().with_module(
716 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
717 );
718 assert!(ctx.entity("CatalogProduct").is_some());
719 assert!(ctx.has_repository("CatalogProduct"));
720 assert!(ctx.repository_behavior("CatalogProduct").is_some());
721 }
722
723 #[tokio::test]
724 async fn module_macro_registers_multiple_entities() {
725 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
726 assert!(ctx.entity("CatalogProduct").is_some());
727 assert!(ctx.has_repository("CatalogProduct"));
728 }
729
730 #[tokio::test]
731 async fn module_macro_registers_entity_behavior_pairs() {
732 let ctx =
733 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
734 assert!(ctx.entity("CatalogProduct").is_some());
735 assert!(ctx.repository_behavior("CatalogProduct").is_some());
736 }
737
738 #[tokio::test]
739 async fn repository_returns_optimistic_lock_conflict() {
740 let store = InMemoryMetadataStore::new().with_entity(entity());
741 let executor = StubExecutor {
742 affected: 0,
743 rows: Vec::new(),
744 };
745 let repo = Repository::new(&store, &executor);
746
747 let err = repo
748 .update(
749 &UpdateCommand::new("Order", 1_u64)
750 .expected_version(3)
751 .value("name", "next"),
752 )
753 .await.unwrap_err();
754
755 match err {
756 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
757 other => panic!("unexpected error: {other}"),
758 }
759 }
760
761 #[tokio::test]
762 async fn user_context_indexes_resources_and_locals() {
763 let mut ctx =
764 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
765 ctx.insert_resource::<u64>(42);
766 ctx.insert_named_resource("tenant", String::from("acme"));
767 ctx.put_local("trace_id", "req-1");
768
769 assert!(ctx.entity("Order").is_some());
770 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
771 assert_eq!(
772 ctx.get_named_resource::<String>("tenant"),
773 Some(&String::from("acme"))
774 );
775 assert_eq!(
776 ctx.local("trace_id"),
777 Some(&Value::Text("req-1".to_owned()))
778 );
779 }
780
781 #[tokio::test]
782 async fn user_context_builds_context_repository() {
783 let mut ctx =
784 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
785 ctx.insert_resource(PostgresDialect);
786 ctx.insert_resource(StubExecutor {
787 affected: 1,
788 rows: Vec::new(),
789 });
790
791 let repo = ctx.repository::< StubExecutor>().unwrap();
792 let affected = repo
793 .update(
794 &UpdateCommand::new("Order", 1_u64)
795 .expected_version(3)
796 .value("name", "next"),
797 )
798 .await.unwrap();
799
800 assert_eq!(affected, 1);
801 }
802
803 #[tokio::test]
804 async fn user_context_resolves_repository_by_entity_type() {
805 let mut ctx = UserContext::new()
806 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
807 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
808 ctx.insert_resource(PostgresDialect);
809 ctx.insert_resource(StubExecutor {
810 affected: 1,
811 rows: Vec::new(),
812 });
813
814 let repo = ctx
815 .resolve_repository::< StubExecutor>("Order")
816 .unwrap();
817 assert_eq!(repo.entity(), "Order");
818 assert_eq!(repo.select().entity, "Order");
819
820 let affected = repo
821 .insert(
822 &repo
823 .insert_command()
824 .value("id", 1_u64)
825 .value("version", 1_i64)
826 .value("name", "n"),
827 )
828 .await.unwrap();
829 assert_eq!(affected, 1);
830 }
831
832 #[tokio::test]
833 async fn resolved_repository_applies_behavior_hooks() {
834 let mut ctx = UserContext::new()
835 .with_metadata(
836 InMemoryMetadataStore::new()
837 .with_entity(entity())
838 .with_entity(line_entity())
839 .with_entity(product_entity()),
840 )
841 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
842 .with_repository_behavior_registry(
843 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
844 );
845 ctx.insert_resource(PostgresDialect);
846 ctx.insert_resource(StubExecutor {
847 affected: 1,
848 rows: Vec::new(),
849 });
850
851 let repo = ctx
852 .resolve_repository::< StubExecutor>("Order")
853 .unwrap();
854
855 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
859 let affected = repo.insert(&insert).await.unwrap();
860 assert_eq!(affected, 1);
861 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
862 }
863
864 #[tokio::test]
865 async fn resolved_repository_applies_request_policy_after_behavior_hooks() {
866 let mut ctx = UserContext::new()
867 .with_metadata(
868 InMemoryMetadataStore::new()
869 .with_entity(entity())
870 .with_entity(line_entity())
871 .with_entity(product_entity()),
872 )
873 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
874 .with_repository_behavior_registry(
875 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
876 )
877 .with_request_policy(TenantRequestPolicy);
878 ctx.insert_named_resource("tenant_id", 9_u64);
879 ctx.insert_resource(PostgresDialect);
880 ctx.insert_resource(StubExecutor {
881 affected: 1,
882 rows: Vec::new(),
883 });
884
885 let repo = ctx
886 .resolve_repository::< StubExecutor>("Order")
887 .unwrap();
888
889 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
894 let command = repo.prepare_insert_command(&insert).unwrap();
895 assert_eq!(command.values.get("version"), Some(&Value::I64(9)));
896 }
897
898 #[tokio::test]
899 async fn resolved_repository_prepares_insert_command_with_generated_id() {
900 let mut ctx = UserContext::new()
901 .with_metadata(
902 InMemoryMetadataStore::new()
903 .with_entity(entity())
904 .with_entity(line_entity())
905 .with_entity(product_entity()),
906 )
907 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
908 .with_repository_behavior_registry(
909 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
910 )
911 .with_internal_id_generator(FixedIdGenerator(42));
912 ctx.insert_resource(PostgresDialect);
913 ctx.insert_resource(StubExecutor {
914 affected: 1,
915 rows: Vec::new(),
916 });
917
918 let repo = ctx
919 .resolve_repository::< StubExecutor>("Order")
920 .unwrap();
921
922 let prepared = repo
923 .prepare_insert_command(&repo.insert_command().value("id", 0_u64).value("name", "n"))
924 .unwrap();
925
926 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
927 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
928 assert_eq!(
929 prepared.values.get("name"),
930 Some(&Value::Text("n".to_owned()))
931 );
932
933 let prepared_zero_version = repo
934 .prepare_insert_command(
935 &repo
936 .insert_command()
937 .value("id", 0_u64)
938 .value("version", 0_i64)
939 .value("name", "zero-version"),
940 )
941 .unwrap();
942 assert_eq!(
943 prepared_zero_version.values.get("version"),
944 Some(&Value::I64(1))
945 );
946 }
947
948 #[tokio::test]
949 async fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
950 let mut ctx = UserContext::new()
951 .with_metadata(
952 InMemoryMetadataStore::new()
953 .with_entity(entity())
954 .with_entity(line_entity())
955 .with_entity(product_entity()),
956 )
957 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
958 .with_repository_behavior_registry(
959 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
960 )
961 .with_internal_id_generator(SequentialIdGenerator::new(500));
962 ctx.insert_resource(PostgresDialect);
963 ctx.insert_resource(StubExecutor {
964 affected: 1,
965 rows: Vec::new(),
966 });
967
968 let repo = ctx
969 .resolve_repository::< StubExecutor>("Order")
970 .unwrap();
971 let graph = GraphNode::new("Order").value("name", "root").relation(
972 "lines",
973 GraphNode::new("OrderLine")
974 .value("name", "line-1")
975 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
976 );
977
978 let saved = repo.save_graph(graph).await.unwrap();
979
980 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
981 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
982 let lines = saved.relations.get("lines").unwrap();
983 assert_eq!(lines.len(), 1);
984 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(502)));
985 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
986 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
987 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(501)));
988 let product = lines[0].relations.get("product").unwrap();
989 assert_eq!(product[0].values.get("id"), Some(&Value::U64(501)));
990 }
991
992 #[tokio::test]
993 async fn resolved_repository_extracts_and_saves_typed_entity_graph() {
994 let mut ctx = UserContext::new()
995 .with_metadata(
996 InMemoryMetadataStore::new()
997 .with_entity(entity())
998 .with_entity(line_entity())
999 .with_entity(product_entity()),
1000 )
1001 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1002 .with_internal_id_generator(SequentialIdGenerator::new(700));
1003 ctx.insert_resource(PostgresDialect);
1004 ctx.insert_resource(StubExecutor {
1005 affected: 1,
1006 rows: Vec::new(),
1007 });
1008
1009 let repo = ctx
1010 .resolve_repository::< StubExecutor>("Order")
1011 .unwrap();
1012 let order = TypedGraphOrder {
1013 id: 0,
1014 version: 0,
1015 name: "typed-root".to_owned(),
1016 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1017 id: 0,
1018 order_id: None,
1019 name: "typed-line".to_owned(),
1020 product_id: None,
1021 product: Some(TypedGraphProduct {
1022 id: 0,
1023 name: "typed-product".to_owned(),
1024 }),
1025 }]),
1026 };
1027
1028 let extracted = repo.graph_node_from_entity(order).unwrap();
1029 assert_eq!(extracted.entity, "Order");
1030 assert_eq!(
1031 extracted.values.get("name"),
1032 Some(&Value::Text("typed-root".to_owned()))
1033 );
1034 assert_eq!(extracted.values.get("id"), Some(&Value::U64(0)));
1035 assert_eq!(extracted.relations["lines"].len(), 1);
1036 assert_eq!(
1037 extracted.relations["lines"][0].values.get("name"),
1038 Some(&Value::Text("typed-line".to_owned()))
1039 );
1040 assert_eq!(
1041 extracted.relations["lines"][0].relations["product"].len(),
1042 1
1043 );
1044
1045 let saved = repo.save_graph(extracted).await.unwrap();
1046 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
1047 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1048 let lines = saved.relations.get("lines").unwrap();
1049 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(702)));
1050 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
1051 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
1052 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(701)));
1053 assert_eq!(
1054 lines[0].relations["product"][0].values.get("id"),
1055 Some(&Value::U64(701))
1056 );
1057 }
1058
1059 #[tokio::test]
1060 async fn resolved_repository_saves_typed_entity_graph_directly() {
1061 let mut ctx = UserContext::new()
1062 .with_metadata(
1063 InMemoryMetadataStore::new()
1064 .with_entity(entity())
1065 .with_entity(line_entity())
1066 .with_entity(product_entity()),
1067 )
1068 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1069 .with_internal_id_generator(SequentialIdGenerator::new(800));
1070 ctx.insert_resource(PostgresDialect);
1071 ctx.insert_resource(StubExecutor {
1072 affected: 1,
1073 rows: Vec::new(),
1074 });
1075
1076 let repo = ctx
1077 .resolve_repository::< StubExecutor>("Order")
1078 .unwrap();
1079 let saved = repo
1080 .save_entity_graph(TypedGraphOrder {
1081 id: 0,
1082 version: 0,
1083 name: "typed-direct".to_owned(),
1084 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1085 id: 0,
1086 order_id: None,
1087 name: "typed-line".to_owned(),
1088 product_id: None,
1089 product: Some(TypedGraphProduct {
1090 id: 0,
1091 name: "typed-product".to_owned(),
1092 }),
1093 }]),
1094 })
1095 .await.unwrap();
1096
1097 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
1098 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1099 assert_eq!(
1100 saved.relations["lines"][0].values.get("order_id"),
1101 Some(&Value::U64(800))
1102 );
1103 assert_eq!(
1104 saved.relations["lines"][0].values.get("product_id"),
1105 Some(&Value::U64(801))
1106 );
1107 }
1108
1109 #[tokio::test]
1110 async fn custom_user_context_can_drive_insert_preparation() {
1111 let mut ctx = UserContext::new()
1112 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1113 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1114 .with_repository_behavior_registry(
1115 InMemoryRepositoryBehaviorRegistry::new()
1116 .with_behavior("Order", ContextAwareOrderBehavior),
1117 )
1118 .with_internal_id_generator(FixedIdGenerator(99));
1119 ctx.insert_named_resource("tenant", String::from("acme"));
1120 ctx.insert_named_resource("initial_version", 7_i64);
1121 ctx.put_local("trace_id", "req-9");
1122 ctx.insert_resource(PostgresDialect);
1123 ctx.insert_resource(StubExecutor {
1124 affected: 1,
1125 rows: Vec::new(),
1126 });
1127
1128 let repo = ctx
1129 .resolve_repository::< StubExecutor>("Order")
1130 .unwrap();
1131 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
1132
1133 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
1134 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
1135 assert_eq!(
1136 prepared.values.get("name"),
1137 Some(&Value::Text("acme:req-9".to_owned()))
1138 );
1139 }
1140
1141 #[tokio::test]
1142 async fn checker_registry_validates_and_fixes_insert_commands() {
1143 let mut ctx = UserContext::new()
1144 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1145 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1146 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1147 .with_internal_id_generator(FixedIdGenerator(77));
1148 ctx.insert_resource(PostgresDialect);
1149 ctx.insert_resource(StubExecutor {
1150 affected: 1,
1151 rows: Vec::new(),
1152 });
1153
1154 let repo = ctx
1155 .resolve_repository::< StubExecutor>("Order")
1156 .unwrap();
1157 let prepared = repo
1158 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
1159 .unwrap();
1160
1161 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
1162 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
1163 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1164
1165 let error = repo
1166 .prepare_insert_command(&repo.insert_command().value("name", "no"))
1167 .unwrap_err();
1168 match error {
1169 RuntimeError::Check(results) => {
1170 assert_eq!(results.len(), 1);
1171 assert_eq!(results[0].location.to_string(), "name");
1172 }
1173 other => panic!("unexpected checker error: {other:?}"),
1174 }
1175 }
1176
1177 #[tokio::test]
1178 async fn typed_checker_validates_and_fixes_derived_entities_without_record_access() {
1179 let mut ctx = UserContext::new()
1180 .with_metadata(InMemoryMetadataStore::new().with_entity(Order::entity_descriptor()))
1181 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1182 .with_checker_registry(
1183 InMemoryCheckerRegistry::new()
1184 .with_checker(TypedEntityChecker::<Order, _>::new(TypedOrderChecker)),
1185 )
1186 .with_internal_id_generator(FixedIdGenerator(79));
1187 ctx.insert_resource(PostgresDialect);
1188 ctx.insert_resource(StubExecutor {
1189 affected: 1,
1190 rows: Vec::new(),
1191 });
1192
1193 let repo = ctx
1194 .resolve_repository::< StubExecutor>("Order")
1195 .unwrap();
1196 let prepared = repo
1197 .prepare_insert_command(
1198 &repo
1199 .insert_command()
1200 .value("name", "fix")
1201 .value("version", 1_i64),
1202 )
1203 .unwrap();
1204 assert_eq!(
1205 prepared.values.get("name"),
1206 Some(&Value::Text("fixed".to_owned()))
1207 );
1208 assert_eq!(prepared.values.get("id"), Some(&Value::U64(79)));
1209 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1210
1211 let error = repo
1212 .prepare_insert_command(&repo.insert_command().value("version", 1_i64))
1213 .unwrap_err();
1214 match error {
1215 RuntimeError::Check(results) => {
1216 assert!(
1217 results
1218 .iter()
1219 .any(|result| result.rule == CheckRule::Required
1220 && result.location.to_string() == "name")
1221 );
1222 }
1223 other => panic!("unexpected typed checker error: {other:?}"),
1224 }
1225 }
1226
1227
1228
1229 #[tokio::test]
1230 async fn checker_registry_reports_nested_create_locations_and_fixes_records() {
1231 let ctx = UserContext::new()
1232 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1233
1234 let mut child = Record::from([
1235 (String::from("id"), Value::U64(10)),
1236 (
1237 String::from(CHECK_OBJECT_STATUS_FIELD),
1238 Value::from(CheckObjectStatus::Create),
1239 ),
1240 ]);
1241 let error = ctx
1242 .check_and_fix_record_at(
1243 "Order",
1244 &mut child,
1245 &ObjectLocation::hash_root("lines").element(0),
1246 )
1247 .unwrap_err();
1248
1249 assert_eq!(child.get("version"), Some(&Value::I64(1)));
1250 match error {
1251 RuntimeError::Check(results) => {
1252 assert_eq!(results.len(), 1);
1253 assert_eq!(results[0].rule, CheckRule::Required);
1254 assert_eq!(results[0].location.to_string(), "lines[0].name");
1255 }
1256 other => panic!("unexpected checker error: {other:?}"),
1257 }
1258
1259 child.insert("name".to_owned(), Value::Text("valid child".to_owned()));
1260 ctx.check_and_fix_record_at(
1261 "Order",
1262 &mut child,
1263 &ObjectLocation::hash_root("lines").element(0),
1264 )
1265 .unwrap();
1266 }
1267
1268 #[tokio::test]
1269 async fn built_in_language_translators_cover_fifteen_languages() {
1270 assert_eq!(Language::ALL.len(), 15);
1271 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
1272 let messages = Language::ALL
1273 .iter()
1274 .map(|language| translate_check_result(*language, &result))
1275 .collect::<Vec<_>>();
1276
1277 assert!(messages.iter().all(|message| !message.is_empty()));
1278 assert!(messages.iter().any(|message| message.contains("required")));
1279 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
1280 assert!(
1281 messages
1282 .iter()
1283 .any(|message| message.contains("obligatoire"))
1284 );
1285 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
1286 assert_eq!(
1287 Language::from_code("zh-TW"),
1288 Some(Language::TraditionalChinese)
1289 );
1290 }
1291
1292 #[tokio::test]
1293 async fn user_context_language_switch_translates_checker_errors() {
1294 let mut ctx = UserContext::new()
1295 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1296 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1297 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1298 .with_internal_id_generator(FixedIdGenerator(77))
1299 .with_language(Language::Chinese);
1300 ctx.insert_resource(PostgresDialect);
1301 ctx.insert_resource(StubExecutor {
1302 affected: 1,
1303 rows: Vec::new(),
1304 });
1305
1306 let repo = ctx
1307 .resolve_repository::< StubExecutor>("Order")
1308 .unwrap();
1309 let error = repo
1310 .prepare_insert_command(&repo.insert_command())
1311 .unwrap_err();
1312 match error {
1313 RuntimeError::Check(results) => {
1314 assert_eq!(results.len(), 1);
1315 assert!(
1316 results[0]
1317 .message
1318 .as_ref()
1319 .is_some_and(|message| message.contains("å¿…å¡«"))
1320 );
1321 }
1322 other => panic!("unexpected checker error: {other:?}"),
1323 }
1324
1325 let mut ctx = UserContext::new().with_language(Language::English);
1326 ctx.set_language_code("es").unwrap();
1327 assert_eq!(ctx.language(), Language::Spanish);
1328 }
1329
1330 #[tokio::test]
1331 async fn checker_registry_merges_graph_update_fixes_by_object_status() {
1332 let mut ctx = UserContext::new()
1333 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1334 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1335 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1336 ctx.insert_resource(PostgresDialect);
1337 ctx.insert_resource(StubExecutor {
1338 affected: 1,
1339 rows: vec![Record::from([
1340 ("id".to_owned(), Value::U64(1)),
1341 ("version".to_owned(), Value::I64(1)),
1342 ("name".to_owned(), Value::Text("old".to_owned())),
1343 ])],
1344 });
1345
1346 let repo = ctx
1347 .resolve_repository::< StubExecutor>("Order")
1348 .unwrap();
1349 let saved = repo
1350 .save_graph(
1351 GraphNode::new("Order")
1352 .value("id", 1_u64)
1353 .value("version", 1_i64)
1354 .value("name", "graph-update"),
1355 )
1356 .await.unwrap();
1357
1358 assert_eq!(
1359 saved.values.get("name"),
1360 Some(&Value::Text("graph-update-checked".to_owned()))
1361 );
1362 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1363 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1364 }
1365
1366 #[tokio::test]
1367 async fn user_context_event_sink_receives_repository_mutation_events() {
1368 let events = Arc::new(Mutex::new(Vec::new()));
1369 let mut ctx = UserContext::new()
1370 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1371 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1372 .with_internal_id_generator(FixedIdGenerator(88))
1373 .with_event_sink(RecordingEventSink {
1374 events: events.clone(),
1375 });
1376 ctx.insert_resource(PostgresDialect);
1377 ctx.insert_resource(StubExecutor {
1378 affected: 1,
1379 rows: vec![Record::from([
1380 ("id".to_owned(), Value::U64(88)),
1381 ("version".to_owned(), Value::I64(1)),
1382 ("name".to_owned(), Value::Text("old".to_owned())),
1383 ])],
1384 });
1385
1386 let repo = ctx
1387 .resolve_repository::< StubExecutor>("Order")
1388 .unwrap();
1389 repo.insert(&repo.insert_command().value("name", "created"))
1390 .await.unwrap();
1391 repo.update(
1392 &repo
1393 .update_command(88_u64)
1394 .expected_version(1)
1395 .value("name", "updated"),
1396 )
1397 .await.unwrap();
1398 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1399 .await.unwrap();
1400 repo.recover(&repo.recover_command(88_u64, -3)).await.unwrap();
1401
1402 let events = events.lock().unwrap();
1403 assert_eq!(events.len(), 4);
1404 assert_eq!(events[0].kind, EntityEventKind::Created);
1405 assert_eq!(events[0].entity, "Order");
1406 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1407 assert_eq!(events[1].kind, EntityEventKind::Updated);
1408 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1409 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1410 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1411 assert_eq!(
1412 events[1]
1413 .old_values
1414 .as_ref()
1415 .and_then(|values| values.get("name")),
1416 None );
1418 assert_eq!(
1419 events[1]
1420 .new_values
1421 .as_ref()
1422 .and_then(|values| values.get("name")),
1423 Some(&Value::Text("updated".to_owned()))
1424 );
1425 assert_eq!(events[1].changes.len(), 1);
1426 assert_eq!(events[1].changes[0].field, "name");
1427 assert_eq!(
1428 events[1].changes[0].old_value,
1429 None );
1431 assert_eq!(
1432 events[1].changes[0].new_value,
1433 Some(Value::Text("updated".to_owned()))
1434 );
1435 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1436 assert!(events[2].old_values.is_none()); assert!(events[2].new_values.is_none());
1438 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1439 assert_eq!(
1440 events[3]
1441 .old_values
1442 .as_ref()
1443 .and_then(|values| values.get("version")),
1444 None );
1446 assert_eq!(
1447 events[3]
1448 .new_values
1449 .as_ref()
1450 .and_then(|values| values.get("version")),
1451 Some(&Value::I64(4))
1452 );
1453 assert_eq!(events[3].changes[0].field, "version");
1454 }
1455
1456 #[tokio::test]
1457 async fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1458 let events = Arc::new(Mutex::new(Vec::new()));
1459 let mut ctx = UserContext::new()
1460 .with_metadata(
1461 InMemoryMetadataStore::new()
1462 .with_entity(entity())
1463 .with_entity(line_entity())
1464 .with_entity(product_entity()),
1465 )
1466 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1467 .with_event_sink(RecordingEventSink {
1468 events: events.clone(),
1469 });
1470 ctx.insert_resource(PostgresDialect);
1471 ctx.insert_resource(StubExecutor {
1472 affected: 1,
1473 rows: vec![Record::from([
1474 ("id".to_owned(), Value::U64(1)),
1475 ("version".to_owned(), Value::I64(1)),
1476 ("name".to_owned(), Value::Text("old".to_owned())),
1477 ])],
1478 });
1479
1480 let repo = ctx
1481 .resolve_repository::< StubExecutor>("Order")
1482 .unwrap();
1483 repo.save_graph(
1484 GraphNode::new("Order")
1485 .value("id", 1_u64)
1486 .value("version", 1_i64)
1487 .value("name", "updated")
1488 .relation(
1489 "lines",
1490 GraphNode::new("OrderLine")
1491 .value("name", "line")
1492 .value("product_id", 3_u64),
1493 ),
1494 )
1495 .await.unwrap();
1496
1497 let events = events.lock().unwrap();
1498 assert_eq!(events.len(), 2);
1499 assert_eq!(events[0].kind, EntityEventKind::Updated);
1500 assert_eq!(events[0].entity, "Order");
1501 assert_eq!(events[1].kind, EntityEventKind::Created);
1502 assert_eq!(events[1].entity, "OrderLine");
1503 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1504 }
1505
1506 #[tokio::test]
1507 async fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1508 let mut ctx = UserContext::new()
1509 .with_metadata(
1510 InMemoryMetadataStore::new()
1511 .with_entity(entity())
1512 .with_entity(line_entity())
1513 .with_entity(product_entity()),
1514 )
1515 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1516 .with_internal_id_generator(SequentialIdGenerator::new(500));
1517 ctx.insert_resource(PostgresDialect);
1518 ctx.insert_resource(StubExecutor {
1519 affected: 1,
1520 rows: vec![Record::from([
1521 ("id".to_owned(), Value::U64(1)),
1522 ("version".to_owned(), Value::I64(1)),
1523 ("name".to_owned(), Value::Text("old".to_owned())),
1524 ])],
1525 });
1526
1527 let plan = ctx
1528 .plan_for_save_graph::< StubExecutor>(
1529 GraphNode::new("Order")
1530 .value("id", 1_u64)
1531 .value("version", 1_i64)
1532 .value("name", "updated")
1533 .relation(
1534 "lines",
1535 GraphNode::new("OrderLine")
1536 .value("name", "new-line-a")
1537 .value("product_id", 2_u64),
1538 )
1539 .relation(
1540 "lines",
1541 GraphNode::new("OrderLine")
1542 .value("name", "new-line-b")
1543 .value("product_id", 2_u64),
1544 )
1545 .relation(
1546 "lines",
1547 GraphNode::new("OrderLine")
1548 .value("id", 5_u64)
1549 .value("version", 1_i64)
1550 .value("name", "same-update-a"),
1551 )
1552 .relation(
1553 "lines",
1554 GraphNode::new("OrderLine")
1555 .value("id", 6_u64)
1556 .value("version", 1_i64)
1557 .value("name", "same-update-b"),
1558 )
1559 .relation(
1560 "lines",
1561 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1562 )
1563 .relation(
1564 "lines",
1565 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1566 ),
1567 )
1568 .await.unwrap();
1569 let counts = plan.grouped_counts();
1570
1571 assert_eq!(
1572 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1573 Some(&1)
1574 );
1575 assert_eq!(
1576 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1577 Some(&2)
1578 );
1579 assert_eq!(
1580 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1581 Some(&2)
1582 );
1583 assert_eq!(
1584 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1585 Some(&1)
1586 );
1587 assert_eq!(
1588 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1589 Some(&1)
1590 );
1591 let create_batch = plan
1592 .batches
1593 .iter()
1594 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1595 .unwrap();
1596 assert_eq!(create_batch.items.len(), 2);
1597 assert_eq!(
1598 create_batch.items[0].values.get("id"),
1599 Some(&Value::U64(500))
1600 );
1601 assert_eq!(
1602 create_batch.items[1].values.get("id"),
1603 Some(&Value::U64(501))
1604 );
1605 let update_batch = plan
1606 .batches
1607 .iter()
1608 .find(|batch| {
1609 batch.entity == "OrderLine"
1610 && batch.kind == GraphMutationKind::Update
1611 && batch.update_fields == vec!["name".to_owned()]
1612 })
1613 .unwrap();
1614 assert_eq!(update_batch.items.len(), 2);
1615 }
1616
1617 #[tokio::test]
1618 async fn resolved_repository_builds_relation_plans() {
1619 let mut ctx = UserContext::new()
1620 .with_metadata(
1621 InMemoryMetadataStore::new()
1622 .with_entity(entity())
1623 .with_entity(line_entity())
1624 .with_entity(product_entity()),
1625 )
1626 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1627 .with_repository_behavior_registry(
1628 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1629 );
1630 ctx.insert_resource(PostgresDialect);
1631 ctx.insert_resource(StubExecutor {
1632 affected: 1,
1633 rows: Vec::new(),
1634 });
1635
1636 let repo = ctx
1637 .resolve_repository::< StubExecutor>("Order")
1638 .unwrap();
1639 let plans = repo.relation_plans().unwrap();
1640
1641 assert_eq!(plans.len(), 1);
1642 assert_eq!(plans[0].relation_name, "lines");
1643 assert_eq!(plans[0].target_entity, "OrderLine");
1644 assert_eq!(plans[0].local_key, "id");
1645 assert_eq!(plans[0].foreign_key, "order_id");
1646 assert!(plans[0].many);
1647 }
1648
1649 #[tokio::test]
1650 async fn resolved_repository_builds_relation_query_from_parent_rows() {
1651 let mut ctx = UserContext::new()
1652 .with_metadata(
1653 InMemoryMetadataStore::new()
1654 .with_entity(entity())
1655 .with_entity(line_entity())
1656 .with_entity(product_entity()),
1657 )
1658 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1659 .with_repository_behavior_registry(
1660 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1661 );
1662 ctx.insert_resource(PostgresDialect);
1663 ctx.insert_resource(StubExecutor {
1664 affected: 1,
1665 rows: Vec::new(),
1666 });
1667
1668 let repo = ctx
1669 .resolve_repository::< StubExecutor>("Order")
1670 .unwrap();
1671 let parent_rows = vec![
1672 Record::from([(String::from("id"), Value::U64(11))]),
1673 Record::from([(String::from("id"), Value::U64(12))]),
1674 ];
1675
1676 let query = repo.relation_query("lines", &parent_rows).unwrap();
1677 }
1682
1683 #[tokio::test]
1684 async fn resolved_repository_enhances_parent_rows_with_relations() {
1685 let mut ctx = UserContext::new()
1686 .with_metadata(
1687 InMemoryMetadataStore::new()
1688 .with_entity(entity())
1689 .with_entity(line_entity())
1690 .with_entity(product_entity()),
1691 )
1692 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1693 .with_repository_behavior_registry(
1694 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1695 );
1696 ctx.insert_resource(PostgresDialect);
1697 ctx.insert_resource(StubExecutor {
1698 affected: 1,
1699 rows: vec![
1700 Record::from([
1701 (String::from("id"), Value::U64(101)),
1702 (String::from("order_id"), Value::U64(11)),
1703 (String::from("name"), Value::Text(String::from("l1"))),
1704 ]),
1705 Record::from([
1706 (String::from("id"), Value::U64(102)),
1707 (String::from("order_id"), Value::U64(11)),
1708 (String::from("name"), Value::Text(String::from("l2"))),
1709 ]),
1710 Record::from([
1711 (String::from("id"), Value::U64(201)),
1712 (String::from("order_id"), Value::U64(12)),
1713 (String::from("name"), Value::Text(String::from("l3"))),
1714 ]),
1715 ],
1716 });
1717
1718 let repo = ctx
1719 .resolve_repository::< StubExecutor>("Order")
1720 .unwrap();
1721 let mut parents = vec![
1722 Record::from([(String::from("id"), Value::U64(11))]),
1723 Record::from([(String::from("id"), Value::U64(12))]),
1724 ];
1725
1726 repo.enhance_relations(&mut parents).await.unwrap();
1727
1728 match parents[0].get("lines") {
1729 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1730 other => panic!("unexpected lines payload: {other:?}"),
1731 }
1732 match parents[1].get("lines") {
1733 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1734 other => panic!("unexpected lines payload: {other:?}"),
1735 }
1736 }
1737
1738 #[tokio::test]
1739 async fn relation_enhancement_wraps_inverse_many_relation_as_list() {
1740 let mut ctx = UserContext::new()
1741 .with_metadata(
1742 InMemoryMetadataStore::new()
1743 .with_entity(OrderLineWithProductEntityRow::entity_descriptor())
1744 .with_entity(ProductWithLinesEntityRow::entity_descriptor()),
1745 )
1746 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("OrderLine"));
1747 ctx.insert_resource(PostgresDialect);
1748 ctx.insert_resource(QueueExecutor {
1749 affected: 1,
1750 rows: Mutex::new(VecDeque::from([
1751 vec![Record::from([
1752 (String::from("id"), Value::U64(11)),
1753 (String::from("order_id"), Value::U64(7)),
1754 (String::from("name"), Value::Text(String::from("line"))),
1755 (String::from("product_id"), Value::U64(101)),
1756 ])],
1757 vec![Record::from([
1758 (String::from("id"), Value::U64(101)),
1759 (String::from("name"), Value::Text(String::from("sku"))),
1760 ])],
1761 ])),
1762 queries: Mutex::new(Vec::new()),
1763 });
1764
1765 let repo = ctx
1766 .resolve_repository::< QueueExecutor>("OrderLine")
1767 .unwrap();
1768 let rows = repo
1769 .fetch_enhanced_entities::<OrderLineWithProductEntityRow>(
1770 &SelectQuery::new("OrderLine").relation("product"),
1771 )
1772 .await.unwrap();
1773
1774 let product = rows.data[0].product.as_ref().unwrap();
1775 assert_eq!(product.lines.data.len(), 1);
1776 assert_eq!(product.lines.data[0].id, 11);
1777 }
1778
1779 #[tokio::test]
1780 async fn resolved_repository_fetches_smart_list_of_entities() {
1781 let mut ctx = UserContext::new()
1782 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1783 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1784 ctx.insert_resource(PostgresDialect);
1785 ctx.insert_resource(StubExecutor {
1786 affected: 1,
1787 rows: vec![Record::from([
1788 (String::from("id"), Value::U64(7)),
1789 (String::from("version"), Value::I64(2)),
1790 (String::from("name"), Value::Text(String::from("typed"))),
1791 ])],
1792 });
1793
1794 let repo = ctx
1795 .resolve_repository::< StubExecutor>("Order")
1796 .unwrap();
1797 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).await.unwrap();
1798
1799 assert_eq!(rows.len(), 1);
1800 assert_eq!(
1801 rows.first(),
1802 Some(&OrderEntity {
1803 id: 7,
1804 version: 2,
1805 name: String::from("typed"),
1806 })
1807 );
1808 }
1809
1810 #[tokio::test]
1811 async fn resolved_repository_fetches_smart_list_of_derived_entities() {
1812 let mut ctx = UserContext::new()
1813 .with_metadata(
1814 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1815 )
1816 .with_repository_registry(
1817 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1818 );
1819 ctx.insert_resource(PostgresDialect);
1820 ctx.insert_resource(StubExecutor {
1821 affected: 1,
1822 rows: vec![Record::from([
1823 (String::from("id"), Value::U64(9)),
1824 (String::from("name"), Value::Text(String::from("derived"))),
1825 ])],
1826 });
1827
1828 let repo = ctx
1829 .resolve_repository::< StubExecutor>("CatalogProduct")
1830 .unwrap();
1831 let rows = repo
1832 .fetch_entities::<CatalogProductRow>(&repo.select())
1833 .await.unwrap();
1834
1835 assert_eq!(rows.len(), 1);
1836 assert_eq!(
1837 rows.first(),
1838 Some(&CatalogProductRow {
1839 id: 9,
1840 name: String::from("derived"),
1841 })
1842 );
1843 }
1844
1845 #[tokio::test]
1846 async fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1847 let mut ctx = UserContext::new()
1848 .with_metadata(
1849 InMemoryMetadataStore::new()
1850 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1851 )
1852 .with_repository_registry(
1853 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1854 );
1855 ctx.insert_resource(PostgresDialect);
1856 ctx.insert_resource(StubExecutor {
1857 affected: 1,
1858 rows: vec![Record::from([
1859 (String::from("id"), Value::U64(1)),
1860 (String::from("lineCount"), Value::I64(3)),
1861 (String::from("amount"), Value::F64(18.5)),
1862 ])],
1863 });
1864
1865 let repo = ctx
1866 .resolve_repository::< StubExecutor>("OrderAggregate")
1867 .unwrap();
1868 let rows = repo
1869 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1870 .await.unwrap();
1871
1872 assert_eq!(rows.len(), 1);
1873 assert_eq!(rows.data[0].id, 1);
1874 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1875 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1876 assert_eq!(
1877 rows.into_vec().into_iter().next().unwrap().into_json(),
1878 serde_json::json!({
1879 "id": 1,
1880 "lineCount": 3,
1881 "amount": 18.5
1882 })
1883 );
1884 }
1885
1886 #[tokio::test]
1887 async fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1888 let executor = QueueExecutor {
1889 affected: 1,
1890 rows: Mutex::new(VecDeque::from([
1891 vec![
1892 Record::from([
1893 (String::from("id"), Value::U64(1)),
1894 (String::from("version"), Value::I64(1)),
1895 (String::from("name"), Value::Text(String::from("first"))),
1896 ]),
1897 Record::from([
1898 (String::from("id"), Value::U64(2)),
1899 (String::from("version"), Value::I64(1)),
1900 (String::from("name"), Value::Text(String::from("second"))),
1901 ]),
1902 ],
1903 vec![Record::from([
1904 (String::from("order_id"), Value::U64(1)),
1905 (String::from("lineCount"), Value::I64(3)),
1906 ])],
1907 ])),
1908 queries: Mutex::new(Vec::new()),
1909 };
1910 let mut ctx = UserContext::new()
1911 .with_metadata(
1912 InMemoryMetadataStore::new()
1913 .with_entity(entity())
1914 .with_entity(line_entity()),
1915 )
1916 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1917 ctx.insert_resource(PostgresDialect);
1918 ctx.insert_resource(executor);
1919
1920 let repo = ctx
1921 .resolve_repository::< QueueExecutor>("Order")
1922 .unwrap();
1923 let rows = repo
1924 .fetch_all_with_relation_aggregates(
1925 &repo
1926 .select()
1927 .project("id")
1928 .project("version")
1929 .project("name"),
1930 &[RelationAggregate::new(
1931 "lines",
1932 "lineCount",
1933 SelectQuery::new("OrderLine"),
1934 true,
1935 )],
1936 )
1937 .await.unwrap();
1938
1939 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1940 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1941
1942 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1943 let queries = executor.queries.lock().unwrap();
1944 assert_eq!(queries.len(), 2);
1945 assert_eq!(
1946 queries[1],
1947 "SELECT ... FROM OrderLine ..."
1948 );
1949 }
1950
1951 #[tokio::test]
1952 async fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1953 let mut line = line_entity();
1954 line.properties
1955 .iter_mut()
1956 .find(|property| property.name == "order_id")
1957 .unwrap()
1958 .column_name = "order_ref".to_owned();
1959 let executor = QueueExecutor {
1960 affected: 1,
1961 rows: Mutex::new(VecDeque::from([
1962 vec![Record::from([
1963 (String::from("id"), Value::U64(1)),
1964 (String::from("version"), Value::I64(1)),
1965 (String::from("name"), Value::Text(String::from("first"))),
1966 ])],
1967 vec![Record::from([
1968 (String::from("order_ref"), Value::I64(1)),
1969 (String::from("lineCount"), Value::I64(3)),
1970 ])],
1971 ])),
1972 queries: Mutex::new(Vec::new()),
1973 };
1974 let mut ctx = UserContext::new()
1975 .with_metadata(
1976 InMemoryMetadataStore::new()
1977 .with_entity(entity())
1978 .with_entity(line),
1979 )
1980 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1981 ctx.insert_resource(PostgresDialect);
1982 ctx.insert_resource(executor);
1983
1984 let repo = ctx
1985 .resolve_repository::< QueueExecutor>("Order")
1986 .unwrap();
1987 let rows = repo
1988 .fetch_all_with_relation_aggregates(
1989 &repo
1990 .select()
1991 .project("id")
1992 .project("version")
1993 .project("name"),
1994 &[RelationAggregate::new(
1995 "lines",
1996 "lineCount",
1997 SelectQuery::new("OrderLine"),
1998 true,
1999 )],
2000 )
2001 .await.unwrap();
2002
2003 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
2004 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2005 assert_eq!(
2006 executor.queries.lock().unwrap()[1],
2007 "SELECT ... FROM OrderLine ..."
2008 );
2009 }
2010
2011 #[tokio::test]
2012 async fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
2013 let executor = QueueExecutor {
2014 affected: 1,
2015 rows: Mutex::new(VecDeque::from([vec![Record::from([(
2016 String::from("count"),
2017 Value::I64(2),
2018 )])]])),
2019 queries: Mutex::new(Vec::new()),
2020 };
2021 let mut ctx = UserContext::new()
2022 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2023 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2024 ctx.insert_resource(PostgresDialect);
2025 ctx.insert_resource(executor);
2026 ctx.insert_resource(InMemoryAggregationCache::default());
2027
2028 let repo = ctx
2029 .resolve_repository::< QueueExecutor>("Order")
2030 .unwrap();
2031 let query = repo
2032 .select()
2033 .count("count")
2034 .enable_aggregation_cache_for(60_000);
2035
2036 let first = repo.fetch_all(&query).await.unwrap();
2037 let second = repo.fetch_all(&query).await.unwrap();
2038
2039 assert_eq!(first, second);
2040 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2041 assert_eq!(executor.queries.lock().unwrap().len(), 1);
2042 }
2043
2044 #[tokio::test]
2045 async fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
2046 let executor = QueueExecutor {
2047 affected: 1,
2048 rows: Mutex::new(VecDeque::from([
2049 vec![Record::from([(String::from("count"), Value::I64(2))])],
2050 vec![Record::from([(String::from("count"), Value::I64(3))])],
2051 ])),
2052 queries: Mutex::new(Vec::new()),
2053 };
2054 let mut ctx = UserContext::new()
2055 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2056 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2057 ctx.insert_resource(PostgresDialect);
2058 ctx.insert_resource(executor);
2059 ctx.insert_resource(
2060 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
2061 as Arc<dyn AggregationCacheBackend>,
2062 );
2063
2064 let repo = ctx
2065 .resolve_repository::< QueueExecutor>("Order")
2066 .unwrap();
2067 let query = repo
2068 .select()
2069 .count("count")
2070 .enable_aggregation_cache_for(60_000);
2071
2072 let first = repo.fetch_all(&query).await.unwrap();
2073 let cached = repo.fetch_all(&query).await.unwrap();
2074 repo.insert(
2075 &InsertCommand::new("Order")
2076 .value("id", 9_u64)
2077 .value("version", 1_i64)
2078 .value("name", "new"),
2079 )
2080 .await.unwrap();
2081 let refreshed = repo.fetch_all(&query).await.unwrap();
2082
2083 assert_eq!(first, cached);
2084 assert_ne!(cached, refreshed);
2085 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2086 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2087 }
2088
2089 #[tokio::test]
2090 async fn aggregation_cache_propagates_to_relation_aggregates() {
2091 let parent_rows = vec![
2092 Record::from([
2093 (String::from("id"), Value::U64(1)),
2094 (String::from("version"), Value::I64(1)),
2095 (String::from("name"), Value::Text(String::from("first"))),
2096 ]),
2097 Record::from([
2098 (String::from("id"), Value::U64(2)),
2099 (String::from("version"), Value::I64(1)),
2100 (String::from("name"), Value::Text(String::from("second"))),
2101 ]),
2102 ];
2103 let aggregate_rows = vec![Record::from([
2104 (String::from("order_id"), Value::U64(1)),
2105 (String::from("lineCount"), Value::I64(3)),
2106 ])];
2107 let executor = QueueExecutor {
2108 affected: 1,
2109 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
2110 queries: Mutex::new(Vec::new()),
2111 };
2112 let mut ctx = UserContext::new()
2113 .with_metadata(
2114 InMemoryMetadataStore::new()
2115 .with_entity(entity())
2116 .with_entity(line_entity()),
2117 )
2118 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2119 ctx.insert_resource(PostgresDialect);
2120 ctx.insert_resource(executor);
2121 ctx.insert_resource(InMemoryAggregationCache::default());
2122
2123 let repo = ctx
2124 .resolve_repository::< QueueExecutor>("Order")
2125 .unwrap();
2126 let query = repo
2127 .select()
2128 .project("id")
2129 .project("version")
2130 .project("name")
2131 .enable_aggregation_cache_for(60_000)
2132 .propagate_aggregation_cache(60_000);
2133 let aggregate =
2134 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2135
2136 let first = repo
2137 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
2138 .await.unwrap();
2139 let second = repo
2140 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2141 .await.unwrap();
2142
2143 assert_eq!(first, second);
2144 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2145 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2146 }
2147
2148 #[tokio::test]
2149 async fn memory_repository_fetches_smart_list_entities_with_query_features() {
2150 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2151 let repository = MemoryRepository::new(metadata).with_rows(
2152 "Order",
2153 vec![
2154 Record::from([
2155 (String::from("id"), Value::U64(1)),
2156 (String::from("version"), Value::I64(1)),
2157 (String::from("name"), Value::Text(String::from("alpha"))),
2158 ]),
2159 Record::from([
2160 (String::from("id"), Value::U64(2)),
2161 (String::from("version"), Value::I64(1)),
2162 (String::from("name"), Value::Text(String::from("beta"))),
2163 ]),
2164 Record::from([
2165 (String::from("id"), Value::U64(3)),
2166 (String::from("version"), Value::I64(1)),
2167 (String::from("name"), Value::Text(String::from("gamma"))),
2168 ]),
2169 ],
2170 );
2171
2172 let query = teaql_core::SelectQuery::new("Order")
2173 .filter(Expr::Binary {
2174 left: Box::new(Expr::column("id")),
2175 op: teaql_core::BinaryOp::Gte,
2176 right: Box::new(Expr::value(2_u64)),
2177 })
2178 .order_by(OrderBy::desc("id"))
2179 .limit(1);
2180
2181 let orders = repository.fetch_entities::<Order>(&query).unwrap();
2182
2183 assert_eq!(orders.ids(), vec![Value::U64(3)]);
2184 assert_eq!(orders.versions(), vec![1]);
2185 assert_eq!(orders.first().unwrap().name, "gamma");
2186 }
2187
2188 #[tokio::test]
2189 async fn memory_repository_runs_relation_aggregates() {
2190 let metadata = InMemoryMetadataStore::new()
2191 .with_entity(entity())
2192 .with_entity(line_entity());
2193
2194 let repository = MemoryRepository::new(metadata)
2195 .with_rows(
2196 "Order",
2197 vec![
2198 Record::from([
2199 (String::from("id"), Value::U64(1)),
2200 (String::from("version"), Value::I64(1)),
2201 (String::from("name"), Value::Text(String::from("first"))),
2202 ]),
2203 Record::from([
2204 (String::from("id"), Value::U64(2)),
2205 (String::from("version"), Value::I64(1)),
2206 (String::from("name"), Value::Text(String::from("second"))),
2207 ]),
2208 ],
2209 )
2210 .with_rows(
2211 "OrderLine",
2212 vec![
2213 Record::from([
2214 (String::from("id"), Value::U64(10)),
2215 (String::from("version"), Value::I64(1)),
2216 (String::from("order_id"), Value::U64(1)),
2217 (String::from("name"), Value::Text(String::from("line1"))),
2218 ]),
2219 Record::from([
2220 (String::from("id"), Value::U64(11)),
2221 (String::from("version"), Value::I64(1)),
2222 (String::from("order_id"), Value::U64(1)),
2223 (String::from("name"), Value::Text(String::from("line2"))),
2224 ]),
2225 Record::from([
2226 (String::from("id"), Value::U64(12)),
2227 (String::from("version"), Value::I64(1)),
2228 (String::from("order_id"), Value::U64(2)),
2229 (String::from("name"), Value::Text(String::from("line3"))),
2230 ]),
2231 ],
2232 );
2233
2234 let query = SelectQuery::new("Order").project("id").project("name");
2235 let aggregate = RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2236
2237 let rows = repository
2238 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2239 .unwrap();
2240
2241 assert_eq!(rows.len(), 2);
2242
2243 let first_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(1))).unwrap();
2244 assert_eq!(first_order.get("lineCount"), Some(&Value::U64(2)));
2245
2246 let second_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(2))).unwrap();
2247 assert_eq!(second_order.get("lineCount"), Some(&Value::U64(1)));
2248 }
2249
2250 #[tokio::test]
2251 async fn memory_repository_runs_aggregates() {
2252 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2253 let repository = MemoryRepository::new(metadata).with_rows(
2254 "Order",
2255 vec![
2256 Record::from([
2257 (String::from("id"), Value::U64(1)),
2258 (String::from("version"), Value::I64(1)),
2259 (String::from("name"), Value::Text(String::from("alpha"))),
2260 ]),
2261 Record::from([
2262 (String::from("id"), Value::U64(2)),
2263 (String::from("version"), Value::I64(2)),
2264 (String::from("name"), Value::Text(String::from("beta"))),
2265 ]),
2266 ],
2267 );
2268
2269 let query = teaql_core::SelectQuery {
2270 entity: String::from("Order"),
2271 projection: Vec::new(),
2272 expr_projection: Vec::new(),
2273 filter: None,
2274 having: None,
2275 order_by: Vec::new(),
2276 slice: None,
2277 trace_chain: Vec::new(),
2278 aggregates: vec![
2279 Aggregate {
2280 function: AggregateFunction::Count,
2281 field: String::from("id"),
2282 alias: String::from("count"),
2283 },
2284 Aggregate {
2285 function: AggregateFunction::Sum,
2286 field: String::from("version"),
2287 alias: String::from("versionSum"),
2288 },
2289 ],
2290 group_by: Vec::new(),
2291 relations: Vec::new(),
2292 aggregation_cache: None,
2293 comment: None,
2294 raw_sql: None,
2295 raw_sql_search_criteria: Vec::new(),
2296 dynamic_properties: Vec::new(),
2297 raw_projections: Vec::new(),
2298 object_group_bys: Vec::new(),
2299 child_enhancements: Vec::new(),
2300 };
2301
2302 let rows = repository.fetch_all(&query).unwrap();
2303
2304 assert_eq!(rows.len(), 1);
2305 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
2306 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2307 }
2308
2309 #[tokio::test]
2310 async fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
2311 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2312 let repository = MemoryRepository::new(metadata).with_rows(
2313 "Order",
2314 vec![
2315 Record::from([
2316 (String::from("id"), Value::U64(1)),
2317 (String::from("version"), Value::I64(1)),
2318 (String::from("name"), Value::Text(String::from("alpha"))),
2319 ]),
2320 Record::from([
2321 (String::from("id"), Value::U64(2)),
2322 (String::from("version"), Value::I64(2)),
2323 (String::from("name"), Value::Text(String::from("alpha"))),
2324 ]),
2325 Record::from([
2326 (String::from("id"), Value::U64(3)),
2327 (String::from("version"), Value::I64(3)),
2328 (String::from("name"), Value::Text(String::from("tmp-beta"))),
2329 ]),
2330 ],
2331 );
2332
2333 let rows = repository
2334 .fetch_all(
2335 &teaql_core::SelectQuery::new("Order")
2336 .filter(
2337 Expr::between("version", 1_i64, 3_i64)
2338 .and_expr(Expr::not_like("name", "tmp%"))
2339 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
2340 )
2341 .group_by("name")
2342 .count("total")
2343 .sum("version", "versionSum"),
2344 )
2345 .unwrap();
2346
2347 assert_eq!(rows.len(), 1);
2348 assert_eq!(
2349 rows[0].get("name"),
2350 Some(&Value::Text(String::from("alpha")))
2351 );
2352 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2353 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2354 }
2355
2356 #[tokio::test]
2357 async fn memory_repository_runs_extended_aggregates_and_having() {
2358 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2359 let repository = MemoryRepository::new(metadata).with_rows(
2360 "Order",
2361 vec![
2362 Record::from([
2363 (String::from("id"), Value::U64(1)),
2364 (String::from("version"), Value::I64(1)),
2365 (String::from("name"), Value::Text(String::from("alpha"))),
2366 ]),
2367 Record::from([
2368 (String::from("id"), Value::U64(2)),
2369 (String::from("version"), Value::I64(3)),
2370 (String::from("name"), Value::Text(String::from("alpha"))),
2371 ]),
2372 Record::from([
2373 (String::from("id"), Value::U64(3)),
2374 (String::from("version"), Value::I64(7)),
2375 (String::from("name"), Value::Text(String::from("beta"))),
2376 ]),
2377 ],
2378 );
2379
2380 let rows = repository
2381 .fetch_all(
2382 &teaql_core::SelectQuery::new("Order")
2383 .group_by("name")
2384 .count("total")
2385 .stddev("version", "stddevVersion")
2386 .var_pop("version", "varPopVersion")
2387 .bit_or("version", "bitOrVersion")
2388 .having(Expr::gt("total", 1_i64)),
2389 )
2390 .unwrap();
2391
2392 assert_eq!(rows.len(), 1);
2393 assert_eq!(
2394 rows[0].get("name"),
2395 Some(&Value::Text(String::from("alpha")))
2396 );
2397 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2398 assert_eq!(
2399 rows[0].get("stddevVersion").map(Value::to_json_value),
2400 Some(serde_json::Value::String(
2401 "1.4142135623730951454746218583".to_owned()
2402 ))
2403 );
2404 assert_eq!(
2405 rows[0].get("varPopVersion"),
2406 Some(&Value::Decimal(Decimal::ONE))
2407 );
2408 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
2409 }
2410
2411 #[tokio::test]
2412 async fn memory_repository_runs_sound_like_filter() {
2413 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2414 let repository = MemoryRepository::new(metadata).with_rows(
2415 "Order",
2416 vec![
2417 Record::from([
2418 (String::from("id"), Value::U64(1)),
2419 (String::from("version"), Value::I64(1)),
2420 (String::from("name"), Value::Text(String::from("Robert"))),
2421 ]),
2422 Record::from([
2423 (String::from("id"), Value::U64(2)),
2424 (String::from("version"), Value::I64(1)),
2425 (String::from("name"), Value::Text(String::from("Rupert"))),
2426 ]),
2427 Record::from([
2428 (String::from("id"), Value::U64(3)),
2429 (String::from("version"), Value::I64(1)),
2430 (String::from("name"), Value::Text(String::from("Ashcraft"))),
2431 ]),
2432 ],
2433 );
2434
2435 let rows = repository
2436 .fetch_all(
2437 &teaql_core::SelectQuery::new("Order")
2438 .filter(Expr::sound_like("name", "Robert"))
2439 .order_asc("id"),
2440 )
2441 .unwrap();
2442
2443 assert_eq!(rows.len(), 2);
2444 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2445 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2446 }
2447
2448 #[tokio::test]
2449 async fn memory_repository_runs_java_style_string_match_filters() {
2450 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2451 let repository = MemoryRepository::new(metadata).with_rows(
2452 "Order",
2453 vec![
2454 Record::from([
2455 (String::from("id"), Value::U64(1)),
2456 (String::from("version"), Value::I64(1)),
2457 (String::from("name"), Value::Text(String::from("tea-order"))),
2458 ]),
2459 Record::from([
2460 (String::from("id"), Value::U64(2)),
2461 (String::from("version"), Value::I64(1)),
2462 (
2463 String::from("name"),
2464 Value::Text(String::from("coffee-order")),
2465 ),
2466 ]),
2467 Record::from([
2468 (String::from("id"), Value::U64(3)),
2469 (String::from("version"), Value::I64(1)),
2470 (
2471 String::from("name"),
2472 Value::Text(String::from("tea-archived")),
2473 ),
2474 ]),
2475 ],
2476 );
2477
2478 let rows = repository
2479 .fetch_all(
2480 &teaql_core::SelectQuery::new("Order")
2481 .filter(
2482 Expr::contain("name", "tea")
2483 .and_expr(Expr::begin_with("name", "tea"))
2484 .and_expr(Expr::end_with("name", "order"))
2485 .and_expr(Expr::not_contain("name", "coffee"))
2486 .and_expr(Expr::not_begin_with("name", "archived"))
2487 .and_expr(Expr::not_end_with("name", "draft")),
2488 )
2489 .order_asc("id"),
2490 )
2491 .unwrap();
2492
2493 assert_eq!(rows.len(), 1);
2494 assert_eq!(
2495 rows[0].get("name"),
2496 Some(&Value::Text("tea-order".to_owned()))
2497 );
2498 }
2499
2500 #[tokio::test]
2501 async fn memory_repository_runs_property_to_property_filters() {
2502 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2503 let repository = MemoryRepository::new(metadata).with_rows(
2504 "Order",
2505 vec![
2506 Record::from([
2507 (String::from("id"), Value::U64(1)),
2508 (String::from("version"), Value::I64(2)),
2509 (String::from("name"), Value::Text(String::from("keep"))),
2510 ]),
2511 Record::from([
2512 (String::from("id"), Value::U64(2)),
2513 (String::from("version"), Value::I64(1)),
2514 (String::from("name"), Value::Text(String::from("skip"))),
2515 ]),
2516 ],
2517 );
2518
2519 let rows = repository
2520 .fetch_all(
2521 &teaql_core::SelectQuery::new("Order")
2522 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2523 .order_asc("id"),
2524 )
2525 .unwrap();
2526
2527 assert_eq!(rows.len(), 1);
2528 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2529 }
2530
2531 #[tokio::test]
2532 async fn memory_repository_supports_mutations_and_optimistic_locking() {
2533 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2534 let repository = MemoryRepository::new(metadata);
2535
2536 repository
2537 .insert(
2538 &InsertCommand::new("Order")
2539 .value("id", 10_u64)
2540 .value("version", 1_i64)
2541 .value("name", "draft"),
2542 )
2543 .unwrap();
2544 repository
2545 .update(
2546 &UpdateCommand::new("Order", 10_u64)
2547 .expected_version(1)
2548 .value("name", "submitted"),
2549 )
2550 .unwrap();
2551
2552 let row = repository
2553 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2554 .unwrap()
2555 .pop()
2556 .unwrap();
2557 assert_eq!(
2558 row.get("name"),
2559 Some(&Value::Text(String::from("submitted")))
2560 );
2561 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2562
2563 let conflict = repository
2564 .update(
2565 &UpdateCommand::new("Order", 10_u64)
2566 .expected_version(1)
2567 .value("name", "stale"),
2568 )
2569 .unwrap_err();
2570 assert!(matches!(
2571 conflict,
2572 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2573 ));
2574
2575 repository
2576 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2577 .unwrap();
2578 let row = repository
2579 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2580 .unwrap()
2581 .pop()
2582 .unwrap();
2583 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2584
2585 repository
2586 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2587 .unwrap();
2588 let row = repository
2589 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2590 .unwrap()
2591 .pop()
2592 .unwrap();
2593 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2594 }
2595
2596 #[tokio::test]
2597 async fn user_context_reports_missing_schema_provider() {
2598 let err = UserContext::new().ensure_schema().await.unwrap_err();
2599 assert!(
2600 matches!(err, RuntimeError::Schema(message) if message == "missing schema provider")
2601 );
2602 }
2603
2604 #[tokio::test]
2605 async fn user_context_stores_and_exposes_user_identifier() {
2606 let mut ctx = UserContext::new();
2607 let pid = std::process::id();
2608 let thread_id_str = format!("{:?}", std::thread::current().id());
2609 let numeric_thread_id = thread_id_str
2610 .strip_prefix("ThreadId(")
2611 .and_then(|s| s.strip_suffix(")"))
2612 .unwrap_or(&thread_id_str);
2613 let os_user = std::env::var("USER")
2614 .or_else(|_| std::env::var("USERNAME"))
2615 .unwrap_or_else(|_| "main".to_owned());
2616 let expected_default = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
2617 assert_eq!(ctx.user_identifier(), Some(expected_default.as_str()));
2618
2619 ctx.set_user_identifier("user-123");
2620 assert_eq!(ctx.user_identifier(), Some("user-123"));
2621
2622 let ctx2 = UserContext::new().with_user_identifier("user-456");
2623 assert_eq!(ctx2.user_identifier(), Some("user-456"));
2624
2625 let mut ctx3 = UserContext::new();
2626 ctx3.set_user_identifier_option(Some("user-789".to_owned()));
2627 assert_eq!(ctx3.user_identifier(), Some("user-789"));
2628 ctx3.set_user_identifier_option(None);
2629 assert_eq!(ctx3.user_identifier(), None);
2630
2631 let ctx4 = UserContext::new().with_user_identifier_option(Some("user-abc".to_owned()));
2632 assert_eq!(ctx4.user_identifier(), Some("user-abc"));
2633
2634 }
2635}
2636
2637pub use checker::{
2638 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
2639 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, TypedChecker,
2640 TypedEntityChecker, clear_record_status, mark_record_status,
2641};