1mod checker;
2mod context;
3mod entity_runtime;
4mod entity_status;
5mod error;
6mod event;
7mod graph;
8mod id;
9mod language;
10mod memory;
11mod registry;
12mod repository;
13
14pub use context::{
15 InfoLogEntry, LogPayload, SchemaProvider, SqlLogEntry, SqlLogOperation,
16 SqlLogOptions, UnifiedLogBuffer, UnifiedLogEntry, UserContext,
17};
18pub use entity_status::{EntityAction, EntityStatus};
19pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
20pub use error::{ContextError, RepositoryError, RuntimeError};
21pub use event::{
22 EntityEvent, EntityEventKind, EntityEventSink, EntityPropertyChange, InMemoryEntityEventSink,
23};
24pub use graph::{
25 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem,
26 GraphNode, GraphOperation, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
27};
28pub(crate) use id::local_id_generator;
29pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
30pub use language::{
31 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
32};
33pub use memory::{MemoryRepository, MemoryRepositoryError};
34pub use registry::{
35 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
36 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
37 RequestPolicy, RuntimeModule,
38};
39pub use repository::{
40 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
41 QueryExecutor, RelationLoadPlan, Repository, ResolvedRepository,
42};
43
44#[cfg(test)]
45mod tests {
46 use std::collections::{BTreeMap, VecDeque};
47 use std::sync::{Arc, Mutex};
48
49 use super::{
50 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult,
51 CheckResults, CheckRule, Checker, EntityEvent, EntityEventKind, EntityEventSink,
52 GraphMutationKind, GraphNode, GraphTransactionBoundary, InMemoryAggregationCache,
53 InMemoryCheckerRegistry, InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry,
54 InMemoryRepositoryRegistry, InternalIdGenerator, Language, MemoryRepository, MetadataStore,
55 ObjectLocation, QueryExecutor, Repository, RepositoryBehavior, RepositoryError,
56 RequestPolicy, RuntimeError, RuntimeModule, SqlLogOperation, SqlLogOptions, TypedChecker,
57 TypedEntityChecker, UserContext, translate_check_result,
58 };
59 use teaql_core::{
60 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
61 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
62 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
63 };
64 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
65 use teaql_sql::{CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect, quote_identifier_if_needed};
66
67 const ORDER_DEFAULT_PROJECTION: &str = "id, version, name";
68
69 #[derive(Debug, Default, Clone, Copy)]
70 struct PostgresDialect;
71
72 impl SqlDialect for PostgresDialect {
73 fn kind(&self) -> DatabaseKind {
74 DatabaseKind::PostgreSql
75 }
76
77 fn quote_ident(&self, ident: &str) -> String {
78 quote_identifier_if_needed(ident, '"')
79 }
80
81 fn placeholder(&self, index: usize) -> String {
82 format!("${index}")
83 }
84
85 fn schema_type_sql(
86 &self,
87 data_type: DataType,
88 _property: &PropertyDescriptor,
89 ) -> Result<&'static str, SqlCompileError> {
90 match data_type {
91 DataType::Bool => Ok("BOOLEAN"),
92 DataType::I64 | DataType::U64 => Ok("BIGINT"),
93 DataType::F64 => Ok("DOUBLE PRECISION"),
94 DataType::Decimal => Ok("NUMERIC"),
95 DataType::Text => Ok("TEXT"),
96 DataType::Json => Ok("JSONB"),
97 DataType::Date => Ok("DATE"),
98 DataType::Timestamp => Ok("TIMESTAMPTZ"),
99 }
100 }
101 }
102
103 fn entity() -> EntityDescriptor {
104 EntityDescriptor::new("Order")
105 .table_name("orders")
106 .property(
107 PropertyDescriptor::new("id", DataType::U64)
108 .column_name("id")
109 .id()
110 .not_null(),
111 )
112 .property(
113 PropertyDescriptor::new("version", DataType::I64)
114 .column_name("version")
115 .version()
116 .not_null(),
117 )
118 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
119 .relation(
120 teaql_core::RelationDescriptor::new("lines", "OrderLine")
121 .local_key("id")
122 .foreign_key("order_id")
123 .many(),
124 )
125 }
126
127 fn line_entity() -> EntityDescriptor {
128 EntityDescriptor::new("OrderLine")
129 .table_name("orderline")
130 .property(
131 PropertyDescriptor::new("id", DataType::U64)
132 .column_name("id")
133 .id()
134 .not_null(),
135 )
136 .property(
137 PropertyDescriptor::new("version", DataType::I64)
138 .column_name("version")
139 .version(),
140 )
141 .property(
142 PropertyDescriptor::new("order_id", DataType::U64)
143 .column_name("order_id")
144 .not_null(),
145 )
146 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
147 .property(
148 PropertyDescriptor::new("product_id", DataType::U64)
149 .column_name("product_id")
150 .not_null(),
151 )
152 .relation(
153 teaql_core::RelationDescriptor::new("product", "Product")
154 .local_key("product_id")
155 .foreign_key("id"),
156 )
157 }
158
159 fn product_entity() -> EntityDescriptor {
160 EntityDescriptor::new("Product")
161 .table_name("product")
162 .property(
163 PropertyDescriptor::new("id", DataType::U64)
164 .column_name("id")
165 .id()
166 .not_null(),
167 )
168 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
169 }
170
171 #[derive(Debug, Default)]
172 struct StubExecutor {
173 affected: u64,
174 rows: Vec<Record>,
175 }
176
177 #[derive(Debug, Default)]
178 struct QueueExecutor {
179 affected: u64,
180 rows: Mutex<VecDeque<Vec<Record>>>,
181 queries: Mutex<Vec<String>>,
182 }
183
184 struct OrderBehavior;
185
186 #[allow(dead_code)]
187 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
188 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
189 struct CatalogProductRow {
190 #[teaql(id)]
191 id: u64,
192 name: String,
193 }
194
195 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
196 #[teaql(entity = "OrderAggregate", table = "orders")]
197 struct OrderAggregateDynamic {
198 #[teaql(id)]
199 id: u64,
200 #[teaql(dynamic)]
201 dynamic: BTreeMap<String, Value>,
202 }
203
204 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
205 #[teaql(entity = "Product", table = "product")]
206 struct ProductEntityRow {
207 #[teaql(id)]
208 id: u64,
209 name: String,
210 }
211
212 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
213 #[teaql(entity = "OrderLine", table = "orderline")]
214 struct OrderLineEntityRow {
215 #[teaql(id)]
216 id: u64,
217 #[teaql(column = "order_id")]
218 order_id: u64,
219 name: String,
220 #[teaql(column = "product_id")]
221 product_id: u64,
222 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
223 product: Option<ProductEntityRow>,
224 }
225
226 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
227 #[teaql(entity = "Order", table = "orders")]
228 struct OrderAggregateRow {
229 #[teaql(id)]
230 id: u64,
231 #[teaql(version)]
232 version: i64,
233 name: String,
234 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
235 lines: teaql_core::SmartList<OrderLineEntityRow>,
236 }
237
238 #[derive(Debug, Clone, PartialEq, DeriveTeaqlEntity)]
239 #[teaql(entity = "Order", table = "orders")]
240 struct Order {
241 #[teaql(id)]
242 id: u64,
243 #[teaql(version)]
244 version: i64,
245 name: String,
246 }
247
248 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
249 #[teaql(entity = "Product", table = "product")]
250 struct TypedGraphProduct {
251 #[teaql(id)]
252 id: u64,
253 name: String,
254 }
255
256 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
257 #[teaql(entity = "OrderLine", table = "orderline")]
258 struct TypedGraphLine {
259 #[teaql(id)]
260 id: u64,
261 #[teaql(column = "order_id")]
262 order_id: Option<u64>,
263 name: String,
264 #[teaql(column = "product_id")]
265 product_id: Option<u64>,
266 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
267 product: Option<TypedGraphProduct>,
268 }
269
270 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
271 #[teaql(entity = "Order", table = "orders")]
272 struct TypedGraphOrder {
273 #[teaql(id)]
274 id: u64,
275 #[teaql(version)]
276 version: i64,
277 name: String,
278 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
279 lines: teaql_core::SmartList<TypedGraphLine>,
280 }
281
282 #[derive(Debug, PartialEq, Eq)]
283 struct OrderEntity {
284 id: u64,
285 version: i64,
286 name: String,
287 }
288
289 impl teaql_core::TeaqlEntity for OrderEntity {
290 fn entity_descriptor() -> EntityDescriptor {
291 entity()
292 }
293 }
294
295 impl Entity for OrderEntity {
296 fn from_record(record: Record) -> Result<Self, EntityError> {
297 let id = match record.get("id") {
298 Some(Value::U64(v)) => *v,
299 Some(Value::I64(v)) if *v >= 0 => *v as u64,
300 other => {
301 return Err(EntityError::new(
302 "Order",
303 format!("invalid id field: {other:?}"),
304 ));
305 }
306 };
307 let version = match record.get("version") {
308 Some(Value::I64(v)) => *v,
309 other => {
310 return Err(EntityError::new(
311 "Order",
312 format!("invalid version field: {other:?}"),
313 ));
314 }
315 };
316 let name = match record.get("name") {
317 Some(Value::Text(v)) => v.clone(),
318 other => {
319 return Err(EntityError::new(
320 "Order",
321 format!("invalid name field: {other:?}"),
322 ));
323 }
324 };
325 Ok(Self { id, version, name })
326 }
327
328 fn into_record(self) -> Record {
329 Record::from([
330 (String::from("id"), Value::U64(self.id)),
331 (String::from("version"), Value::I64(self.version)),
332 (String::from("name"), Value::Text(self.name)),
333 ])
334 }
335 }
336
337 #[derive(Debug)]
338 struct StubError;
339
340 impl std::fmt::Display for StubError {
341 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 write!(f, "stub error")
343 }
344 }
345
346 impl std::error::Error for StubError {}
347
348 impl QueryExecutor for StubExecutor {
349 type Error = StubError;
350
351 fn fetch_all(&self, _query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
352 Ok(self.rows.clone())
353 }
354
355 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
356 Ok(self.affected)
357 }
358
359 fn begin_transaction(&self) -> Result<GraphTransactionBoundary, Self::Error> {
360 Ok(GraphTransactionBoundary::Started)
361 }
362 }
363
364 impl QueryExecutor for QueueExecutor {
365 type Error = StubError;
366
367 fn fetch_all(&self, query: &CompiledQuery) -> Result<Vec<Record>, Self::Error> {
368 self.queries.lock().unwrap().push(query.sql.clone());
369 Ok(self.rows.lock().unwrap().pop_front().unwrap_or_default())
370 }
371
372 fn execute(&self, _query: &CompiledQuery) -> Result<u64, Self::Error> {
373 Ok(self.affected)
374 }
375 }
376
377 #[test]
378 fn user_context_records_configured_sql_logs() {
379 let mut ctx = UserContext::new()
380 .with_module(crate::module!(Order))
381 .with_sql_log_options(SqlLogOptions::select_only());
382 ctx.insert_resource(PostgresDialect);
383 ctx.insert_resource(StubExecutor {
384 affected: 1,
385 rows: Vec::new(),
386 });
387
388 {
389 let repo = ctx
390 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
391 .unwrap();
392 repo.fetch_all(&SelectQuery::new("Order").filter(Expr::eq("name", "Bob's Shop")))
393 .unwrap();
394 repo.insert(&InsertCommand::new("Order").value("name", "created"))
395 .unwrap();
396 }
397
398 let logs = ctx.sql_logs();
399 assert_eq!(logs.len(), 1);
400 assert_eq!(logs[0].operation, SqlLogOperation::Select);
401 assert_eq!(logs[0].result_count, Some(0));
402 assert_eq!(logs[0].result_type.as_deref(), Some("Order"));
403 assert_eq!(logs[0].result_summary, "MISS");
404 assert!(logs[0].ended_at >= logs[0].started_at);
405 assert!(logs[0].pretty_sql.contains("\nFROM orders"));
406 assert!(
407 logs[0]
408 .pretty_sql
409 .contains("\nWHERE (name = 'Bob''s Shop')")
410 );
411 assert_eq!(
412 logs[0].debug_sql,
413 format!(
414 "SELECT {ORDER_DEFAULT_PROJECTION} FROM orders WHERE (name = 'Bob''s Shop')"
415 )
416 );
417
418 ctx.set_sql_log_options(SqlLogOptions::mutation_only());
419 ctx.clear_sql_logs();
420 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
421 .unwrap()
422 .update(
423 &UpdateCommand::new("Order", 1_u64)
424 .value("name", "updated")
425 .expected_version(1),
426 )
427 .unwrap();
428
429 let logs = ctx.sql_logs();
430 assert_eq!(logs.len(), 1);
431 assert_eq!(logs[0].operation, SqlLogOperation::Update);
432 assert_eq!(logs[0].affected_rows, Some(1));
433 assert_eq!(logs[0].result_summary, "1 UPDATED");
434 assert!(logs[0].debug_sql.contains("UPDATE orders SET"));
435 assert!(logs[0].debug_sql.contains("'updated'"));
436 }
437
438 #[test]
439 fn user_context_records_all_sql_logs_by_default() {
440 let mut ctx = UserContext::new().with_module(crate::module!(Order));
441 ctx.insert_resource(PostgresDialect);
442 ctx.insert_resource(StubExecutor {
443 affected: 1,
444 rows: Vec::new(),
445 });
446
447 {
448 let repo = ctx
449 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
450 .unwrap();
451 repo.fetch_all(&SelectQuery::new("Order")).unwrap();
452 repo.insert(&InsertCommand::new("Order").value("name", "created"))
453 .unwrap();
454 }
455
456 let logs = ctx.sql_logs();
457 assert_eq!(logs.len(), 2);
458 assert_eq!(logs[0].operation, SqlLogOperation::Select);
459 assert_eq!(logs[1].operation, SqlLogOperation::Insert);
460
461 ctx.disable_sql_log();
462 ctx.resolve_repository::<PostgresDialect, StubExecutor>("Order")
463 .unwrap()
464 .fetch_all(&SelectQuery::new("Order"))
465 .unwrap();
466 assert!(ctx.sql_logs().is_empty());
467 }
468
469 impl RepositoryBehavior for OrderBehavior {
470 fn before_select(
471 &self,
472 _ctx: &UserContext,
473 query: &mut teaql_core::SelectQuery,
474 ) -> Result<(), RuntimeError> {
475 query.filter = Some(Expr::eq("version", 1_i64));
476 Ok(())
477 }
478
479 fn before_insert(
480 &self,
481 _ctx: &UserContext,
482 command: &mut InsertCommand,
483 ) -> Result<(), RuntimeError> {
484 command
485 .values
486 .entry("version".to_owned())
487 .or_insert(Value::I64(1));
488 Ok(())
489 }
490
491 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
492 vec!["lines".to_owned()]
493 }
494 }
495
496 struct ContextAwareOrderBehavior;
497 struct TenantRequestPolicy;
498 struct OrderChecker;
499 struct TypedOrderChecker;
500 #[derive(Clone)]
501 struct RecordingEventSink {
502 events: Arc<Mutex<Vec<EntityEvent>>>,
503 }
504
505 impl RepositoryBehavior for ContextAwareOrderBehavior {
506 fn before_insert(
507 &self,
508 ctx: &UserContext,
509 command: &mut InsertCommand,
510 ) -> Result<(), RuntimeError> {
511 let tenant = ctx
512 .get_named_resource::<String>("tenant")
513 .cloned()
514 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
515 let version = *ctx
516 .get_named_resource::<i64>("initial_version")
517 .ok_or_else(|| {
518 RuntimeError::Behavior("missing initial_version resource".to_owned())
519 })?;
520 let trace_id = match ctx.local("trace_id") {
521 Some(Value::Text(value)) => value.clone(),
522 other => {
523 return Err(RuntimeError::Behavior(format!(
524 "missing trace_id local, got {other:?}"
525 )));
526 }
527 };
528
529 command
530 .values
531 .entry("name".to_owned())
532 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
533 command
534 .values
535 .entry("version".to_owned())
536 .or_insert(Value::I64(version));
537 Ok(())
538 }
539 }
540
541 impl RequestPolicy for TenantRequestPolicy {
542 fn enforce_select(
543 &self,
544 ctx: &UserContext,
545 query: &mut SelectQuery,
546 ) -> Result<(), RuntimeError> {
547 if query.entity == "Order" {
548 let tenant_id = ctx
549 .get_named_resource::<u64>("tenant_id")
550 .copied()
551 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
552 query.filter = Some(match query.filter.take() {
553 Some(filter) => filter.and_expr(Expr::eq("id", tenant_id)),
554 None => Expr::eq("id", tenant_id),
555 });
556 }
557 Ok(())
558 }
559
560 fn enforce_insert(
561 &self,
562 ctx: &UserContext,
563 command: &mut InsertCommand,
564 ) -> Result<(), RuntimeError> {
565 if command.entity == "Order" {
566 let tenant_id = ctx
567 .get_named_resource::<u64>("tenant_id")
568 .copied()
569 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
570 command
571 .values
572 .insert("version".to_owned(), Value::I64(tenant_id as i64));
573 }
574 Ok(())
575 }
576 }
577
578 impl Checker for OrderChecker {
579 fn entity(&self) -> &str {
580 "Order"
581 }
582
583 fn check_and_fix(
584 &self,
585 _ctx: &UserContext,
586 record: &mut Record,
587 location: &ObjectLocation,
588 results: &mut CheckResults,
589 ) {
590 let status = CheckObjectStatus::from_record(record);
591 if status.is_create() {
592 self.required(record, "name", location, results);
593 record.entry("version".to_owned()).or_insert(Value::I64(1));
594 }
595 if status.is_update()
596 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
597 {
598 record.insert(
599 "name".to_owned(),
600 Value::Text("graph-update-checked".to_owned()),
601 );
602 }
603 self.min_string_length(record, "name", 3, location, results);
604 }
605 }
606
607 impl TypedChecker<Order> for TypedOrderChecker {
608 fn check_and_fix_typed(
609 &self,
610 _ctx: &UserContext,
611 entity: &mut Order,
612 status: CheckObjectStatus,
613 location: &ObjectLocation,
614 results: &mut CheckResults,
615 ) {
616 if status.is_create() {
617 if entity.name.is_empty() {
618 results.push(CheckResult::required(location.clone().member("name")));
619 }
620 }
621 if entity.name.chars().count() < 3 {
622 results.push(CheckResult::min_str(
623 location.clone().member("name"),
624 3,
625 entity.name.clone(),
626 ));
627 }
628 if entity.name == "fix" {
629 entity.name = "fixed".to_owned();
630 }
631 }
632 }
633
634 impl EntityEventSink for RecordingEventSink {
635 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
636 self.events.lock().unwrap().push(event.clone());
637 Ok(())
638 }
639 }
640
641 struct FixedIdGenerator(u64);
642
643 impl InternalIdGenerator for FixedIdGenerator {
644 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
645 Ok(self.0)
646 }
647 }
648
649 struct SequentialIdGenerator {
650 next: Mutex<u64>,
651 }
652
653 impl SequentialIdGenerator {
654 fn new(next: u64) -> Self {
655 Self {
656 next: Mutex::new(next),
657 }
658 }
659 }
660
661 impl InternalIdGenerator for SequentialIdGenerator {
662 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
663 let mut next = self
664 .next
665 .lock()
666 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
667 let id = *next;
668 *next += 1;
669 Ok(id)
670 }
671 }
672
673 #[test]
674 fn metadata_store_registers_entities() {
675 let store = InMemoryMetadataStore::new().with_entity(entity());
676 assert!(store.entity("Order").is_some());
677 }
678
679 #[test]
680 fn runtime_module_registers_descriptor_into_context() {
681 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
682 assert!(ctx.entity("Order").is_some());
683 assert!(ctx.has_repository("Order"));
684 }
685
686 #[test]
687 fn runtime_module_registers_derived_entity_and_behavior() {
688 let ctx = UserContext::new().with_module(
689 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
690 );
691 assert!(ctx.entity("CatalogProduct").is_some());
692 assert!(ctx.has_repository("CatalogProduct"));
693 assert!(ctx.repository_behavior("CatalogProduct").is_some());
694 }
695
696 #[test]
697 fn module_macro_registers_multiple_entities() {
698 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
699 assert!(ctx.entity("CatalogProduct").is_some());
700 assert!(ctx.has_repository("CatalogProduct"));
701 }
702
703 #[test]
704 fn module_macro_registers_entity_behavior_pairs() {
705 let ctx =
706 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
707 assert!(ctx.entity("CatalogProduct").is_some());
708 assert!(ctx.repository_behavior("CatalogProduct").is_some());
709 }
710
711 #[test]
712 fn repository_returns_optimistic_lock_conflict() {
713 let store = InMemoryMetadataStore::new().with_entity(entity());
714 let executor = StubExecutor {
715 affected: 0,
716 rows: Vec::new(),
717 };
718 let repo = Repository::new(&PostgresDialect, &store, &executor);
719
720 let err = repo
721 .update(
722 &UpdateCommand::new("Order", 1_u64)
723 .expected_version(3)
724 .value("name", "next"),
725 )
726 .unwrap_err();
727
728 match err {
729 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
730 other => panic!("unexpected error: {other}"),
731 }
732 }
733
734 #[test]
735 fn user_context_indexes_resources_and_locals() {
736 let mut ctx =
737 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
738 ctx.insert_resource::<u64>(42);
739 ctx.insert_named_resource("tenant", String::from("acme"));
740 ctx.put_local("trace_id", "req-1");
741
742 assert!(ctx.entity("Order").is_some());
743 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
744 assert_eq!(
745 ctx.get_named_resource::<String>("tenant"),
746 Some(&String::from("acme"))
747 );
748 assert_eq!(
749 ctx.local("trace_id"),
750 Some(&Value::Text("req-1".to_owned()))
751 );
752 }
753
754 #[test]
755 fn user_context_builds_context_repository() {
756 let mut ctx =
757 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
758 ctx.insert_resource(PostgresDialect);
759 ctx.insert_resource(StubExecutor {
760 affected: 1,
761 rows: Vec::new(),
762 });
763
764 let repo = ctx.repository::<PostgresDialect, StubExecutor>().unwrap();
765 let affected = repo
766 .update(
767 &UpdateCommand::new("Order", 1_u64)
768 .expected_version(3)
769 .value("name", "next"),
770 )
771 .unwrap();
772
773 assert_eq!(affected, 1);
774 }
775
776 #[test]
777 fn user_context_resolves_repository_by_entity_type() {
778 let mut ctx = UserContext::new()
779 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
780 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
781 ctx.insert_resource(PostgresDialect);
782 ctx.insert_resource(StubExecutor {
783 affected: 1,
784 rows: Vec::new(),
785 });
786
787 let repo = ctx
788 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
789 .unwrap();
790 assert_eq!(repo.entity(), "Order");
791 assert_eq!(repo.select().entity, "Order");
792
793 let affected = repo
794 .insert(
795 &repo
796 .insert_command()
797 .value("id", 1_u64)
798 .value("version", 1_i64)
799 .value("name", "n"),
800 )
801 .unwrap();
802 assert_eq!(affected, 1);
803 }
804
805 #[test]
806 fn resolved_repository_applies_behavior_hooks() {
807 let mut ctx = UserContext::new()
808 .with_metadata(
809 InMemoryMetadataStore::new()
810 .with_entity(entity())
811 .with_entity(line_entity())
812 .with_entity(product_entity()),
813 )
814 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
815 .with_repository_behavior_registry(
816 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
817 );
818 ctx.insert_resource(PostgresDialect);
819 ctx.insert_resource(StubExecutor {
820 affected: 1,
821 rows: Vec::new(),
822 });
823
824 let repo = ctx
825 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
826 .unwrap();
827
828 let compiled = repo.compile(&repo.select()).unwrap();
829 assert!(compiled.sql.contains("WHERE (version = $1)"));
830
831 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
832 let affected = repo.insert(&insert).unwrap();
833 assert_eq!(affected, 1);
834 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
835 }
836
837 #[test]
838 fn resolved_repository_applies_request_policy_after_behavior_hooks() {
839 let mut ctx = UserContext::new()
840 .with_metadata(
841 InMemoryMetadataStore::new()
842 .with_entity(entity())
843 .with_entity(line_entity())
844 .with_entity(product_entity()),
845 )
846 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
847 .with_repository_behavior_registry(
848 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
849 )
850 .with_request_policy(TenantRequestPolicy);
851 ctx.insert_named_resource("tenant_id", 9_u64);
852 ctx.insert_resource(PostgresDialect);
853 ctx.insert_resource(StubExecutor {
854 affected: 1,
855 rows: Vec::new(),
856 });
857
858 let repo = ctx
859 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
860 .unwrap();
861
862 let compiled = repo.compile(&repo.select()).unwrap();
863 assert!(compiled.sql.contains("version = $1"));
864 assert!(compiled.sql.contains("id = $2"));
865
866 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
867 let command = repo.prepare_insert_command(&insert).unwrap();
868 assert_eq!(command.values.get("version"), Some(&Value::I64(9)));
869 }
870
871 #[test]
872 fn resolved_repository_prepares_insert_command_with_generated_id() {
873 let mut ctx = UserContext::new()
874 .with_metadata(
875 InMemoryMetadataStore::new()
876 .with_entity(entity())
877 .with_entity(line_entity())
878 .with_entity(product_entity()),
879 )
880 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
881 .with_repository_behavior_registry(
882 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
883 )
884 .with_internal_id_generator(FixedIdGenerator(42));
885 ctx.insert_resource(PostgresDialect);
886 ctx.insert_resource(StubExecutor {
887 affected: 1,
888 rows: Vec::new(),
889 });
890
891 let repo = ctx
892 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
893 .unwrap();
894
895 let prepared = repo
896 .prepare_insert_command(&repo.insert_command().value("id", 0_u64).value("name", "n"))
897 .unwrap();
898
899 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
900 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
901 assert_eq!(
902 prepared.values.get("name"),
903 Some(&Value::Text("n".to_owned()))
904 );
905
906 let prepared_zero_version = repo
907 .prepare_insert_command(
908 &repo
909 .insert_command()
910 .value("id", 0_u64)
911 .value("version", 0_i64)
912 .value("name", "zero-version"),
913 )
914 .unwrap();
915 assert_eq!(
916 prepared_zero_version.values.get("version"),
917 Some(&Value::I64(1))
918 );
919 }
920
921 #[test]
922 fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
923 let mut ctx = UserContext::new()
924 .with_metadata(
925 InMemoryMetadataStore::new()
926 .with_entity(entity())
927 .with_entity(line_entity())
928 .with_entity(product_entity()),
929 )
930 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
931 .with_repository_behavior_registry(
932 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
933 )
934 .with_internal_id_generator(SequentialIdGenerator::new(500));
935 ctx.insert_resource(PostgresDialect);
936 ctx.insert_resource(StubExecutor {
937 affected: 1,
938 rows: Vec::new(),
939 });
940
941 let repo = ctx
942 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
943 .unwrap();
944 let graph = GraphNode::new("Order").value("name", "root").relation(
945 "lines",
946 GraphNode::new("OrderLine")
947 .value("name", "line-1")
948 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
949 );
950
951 let saved = repo.save_graph(graph).unwrap();
952
953 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
954 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
955 let lines = saved.relations.get("lines").unwrap();
956 assert_eq!(lines.len(), 1);
957 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(502)));
958 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
959 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
960 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(501)));
961 let product = lines[0].relations.get("product").unwrap();
962 assert_eq!(product[0].values.get("id"), Some(&Value::U64(501)));
963 }
964
965 #[test]
966 fn resolved_repository_extracts_and_saves_typed_entity_graph() {
967 let mut ctx = UserContext::new()
968 .with_metadata(
969 InMemoryMetadataStore::new()
970 .with_entity(entity())
971 .with_entity(line_entity())
972 .with_entity(product_entity()),
973 )
974 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
975 .with_internal_id_generator(SequentialIdGenerator::new(700));
976 ctx.insert_resource(PostgresDialect);
977 ctx.insert_resource(StubExecutor {
978 affected: 1,
979 rows: Vec::new(),
980 });
981
982 let repo = ctx
983 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
984 .unwrap();
985 let order = TypedGraphOrder {
986 id: 0,
987 version: 0,
988 name: "typed-root".to_owned(),
989 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
990 id: 0,
991 order_id: None,
992 name: "typed-line".to_owned(),
993 product_id: None,
994 product: Some(TypedGraphProduct {
995 id: 0,
996 name: "typed-product".to_owned(),
997 }),
998 }]),
999 };
1000
1001 let extracted = repo.graph_node_from_entity(order).unwrap();
1002 assert_eq!(extracted.entity, "Order");
1003 assert_eq!(
1004 extracted.values.get("name"),
1005 Some(&Value::Text("typed-root".to_owned()))
1006 );
1007 assert_eq!(extracted.values.get("id"), Some(&Value::U64(0)));
1008 assert_eq!(extracted.relations["lines"].len(), 1);
1009 assert_eq!(
1010 extracted.relations["lines"][0].values.get("name"),
1011 Some(&Value::Text("typed-line".to_owned()))
1012 );
1013 assert_eq!(
1014 extracted.relations["lines"][0].relations["product"].len(),
1015 1
1016 );
1017
1018 let saved = repo.save_graph(extracted).unwrap();
1019 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
1020 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1021 let lines = saved.relations.get("lines").unwrap();
1022 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(702)));
1023 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
1024 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
1025 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(701)));
1026 assert_eq!(
1027 lines[0].relations["product"][0].values.get("id"),
1028 Some(&Value::U64(701))
1029 );
1030 }
1031
1032 #[test]
1033 fn resolved_repository_saves_typed_entity_graph_directly() {
1034 let mut ctx = UserContext::new()
1035 .with_metadata(
1036 InMemoryMetadataStore::new()
1037 .with_entity(entity())
1038 .with_entity(line_entity())
1039 .with_entity(product_entity()),
1040 )
1041 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1042 .with_internal_id_generator(SequentialIdGenerator::new(800));
1043 ctx.insert_resource(PostgresDialect);
1044 ctx.insert_resource(StubExecutor {
1045 affected: 1,
1046 rows: Vec::new(),
1047 });
1048
1049 let repo = ctx
1050 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1051 .unwrap();
1052 let saved = repo
1053 .save_entity_graph(TypedGraphOrder {
1054 id: 0,
1055 version: 0,
1056 name: "typed-direct".to_owned(),
1057 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1058 id: 0,
1059 order_id: None,
1060 name: "typed-line".to_owned(),
1061 product_id: None,
1062 product: Some(TypedGraphProduct {
1063 id: 0,
1064 name: "typed-product".to_owned(),
1065 }),
1066 }]),
1067 })
1068 .unwrap();
1069
1070 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
1071 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1072 assert_eq!(
1073 saved.relations["lines"][0].values.get("order_id"),
1074 Some(&Value::U64(800))
1075 );
1076 assert_eq!(
1077 saved.relations["lines"][0].values.get("product_id"),
1078 Some(&Value::U64(801))
1079 );
1080 }
1081
1082 #[test]
1083 fn custom_user_context_can_drive_insert_preparation() {
1084 let mut ctx = UserContext::new()
1085 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1086 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1087 .with_repository_behavior_registry(
1088 InMemoryRepositoryBehaviorRegistry::new()
1089 .with_behavior("Order", ContextAwareOrderBehavior),
1090 )
1091 .with_internal_id_generator(FixedIdGenerator(99));
1092 ctx.insert_named_resource("tenant", String::from("acme"));
1093 ctx.insert_named_resource("initial_version", 7_i64);
1094 ctx.put_local("trace_id", "req-9");
1095 ctx.insert_resource(PostgresDialect);
1096 ctx.insert_resource(StubExecutor {
1097 affected: 1,
1098 rows: Vec::new(),
1099 });
1100
1101 let repo = ctx
1102 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1103 .unwrap();
1104 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
1105
1106 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
1107 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
1108 assert_eq!(
1109 prepared.values.get("name"),
1110 Some(&Value::Text("acme:req-9".to_owned()))
1111 );
1112 }
1113
1114 #[test]
1115 fn checker_registry_validates_and_fixes_insert_commands() {
1116 let mut ctx = UserContext::new()
1117 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1118 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1119 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1120 .with_internal_id_generator(FixedIdGenerator(77));
1121 ctx.insert_resource(PostgresDialect);
1122 ctx.insert_resource(StubExecutor {
1123 affected: 1,
1124 rows: Vec::new(),
1125 });
1126
1127 let repo = ctx
1128 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1129 .unwrap();
1130 let prepared = repo
1131 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
1132 .unwrap();
1133
1134 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
1135 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
1136 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1137
1138 let error = repo
1139 .prepare_insert_command(&repo.insert_command().value("name", "no"))
1140 .unwrap_err();
1141 match error {
1142 RuntimeError::Check(results) => {
1143 assert_eq!(results.len(), 1);
1144 assert_eq!(results[0].location.to_string(), "name");
1145 }
1146 other => panic!("unexpected checker error: {other:?}"),
1147 }
1148 }
1149
1150 #[test]
1151 fn typed_checker_validates_and_fixes_derived_entities_without_record_access() {
1152 let mut ctx = UserContext::new()
1153 .with_metadata(InMemoryMetadataStore::new().with_entity(Order::entity_descriptor()))
1154 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1155 .with_checker_registry(
1156 InMemoryCheckerRegistry::new()
1157 .with_checker(TypedEntityChecker::<Order, _>::new(TypedOrderChecker)),
1158 )
1159 .with_internal_id_generator(FixedIdGenerator(79));
1160 ctx.insert_resource(PostgresDialect);
1161 ctx.insert_resource(StubExecutor {
1162 affected: 1,
1163 rows: Vec::new(),
1164 });
1165
1166 let repo = ctx
1167 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1168 .unwrap();
1169 let prepared = repo
1170 .prepare_insert_command(
1171 &repo
1172 .insert_command()
1173 .value("name", "fix")
1174 .value("version", 1_i64),
1175 )
1176 .unwrap();
1177 assert_eq!(
1178 prepared.values.get("name"),
1179 Some(&Value::Text("fixed".to_owned()))
1180 );
1181 assert_eq!(prepared.values.get("id"), Some(&Value::U64(79)));
1182 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1183
1184 let error = repo
1185 .prepare_insert_command(&repo.insert_command().value("version", 1_i64))
1186 .unwrap_err();
1187 match error {
1188 RuntimeError::Check(results) => {
1189 assert!(
1190 results
1191 .iter()
1192 .any(|result| result.rule == CheckRule::Required
1193 && result.location.to_string() == "name")
1194 );
1195 }
1196 other => panic!("unexpected typed checker error: {other:?}"),
1197 }
1198 }
1199
1200
1201
1202 #[test]
1203 fn checker_registry_reports_nested_create_locations_and_fixes_records() {
1204 let ctx = UserContext::new()
1205 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1206
1207 let mut child = Record::from([
1208 (String::from("id"), Value::U64(10)),
1209 (
1210 String::from(CHECK_OBJECT_STATUS_FIELD),
1211 Value::from(CheckObjectStatus::Create),
1212 ),
1213 ]);
1214 let error = ctx
1215 .check_and_fix_record_at(
1216 "Order",
1217 &mut child,
1218 &ObjectLocation::hash_root("lines").element(0),
1219 )
1220 .unwrap_err();
1221
1222 assert_eq!(child.get("version"), Some(&Value::I64(1)));
1223 match error {
1224 RuntimeError::Check(results) => {
1225 assert_eq!(results.len(), 1);
1226 assert_eq!(results[0].rule, CheckRule::Required);
1227 assert_eq!(results[0].location.to_string(), "lines[0].name");
1228 }
1229 other => panic!("unexpected checker error: {other:?}"),
1230 }
1231
1232 child.insert("name".to_owned(), Value::Text("valid child".to_owned()));
1233 ctx.check_and_fix_record_at(
1234 "Order",
1235 &mut child,
1236 &ObjectLocation::hash_root("lines").element(0),
1237 )
1238 .unwrap();
1239 }
1240
1241 #[test]
1242 fn built_in_language_translators_cover_fifteen_languages() {
1243 assert_eq!(Language::ALL.len(), 15);
1244 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
1245 let messages = Language::ALL
1246 .iter()
1247 .map(|language| translate_check_result(*language, &result))
1248 .collect::<Vec<_>>();
1249
1250 assert!(messages.iter().all(|message| !message.is_empty()));
1251 assert!(messages.iter().any(|message| message.contains("required")));
1252 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
1253 assert!(
1254 messages
1255 .iter()
1256 .any(|message| message.contains("obligatoire"))
1257 );
1258 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
1259 assert_eq!(
1260 Language::from_code("zh-TW"),
1261 Some(Language::TraditionalChinese)
1262 );
1263 }
1264
1265 #[test]
1266 fn user_context_language_switch_translates_checker_errors() {
1267 let mut ctx = UserContext::new()
1268 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1269 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1270 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1271 .with_internal_id_generator(FixedIdGenerator(77))
1272 .with_language(Language::Chinese);
1273 ctx.insert_resource(PostgresDialect);
1274 ctx.insert_resource(StubExecutor {
1275 affected: 1,
1276 rows: Vec::new(),
1277 });
1278
1279 let repo = ctx
1280 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1281 .unwrap();
1282 let error = repo
1283 .prepare_insert_command(&repo.insert_command())
1284 .unwrap_err();
1285 match error {
1286 RuntimeError::Check(results) => {
1287 assert_eq!(results.len(), 1);
1288 assert!(
1289 results[0]
1290 .message
1291 .as_ref()
1292 .is_some_and(|message| message.contains("å¿…å¡«"))
1293 );
1294 }
1295 other => panic!("unexpected checker error: {other:?}"),
1296 }
1297
1298 let mut ctx = UserContext::new().with_language(Language::English);
1299 ctx.set_language_code("es").unwrap();
1300 assert_eq!(ctx.language(), Language::Spanish);
1301 }
1302
1303 #[test]
1304 fn checker_registry_merges_graph_update_fixes_by_object_status() {
1305 let mut ctx = UserContext::new()
1306 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1307 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1308 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1309 ctx.insert_resource(PostgresDialect);
1310 ctx.insert_resource(StubExecutor {
1311 affected: 1,
1312 rows: vec![Record::from([
1313 ("id".to_owned(), Value::U64(1)),
1314 ("version".to_owned(), Value::I64(1)),
1315 ("name".to_owned(), Value::Text("old".to_owned())),
1316 ])],
1317 });
1318
1319 let repo = ctx
1320 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1321 .unwrap();
1322 let saved = repo
1323 .save_graph(
1324 GraphNode::new("Order")
1325 .value("id", 1_u64)
1326 .value("version", 1_i64)
1327 .value("name", "graph-update"),
1328 )
1329 .unwrap();
1330
1331 assert_eq!(
1332 saved.values.get("name"),
1333 Some(&Value::Text("graph-update-checked".to_owned()))
1334 );
1335 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1336 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1337 }
1338
1339 #[test]
1340 fn user_context_event_sink_receives_repository_mutation_events() {
1341 let events = Arc::new(Mutex::new(Vec::new()));
1342 let mut ctx = UserContext::new()
1343 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1344 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1345 .with_internal_id_generator(FixedIdGenerator(88))
1346 .with_event_sink(RecordingEventSink {
1347 events: events.clone(),
1348 });
1349 ctx.insert_resource(PostgresDialect);
1350 ctx.insert_resource(StubExecutor {
1351 affected: 1,
1352 rows: vec![Record::from([
1353 ("id".to_owned(), Value::U64(88)),
1354 ("version".to_owned(), Value::I64(1)),
1355 ("name".to_owned(), Value::Text("old".to_owned())),
1356 ])],
1357 });
1358
1359 let repo = ctx
1360 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1361 .unwrap();
1362 repo.insert(&repo.insert_command().value("name", "created"))
1363 .unwrap();
1364 repo.update(
1365 &repo
1366 .update_command(88_u64)
1367 .expected_version(1)
1368 .value("name", "updated"),
1369 )
1370 .unwrap();
1371 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1372 .unwrap();
1373 repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1374
1375 let events = events.lock().unwrap();
1376 assert_eq!(events.len(), 4);
1377 assert_eq!(events[0].kind, EntityEventKind::Created);
1378 assert_eq!(events[0].entity, "Order");
1379 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1380 assert_eq!(events[1].kind, EntityEventKind::Updated);
1381 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1382 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1383 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1384 assert_eq!(
1385 events[1]
1386 .old_values
1387 .as_ref()
1388 .and_then(|values| values.get("name")),
1389 None );
1391 assert_eq!(
1392 events[1]
1393 .new_values
1394 .as_ref()
1395 .and_then(|values| values.get("name")),
1396 Some(&Value::Text("updated".to_owned()))
1397 );
1398 assert_eq!(events[1].changes.len(), 1);
1399 assert_eq!(events[1].changes[0].field, "name");
1400 assert_eq!(
1401 events[1].changes[0].old_value,
1402 None );
1404 assert_eq!(
1405 events[1].changes[0].new_value,
1406 Some(Value::Text("updated".to_owned()))
1407 );
1408 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1409 assert!(events[2].old_values.is_none()); assert!(events[2].new_values.is_none());
1411 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1412 assert_eq!(
1413 events[3]
1414 .old_values
1415 .as_ref()
1416 .and_then(|values| values.get("version")),
1417 None );
1419 assert_eq!(
1420 events[3]
1421 .new_values
1422 .as_ref()
1423 .and_then(|values| values.get("version")),
1424 Some(&Value::I64(4))
1425 );
1426 assert_eq!(events[3].changes[0].field, "version");
1427 }
1428
1429 #[test]
1430 fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1431 let events = Arc::new(Mutex::new(Vec::new()));
1432 let mut ctx = UserContext::new()
1433 .with_metadata(
1434 InMemoryMetadataStore::new()
1435 .with_entity(entity())
1436 .with_entity(line_entity())
1437 .with_entity(product_entity()),
1438 )
1439 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1440 .with_event_sink(RecordingEventSink {
1441 events: events.clone(),
1442 });
1443 ctx.insert_resource(PostgresDialect);
1444 ctx.insert_resource(StubExecutor {
1445 affected: 1,
1446 rows: vec![Record::from([
1447 ("id".to_owned(), Value::U64(1)),
1448 ("version".to_owned(), Value::I64(1)),
1449 ("name".to_owned(), Value::Text("old".to_owned())),
1450 ])],
1451 });
1452
1453 let repo = ctx
1454 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1455 .unwrap();
1456 repo.save_graph(
1457 GraphNode::new("Order")
1458 .value("id", 1_u64)
1459 .value("version", 1_i64)
1460 .value("name", "updated")
1461 .relation(
1462 "lines",
1463 GraphNode::new("OrderLine")
1464 .value("name", "line")
1465 .value("product_id", 3_u64),
1466 ),
1467 )
1468 .unwrap();
1469
1470 let events = events.lock().unwrap();
1471 assert_eq!(events.len(), 2);
1472 assert_eq!(events[0].kind, EntityEventKind::Updated);
1473 assert_eq!(events[0].entity, "Order");
1474 assert_eq!(events[1].kind, EntityEventKind::Created);
1475 assert_eq!(events[1].entity, "OrderLine");
1476 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1477 }
1478
1479 #[test]
1480 fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1481 let mut ctx = UserContext::new()
1482 .with_metadata(
1483 InMemoryMetadataStore::new()
1484 .with_entity(entity())
1485 .with_entity(line_entity())
1486 .with_entity(product_entity()),
1487 )
1488 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1489 .with_internal_id_generator(SequentialIdGenerator::new(500));
1490 ctx.insert_resource(PostgresDialect);
1491 ctx.insert_resource(StubExecutor {
1492 affected: 1,
1493 rows: vec![Record::from([
1494 ("id".to_owned(), Value::U64(1)),
1495 ("version".to_owned(), Value::I64(1)),
1496 ("name".to_owned(), Value::Text("old".to_owned())),
1497 ])],
1498 });
1499
1500 let plan = ctx
1501 .plan_for_save_graph::<PostgresDialect, StubExecutor>(
1502 GraphNode::new("Order")
1503 .value("id", 1_u64)
1504 .value("version", 1_i64)
1505 .value("name", "updated")
1506 .relation(
1507 "lines",
1508 GraphNode::new("OrderLine")
1509 .value("name", "new-line-a")
1510 .value("product_id", 2_u64),
1511 )
1512 .relation(
1513 "lines",
1514 GraphNode::new("OrderLine")
1515 .value("name", "new-line-b")
1516 .value("product_id", 2_u64),
1517 )
1518 .relation(
1519 "lines",
1520 GraphNode::new("OrderLine")
1521 .value("id", 5_u64)
1522 .value("version", 1_i64)
1523 .value("name", "same-update-a"),
1524 )
1525 .relation(
1526 "lines",
1527 GraphNode::new("OrderLine")
1528 .value("id", 6_u64)
1529 .value("version", 1_i64)
1530 .value("name", "same-update-b"),
1531 )
1532 .relation(
1533 "lines",
1534 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1535 )
1536 .relation(
1537 "lines",
1538 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1539 ),
1540 )
1541 .unwrap();
1542 let counts = plan.grouped_counts();
1543
1544 assert_eq!(
1545 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1546 Some(&1)
1547 );
1548 assert_eq!(
1549 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1550 Some(&2)
1551 );
1552 assert_eq!(
1553 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1554 Some(&2)
1555 );
1556 assert_eq!(
1557 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1558 Some(&1)
1559 );
1560 assert_eq!(
1561 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1562 Some(&1)
1563 );
1564 let create_batch = plan
1565 .batches
1566 .iter()
1567 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1568 .unwrap();
1569 assert_eq!(create_batch.items.len(), 2);
1570 assert_eq!(
1571 create_batch.items[0].values.get("id"),
1572 Some(&Value::U64(500))
1573 );
1574 assert_eq!(
1575 create_batch.items[1].values.get("id"),
1576 Some(&Value::U64(501))
1577 );
1578 let update_batch = plan
1579 .batches
1580 .iter()
1581 .find(|batch| {
1582 batch.entity == "OrderLine"
1583 && batch.kind == GraphMutationKind::Update
1584 && batch.update_fields == vec!["name".to_owned()]
1585 })
1586 .unwrap();
1587 assert_eq!(update_batch.items.len(), 2);
1588 }
1589
1590 #[test]
1591 fn resolved_repository_builds_relation_plans() {
1592 let mut ctx = UserContext::new()
1593 .with_metadata(
1594 InMemoryMetadataStore::new()
1595 .with_entity(entity())
1596 .with_entity(line_entity())
1597 .with_entity(product_entity()),
1598 )
1599 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1600 .with_repository_behavior_registry(
1601 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1602 );
1603 ctx.insert_resource(PostgresDialect);
1604 ctx.insert_resource(StubExecutor {
1605 affected: 1,
1606 rows: Vec::new(),
1607 });
1608
1609 let repo = ctx
1610 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1611 .unwrap();
1612 let plans = repo.relation_plans().unwrap();
1613
1614 assert_eq!(plans.len(), 1);
1615 assert_eq!(plans[0].relation_name, "lines");
1616 assert_eq!(plans[0].target_entity, "OrderLine");
1617 assert_eq!(plans[0].local_key, "id");
1618 assert_eq!(plans[0].foreign_key, "order_id");
1619 assert!(plans[0].many);
1620 }
1621
1622 #[test]
1623 fn resolved_repository_builds_relation_query_from_parent_rows() {
1624 let mut ctx = UserContext::new()
1625 .with_metadata(
1626 InMemoryMetadataStore::new()
1627 .with_entity(entity())
1628 .with_entity(line_entity())
1629 .with_entity(product_entity()),
1630 )
1631 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1632 .with_repository_behavior_registry(
1633 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1634 );
1635 ctx.insert_resource(PostgresDialect);
1636 ctx.insert_resource(StubExecutor {
1637 affected: 1,
1638 rows: Vec::new(),
1639 });
1640
1641 let repo = ctx
1642 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1643 .unwrap();
1644 let parent_rows = vec![
1645 Record::from([(String::from("id"), Value::U64(11))]),
1646 Record::from([(String::from("id"), Value::U64(12))]),
1647 ];
1648
1649 let query = repo.relation_query("lines", &parent_rows).unwrap();
1650 let compiled = repo.compile(&query).unwrap();
1651 assert!(compiled.sql.contains("FROM orderline"));
1652 assert!(compiled.sql.contains("order_id IN ($1, $2)"));
1653 assert_eq!(compiled.params, vec![Value::U64(11), Value::U64(12)]);
1654 }
1655
1656 #[test]
1657 fn resolved_repository_enhances_parent_rows_with_relations() {
1658 let mut ctx = UserContext::new()
1659 .with_metadata(
1660 InMemoryMetadataStore::new()
1661 .with_entity(entity())
1662 .with_entity(line_entity())
1663 .with_entity(product_entity()),
1664 )
1665 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1666 .with_repository_behavior_registry(
1667 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1668 );
1669 ctx.insert_resource(PostgresDialect);
1670 ctx.insert_resource(StubExecutor {
1671 affected: 1,
1672 rows: vec![
1673 Record::from([
1674 (String::from("id"), Value::U64(101)),
1675 (String::from("order_id"), Value::U64(11)),
1676 (String::from("name"), Value::Text(String::from("l1"))),
1677 ]),
1678 Record::from([
1679 (String::from("id"), Value::U64(102)),
1680 (String::from("order_id"), Value::U64(11)),
1681 (String::from("name"), Value::Text(String::from("l2"))),
1682 ]),
1683 Record::from([
1684 (String::from("id"), Value::U64(201)),
1685 (String::from("order_id"), Value::U64(12)),
1686 (String::from("name"), Value::Text(String::from("l3"))),
1687 ]),
1688 ],
1689 });
1690
1691 let repo = ctx
1692 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1693 .unwrap();
1694 let mut parents = vec![
1695 Record::from([(String::from("id"), Value::U64(11))]),
1696 Record::from([(String::from("id"), Value::U64(12))]),
1697 ];
1698
1699 repo.enhance_relations(&mut parents).unwrap();
1700
1701 match parents[0].get("lines") {
1702 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1703 other => panic!("unexpected lines payload: {other:?}"),
1704 }
1705 match parents[1].get("lines") {
1706 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1707 other => panic!("unexpected lines payload: {other:?}"),
1708 }
1709 }
1710
1711 #[test]
1712 fn resolved_repository_fetches_smart_list_of_entities() {
1713 let mut ctx = UserContext::new()
1714 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1715 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1716 ctx.insert_resource(PostgresDialect);
1717 ctx.insert_resource(StubExecutor {
1718 affected: 1,
1719 rows: vec![Record::from([
1720 (String::from("id"), Value::U64(7)),
1721 (String::from("version"), Value::I64(2)),
1722 (String::from("name"), Value::Text(String::from("typed"))),
1723 ])],
1724 });
1725
1726 let repo = ctx
1727 .resolve_repository::<PostgresDialect, StubExecutor>("Order")
1728 .unwrap();
1729 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1730
1731 assert_eq!(rows.len(), 1);
1732 assert_eq!(
1733 rows.first(),
1734 Some(&OrderEntity {
1735 id: 7,
1736 version: 2,
1737 name: String::from("typed"),
1738 })
1739 );
1740 }
1741
1742 #[test]
1743 fn resolved_repository_fetches_smart_list_of_derived_entities() {
1744 let mut ctx = UserContext::new()
1745 .with_metadata(
1746 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1747 )
1748 .with_repository_registry(
1749 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1750 );
1751 ctx.insert_resource(PostgresDialect);
1752 ctx.insert_resource(StubExecutor {
1753 affected: 1,
1754 rows: vec![Record::from([
1755 (String::from("id"), Value::U64(9)),
1756 (String::from("name"), Value::Text(String::from("derived"))),
1757 ])],
1758 });
1759
1760 let repo = ctx
1761 .resolve_repository::<PostgresDialect, StubExecutor>("CatalogProduct")
1762 .unwrap();
1763 let rows = repo
1764 .fetch_entities::<CatalogProductRow>(&repo.select())
1765 .unwrap();
1766
1767 assert_eq!(rows.len(), 1);
1768 assert_eq!(
1769 rows.first(),
1770 Some(&CatalogProductRow {
1771 id: 9,
1772 name: String::from("derived"),
1773 })
1774 );
1775 }
1776
1777 #[test]
1778 fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1779 let mut ctx = UserContext::new()
1780 .with_metadata(
1781 InMemoryMetadataStore::new()
1782 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1783 )
1784 .with_repository_registry(
1785 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1786 );
1787 ctx.insert_resource(PostgresDialect);
1788 ctx.insert_resource(StubExecutor {
1789 affected: 1,
1790 rows: vec![Record::from([
1791 (String::from("id"), Value::U64(1)),
1792 (String::from("lineCount"), Value::I64(3)),
1793 (String::from("amount"), Value::F64(18.5)),
1794 ])],
1795 });
1796
1797 let repo = ctx
1798 .resolve_repository::<PostgresDialect, StubExecutor>("OrderAggregate")
1799 .unwrap();
1800 let rows = repo
1801 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1802 .unwrap();
1803
1804 assert_eq!(rows.len(), 1);
1805 assert_eq!(rows.data[0].id, 1);
1806 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1807 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1808 assert_eq!(
1809 rows.into_vec().into_iter().next().unwrap().into_json(),
1810 serde_json::json!({
1811 "id": 1,
1812 "lineCount": 3,
1813 "amount": 18.5
1814 })
1815 );
1816 }
1817
1818 #[test]
1819 fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1820 let executor = QueueExecutor {
1821 affected: 1,
1822 rows: Mutex::new(VecDeque::from([
1823 vec![
1824 Record::from([
1825 (String::from("id"), Value::U64(1)),
1826 (String::from("version"), Value::I64(1)),
1827 (String::from("name"), Value::Text(String::from("first"))),
1828 ]),
1829 Record::from([
1830 (String::from("id"), Value::U64(2)),
1831 (String::from("version"), Value::I64(1)),
1832 (String::from("name"), Value::Text(String::from("second"))),
1833 ]),
1834 ],
1835 vec![Record::from([
1836 (String::from("order_id"), Value::U64(1)),
1837 (String::from("lineCount"), Value::I64(3)),
1838 ])],
1839 ])),
1840 queries: Mutex::new(Vec::new()),
1841 };
1842 let mut ctx = UserContext::new()
1843 .with_metadata(
1844 InMemoryMetadataStore::new()
1845 .with_entity(entity())
1846 .with_entity(line_entity()),
1847 )
1848 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1849 ctx.insert_resource(PostgresDialect);
1850 ctx.insert_resource(executor);
1851
1852 let repo = ctx
1853 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1854 .unwrap();
1855 let rows = repo
1856 .fetch_all_with_relation_aggregates(
1857 &repo
1858 .select()
1859 .project("id")
1860 .project("version")
1861 .project("name"),
1862 &[RelationAggregate::new(
1863 "lines",
1864 "lineCount",
1865 SelectQuery::new("OrderLine"),
1866 true,
1867 )],
1868 )
1869 .unwrap();
1870
1871 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1872 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1873
1874 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1875 let queries = executor.queries.lock().unwrap();
1876 assert_eq!(queries.len(), 2);
1877 assert_eq!(
1878 queries[1],
1879 "SELECT order_id, COUNT(*) AS lineCount FROM orderline WHERE (order_id IN ($1, $2)) GROUP BY order_id"
1880 );
1881 }
1882
1883 #[test]
1884 fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1885 let mut line = line_entity();
1886 line.properties
1887 .iter_mut()
1888 .find(|property| property.name == "order_id")
1889 .unwrap()
1890 .column_name = "order_ref".to_owned();
1891 let executor = QueueExecutor {
1892 affected: 1,
1893 rows: Mutex::new(VecDeque::from([
1894 vec![Record::from([
1895 (String::from("id"), Value::U64(1)),
1896 (String::from("version"), Value::I64(1)),
1897 (String::from("name"), Value::Text(String::from("first"))),
1898 ])],
1899 vec![Record::from([
1900 (String::from("order_ref"), Value::I64(1)),
1901 (String::from("lineCount"), Value::I64(3)),
1902 ])],
1903 ])),
1904 queries: Mutex::new(Vec::new()),
1905 };
1906 let mut ctx = UserContext::new()
1907 .with_metadata(
1908 InMemoryMetadataStore::new()
1909 .with_entity(entity())
1910 .with_entity(line),
1911 )
1912 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1913 ctx.insert_resource(PostgresDialect);
1914 ctx.insert_resource(executor);
1915
1916 let repo = ctx
1917 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1918 .unwrap();
1919 let rows = repo
1920 .fetch_all_with_relation_aggregates(
1921 &repo
1922 .select()
1923 .project("id")
1924 .project("version")
1925 .project("name"),
1926 &[RelationAggregate::new(
1927 "lines",
1928 "lineCount",
1929 SelectQuery::new("OrderLine"),
1930 true,
1931 )],
1932 )
1933 .unwrap();
1934
1935 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1936 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1937 assert_eq!(
1938 executor.queries.lock().unwrap()[1],
1939 "SELECT order_ref, COUNT(*) AS lineCount FROM orderline WHERE (order_ref IN ($1)) GROUP BY order_ref"
1940 );
1941 }
1942
1943 #[test]
1944 fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
1945 let executor = QueueExecutor {
1946 affected: 1,
1947 rows: Mutex::new(VecDeque::from([vec![Record::from([(
1948 String::from("count"),
1949 Value::I64(2),
1950 )])]])),
1951 queries: Mutex::new(Vec::new()),
1952 };
1953 let mut ctx = UserContext::new()
1954 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1955 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1956 ctx.insert_resource(PostgresDialect);
1957 ctx.insert_resource(executor);
1958 ctx.insert_resource(InMemoryAggregationCache::default());
1959
1960 let repo = ctx
1961 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1962 .unwrap();
1963 let query = repo
1964 .select()
1965 .count("count")
1966 .enable_aggregation_cache_for(60_000);
1967
1968 let first = repo.fetch_all(&query).unwrap();
1969 let second = repo.fetch_all(&query).unwrap();
1970
1971 assert_eq!(first, second);
1972 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1973 assert_eq!(executor.queries.lock().unwrap().len(), 1);
1974 }
1975
1976 #[test]
1977 fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
1978 let executor = QueueExecutor {
1979 affected: 1,
1980 rows: Mutex::new(VecDeque::from([
1981 vec![Record::from([(String::from("count"), Value::I64(2))])],
1982 vec![Record::from([(String::from("count"), Value::I64(3))])],
1983 ])),
1984 queries: Mutex::new(Vec::new()),
1985 };
1986 let mut ctx = UserContext::new()
1987 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1988 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1989 ctx.insert_resource(PostgresDialect);
1990 ctx.insert_resource(executor);
1991 ctx.insert_resource(
1992 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
1993 as Arc<dyn AggregationCacheBackend>,
1994 );
1995
1996 let repo = ctx
1997 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
1998 .unwrap();
1999 let query = repo
2000 .select()
2001 .count("count")
2002 .enable_aggregation_cache_for(60_000);
2003
2004 let first = repo.fetch_all(&query).unwrap();
2005 let cached = repo.fetch_all(&query).unwrap();
2006 repo.insert(
2007 &InsertCommand::new("Order")
2008 .value("id", 9_u64)
2009 .value("version", 1_i64)
2010 .value("name", "new"),
2011 )
2012 .unwrap();
2013 let refreshed = repo.fetch_all(&query).unwrap();
2014
2015 assert_eq!(first, cached);
2016 assert_ne!(cached, refreshed);
2017 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2018 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2019 }
2020
2021 #[test]
2022 fn aggregation_cache_propagates_to_relation_aggregates() {
2023 let parent_rows = vec![
2024 Record::from([
2025 (String::from("id"), Value::U64(1)),
2026 (String::from("version"), Value::I64(1)),
2027 (String::from("name"), Value::Text(String::from("first"))),
2028 ]),
2029 Record::from([
2030 (String::from("id"), Value::U64(2)),
2031 (String::from("version"), Value::I64(1)),
2032 (String::from("name"), Value::Text(String::from("second"))),
2033 ]),
2034 ];
2035 let aggregate_rows = vec![Record::from([
2036 (String::from("order_id"), Value::U64(1)),
2037 (String::from("lineCount"), Value::I64(3)),
2038 ])];
2039 let executor = QueueExecutor {
2040 affected: 1,
2041 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
2042 queries: Mutex::new(Vec::new()),
2043 };
2044 let mut ctx = UserContext::new()
2045 .with_metadata(
2046 InMemoryMetadataStore::new()
2047 .with_entity(entity())
2048 .with_entity(line_entity()),
2049 )
2050 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2051 ctx.insert_resource(PostgresDialect);
2052 ctx.insert_resource(executor);
2053 ctx.insert_resource(InMemoryAggregationCache::default());
2054
2055 let repo = ctx
2056 .resolve_repository::<PostgresDialect, QueueExecutor>("Order")
2057 .unwrap();
2058 let query = repo
2059 .select()
2060 .project("id")
2061 .project("version")
2062 .project("name")
2063 .enable_aggregation_cache_for(60_000)
2064 .propagate_aggregation_cache(60_000);
2065 let aggregate =
2066 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2067
2068 let first = repo
2069 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
2070 .unwrap();
2071 let second = repo
2072 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2073 .unwrap();
2074
2075 assert_eq!(first, second);
2076 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2077 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2078 }
2079
2080 #[test]
2081 fn memory_repository_fetches_smart_list_entities_with_query_features() {
2082 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2083 let repository = MemoryRepository::new(metadata).with_rows(
2084 "Order",
2085 vec![
2086 Record::from([
2087 (String::from("id"), Value::U64(1)),
2088 (String::from("version"), Value::I64(1)),
2089 (String::from("name"), Value::Text(String::from("alpha"))),
2090 ]),
2091 Record::from([
2092 (String::from("id"), Value::U64(2)),
2093 (String::from("version"), Value::I64(1)),
2094 (String::from("name"), Value::Text(String::from("beta"))),
2095 ]),
2096 Record::from([
2097 (String::from("id"), Value::U64(3)),
2098 (String::from("version"), Value::I64(1)),
2099 (String::from("name"), Value::Text(String::from("gamma"))),
2100 ]),
2101 ],
2102 );
2103
2104 let query = teaql_core::SelectQuery::new("Order")
2105 .filter(Expr::Binary {
2106 left: Box::new(Expr::column("id")),
2107 op: teaql_core::BinaryOp::Gte,
2108 right: Box::new(Expr::value(2_u64)),
2109 })
2110 .order_by(OrderBy::desc("id"))
2111 .limit(1);
2112
2113 let orders = repository.fetch_entities::<Order>(&query).unwrap();
2114
2115 assert_eq!(orders.ids(), vec![Value::U64(3)]);
2116 assert_eq!(orders.versions(), vec![1]);
2117 assert_eq!(orders.first().unwrap().name, "gamma");
2118 }
2119
2120 #[test]
2121 fn memory_repository_runs_relation_aggregates() {
2122 let metadata = InMemoryMetadataStore::new()
2123 .with_entity(entity())
2124 .with_entity(line_entity());
2125
2126 let repository = MemoryRepository::new(metadata)
2127 .with_rows(
2128 "Order",
2129 vec![
2130 Record::from([
2131 (String::from("id"), Value::U64(1)),
2132 (String::from("version"), Value::I64(1)),
2133 (String::from("name"), Value::Text(String::from("first"))),
2134 ]),
2135 Record::from([
2136 (String::from("id"), Value::U64(2)),
2137 (String::from("version"), Value::I64(1)),
2138 (String::from("name"), Value::Text(String::from("second"))),
2139 ]),
2140 ],
2141 )
2142 .with_rows(
2143 "OrderLine",
2144 vec![
2145 Record::from([
2146 (String::from("id"), Value::U64(10)),
2147 (String::from("version"), Value::I64(1)),
2148 (String::from("order_id"), Value::U64(1)),
2149 (String::from("name"), Value::Text(String::from("line1"))),
2150 ]),
2151 Record::from([
2152 (String::from("id"), Value::U64(11)),
2153 (String::from("version"), Value::I64(1)),
2154 (String::from("order_id"), Value::U64(1)),
2155 (String::from("name"), Value::Text(String::from("line2"))),
2156 ]),
2157 Record::from([
2158 (String::from("id"), Value::U64(12)),
2159 (String::from("version"), Value::I64(1)),
2160 (String::from("order_id"), Value::U64(2)),
2161 (String::from("name"), Value::Text(String::from("line3"))),
2162 ]),
2163 ],
2164 );
2165
2166 let query = SelectQuery::new("Order").project("id").project("name");
2167 let aggregate = RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2168
2169 let rows = repository
2170 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2171 .unwrap();
2172
2173 assert_eq!(rows.len(), 2);
2174
2175 let first_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(1))).unwrap();
2176 assert_eq!(first_order.get("lineCount"), Some(&Value::U64(2)));
2177
2178 let second_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(2))).unwrap();
2179 assert_eq!(second_order.get("lineCount"), Some(&Value::U64(1)));
2180 }
2181
2182 #[test]
2183 fn memory_repository_runs_aggregates() {
2184 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2185 let repository = MemoryRepository::new(metadata).with_rows(
2186 "Order",
2187 vec![
2188 Record::from([
2189 (String::from("id"), Value::U64(1)),
2190 (String::from("version"), Value::I64(1)),
2191 (String::from("name"), Value::Text(String::from("alpha"))),
2192 ]),
2193 Record::from([
2194 (String::from("id"), Value::U64(2)),
2195 (String::from("version"), Value::I64(2)),
2196 (String::from("name"), Value::Text(String::from("beta"))),
2197 ]),
2198 ],
2199 );
2200
2201 let query = teaql_core::SelectQuery {
2202 entity: String::from("Order"),
2203 projection: Vec::new(),
2204 expr_projection: Vec::new(),
2205 filter: None,
2206 having: None,
2207 order_by: Vec::new(),
2208 slice: None,
2209 trace_chain: Vec::new(),
2210 aggregates: vec![
2211 Aggregate {
2212 function: AggregateFunction::Count,
2213 field: String::from("id"),
2214 alias: String::from("count"),
2215 },
2216 Aggregate {
2217 function: AggregateFunction::Sum,
2218 field: String::from("version"),
2219 alias: String::from("versionSum"),
2220 },
2221 ],
2222 group_by: Vec::new(),
2223 relations: Vec::new(),
2224 aggregation_cache: None,
2225 comment: None,
2226 raw_sql: None,
2227 raw_sql_search_criteria: Vec::new(),
2228 dynamic_properties: Vec::new(),
2229 raw_projections: Vec::new(),
2230 object_group_bys: Vec::new(),
2231 child_enhancements: Vec::new(),
2232 };
2233
2234 let rows = repository.fetch_all(&query).unwrap();
2235
2236 assert_eq!(rows.len(), 1);
2237 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
2238 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2239 }
2240
2241 #[test]
2242 fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
2243 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2244 let repository = MemoryRepository::new(metadata).with_rows(
2245 "Order",
2246 vec![
2247 Record::from([
2248 (String::from("id"), Value::U64(1)),
2249 (String::from("version"), Value::I64(1)),
2250 (String::from("name"), Value::Text(String::from("alpha"))),
2251 ]),
2252 Record::from([
2253 (String::from("id"), Value::U64(2)),
2254 (String::from("version"), Value::I64(2)),
2255 (String::from("name"), Value::Text(String::from("alpha"))),
2256 ]),
2257 Record::from([
2258 (String::from("id"), Value::U64(3)),
2259 (String::from("version"), Value::I64(3)),
2260 (String::from("name"), Value::Text(String::from("tmp-beta"))),
2261 ]),
2262 ],
2263 );
2264
2265 let rows = repository
2266 .fetch_all(
2267 &teaql_core::SelectQuery::new("Order")
2268 .filter(
2269 Expr::between("version", 1_i64, 3_i64)
2270 .and_expr(Expr::not_like("name", "tmp%"))
2271 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
2272 )
2273 .group_by("name")
2274 .count("total")
2275 .sum("version", "versionSum"),
2276 )
2277 .unwrap();
2278
2279 assert_eq!(rows.len(), 1);
2280 assert_eq!(
2281 rows[0].get("name"),
2282 Some(&Value::Text(String::from("alpha")))
2283 );
2284 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2285 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2286 }
2287
2288 #[test]
2289 fn memory_repository_runs_extended_aggregates_and_having() {
2290 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2291 let repository = MemoryRepository::new(metadata).with_rows(
2292 "Order",
2293 vec![
2294 Record::from([
2295 (String::from("id"), Value::U64(1)),
2296 (String::from("version"), Value::I64(1)),
2297 (String::from("name"), Value::Text(String::from("alpha"))),
2298 ]),
2299 Record::from([
2300 (String::from("id"), Value::U64(2)),
2301 (String::from("version"), Value::I64(3)),
2302 (String::from("name"), Value::Text(String::from("alpha"))),
2303 ]),
2304 Record::from([
2305 (String::from("id"), Value::U64(3)),
2306 (String::from("version"), Value::I64(7)),
2307 (String::from("name"), Value::Text(String::from("beta"))),
2308 ]),
2309 ],
2310 );
2311
2312 let rows = repository
2313 .fetch_all(
2314 &teaql_core::SelectQuery::new("Order")
2315 .group_by("name")
2316 .count("total")
2317 .stddev("version", "stddevVersion")
2318 .var_pop("version", "varPopVersion")
2319 .bit_or("version", "bitOrVersion")
2320 .having(Expr::gt("total", 1_i64)),
2321 )
2322 .unwrap();
2323
2324 assert_eq!(rows.len(), 1);
2325 assert_eq!(
2326 rows[0].get("name"),
2327 Some(&Value::Text(String::from("alpha")))
2328 );
2329 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2330 assert_eq!(
2331 rows[0].get("stddevVersion").map(Value::to_json_value),
2332 Some(serde_json::Value::String(
2333 "1.4142135623730951454746218583".to_owned()
2334 ))
2335 );
2336 assert_eq!(
2337 rows[0].get("varPopVersion"),
2338 Some(&Value::Decimal(Decimal::ONE))
2339 );
2340 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
2341 }
2342
2343 #[test]
2344 fn memory_repository_runs_sound_like_filter() {
2345 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2346 let repository = MemoryRepository::new(metadata).with_rows(
2347 "Order",
2348 vec![
2349 Record::from([
2350 (String::from("id"), Value::U64(1)),
2351 (String::from("version"), Value::I64(1)),
2352 (String::from("name"), Value::Text(String::from("Robert"))),
2353 ]),
2354 Record::from([
2355 (String::from("id"), Value::U64(2)),
2356 (String::from("version"), Value::I64(1)),
2357 (String::from("name"), Value::Text(String::from("Rupert"))),
2358 ]),
2359 Record::from([
2360 (String::from("id"), Value::U64(3)),
2361 (String::from("version"), Value::I64(1)),
2362 (String::from("name"), Value::Text(String::from("Ashcraft"))),
2363 ]),
2364 ],
2365 );
2366
2367 let rows = repository
2368 .fetch_all(
2369 &teaql_core::SelectQuery::new("Order")
2370 .filter(Expr::sound_like("name", "Robert"))
2371 .order_asc("id"),
2372 )
2373 .unwrap();
2374
2375 assert_eq!(rows.len(), 2);
2376 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2377 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2378 }
2379
2380 #[test]
2381 fn memory_repository_runs_java_style_string_match_filters() {
2382 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2383 let repository = MemoryRepository::new(metadata).with_rows(
2384 "Order",
2385 vec![
2386 Record::from([
2387 (String::from("id"), Value::U64(1)),
2388 (String::from("version"), Value::I64(1)),
2389 (String::from("name"), Value::Text(String::from("tea-order"))),
2390 ]),
2391 Record::from([
2392 (String::from("id"), Value::U64(2)),
2393 (String::from("version"), Value::I64(1)),
2394 (
2395 String::from("name"),
2396 Value::Text(String::from("coffee-order")),
2397 ),
2398 ]),
2399 Record::from([
2400 (String::from("id"), Value::U64(3)),
2401 (String::from("version"), Value::I64(1)),
2402 (
2403 String::from("name"),
2404 Value::Text(String::from("tea-archived")),
2405 ),
2406 ]),
2407 ],
2408 );
2409
2410 let rows = repository
2411 .fetch_all(
2412 &teaql_core::SelectQuery::new("Order")
2413 .filter(
2414 Expr::contain("name", "tea")
2415 .and_expr(Expr::begin_with("name", "tea"))
2416 .and_expr(Expr::end_with("name", "order"))
2417 .and_expr(Expr::not_contain("name", "coffee"))
2418 .and_expr(Expr::not_begin_with("name", "archived"))
2419 .and_expr(Expr::not_end_with("name", "draft")),
2420 )
2421 .order_asc("id"),
2422 )
2423 .unwrap();
2424
2425 assert_eq!(rows.len(), 1);
2426 assert_eq!(
2427 rows[0].get("name"),
2428 Some(&Value::Text("tea-order".to_owned()))
2429 );
2430 }
2431
2432 #[test]
2433 fn memory_repository_runs_property_to_property_filters() {
2434 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2435 let repository = MemoryRepository::new(metadata).with_rows(
2436 "Order",
2437 vec![
2438 Record::from([
2439 (String::from("id"), Value::U64(1)),
2440 (String::from("version"), Value::I64(2)),
2441 (String::from("name"), Value::Text(String::from("keep"))),
2442 ]),
2443 Record::from([
2444 (String::from("id"), Value::U64(2)),
2445 (String::from("version"), Value::I64(1)),
2446 (String::from("name"), Value::Text(String::from("skip"))),
2447 ]),
2448 ],
2449 );
2450
2451 let rows = repository
2452 .fetch_all(
2453 &teaql_core::SelectQuery::new("Order")
2454 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2455 .order_asc("id"),
2456 )
2457 .unwrap();
2458
2459 assert_eq!(rows.len(), 1);
2460 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2461 }
2462
2463 #[test]
2464 fn memory_repository_supports_mutations_and_optimistic_locking() {
2465 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2466 let repository = MemoryRepository::new(metadata);
2467
2468 repository
2469 .insert(
2470 &InsertCommand::new("Order")
2471 .value("id", 10_u64)
2472 .value("version", 1_i64)
2473 .value("name", "draft"),
2474 )
2475 .unwrap();
2476 repository
2477 .update(
2478 &UpdateCommand::new("Order", 10_u64)
2479 .expected_version(1)
2480 .value("name", "submitted"),
2481 )
2482 .unwrap();
2483
2484 let row = repository
2485 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2486 .unwrap()
2487 .pop()
2488 .unwrap();
2489 assert_eq!(
2490 row.get("name"),
2491 Some(&Value::Text(String::from("submitted")))
2492 );
2493 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2494
2495 let conflict = repository
2496 .update(
2497 &UpdateCommand::new("Order", 10_u64)
2498 .expected_version(1)
2499 .value("name", "stale"),
2500 )
2501 .unwrap_err();
2502 assert!(matches!(
2503 conflict,
2504 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2505 ));
2506
2507 repository
2508 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2509 .unwrap();
2510 let row = repository
2511 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2512 .unwrap()
2513 .pop()
2514 .unwrap();
2515 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2516
2517 repository
2518 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2519 .unwrap();
2520 let row = repository
2521 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2522 .unwrap()
2523 .pop()
2524 .unwrap();
2525 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2526 }
2527
2528 #[tokio::test]
2529 async fn user_context_reports_missing_schema_provider() {
2530 let err = UserContext::new().ensure_schema().await.unwrap_err();
2531 assert!(
2532 matches!(err, RuntimeError::Schema(message) if message == "missing schema provider")
2533 );
2534 }
2535
2536 #[test]
2537 fn user_context_stores_and_exposes_user_identifier() {
2538 let mut ctx = UserContext::new();
2539 let pid = std::process::id();
2540 let thread_id_str = format!("{:?}", std::thread::current().id());
2541 let numeric_thread_id = thread_id_str
2542 .strip_prefix("ThreadId(")
2543 .and_then(|s| s.strip_suffix(")"))
2544 .unwrap_or(&thread_id_str);
2545 let os_user = std::env::var("USER")
2546 .or_else(|_| std::env::var("USERNAME"))
2547 .unwrap_or_else(|_| "main".to_owned());
2548 let expected_default = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
2549 assert_eq!(ctx.user_identifier(), Some(expected_default.as_str()));
2550
2551 ctx.set_user_identifier("user-123");
2552 assert_eq!(ctx.user_identifier(), Some("user-123"));
2553
2554 let ctx2 = UserContext::new().with_user_identifier("user-456");
2555 assert_eq!(ctx2.user_identifier(), Some("user-456"));
2556
2557 let mut ctx3 = UserContext::new();
2558 ctx3.set_user_identifier_option(Some("user-789".to_owned()));
2559 assert_eq!(ctx3.user_identifier(), Some("user-789"));
2560 ctx3.set_user_identifier_option(None);
2561 assert_eq!(ctx3.user_identifier(), None);
2562
2563 let ctx4 = UserContext::new().with_user_identifier_option(Some("user-abc".to_owned()));
2564 assert_eq!(ctx4.user_identifier(), Some("user-abc"));
2565
2566 }
2567}
2568
2569pub use checker::{
2570 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
2571 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, TypedChecker,
2572 TypedEntityChecker, clear_record_status, mark_record_status,
2573};