1mod checker;
2mod context;
3mod entity_runtime;
4mod error;
5mod event;
6mod graph;
7mod id;
8mod language;
9mod memory;
10mod registry;
11mod repository;
12
13pub use context::{SchemaProvider, SqlLogEntry, SqlLogOperation, SqlLogOptions, UserContext};
14pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
15pub use error::{ContextError, RepositoryError, RuntimeError};
16pub use event::{
17 EntityEvent, EntityEventKind, EntityEventSink, EntityPropertyChange, InMemoryEntityEventSink,
18};
19pub use graph::{
20 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem, GraphNode,
21 GraphOperation, sorted_update_fields,
22};
23pub(crate) use id::local_id_generator;
24pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
25pub use language::{
26 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
27};
28pub use memory::{MemoryRepository, MemoryRepositoryError};
29pub use registry::{
30 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
31 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
32 RequestPolicy, RuntimeModule,
33};
34pub use repository::{
35 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
36 QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
37};
38
39#[cfg(test)]
40mod tests {
41 use std::collections::{BTreeMap, VecDeque};
42 use std::sync::{Arc, Mutex};
43
44 use super::{
45 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult,
46 CheckResults, CheckRule, Checker, EntityEvent, EntityEventKind, EntityEventSink,
47 GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryAggregationCache,
48 InMemoryCheckerRegistry, InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry,
49 InMemoryRepositoryRegistry, InternalIdGenerator, Language, MemoryRepository, MetadataStore,
50 ObjectLocation, QueryExecutor, Repository, RepositoryBehavior, RepositoryError,
51 RequestPolicy, RuntimeError, RuntimeModule, SqlLogOperation, SqlLogOptions, TypedChecker,
52 TypedEntityChecker, UserContext, translate_check_result,
53 };
54 use teaql_core::{
55 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
56 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
57 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
58 };
59 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
60 use teaql_sql::{CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect};
61
62 const ORDER_DEFAULT_PROJECTION: &str = "\"id\", \"version\", \"name\"";
63
64 #[derive(Debug, Default, Clone, Copy)]
65 struct PostgresDialect;
66
67 impl SqlDialect for PostgresDialect {
68 fn kind(&self) -> DatabaseKind {
69 DatabaseKind::PostgreSql
70 }
71
72 fn quote_ident(&self, ident: &str) -> String {
73 format!("\"{}\"", ident.replace('"', "\"\""))
74 }
75
76 fn placeholder(&self, index: usize) -> String {
77 format!("${index}")
78 }
79
80 fn schema_type_sql(
81 &self,
82 data_type: DataType,
83 _property: &PropertyDescriptor,
84 ) -> Result<&'static str, SqlCompileError> {
85 match data_type {
86 DataType::Bool => Ok("BOOLEAN"),
87 DataType::I64 | DataType::U64 => Ok("BIGINT"),
88 DataType::F64 => Ok("DOUBLE PRECISION"),
89 DataType::Decimal => Ok("NUMERIC"),
90 DataType::Text => Ok("TEXT"),
91 DataType::Json => Ok("JSONB"),
92 DataType::Date => Ok("DATE"),
93 DataType::Timestamp => Ok("TIMESTAMPTZ"),
94 }
95 }
96 }
97
98 fn entity() -> EntityDescriptor {
99 EntityDescriptor::new("Order")
100 .table_name("orders")
101 .property(
102 PropertyDescriptor::new("id", DataType::U64)
103 .column_name("id")
104 .id()
105 .not_null(),
106 )
107 .property(
108 PropertyDescriptor::new("version", DataType::I64)
109 .column_name("version")
110 .version()
111 .not_null(),
112 )
113 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
114 .relation(
115 teaql_core::RelationDescriptor::new("lines", "OrderLine")
116 .local_key("id")
117 .foreign_key("order_id")
118 .many(),
119 )
120 }
121
122 fn line_entity() -> EntityDescriptor {
123 EntityDescriptor::new("OrderLine")
124 .table_name("orderline")
125 .property(
126 PropertyDescriptor::new("id", DataType::U64)
127 .column_name("id")
128 .id()
129 .not_null(),
130 )
131 .property(
132 PropertyDescriptor::new("version", DataType::I64)
133 .column_name("version")
134 .version(),
135 )
136 .property(
137 PropertyDescriptor::new("order_id", DataType::U64)
138 .column_name("order_id")
139 .not_null(),
140 )
141 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
142 .property(
143 PropertyDescriptor::new("product_id", DataType::U64)
144 .column_name("product_id")
145 .not_null(),
146 )
147 .relation(
148 teaql_core::RelationDescriptor::new("product", "Product")
149 .local_key("product_id")
150 .foreign_key("id"),
151 )
152 }
153
154 fn product_entity() -> EntityDescriptor {
155 EntityDescriptor::new("Product")
156 .table_name("product")
157 .property(
158 PropertyDescriptor::new("id", DataType::U64)
159 .column_name("id")
160 .id()
161 .not_null(),
162 )
163 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
164 }
165
166 #[derive(Debug, Default)]
167 struct StubExecutor {
168 affected: u64,
169 rows: Vec<Record>,
170 }
171
172 #[derive(Debug, Default)]
173 struct QueueExecutor {
174 affected: u64,
175 rows: Mutex<VecDeque<Vec<Record>>>,
176 queries: Mutex<Vec<String>>,
177 }
178
179 struct OrderBehavior;
180
181 #[allow(dead_code)]
182 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
183 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
184 struct CatalogProductRow {
185 #[teaql(id)]
186 id: u64,
187 name: String,
188 }
189
190 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
191 #[teaql(entity = "OrderAggregate", table = "orders")]
192 struct OrderAggregateDynamic {
193 #[teaql(id)]
194 id: u64,
195 #[teaql(dynamic)]
196 dynamic: BTreeMap<String, Value>,
197 }
198
199 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
200 #[teaql(entity = "Product", table = "product")]
201 struct ProductEntityRow {
202 #[teaql(id)]
203 id: u64,
204 name: String,
205 }
206
207 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
208 #[teaql(entity = "OrderLine", table = "orderline")]
209 struct OrderLineEntityRow {
210 #[teaql(id)]
211 id: u64,
212 #[teaql(column = "order_id")]
213 order_id: u64,
214 name: String,
215 #[teaql(column = "product_id")]
216 product_id: u64,
217 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
218 product: Option<ProductEntityRow>,
219 }
220
221 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
222 #[teaql(entity = "Order", table = "orders")]
223 struct OrderAggregateRow {
224 #[teaql(id)]
225 id: u64,
226 #[teaql(version)]
227 version: i64,
228 name: String,
229 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
230 lines: teaql_core::SmartList<OrderLineEntityRow>,
231 }
232
233 #[derive(Debug, Clone, PartialEq, DeriveTeaqlEntity)]
234 #[teaql(entity = "Order", table = "orders")]
235 struct Order {
236 #[teaql(id)]
237 id: u64,
238 #[teaql(version)]
239 version: i64,
240 name: String,
241 }
242
243 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
244 #[teaql(entity = "Product", table = "product")]
245 struct TypedGraphProduct {
246 #[teaql(id)]
247 id: u64,
248 name: String,
249 }
250
251 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
252 #[teaql(entity = "OrderLine", table = "orderline")]
253 struct TypedGraphLine {
254 #[teaql(id)]
255 id: u64,
256 #[teaql(column = "order_id")]
257 order_id: Option<u64>,
258 name: String,
259 #[teaql(column = "product_id")]
260 product_id: Option<u64>,
261 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
262 product: Option<TypedGraphProduct>,
263 }
264
265 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
266 #[teaql(entity = "Order", table = "orders")]
267 struct TypedGraphOrder {
268 #[teaql(id)]
269 id: u64,
270 #[teaql(version)]
271 version: i64,
272 name: String,
273 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
274 lines: teaql_core::SmartList<TypedGraphLine>,
275 }
276
277 #[derive(Debug, PartialEq, Eq)]
278 struct OrderEntity {
279 id: u64,
280 version: i64,
281 name: String,
282 }
283
284 impl teaql_core::TeaqlEntity for OrderEntity {
285 fn entity_descriptor() -> EntityDescriptor {
286 entity()
287 }
288 }
289
290 impl Entity for OrderEntity {
291 fn from_record(record: Record) -> Result<Self, EntityError> {
292 let id = match record.get("id") {
293 Some(Value::U64(v)) => *v,
294 Some(Value::I64(v)) if *v >= 0 => *v as u64,
295 other => {
296 return Err(EntityError::new(
297 "Order",
298 format!("invalid id field: {other:?}"),
299 ));
300 }
301 };
302 let version = match record.get("version") {
303 Some(Value::I64(v)) => *v,
304 other => {
305 return Err(EntityError::new(
306 "Order",
307 format!("invalid version field: {other:?}"),
308 ));
309 }
310 };
311 let name = match record.get("name") {
312 Some(Value::Text(v)) => v.clone(),
313 other => {
314 return Err(EntityError::new(
315 "Order",
316 format!("invalid name field: {other:?}"),
317 ));
318 }
319 };
320 Ok(Self { id, version, name })
321 }
322
323 fn into_record(self) -> Record {
324 Record::from([
325 (String::from("id"), Value::U64(self.id)),
326 (String::from("version"), Value::I64(self.version)),
327 (String::from("name"), Value::Text(self.name)),
328 ])
329 }
330 }
331
332 #[derive(Debug)]
333 struct StubError;
334
335 impl std::fmt::Display for StubError {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 write!(f, "stub error")
338 }
339 }
340
341 impl std::error::Error for StubError {}
342
343 impl QueryExecutor for StubExecutor {
344 type Error = StubError;
345
346 fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
347 Ok(self.rows.clone())
348 }
349
350 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
351 Ok(self.affected)
352 }
353
354 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
355 Ok(GraphTransactionBoundary::Started)
356 }
357 }
358
359 impl QueryExecutor for QueueExecutor {
360 type Error = StubError;
361
362 fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
363 self.queries.lock().unwrap().push(query.sql.clone());
364 Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
365 }
366
367 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
368 Ok(self.affected)
369 }
370 }
371
372 #[test]
373 fn user_context_records_configured_sql_logs() {
374 let mut ctx = UserContext::new()
375 .with_module(crate::module!(Order))
376 .with_sql_log_options(SqlLogOptions::select_only());
377 ctx.insert_resource(PostgresDialect);
378 ctx.insert_resource(StubExecutor {
379 affected: 1,
380 rows: Vec::new(),
381 });
382
383 {
384 let repo = ctx
385 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
386 .unwrap();
387 repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
388 .unwrap();
389 repo.insert(&InsertCommand::new("Order").value("name", "created"))
390 .unwrap();
391 }
392
393 let logs = ctx.sql_logs();
394 assert_eq!(logs.len(), 1);
395 assert_eq!(logs[0].operation, SqlLogOperation::Select);
396 assert_eq!(logs[0].result_count, Some(0));
397 assert_eq!(logs[0].result_type.as_deref(), Some("Order"));
398 assert_eq!(logs[0].result_summary, "0 x Order");
399 assert!(logs[0].ended_at >= logs[0].started_at);
400 assert!(logs[0].pretty_sql.contains("\nFROM \"orders\""));
401 assert!(
402 logs[0]
403 .pretty_sql
404 .contains("\nWHERE (\"name\" = 'Bob''s Shop')")
405 );
406 assert_eq!(
407 logs[0].debug_sql,
408 format!(
409 "SELECT {ORDER_DEFAULT_PROJECTION} FROM \"orders\" WHERE (\"name\" = 'Bob''s Shop')"
410 )
411 );
412
413 ctx.set_sql_log_options(SqlLogOptions::mutation_only());
414 ctx.clear_sql_logs();
415 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
416 .unwrap()
417 .update(
418 &UpdateCommand::new("Order", 1_u64)
419 .value("name", "updated")
420 .expected_version(1),
421 )
422 .unwrap();
423
424 let logs = ctx.sql_logs();
425 assert_eq!(logs.len(), 1);
426 assert_eq!(logs[0].operation, SqlLogOperation::Update);
427 assert_eq!(logs[0].affected_rows, Some(1));
428 assert_eq!(logs[0].result_summary, "1 rows affected");
429 assert!(logs[0].debug_sql.contains("UPDATE \"orders\" SET"));
430 assert!(logs[0].debug_sql.contains("'updated'"));
431 }
432
433 #[test]
434 fn user_context_records_all_sql_logs_by_default() {
435 let mut ctx = UserContext::new().with_module(crate::module!(Order));
436 ctx.insert_resource(PostgresDialect);
437 ctx.insert_resource(StubExecutor {
438 affected: 1,
439 rows: Vec::new(),
440 });
441
442 {
443 let repo = ctx
444 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
445 .unwrap();
446 repo.fetch_all(&SelectQuery::new("Order")).unwrap();
447 repo.insert(&InsertCommand::new("Order").value("name", "created"))
448 .unwrap();
449 }
450
451 let logs = ctx.sql_logs();
452 assert_eq!(logs.len(), 2);
453 assert_eq!(logs[0].operation, SqlLogOperation::Select);
454 assert_eq!(logs[1].operation, SqlLogOperation::Insert);
455
456 ctx.disable_sql_log();
457 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
458 .unwrap()
459 .fetch_all(&SelectQuery::new("Order"))
460 .unwrap();
461 assert!(ctx.sql_logs().is_empty());
462 }
463
464 impl RepositoryBehavior for OrderBehavior {
465 fn before_select(
466 &self,
467 _ctx: &UserContext,
468 query: &mut teaql_core::SelectQuery,
469 ) -> Result<(), RuntimeError> {
470 query.filter = Some(Expr::eq("version", 1_i64));
471 Ok(())
472 }
473
474 fn before_insert(
475 &self,
476 _ctx: &UserContext,
477 command: &mut InsertCommand,
478 ) -> Result<(), RuntimeError> {
479 command
480 .values
481 .entry("version".to_owned())
482 .or_insert(Value::I64(1));
483 Ok(())
484 }
485
486 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
487 vec!["lines".to_owned()]
488 }
489 }
490
491 struct ContextAwareOrderBehavior;
492 struct TenantRequestPolicy;
493 struct OrderChecker;
494 struct TypedOrderChecker;
495 #[derive(Clone)]
496 struct RecordingEventSink {
497 events: Arc<Mutex<Vec<EntityEvent>>>,
498 }
499
500 impl RepositoryBehavior for ContextAwareOrderBehavior {
501 fn before_insert(
502 &self,
503 ctx: &UserContext,
504 command: &mut InsertCommand,
505 ) -> Result<(), RuntimeError> {
506 let tenant = ctx
507 .get_named_resource::<String>("tenant")
508 .cloned()
509 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
510 let version = *ctx
511 .get_named_resource::<i64>("initial_version")
512 .ok_or_else(|| {
513 RuntimeError::Behavior("missing initial_version resource".to_owned())
514 })?;
515 let trace_id = match ctx.local("trace_id") {
516 Some(Value::Text(value)) => value.clone(),
517 other => {
518 return Err(RuntimeError::Behavior(format!(
519 "missing trace_id local, got {other:?}"
520 )));
521 }
522 };
523
524 command
525 .values
526 .entry("name".to_owned())
527 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
528 command
529 .values
530 .entry("version".to_owned())
531 .or_insert(Value::I64(version));
532 Ok(())
533 }
534 }
535
536 impl RequestPolicy for TenantRequestPolicy {
537 fn enforce_select(
538 &self,
539 ctx: &UserContext,
540 query: &mut SelectQuery,
541 ) -> Result<(), RuntimeError> {
542 if query.entity == "Order" {
543 let tenant_id = ctx
544 .get_named_resource::<u64>("tenant_id")
545 .copied()
546 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
547 query.filter = Some(match query.filter.take() {
548 Some(filter) => filter.and_expr(Expr::eq("id", tenant_id)),
549 None => Expr::eq("id", tenant_id),
550 });
551 }
552 Ok(())
553 }
554
555 fn enforce_insert(
556 &self,
557 ctx: &UserContext,
558 command: &mut InsertCommand,
559 ) -> Result<(), RuntimeError> {
560 if command.entity == "Order" {
561 let tenant_id = ctx
562 .get_named_resource::<u64>("tenant_id")
563 .copied()
564 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
565 command
566 .values
567 .insert("version".to_owned(), Value::I64(tenant_id as i64));
568 }
569 Ok(())
570 }
571 }
572
573 impl Checker for OrderChecker {
574 fn entity(&self) -> &str {
575 "Order"
576 }
577
578 fn check_and_fix(
579 &self,
580 _ctx: &UserContext,
581 record: &mut Record,
582 location: &ObjectLocation,
583 results: &mut CheckResults,
584 ) {
585 let status = CheckObjectStatus::from_record(record);
586 if status.is_create() {
587 self.required(record, "name", location, results);
588 record.entry("version".to_owned()).or_insert(Value::I64(1));
589 }
590 if status.is_update()
591 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
592 {
593 record.insert(
594 "name".to_owned(),
595 Value::Text("graph-update-checked".to_owned()),
596 );
597 }
598 self.min_string_length(record, "name", 3, location, results);
599 }
600 }
601
602 impl TypedChecker<Order> for TypedOrderChecker {
603 fn check_and_fix_typed(
604 &self,
605 _ctx: &UserContext,
606 entity: &mut Order,
607 status: CheckObjectStatus,
608 location: &ObjectLocation,
609 results: &mut CheckResults,
610 ) {
611 if status.is_create() {
612 if entity.name.is_empty() {
613 results.push(CheckResult::required(location.clone().member("name")));
614 }
615 }
616 if entity.name.chars().count() < 3 {
617 results.push(CheckResult::min_str(
618 location.clone().member("name"),
619 3,
620 entity.name.clone(),
621 ));
622 }
623 if entity.name == "fix" {
624 entity.name = "fixed".to_owned();
625 }
626 }
627 }
628
629 impl EntityEventSink for RecordingEventSink {
630 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
631 self.events.lock().unwrap().push(event.clone());
632 Ok(())
633 }
634 }
635
636 struct FixedIdGenerator(u64);
637
638 impl InternalIdGenerator for FixedIdGenerator {
639 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
640 Ok(self.0)
641 }
642 }
643
644 struct SequentialIdGenerator {
645 next: Mutex<u64>,
646 }
647
648 impl SequentialIdGenerator {
649 fn new(next: u64) -> Self {
650 Self {
651 next: Mutex::new(next),
652 }
653 }
654 }
655
656 impl InternalIdGenerator for SequentialIdGenerator {
657 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
658 let mut next = self
659 .next
660 .lock()
661 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
662 let id = *next;
663 *next += 1;
664 Ok(id)
665 }
666 }
667
668 #[test]
669 fn metadata_store_registers_entities() {
670 let store = InMemoryMetadataStore::new().with_entity(entity());
671 assert!(store.entity("Order").is_some());
672 }
673
674 #[test]
675 fn runtime_module_registers_descriptor_into_context() {
676 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
677 assert!(ctx.entity("Order").is_some());
678 assert!(ctx.has_repository("Order"));
679 }
680
681 #[test]
682 fn runtime_module_registers_derived_entity_and_behavior() {
683 let ctx = UserContext::new().with_module(
684 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
685 );
686 assert!(ctx.entity("CatalogProduct").is_some());
687 assert!(ctx.has_repository("CatalogProduct"));
688 assert!(ctx.repository_behavior("CatalogProduct").is_some());
689 }
690
691 #[test]
692 fn module_macro_registers_multiple_entities() {
693 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
694 assert!(ctx.entity("CatalogProduct").is_some());
695 assert!(ctx.has_repository("CatalogProduct"));
696 }
697
698 #[test]
699 fn module_macro_registers_entity_behavior_pairs() {
700 let ctx =
701 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
702 assert!(ctx.entity("CatalogProduct").is_some());
703 assert!(ctx.repository_behavior("CatalogProduct").is_some());
704 }
705
706 #[test]
707 fn repository_returns_optimistic_lock_conflict() {
708 let store = InMemoryMetadataStore::new().with_entity(entity());
709 let executor = StubExecutor {
710 affected: 0,
711 rows: Vec::new(),
712 };
713 let repo = Repository::new(&PostgresDialect, &store, &executor);
714
715 let err = repo
716 .update(
717 &UpdateCommand::new("Order", 1_u64)
718 .expected_version(3)
719 .value("name", "next"),
720 )
721 .unwrap_err();
722
723 match err {
724 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
725 other => panic!("unexpected error: {other}"),
726 }
727 }
728
729 #[test]
730 fn user_context_indexes_resources_and_locals() {
731 let mut ctx =
732 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
733 ctx.insert_resource::<u64>(42);
734 ctx.insert_named_resource("tenant", String::from("acme"));
735 ctx.put_local("trace_id", "req-1");
736
737 assert!(ctx.entity("Order").is_some());
738 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
739 assert_eq!(
740 ctx.get_named_resource::<String>("tenant"),
741 Some(&String::from("acme"))
742 );
743 assert_eq!(
744 ctx.local("trace_id"),
745 Some(&Value::Text("req-1".to_owned()))
746 );
747 }
748
749 #[test]
750 fn user_context_builds_context_repository() {
751 let mut ctx =
752 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
753 ctx.insert_resource(PostgresDialect);
754 ctx.insert_resource(StubExecutor {
755 affected: 1,
756 rows: Vec::new(),
757 });
758
759 let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
760 let affected = repo
761 .update(
762 &UpdateCommand::new("Order", 1_u64)
763 .expected_version(3)
764 .value("name", "next"),
765 )
766 .unwrap();
767
768 assert_eq!(affected, 1);
769 }
770
771 #[test]
772 fn user_context_resolves_repository_by_entity_type() {
773 let mut ctx = UserContext::new()
774 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
775 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
776 ctx.insert_resource(PostgresDialect);
777 ctx.insert_resource(StubExecutor {
778 affected: 1,
779 rows: Vec::new(),
780 });
781
782 let repo = ctx
783 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
784 .unwrap();
785 assert_eq!(repo.entity(), "Order");
786 assert_eq!(repo.select().entity, "Order");
787
788 let affected = repo
789 .insert(
790 &repo
791 .insert_command()
792 .value("id", 1_u64)
793 .value("version", 1_i64)
794 .value("name", "n"),
795 )
796 .unwrap();
797 assert_eq!(affected, 1);
798 }
799
800 #[test]
801 fn resolved_repository_applies_behavior_hooks() {
802 let mut ctx = UserContext::new()
803 .with_metadata(
804 InMemoryMetadataStore::new()
805 .with_entity(entity())
806 .with_entity(line_entity())
807 .with_entity(product_entity()),
808 )
809 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
810 .with_repository_behavior_registry(
811 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
812 );
813 ctx.insert_resource(PostgresDialect);
814 ctx.insert_resource(StubExecutor {
815 affected: 1,
816 rows: Vec::new(),
817 });
818
819 let repo = ctx
820 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
821 .unwrap();
822
823 let compiled = repo.compile(&repo.select()).unwrap();
824 assert!(compiled.sql.contains("WHERE (\"version\" = $1)"));
825
826 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
827 let affected = repo.insert(&insert).unwrap();
828 assert_eq!(affected, 1);
829 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
830 }
831
832 #[test]
833 fn resolved_repository_applies_request_policy_after_behavior_hooks() {
834 let mut ctx = UserContext::new()
835 .with_metadata(
836 InMemoryMetadataStore::new()
837 .with_entity(entity())
838 .with_entity(line_entity())
839 .with_entity(product_entity()),
840 )
841 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
842 .with_repository_behavior_registry(
843 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
844 )
845 .with_request_policy(TenantRequestPolicy);
846 ctx.insert_named_resource("tenant_id", 9_u64);
847 ctx.insert_resource(PostgresDialect);
848 ctx.insert_resource(StubExecutor {
849 affected: 1,
850 rows: Vec::new(),
851 });
852
853 let repo = ctx
854 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
855 .unwrap();
856
857 let compiled = repo.compile(&repo.select()).unwrap();
858 assert!(compiled.sql.contains("\"version\" = $1"));
859 assert!(compiled.sql.contains("\"id\" = $2"));
860
861 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
862 let command = repo.prepare_insert_command(&insert).unwrap();
863 assert_eq!(command.values.get("version"), Some(&Value::I64(9)));
864 }
865
866 #[test]
867 fn resolved_repository_prepares_insert_command_with_generated_id() {
868 let mut ctx = UserContext::new()
869 .with_metadata(
870 InMemoryMetadataStore::new()
871 .with_entity(entity())
872 .with_entity(line_entity())
873 .with_entity(product_entity()),
874 )
875 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
876 .with_repository_behavior_registry(
877 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
878 )
879 .with_internal_id_generator(FixedIdGenerator(42));
880 ctx.insert_resource(PostgresDialect);
881 ctx.insert_resource(StubExecutor {
882 affected: 1,
883 rows: Vec::new(),
884 });
885
886 let repo = ctx
887 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
888 .unwrap();
889
890 let prepared = repo
891 .prepare_insert_command(&repo.insert_command().value("id", 0_u64).value("name", "n"))
892 .unwrap();
893
894 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
895 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
896 assert_eq!(
897 prepared.values.get("name"),
898 Some(&Value::Text("n".to_owned()))
899 );
900 }
901
902 #[test]
903 fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
904 let mut ctx = UserContext::new()
905 .with_metadata(
906 InMemoryMetadataStore::new()
907 .with_entity(entity())
908 .with_entity(line_entity())
909 .with_entity(product_entity()),
910 )
911 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
912 .with_repository_behavior_registry(
913 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
914 )
915 .with_internal_id_generator(SequentialIdGenerator::new(500));
916 ctx.insert_resource(PostgresDialect);
917 ctx.insert_resource(StubExecutor {
918 affected: 1,
919 rows: Vec::new(),
920 });
921
922 let repo = ctx
923 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
924 .unwrap();
925 let graph = GraphNode::new("Order").value("name", "root").relation(
926 "lines",
927 GraphNode::new("OrderLine")
928 .value("name", "line-1")
929 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
930 );
931
932 let saved = repo.save_graph(graph).unwrap();
933
934 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
935 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
936 let lines = saved.relations.get("lines").unwrap();
937 assert_eq!(lines.len(), 1);
938 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(501)));
939 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
940 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(502)));
941 let product = lines[0].relations.get("product").unwrap();
942 assert_eq!(product[0].values.get("id"), Some(&Value::U64(502)));
943 }
944
945 #[test]
946 fn resolved_repository_extracts_and_saves_typed_entity_graph() {
947 let mut ctx = UserContext::new()
948 .with_metadata(
949 InMemoryMetadataStore::new()
950 .with_entity(entity())
951 .with_entity(line_entity())
952 .with_entity(product_entity()),
953 )
954 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
955 .with_internal_id_generator(SequentialIdGenerator::new(700));
956 ctx.insert_resource(PostgresDialect);
957 ctx.insert_resource(StubExecutor {
958 affected: 1,
959 rows: Vec::new(),
960 });
961
962 let repo = ctx
963 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
964 .unwrap();
965 let order = TypedGraphOrder {
966 id: 0,
967 version: 1,
968 name: "typed-root".to_owned(),
969 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
970 id: 0,
971 order_id: None,
972 name: "typed-line".to_owned(),
973 product_id: None,
974 product: Some(TypedGraphProduct {
975 id: 0,
976 name: "typed-product".to_owned(),
977 }),
978 }]),
979 };
980
981 let extracted = repo.graph_node_from_entity(order).unwrap();
982 assert_eq!(extracted.entity, "Order");
983 assert_eq!(
984 extracted.values.get("name"),
985 Some(&Value::Text("typed-root".to_owned()))
986 );
987 assert_eq!(extracted.values.get("id"), Some(&Value::U64(0)));
988 assert_eq!(extracted.relations["lines"].len(), 1);
989 assert_eq!(
990 extracted.relations["lines"][0].values.get("name"),
991 Some(&Value::Text("typed-line".to_owned()))
992 );
993 assert_eq!(
994 extracted.relations["lines"][0].relations["product"].len(),
995 1
996 );
997
998 let saved = repo.save_graph(extracted).unwrap();
999 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
1000 let lines = saved.relations.get("lines").unwrap();
1001 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(701)));
1002 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
1003 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(702)));
1004 assert_eq!(
1005 lines[0].relations["product"][0].values.get("id"),
1006 Some(&Value::U64(702))
1007 );
1008 }
1009
1010 #[test]
1011 fn resolved_repository_saves_typed_entity_graph_directly() {
1012 let mut ctx = UserContext::new()
1013 .with_metadata(
1014 InMemoryMetadataStore::new()
1015 .with_entity(entity())
1016 .with_entity(line_entity())
1017 .with_entity(product_entity()),
1018 )
1019 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1020 .with_internal_id_generator(SequentialIdGenerator::new(800));
1021 ctx.insert_resource(PostgresDialect);
1022 ctx.insert_resource(StubExecutor {
1023 affected: 1,
1024 rows: Vec::new(),
1025 });
1026
1027 let repo = ctx
1028 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1029 .unwrap();
1030 let saved = repo
1031 .save_entity_graph(TypedGraphOrder {
1032 id: 0,
1033 version: 1,
1034 name: "typed-direct".to_owned(),
1035 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1036 id: 0,
1037 order_id: None,
1038 name: "typed-line".to_owned(),
1039 product_id: None,
1040 product: Some(TypedGraphProduct {
1041 id: 0,
1042 name: "typed-product".to_owned(),
1043 }),
1044 }]),
1045 })
1046 .unwrap();
1047
1048 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
1049 assert_eq!(
1050 saved.relations["lines"][0].values.get("order_id"),
1051 Some(&Value::U64(800))
1052 );
1053 assert_eq!(
1054 saved.relations["lines"][0].values.get("product_id"),
1055 Some(&Value::U64(802))
1056 );
1057 }
1058
1059 #[test]
1060 fn custom_user_context_can_drive_insert_preparation() {
1061 let mut ctx = UserContext::new()
1062 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1063 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1064 .with_repository_behavior_registry(
1065 InMemoryRepositoryBehaviorRegistry::new()
1066 .with_behavior("Order", ContextAwareOrderBehavior),
1067 )
1068 .with_internal_id_generator(FixedIdGenerator(99));
1069 ctx.insert_named_resource("tenant", String::from("acme"));
1070 ctx.insert_named_resource("initial_version", 7_i64);
1071 ctx.put_local("trace_id", "req-9");
1072 ctx.insert_resource(PostgresDialect);
1073 ctx.insert_resource(StubExecutor {
1074 affected: 1,
1075 rows: Vec::new(),
1076 });
1077
1078 let repo = ctx
1079 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1080 .unwrap();
1081 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
1082
1083 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
1084 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
1085 assert_eq!(
1086 prepared.values.get("name"),
1087 Some(&Value::Text("acme:req-9".to_owned()))
1088 );
1089 }
1090
1091 #[test]
1092 fn checker_registry_validates_and_fixes_insert_commands() {
1093 let mut ctx = UserContext::new()
1094 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1095 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1096 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1097 .with_internal_id_generator(FixedIdGenerator(77));
1098 ctx.insert_resource(PostgresDialect);
1099 ctx.insert_resource(StubExecutor {
1100 affected: 1,
1101 rows: Vec::new(),
1102 });
1103
1104 let repo = ctx
1105 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1106 .unwrap();
1107 let prepared = repo
1108 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
1109 .unwrap();
1110
1111 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
1112 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
1113 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1114
1115 let error = repo
1116 .prepare_insert_command(&repo.insert_command().value("name", "no"))
1117 .unwrap_err();
1118 match error {
1119 RuntimeError::Check(results) => {
1120 assert_eq!(results.len(), 1);
1121 assert_eq!(results[0].location.to_string(), "name");
1122 }
1123 other => panic!("unexpected checker error: {other:?}"),
1124 }
1125 }
1126
1127 #[test]
1128 fn typed_checker_validates_and_fixes_derived_entities_without_record_access() {
1129 let mut ctx = UserContext::new()
1130 .with_metadata(InMemoryMetadataStore::new().with_entity(Order::entity_descriptor()))
1131 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1132 .with_checker_registry(
1133 InMemoryCheckerRegistry::new()
1134 .with_checker(TypedEntityChecker::<Order, _>::new(TypedOrderChecker)),
1135 )
1136 .with_internal_id_generator(FixedIdGenerator(79));
1137 ctx.insert_resource(PostgresDialect);
1138 ctx.insert_resource(StubExecutor {
1139 affected: 1,
1140 rows: Vec::new(),
1141 });
1142
1143 let repo = ctx
1144 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1145 .unwrap();
1146 let prepared = repo
1147 .prepare_insert_command(
1148 &repo
1149 .insert_command()
1150 .value("name", "fix")
1151 .value("version", 1_i64),
1152 )
1153 .unwrap();
1154 assert_eq!(
1155 prepared.values.get("name"),
1156 Some(&Value::Text("fixed".to_owned()))
1157 );
1158 assert_eq!(prepared.values.get("id"), Some(&Value::U64(79)));
1159 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1160
1161 let error = repo
1162 .prepare_insert_command(&repo.insert_command().value("version", 1_i64))
1163 .unwrap_err();
1164 match error {
1165 RuntimeError::Check(results) => {
1166 assert!(
1167 results
1168 .iter()
1169 .any(|result| result.rule == CheckRule::Required
1170 && result.location.to_string() == "name")
1171 );
1172 }
1173 other => panic!("unexpected typed checker error: {other:?}"),
1174 }
1175 }
1176
1177 #[test]
1178 fn checker_registry_validates_update_commands_without_required_insert_checks() {
1179 let mut ctx = UserContext::new()
1180 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1181 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1182 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1183 ctx.insert_resource(PostgresDialect);
1184 ctx.insert_resource(StubExecutor {
1185 affected: 1,
1186 rows: Vec::new(),
1187 });
1188
1189 let repo = ctx
1190 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1191 .unwrap();
1192 repo.update(&repo.update_command(1_u64).value("version", 1_i64))
1193 .unwrap();
1194
1195 let error = repo
1196 .update(&repo.update_command(1_u64).value("name", "no"))
1197 .unwrap_err();
1198 match error {
1199 RepositoryError::Runtime(RuntimeError::Check(results)) => {
1200 assert_eq!(results.len(), 1);
1201 assert_eq!(results[0].location.to_string(), "name");
1202 }
1203 other => panic!("unexpected checker error: {other:?}"),
1204 }
1205 }
1206
1207 #[test]
1208 fn checker_registry_reports_nested_create_locations_and_fixes_records() {
1209 let ctx = UserContext::new()
1210 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1211
1212 let mut child = Record::from([
1213 (String::from("id"), Value::U64(10)),
1214 (
1215 String::from(CHECK_OBJECT_STATUS_FIELD),
1216 Value::from(CheckObjectStatus::Create),
1217 ),
1218 ]);
1219 let error = ctx
1220 .check_and_fix_record_at(
1221 "Order",
1222 &mut child,
1223 &ObjectLocation::hash_root("lines").element(0),
1224 )
1225 .unwrap_err();
1226
1227 assert_eq!(child.get("version"), Some(&Value::I64(1)));
1228 match error {
1229 RuntimeError::Check(results) => {
1230 assert_eq!(results.len(), 1);
1231 assert_eq!(results[0].rule, CheckRule::Required);
1232 assert_eq!(results[0].location.to_string(), "lines[0].name");
1233 }
1234 other => panic!("unexpected checker error: {other:?}"),
1235 }
1236
1237 child.insert("name".to_owned(), Value::Text("valid child".to_owned()));
1238 ctx.check_and_fix_record_at(
1239 "Order",
1240 &mut child,
1241 &ObjectLocation::hash_root("lines").element(0),
1242 )
1243 .unwrap();
1244 }
1245
1246 #[test]
1247 fn built_in_language_translators_cover_fifteen_languages() {
1248 assert_eq!(Language::ALL.len(), 15);
1249 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
1250 let messages = Language::ALL
1251 .iter()
1252 .map(|language| translate_check_result(*language, &result))
1253 .collect::<Vec<_>>();
1254
1255 assert!(messages.iter().all(|message| !message.is_empty()));
1256 assert!(messages.iter().any(|message| message.contains("required")));
1257 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
1258 assert!(
1259 messages
1260 .iter()
1261 .any(|message| message.contains("obligatoire"))
1262 );
1263 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
1264 assert_eq!(
1265 Language::from_code("zh-TW"),
1266 Some(Language::TraditionalChinese)
1267 );
1268 }
1269
1270 #[test]
1271 fn user_context_language_switch_translates_checker_errors() {
1272 let mut ctx = UserContext::new()
1273 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1274 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1275 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1276 .with_internal_id_generator(FixedIdGenerator(77))
1277 .with_language(Language::Chinese);
1278 ctx.insert_resource(PostgresDialect);
1279 ctx.insert_resource(StubExecutor {
1280 affected: 1,
1281 rows: Vec::new(),
1282 });
1283
1284 let repo = ctx
1285 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1286 .unwrap();
1287 let error = repo
1288 .prepare_insert_command(&repo.insert_command())
1289 .unwrap_err();
1290 match error {
1291 RuntimeError::Check(results) => {
1292 assert_eq!(results.len(), 1);
1293 assert!(
1294 results[0]
1295 .message
1296 .as_ref()
1297 .is_some_and(|message| message.contains("å¿…å¡«"))
1298 );
1299 }
1300 other => panic!("unexpected checker error: {other:?}"),
1301 }
1302
1303 let mut ctx = UserContext::new().with_language(Language::English);
1304 ctx.set_language_code("es").unwrap();
1305 assert_eq!(ctx.language(), Language::Spanish);
1306 }
1307
1308 #[test]
1309 fn checker_registry_merges_graph_update_fixes_by_object_status() {
1310 let mut ctx = UserContext::new()
1311 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1312 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1313 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1314 ctx.insert_resource(PostgresDialect);
1315 ctx.insert_resource(StubExecutor {
1316 affected: 1,
1317 rows: vec![Record::from([
1318 ("id".to_owned(), Value::U64(1)),
1319 ("version".to_owned(), Value::I64(1)),
1320 ("name".to_owned(), Value::Text("old".to_owned())),
1321 ])],
1322 });
1323
1324 let repo = ctx
1325 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1326 .unwrap();
1327 let saved = repo
1328 .save_graph(
1329 GraphNode::new("Order")
1330 .value("id", 1_u64)
1331 .value("version", 1_i64)
1332 .value("name", "graph-update"),
1333 )
1334 .unwrap();
1335
1336 assert_eq!(
1337 saved.values.get("name"),
1338 Some(&Value::Text("graph-update-checked".to_owned()))
1339 );
1340 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1341 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1342 }
1343
1344 #[test]
1345 fn user_context_event_sink_receives_repository_mutation_events() {
1346 let events = Arc::new(Mutex::new(Vec::new()));
1347 let mut ctx = UserContext::new()
1348 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1349 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1350 .with_internal_id_generator(FixedIdGenerator(88))
1351 .with_event_sink(RecordingEventSink {
1352 events: events.clone(),
1353 });
1354 ctx.insert_resource(PostgresDialect);
1355 ctx.insert_resource(StubExecutor {
1356 affected: 1,
1357 rows: vec![Record::from([
1358 ("id".to_owned(), Value::U64(88)),
1359 ("version".to_owned(), Value::I64(1)),
1360 ("name".to_owned(), Value::Text("old".to_owned())),
1361 ])],
1362 });
1363
1364 let repo = ctx
1365 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1366 .unwrap();
1367 repo.insert(&repo.insert_command().value("name", "created"))
1368 .unwrap();
1369 repo.update(
1370 &repo
1371 .update_command(88_u64)
1372 .expected_version(1)
1373 .value("name", "updated"),
1374 )
1375 .unwrap();
1376 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1377 .unwrap();
1378 repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1379
1380 let events = events.lock().unwrap();
1381 assert_eq!(events.len(), 4);
1382 assert_eq!(events[0].kind, EntityEventKind::Created);
1383 assert_eq!(events[0].entity, "Order");
1384 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1385 assert_eq!(events[1].kind, EntityEventKind::Updated);
1386 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1387 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1388 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1389 assert_eq!(
1390 events[1]
1391 .old_values
1392 .as_ref()
1393 .and_then(|values| values.get("name")),
1394 Some(&Value::Text("old".to_owned()))
1395 );
1396 assert_eq!(
1397 events[1]
1398 .new_values
1399 .as_ref()
1400 .and_then(|values| values.get("name")),
1401 Some(&Value::Text("updated".to_owned()))
1402 );
1403 assert_eq!(events[1].changes.len(), 1);
1404 assert_eq!(events[1].changes[0].field, "name");
1405 assert_eq!(
1406 events[1].changes[0].old_value,
1407 Some(Value::Text("old".to_owned()))
1408 );
1409 assert_eq!(
1410 events[1].changes[0].new_value,
1411 Some(Value::Text("updated".to_owned()))
1412 );
1413 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1414 assert!(events[2].old_values.is_some());
1415 assert!(events[2].new_values.is_none());
1416 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1417 assert_eq!(
1418 events[3]
1419 .old_values
1420 .as_ref()
1421 .and_then(|values| values.get("version")),
1422 Some(&Value::I64(1))
1423 );
1424 assert_eq!(
1425 events[3]
1426 .new_values
1427 .as_ref()
1428 .and_then(|values| values.get("version")),
1429 Some(&Value::I64(4))
1430 );
1431 assert_eq!(events[3].changes[0].field, "version");
1432 }
1433
1434 #[test]
1435 fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1436 let events = Arc::new(Mutex::new(Vec::new()));
1437 let mut ctx = UserContext::new()
1438 .with_metadata(
1439 InMemoryMetadataStore::new()
1440 .with_entity(entity())
1441 .with_entity(line_entity())
1442 .with_entity(product_entity()),
1443 )
1444 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1445 .with_event_sink(RecordingEventSink {
1446 events: events.clone(),
1447 });
1448 ctx.insert_resource(PostgresDialect);
1449 ctx.insert_resource(StubExecutor {
1450 affected: 1,
1451 rows: vec![Record::from([
1452 ("id".to_owned(), Value::U64(1)),
1453 ("version".to_owned(), Value::I64(1)),
1454 ("name".to_owned(), Value::Text("old".to_owned())),
1455 ])],
1456 });
1457
1458 let repo = ctx
1459 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1460 .unwrap();
1461 repo.save_graph(
1462 GraphNode::new("Order")
1463 .value("id", 1_u64)
1464 .value("version", 1_i64)
1465 .value("name", "updated")
1466 .relation(
1467 "lines",
1468 GraphNode::new("OrderLine")
1469 .value("name", "line")
1470 .value("product_id", 3_u64),
1471 ),
1472 )
1473 .unwrap();
1474
1475 let events = events.lock().unwrap();
1476 assert_eq!(events.len(), 3);
1477 assert_eq!(events[0].kind, EntityEventKind::Updated);
1478 assert_eq!(events[0].entity, "Order");
1479 assert_eq!(events[1].kind, EntityEventKind::Updated);
1480 assert_eq!(events[1].entity, "OrderLine");
1481 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1482 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1483 assert_eq!(events[2].entity, "OrderLine");
1484 }
1485
1486 #[test]
1487 fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1488 let mut ctx = UserContext::new()
1489 .with_metadata(
1490 InMemoryMetadataStore::new()
1491 .with_entity(entity())
1492 .with_entity(line_entity())
1493 .with_entity(product_entity()),
1494 )
1495 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1496 .with_internal_id_generator(SequentialIdGenerator::new(500));
1497 ctx.insert_resource(PostgresDialect);
1498 ctx.insert_resource(StubExecutor {
1499 affected: 1,
1500 rows: vec![Record::from([
1501 ("id".to_owned(), Value::U64(1)),
1502 ("version".to_owned(), Value::I64(1)),
1503 ("name".to_owned(), Value::Text("old".to_owned())),
1504 ])],
1505 });
1506
1507 let plan = ctx
1508 .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1509 GraphNode::new("Order")
1510 .value("id", 1_u64)
1511 .value("version", 1_i64)
1512 .value("name", "updated")
1513 .relation(
1514 "lines",
1515 GraphNode::new("OrderLine")
1516 .value("name", "new-line-a")
1517 .value("product_id", 2_u64),
1518 )
1519 .relation(
1520 "lines",
1521 GraphNode::new("OrderLine")
1522 .value("name", "new-line-b")
1523 .value("product_id", 2_u64),
1524 )
1525 .relation(
1526 "lines",
1527 GraphNode::new("OrderLine")
1528 .value("id", 5_u64)
1529 .value("version", 1_i64)
1530 .value("name", "same-update-a"),
1531 )
1532 .relation(
1533 "lines",
1534 GraphNode::new("OrderLine")
1535 .value("id", 6_u64)
1536 .value("version", 1_i64)
1537 .value("name", "same-update-b"),
1538 )
1539 .relation(
1540 "lines",
1541 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1542 )
1543 .relation(
1544 "lines",
1545 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1546 ),
1547 )
1548 .unwrap();
1549 let counts = plan.grouped_counts();
1550
1551 assert_eq!(
1552 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1553 Some(&1)
1554 );
1555 assert_eq!(
1556 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1557 Some(&2)
1558 );
1559 assert_eq!(
1560 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1561 Some(&2)
1562 );
1563 assert_eq!(
1564 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1565 Some(&1)
1566 );
1567 assert_eq!(
1568 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1569 Some(&1)
1570 );
1571 let create_batch = plan
1572 .batches
1573 .iter()
1574 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1575 .unwrap();
1576 assert_eq!(create_batch.items.len(), 2);
1577 assert_eq!(
1578 create_batch.items[0].values.get("id"),
1579 Some(&Value::U64(500))
1580 );
1581 assert_eq!(
1582 create_batch.items[1].values.get("id"),
1583 Some(&Value::U64(501))
1584 );
1585 let update_batch = plan
1586 .batches
1587 .iter()
1588 .find(|batch| {
1589 batch.entity == "OrderLine"
1590 && batch.kind == GraphMutationKind::Update
1591 && batch.update_fields == vec!["name".to_owned()]
1592 })
1593 .unwrap();
1594 assert_eq!(update_batch.items.len(), 2);
1595 }
1596
1597 #[test]
1598 fn resolved_repository_builds_relation_plans() {
1599 let mut ctx = UserContext::new()
1600 .with_metadata(
1601 InMemoryMetadataStore::new()
1602 .with_entity(entity())
1603 .with_entity(line_entity())
1604 .with_entity(product_entity()),
1605 )
1606 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1607 .with_repository_behavior_registry(
1608 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1609 );
1610 ctx.insert_resource(PostgresDialect);
1611 ctx.insert_resource(StubExecutor {
1612 affected: 1,
1613 rows: Vec::new(),
1614 });
1615
1616 let repo = ctx
1617 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1618 .unwrap();
1619 let plans = repo.relation_plans().unwrap();
1620
1621 assert_eq!(plans.len(), 1);
1622 assert_eq!(plans[0].relation_name, "lines");
1623 assert_eq!(plans[0].target_entity, "OrderLine");
1624 assert_eq!(plans[0].local_key, "id");
1625 assert_eq!(plans[0].foreign_key, "order_id");
1626 assert!(plans[0].many);
1627 }
1628
1629 #[test]
1630 fn resolved_repository_builds_relation_query_from_parent_rows() {
1631 let mut ctx = UserContext::new()
1632 .with_metadata(
1633 InMemoryMetadataStore::new()
1634 .with_entity(entity())
1635 .with_entity(line_entity())
1636 .with_entity(product_entity()),
1637 )
1638 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1639 .with_repository_behavior_registry(
1640 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1641 );
1642 ctx.insert_resource(PostgresDialect);
1643 ctx.insert_resource(StubExecutor {
1644 affected: 1,
1645 rows: Vec::new(),
1646 });
1647
1648 let repo = ctx
1649 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1650 .unwrap();
1651 let parent_rows = vec![
1652 Record::from([(String::from("id"), Value::U64(11))]),
1653 Record::from([(String::from("id"), Value::U64(12))]),
1654 ];
1655
1656 let query = repo.relation_query("lines", &parent_rows).unwrap();
1657 let compiled = repo.compile(&query).unwrap();
1658 assert!(compiled.sql.contains("FROM \"orderline\""));
1659 assert!(compiled.sql.contains("\"order_id\" IN ($1, $2)"));
1660 assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1661 }
1662
1663 #[test]
1664 fn resolved_repository_enhances_parent_rows_with_relations() {
1665 let mut ctx = UserContext::new()
1666 .with_metadata(
1667 InMemoryMetadataStore::new()
1668 .with_entity(entity())
1669 .with_entity(line_entity())
1670 .with_entity(product_entity()),
1671 )
1672 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1673 .with_repository_behavior_registry(
1674 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1675 );
1676 ctx.insert_resource(PostgresDialect);
1677 ctx.insert_resource(StubExecutor {
1678 affected: 1,
1679 rows: vec![
1680 Record::from([
1681 (String::from("id"), Value::U64(101)),
1682 (String::from("order_id"), Value::U64(11)),
1683 (String::from("name"), Value::Text(String::from("l1"))),
1684 ]),
1685 Record::from([
1686 (String::from("id"), Value::U64(102)),
1687 (String::from("order_id"), Value::U64(11)),
1688 (String::from("name"), Value::Text(String::from("l2"))),
1689 ]),
1690 Record::from([
1691 (String::from("id"), Value::U64(201)),
1692 (String::from("order_id"), Value::U64(12)),
1693 (String::from("name"), Value::Text(String::from("l3"))),
1694 ]),
1695 ],
1696 });
1697
1698 let repo = ctx
1699 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1700 .unwrap();
1701 let mut parents = vec![
1702 Record::from([(String::from("id"), Value::U64(11))]),
1703 Record::from([(String::from("id"), Value::U64(12))]),
1704 ];
1705
1706 repo.enhance_relations(&mut parents).unwrap();
1707
1708 match parents[0].get("lines") {
1709 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1710 other => panic!("unexpected lines payload: {other:?}"),
1711 }
1712 match parents[1].get("lines") {
1713 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1714 other => panic!("unexpected lines payload: {other:?}"),
1715 }
1716 }
1717
1718 #[test]
1719 fn resolved_repository_fetches_smart_list_of_entities() {
1720 let mut ctx = UserContext::new()
1721 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1722 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1723 ctx.insert_resource(PostgresDialect);
1724 ctx.insert_resource(StubExecutor {
1725 affected: 1,
1726 rows: vec![Record::from([
1727 (String::from("id"), Value::U64(7)),
1728 (String::from("version"), Value::I64(2)),
1729 (String::from("name"), Value::Text(String::from("typed"))),
1730 ])],
1731 });
1732
1733 let repo = ctx
1734 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1735 .unwrap();
1736 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1737
1738 assert_eq!(rows.len(), 1);
1739 assert_eq!(
1740 rows.first(),
1741 Some(&OrderEntity {
1742 id: 7,
1743 version: 2,
1744 name: String::from("typed"),
1745 })
1746 );
1747 }
1748
1749 #[test]
1750 fn resolved_repository_fetches_smart_list_of_derived_entities() {
1751 let mut ctx = UserContext::new()
1752 .with_metadata(
1753 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1754 )
1755 .with_repository_registry(
1756 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1757 );
1758 ctx.insert_resource(PostgresDialect);
1759 ctx.insert_resource(StubExecutor {
1760 affected: 1,
1761 rows: vec![Record::from([
1762 (String::from("id"), Value::U64(9)),
1763 (String::from("name"), Value::Text(String::from("derived"))),
1764 ])],
1765 });
1766
1767 let repo = ctx
1768 .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1769 .unwrap();
1770 let rows = repo
1771 .fetch_entities::<CatalogProductRow>(&repo.select())
1772 .unwrap();
1773
1774 assert_eq!(rows.len(), 1);
1775 assert_eq!(
1776 rows.first(),
1777 Some(&CatalogProductRow {
1778 id: 9,
1779 name: String::from("derived"),
1780 })
1781 );
1782 }
1783
1784 #[test]
1785 fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1786 let mut ctx = UserContext::new()
1787 .with_metadata(
1788 InMemoryMetadataStore::new()
1789 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1790 )
1791 .with_repository_registry(
1792 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1793 );
1794 ctx.insert_resource(PostgresDialect);
1795 ctx.insert_resource(StubExecutor {
1796 affected: 1,
1797 rows: vec![Record::from([
1798 (String::from("id"), Value::U64(1)),
1799 (String::from("lineCount"), Value::I64(3)),
1800 (String::from("amount"), Value::F64(18.5)),
1801 ])],
1802 });
1803
1804 let repo = ctx
1805 .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1806 .unwrap();
1807 let rows = repo
1808 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1809 .unwrap();
1810
1811 assert_eq!(rows.len(), 1);
1812 assert_eq!(rows.data[0].id, 1);
1813 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1814 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1815 assert_eq!(
1816 rows.into_vec().into_iter().next().unwrap().into_json(),
1817 serde_json::json!({
1818 "id": 1,
1819 "lineCount": 3,
1820 "amount": 18.5
1821 })
1822 );
1823 }
1824
1825 #[test]
1826 fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1827 let executor = QueueExecutor {
1828 affected: 1,
1829 rows: Mutex::new(VecDeque::from([
1830 vec![
1831 Record::from([
1832 (String::from("id"), Value::U64(1)),
1833 (String::from("version"), Value::I64(1)),
1834 (String::from("name"), Value::Text(String::from("first"))),
1835 ]),
1836 Record::from([
1837 (String::from("id"), Value::U64(2)),
1838 (String::from("version"), Value::I64(1)),
1839 (String::from("name"), Value::Text(String::from("second"))),
1840 ]),
1841 ],
1842 vec![Record::from([
1843 (String::from("order_id"), Value::U64(1)),
1844 (String::from("lineCount"), Value::I64(3)),
1845 ])],
1846 ])),
1847 queries: Mutex::new(Vec::new()),
1848 };
1849 let mut ctx = UserContext::new()
1850 .with_metadata(
1851 InMemoryMetadataStore::new()
1852 .with_entity(entity())
1853 .with_entity(line_entity()),
1854 )
1855 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1856 ctx.insert_resource(PostgresDialect);
1857 ctx.insert_resource(executor);
1858
1859 let repo = ctx
1860 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1861 .unwrap();
1862 let rows = repo
1863 .fetch_all_with_relation_aggregates(
1864 &repo
1865 .select()
1866 .project("id")
1867 .project("version")
1868 .project("name"),
1869 &[RelationAggregate::new(
1870 "lines",
1871 "lineCount",
1872 SelectQuery::new("OrderLine"),
1873 true,
1874 )],
1875 )
1876 .unwrap();
1877
1878 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1879 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1880
1881 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1882 let queries = executor.queries.lock().unwrap();
1883 assert_eq!(queries.len(), 2);
1884 assert_eq!(
1885 queries[1],
1886 "SELECT \"order_id\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_id\" IN ($1, $2)) GROUP BY \"order_id\""
1887 );
1888 }
1889
1890 #[test]
1891 fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1892 let mut line = line_entity();
1893 line.properties
1894 .iter_mut()
1895 .find(|property| property.name == "order_id")
1896 .unwrap()
1897 .column_name = "order_ref".to_owned();
1898 let executor = QueueExecutor {
1899 affected: 1,
1900 rows: Mutex::new(VecDeque::from([
1901 vec![Record::from([
1902 (String::from("id"), Value::U64(1)),
1903 (String::from("version"), Value::I64(1)),
1904 (String::from("name"), Value::Text(String::from("first"))),
1905 ])],
1906 vec![Record::from([
1907 (String::from("order_ref"), Value::I64(1)),
1908 (String::from("lineCount"), Value::I64(3)),
1909 ])],
1910 ])),
1911 queries: Mutex::new(Vec::new()),
1912 };
1913 let mut ctx = UserContext::new()
1914 .with_metadata(
1915 InMemoryMetadataStore::new()
1916 .with_entity(entity())
1917 .with_entity(line),
1918 )
1919 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1920 ctx.insert_resource(PostgresDialect);
1921 ctx.insert_resource(executor);
1922
1923 let repo = ctx
1924 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1925 .unwrap();
1926 let rows = repo
1927 .fetch_all_with_relation_aggregates(
1928 &repo
1929 .select()
1930 .project("id")
1931 .project("version")
1932 .project("name"),
1933 &[RelationAggregate::new(
1934 "lines",
1935 "lineCount",
1936 SelectQuery::new("OrderLine"),
1937 true,
1938 )],
1939 )
1940 .unwrap();
1941
1942 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1943 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1944 assert_eq!(
1945 executor.queries.lock().unwrap()[1],
1946 "SELECT \"order_ref\", COUNT(*) AS \"lineCount\" FROM \"orderline\" WHERE (\"order_ref\" IN ($1)) GROUP BY \"order_ref\""
1947 );
1948 }
1949
1950 #[test]
1951 fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1952 let executor = QueueExecutor {
1953 affected: 1,
1954 rows: Mutex::new(VecDeque::from([vec![Record::from([(
1955 String::from("count"),
1956 Value::I64(2),
1957 )])]])),
1958 queries: Mutex::new(Vec::new()),
1959 };
1960 let mut ctx = UserContext::new()
1961 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1962 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1963 ctx.insert_resource(PostgresDialect);
1964 ctx.insert_resource(executor);
1965 ctx.insert_resource(InMemoryAggregationCache::default());
1966
1967 let repo = ctx
1968 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1969 .unwrap();
1970 let query = repo
1971 .select()
1972 .count("count")
1973 .enable_aggregation_cache_for(60_000);
1974
1975 let first = repo.fetch_all(&query).unwrap();
1976 let second = repo.fetch_all(&query).unwrap();
1977
1978 assert_eq!(first, second);
1979 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1980 assert_eq!(executor.queries.lock().unwrap().len(), 1);
1981 }
1982
1983 #[test]
1984 fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1985 let executor = QueueExecutor {
1986 affected: 1,
1987 rows: Mutex::new(VecDeque::from([
1988 vec![Record::from([(String::from("count"), Value::I64(2))])],
1989 vec![Record::from([(String::from("count"), Value::I64(3))])],
1990 ])),
1991 queries: Mutex::new(Vec::new()),
1992 };
1993 let mut ctx = UserContext::new()
1994 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1995 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1996 ctx.insert_resource(PostgresDialect);
1997 ctx.insert_resource(executor);
1998 ctx.insert_resource(
1999 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
2000 as Arc<dyn AggregationCacheBackend>,
2001 );
2002
2003 let repo = ctx
2004 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
2005 .unwrap();
2006 let query = repo
2007 .select()
2008 .count("count")
2009 .enable_aggregation_cache_for(60_000);
2010
2011 let first = repo.fetch_all(&query).unwrap();
2012 let cached = repo.fetch_all(&query).unwrap();
2013 repo.insert(
2014 &InsertCommand::new("Order")
2015 .value("id", 9_u64)
2016 .value("version", 1_i64)
2017 .value("name", "new"),
2018 )
2019 .unwrap();
2020 let refreshed = repo.fetch_all(&query).unwrap();
2021
2022 assert_eq!(first, cached);
2023 assert_ne!(cached, refreshed);
2024 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2025 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2026 }
2027
2028 #[test]
2029 fn aggregation_cache_propagates_to_relation_aggregates() {
2030 let parent_rows = vec![
2031 Record::from([
2032 (String::from("id"), Value::U64(1)),
2033 (String::from("version"), Value::I64(1)),
2034 (String::from("name"), Value::Text(String::from("first"))),
2035 ]),
2036 Record::from([
2037 (String::from("id"), Value::U64(2)),
2038 (String::from("version"), Value::I64(1)),
2039 (String::from("name"), Value::Text(String::from("second"))),
2040 ]),
2041 ];
2042 let aggregate_rows = vec![Record::from([
2043 (String::from("order_id"), Value::U64(1)),
2044 (String::from("lineCount"), Value::I64(3)),
2045 ])];
2046 let executor = QueueExecutor {
2047 affected: 1,
2048 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
2049 queries: Mutex::new(Vec::new()),
2050 };
2051 let mut ctx = UserContext::new()
2052 .with_metadata(
2053 InMemoryMetadataStore::new()
2054 .with_entity(entity())
2055 .with_entity(line_entity()),
2056 )
2057 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2058 ctx.insert_resource(PostgresDialect);
2059 ctx.insert_resource(executor);
2060 ctx.insert_resource(InMemoryAggregationCache::default());
2061
2062 let repo = ctx
2063 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
2064 .unwrap();
2065 let query = repo
2066 .select()
2067 .project("id")
2068 .project("version")
2069 .project("name")
2070 .enable_aggregation_cache_for(60_000)
2071 .propagate_aggregation_cache(60_000);
2072 let aggregate =
2073 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2074
2075 let first = repo
2076 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
2077 .unwrap();
2078 let second = repo
2079 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2080 .unwrap();
2081
2082 assert_eq!(first, second);
2083 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2084 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2085 }
2086
2087 #[test]
2088 fn memory_repository_fetches_smart_list_entities_with_query_features() {
2089 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2090 let repository = MemoryRepository::new(metadata).with_rows(
2091 "Order",
2092 vec![
2093 Record::from([
2094 (String::from("id"), Value::U64(1)),
2095 (String::from("version"), Value::I64(1)),
2096 (String::from("name"), Value::Text(String::from("alpha"))),
2097 ]),
2098 Record::from([
2099 (String::from("id"), Value::U64(2)),
2100 (String::from("version"), Value::I64(1)),
2101 (String::from("name"), Value::Text(String::from("beta"))),
2102 ]),
2103 Record::from([
2104 (String::from("id"), Value::U64(3)),
2105 (String::from("version"), Value::I64(1)),
2106 (String::from("name"), Value::Text(String::from("gamma"))),
2107 ]),
2108 ],
2109 );
2110
2111 let query = teaql_core::SelectQuery::new("Order")
2112 .filter(Expr::Binary {
2113 left: Box::new(Expr::column("id")),
2114 op: teaql_core::BinaryOp::Gte,
2115 right: Box::new(Expr::value(2_u64)),
2116 })
2117 .order_by(OrderBy::desc("id"))
2118 .limit(1);
2119
2120 let orders = repository.fetch_entities::<Order>(&query).unwrap();
2121
2122 assert_eq!(orders.ids(), vec![Value::U64(3)]);
2123 assert_eq!(orders.versions(), vec![1]);
2124 assert_eq!(orders.first().unwrap().name, "gamma");
2125 }
2126
2127 #[test]
2128 fn memory_repository_runs_aggregates() {
2129 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2130 let repository = MemoryRepository::new(metadata).with_rows(
2131 "Order",
2132 vec![
2133 Record::from([
2134 (String::from("id"), Value::U64(1)),
2135 (String::from("version"), Value::I64(1)),
2136 (String::from("name"), Value::Text(String::from("alpha"))),
2137 ]),
2138 Record::from([
2139 (String::from("id"), Value::U64(2)),
2140 (String::from("version"), Value::I64(2)),
2141 (String::from("name"), Value::Text(String::from("beta"))),
2142 ]),
2143 ],
2144 );
2145
2146 let query = teaql_core::SelectQuery {
2147 entity: String::from("Order"),
2148 projection: Vec::new(),
2149 expr_projection: Vec::new(),
2150 filter: None,
2151 having: None,
2152 order_by: Vec::new(),
2153 slice: None,
2154 aggregates: vec![
2155 Aggregate {
2156 function: AggregateFunction::Count,
2157 field: String::from("id"),
2158 alias: String::from("count"),
2159 },
2160 Aggregate {
2161 function: AggregateFunction::Sum,
2162 field: String::from("version"),
2163 alias: String::from("versionSum"),
2164 },
2165 ],
2166 group_by: Vec::new(),
2167 relations: Vec::new(),
2168 aggregation_cache: None,
2169 comment: None,
2170 raw_sql: None,
2171 raw_sql_search_criteria: Vec::new(),
2172 dynamic_properties: Vec::new(),
2173 raw_projections: Vec::new(),
2174 object_group_bys: Vec::new(),
2175 child_enhancements: Vec::new(),
2176 };
2177
2178 let rows = repository.fetch_all(&query).unwrap();
2179
2180 assert_eq!(rows.len(), 1);
2181 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
2182 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2183 }
2184
2185 #[test]
2186 fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
2187 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2188 let repository = MemoryRepository::new(metadata).with_rows(
2189 "Order",
2190 vec![
2191 Record::from([
2192 (String::from("id"), Value::U64(1)),
2193 (String::from("version"), Value::I64(1)),
2194 (String::from("name"), Value::Text(String::from("alpha"))),
2195 ]),
2196 Record::from([
2197 (String::from("id"), Value::U64(2)),
2198 (String::from("version"), Value::I64(2)),
2199 (String::from("name"), Value::Text(String::from("alpha"))),
2200 ]),
2201 Record::from([
2202 (String::from("id"), Value::U64(3)),
2203 (String::from("version"), Value::I64(3)),
2204 (String::from("name"), Value::Text(String::from("tmp-beta"))),
2205 ]),
2206 ],
2207 );
2208
2209 let rows = repository
2210 .fetch_all(
2211 &teaql_core::SelectQuery::new("Order")
2212 .filter(
2213 Expr::between("version", 1_i64, 3_i64)
2214 .and_expr(Expr::not_like("name", "tmp%"))
2215 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
2216 )
2217 .group_by("name")
2218 .count("total")
2219 .sum("version", "versionSum"),
2220 )
2221 .unwrap();
2222
2223 assert_eq!(rows.len(), 1);
2224 assert_eq!(
2225 rows[0].get("name"),
2226 Some(&Value::Text(String::from("alpha")))
2227 );
2228 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2229 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2230 }
2231
2232 #[test]
2233 fn memory_repository_runs_extended_aggregates_and_having() {
2234 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2235 let repository = MemoryRepository::new(metadata).with_rows(
2236 "Order",
2237 vec![
2238 Record::from([
2239 (String::from("id"), Value::U64(1)),
2240 (String::from("version"), Value::I64(1)),
2241 (String::from("name"), Value::Text(String::from("alpha"))),
2242 ]),
2243 Record::from([
2244 (String::from("id"), Value::U64(2)),
2245 (String::from("version"), Value::I64(3)),
2246 (String::from("name"), Value::Text(String::from("alpha"))),
2247 ]),
2248 Record::from([
2249 (String::from("id"), Value::U64(3)),
2250 (String::from("version"), Value::I64(7)),
2251 (String::from("name"), Value::Text(String::from("beta"))),
2252 ]),
2253 ],
2254 );
2255
2256 let rows = repository
2257 .fetch_all(
2258 &teaql_core::SelectQuery::new("Order")
2259 .group_by("name")
2260 .count("total")
2261 .stddev("version", "stddevVersion")
2262 .var_pop("version", "varPopVersion")
2263 .bit_or("version", "bitOrVersion")
2264 .having(Expr::gt("total", 1_i64)),
2265 )
2266 .unwrap();
2267
2268 assert_eq!(rows.len(), 1);
2269 assert_eq!(
2270 rows[0].get("name"),
2271 Some(&Value::Text(String::from("alpha")))
2272 );
2273 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2274 assert_eq!(
2275 rows[0].get("stddevVersion").map(Value::to_json_value),
2276 Some(serde_json::Value::String(
2277 "1.4142135623730951454746218583".to_owned()
2278 ))
2279 );
2280 assert_eq!(
2281 rows[0].get("varPopVersion"),
2282 Some(&Value::Decimal(Decimal::ONE))
2283 );
2284 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
2285 }
2286
2287 #[test]
2288 fn memory_repository_runs_sound_like_filter() {
2289 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2290 let repository = MemoryRepository::new(metadata).with_rows(
2291 "Order",
2292 vec![
2293 Record::from([
2294 (String::from("id"), Value::U64(1)),
2295 (String::from("version"), Value::I64(1)),
2296 (String::from("name"), Value::Text(String::from("Robert"))),
2297 ]),
2298 Record::from([
2299 (String::from("id"), Value::U64(2)),
2300 (String::from("version"), Value::I64(1)),
2301 (String::from("name"), Value::Text(String::from("Rupert"))),
2302 ]),
2303 Record::from([
2304 (String::from("id"), Value::U64(3)),
2305 (String::from("version"), Value::I64(1)),
2306 (String::from("name"), Value::Text(String::from("Ashcraft"))),
2307 ]),
2308 ],
2309 );
2310
2311 let rows = repository
2312 .fetch_all(
2313 &teaql_core::SelectQuery::new("Order")
2314 .filter(Expr::sound_like("name", "Robert"))
2315 .order_asc("id"),
2316 )
2317 .unwrap();
2318
2319 assert_eq!(rows.len(), 2);
2320 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2321 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2322 }
2323
2324 #[test]
2325 fn memory_repository_runs_java_style_string_match_filters() {
2326 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2327 let repository = MemoryRepository::new(metadata).with_rows(
2328 "Order",
2329 vec![
2330 Record::from([
2331 (String::from("id"), Value::U64(1)),
2332 (String::from("version"), Value::I64(1)),
2333 (String::from("name"), Value::Text(String::from("tea-order"))),
2334 ]),
2335 Record::from([
2336 (String::from("id"), Value::U64(2)),
2337 (String::from("version"), Value::I64(1)),
2338 (
2339 String::from("name"),
2340 Value::Text(String::from("coffee-order")),
2341 ),
2342 ]),
2343 Record::from([
2344 (String::from("id"), Value::U64(3)),
2345 (String::from("version"), Value::I64(1)),
2346 (
2347 String::from("name"),
2348 Value::Text(String::from("tea-archived")),
2349 ),
2350 ]),
2351 ],
2352 );
2353
2354 let rows = repository
2355 .fetch_all(
2356 &teaql_core::SelectQuery::new("Order")
2357 .filter(
2358 Expr::contain("name", "tea")
2359 .and_expr(Expr::begin_with("name", "tea"))
2360 .and_expr(Expr::end_with("name", "order"))
2361 .and_expr(Expr::not_contain("name", "coffee"))
2362 .and_expr(Expr::not_begin_with("name", "archived"))
2363 .and_expr(Expr::not_end_with("name", "draft")),
2364 )
2365 .order_asc("id"),
2366 )
2367 .unwrap();
2368
2369 assert_eq!(rows.len(), 1);
2370 assert_eq!(
2371 rows[0].get("name"),
2372 Some(&Value::Text("tea-order".to_owned()))
2373 );
2374 }
2375
2376 #[test]
2377 fn memory_repository_runs_property_to_property_filters() {
2378 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2379 let repository = MemoryRepository::new(metadata).with_rows(
2380 "Order",
2381 vec![
2382 Record::from([
2383 (String::from("id"), Value::U64(1)),
2384 (String::from("version"), Value::I64(2)),
2385 (String::from("name"), Value::Text(String::from("keep"))),
2386 ]),
2387 Record::from([
2388 (String::from("id"), Value::U64(2)),
2389 (String::from("version"), Value::I64(1)),
2390 (String::from("name"), Value::Text(String::from("skip"))),
2391 ]),
2392 ],
2393 );
2394
2395 let rows = repository
2396 .fetch_all(
2397 &teaql_core::SelectQuery::new("Order")
2398 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2399 .order_asc("id"),
2400 )
2401 .unwrap();
2402
2403 assert_eq!(rows.len(), 1);
2404 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2405 }
2406
2407 #[test]
2408 fn memory_repository_supports_mutations_and_optimistic_locking() {
2409 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2410 let repository = MemoryRepository::new(metadata);
2411
2412 repository
2413 .insert(
2414 &InsertCommand::new("Order")
2415 .value("id", 10_u64)
2416 .value("version", 1_i64)
2417 .value("name", "draft"),
2418 )
2419 .unwrap();
2420 repository
2421 .update(
2422 &UpdateCommand::new("Order", 10_u64)
2423 .expected_version(1)
2424 .value("name", "submitted"),
2425 )
2426 .unwrap();
2427
2428 let row = repository
2429 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2430 .unwrap()
2431 .pop()
2432 .unwrap();
2433 assert_eq!(
2434 row.get("name"),
2435 Some(&Value::Text(String::from("submitted")))
2436 );
2437 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2438
2439 let conflict = repository
2440 .update(
2441 &UpdateCommand::new("Order", 10_u64)
2442 .expected_version(1)
2443 .value("name", "stale"),
2444 )
2445 .unwrap_err();
2446 assert!(matches!(
2447 conflict,
2448 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2449 ));
2450
2451 repository
2452 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2453 .unwrap();
2454 let row = repository
2455 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2456 .unwrap()
2457 .pop()
2458 .unwrap();
2459 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2460
2461 repository
2462 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2463 .unwrap();
2464 let row = repository
2465 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2466 .unwrap()
2467 .pop()
2468 .unwrap();
2469 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2470 }
2471
2472 #[tokio::test]
2473 async fn user_context_reports_missing_schema_provider() {
2474 let err = UserContext::new().ensure_schema().await.unwrap_err();
2475 assert!(
2476 matches!(err, RuntimeError::Schema(message) if message == "missing schema provider")
2477 );
2478 }
2479}
2480
2481pub use checker::{
2482 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
2483 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, TypedChecker,
2484 TypedEntityChecker, clear_record_status, mark_record_status,
2485};