1#![allow(warnings)]
2mod checker;
3mod context;
4mod entity_runtime;
5mod entity_status;
6mod error;
7mod event;
8mod graph;
9mod id;
10mod language;
11mod memory;
12mod registry;
13mod repository;
14
15pub use context::{
16 DataStore, InfoLogEntry, InMemoryDataStore, LogPayload, SchemaProvider, SqlLogEntry, SqlLogOperation,
17 SqlLogOptions, UnifiedLogBuffer, UnifiedLogEntry, UserContext,
18};
19pub use entity_status::{EntityAction, EntityStatus};
20pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
21pub use error::{ContextError, RepositoryError, RuntimeError};
22pub use event::{
23 RawAuditEvent, RawAuditEventKind, RawAuditEventSink, EntityPropertyChange, InMemoryRawAuditEventSink,
24 SafeAuditEvent, SafeAuditField, SafeAuditEventSink,
25};
26pub use graph::{
27 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem,
28 GraphNode, GraphOperation, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
29};
30pub(crate) use id::local_id_generator;
31pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
32pub use language::{
33 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
34};
35pub use memory::{MemoryRepository, MemoryRepositoryError};
36pub use registry::{
37 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
38 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
39 RequestPolicy, RuntimeModule,
40};
41pub use repository::{
42 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
43 RelationLoadPlan, Repository, ResolvedRepository,
44};
45
46#[cfg(test)]
47mod tests {
48 use std::collections::{BTreeMap, VecDeque};
49 use std::sync::{Arc, Mutex};
50
51 use super::{
52 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult,
53 CheckResults, CheckRule, Checker, RawAuditEvent, RawAuditEventKind, RawAuditEventSink,
54 GraphMutationKind, GraphNode, InMemoryAggregationCache,
55 InMemoryCheckerRegistry, InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry,
56 InMemoryRepositoryRegistry, InternalIdGenerator, Language, MemoryRepository, MetadataStore,
57 ObjectLocation, Repository, RepositoryBehavior, RepositoryError,
58 RequestPolicy, RuntimeError, RuntimeModule, SqlLogOperation, SqlLogOptions, TypedChecker,
59 TypedEntityChecker, UserContext, translate_check_result,
60 };
61 use teaql_data_service::{
62 DataServiceCapabilities, DataServiceExecutor, ExecutionMetadata, MutationExecutor,
63 MutationRequest, MutationResult, QueryExecutor, QueryRequest, QueryResult, DataServiceOperation,
64 };
65 use teaql_core::{
66 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
67 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
68 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
69 };
70 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
71 use teaql_sql::{CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect, quote_identifier_if_needed};
72
73 const ORDER_DEFAULT_PROJECTION: &str = "id, version, name";
74
75 #[derive(Debug, Default, Clone, Copy)]
76 struct PostgresDialect;
77
78 impl SqlDialect for PostgresDialect {
79 fn kind(&self) -> DatabaseKind {
80 DatabaseKind::PostgreSql
81 }
82
83 fn quote_ident(&self, ident: &str) -> String {
84 quote_identifier_if_needed(ident, '"')
85 }
86
87 fn placeholder(&self, index: usize) -> String {
88 format!("${index}")
89 }
90
91 fn schema_type_sql(
92 &self,
93 data_type: DataType,
94 _property: &PropertyDescriptor,
95 ) -> Result<&'static str, SqlCompileError> {
96 match data_type {
97 DataType::Bool => Ok("BOOLEAN"),
98 DataType::I64 | DataType::U64 => Ok("BIGINT"),
99 DataType::F64 => Ok("DOUBLE PRECISION"),
100 DataType::Decimal => Ok("NUMERIC"),
101 DataType::Text => Ok("TEXT"),
102 DataType::Json => Ok("JSONB"),
103 DataType::Date => Ok("DATE"),
104 DataType::Timestamp => Ok("TIMESTAMPTZ"),
105 }
106 }
107 }
108
109 fn entity() -> EntityDescriptor {
110 EntityDescriptor::new("Order")
111 .table_name("orders")
112 .property(
113 PropertyDescriptor::new("id", DataType::U64)
114 .column_name("id")
115 .id()
116 .not_null(),
117 )
118 .property(
119 PropertyDescriptor::new("version", DataType::I64)
120 .column_name("version")
121 .version()
122 .not_null(),
123 )
124 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
125 .relation(
126 teaql_core::RelationDescriptor::new("lines", "OrderLine")
127 .local_key("id")
128 .foreign_key("order_id")
129 .many(),
130 )
131 }
132
133 fn line_entity() -> EntityDescriptor {
134 EntityDescriptor::new("OrderLine")
135 .table_name("orderline")
136 .property(
137 PropertyDescriptor::new("id", DataType::U64)
138 .column_name("id")
139 .id()
140 .not_null(),
141 )
142 .property(
143 PropertyDescriptor::new("version", DataType::I64)
144 .column_name("version")
145 .version(),
146 )
147 .property(
148 PropertyDescriptor::new("order_id", DataType::U64)
149 .column_name("order_id")
150 .not_null(),
151 )
152 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
153 .property(
154 PropertyDescriptor::new("product_id", DataType::U64)
155 .column_name("product_id")
156 .not_null(),
157 )
158 .relation(
159 teaql_core::RelationDescriptor::new("product", "Product")
160 .local_key("product_id")
161 .foreign_key("id"),
162 )
163 }
164
165 fn product_entity() -> EntityDescriptor {
166 EntityDescriptor::new("Product")
167 .table_name("product")
168 .property(
169 PropertyDescriptor::new("id", DataType::U64)
170 .column_name("id")
171 .id()
172 .not_null(),
173 )
174 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
175 }
176
177 #[derive(Debug, Default)]
178 struct StubExecutor {
179 affected: u64,
180 rows: Vec<Record>,
181 }
182
183 #[derive(Debug, Default)]
184 struct QueueExecutor {
185 affected: u64,
186 rows: Mutex<VecDeque<Vec<Record>>>,
187 queries: Mutex<Vec<String>>,
188 }
189
190 struct OrderBehavior;
191
192 #[allow(dead_code)]
193 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
194 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
195 struct CatalogProductRow {
196 #[teaql(id)]
197 id: u64,
198 name: String,
199 }
200
201 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
202 #[teaql(entity = "OrderAggregate", table = "orders")]
203 struct OrderAggregateDynamic {
204 #[teaql(id)]
205 id: u64,
206 #[teaql(dynamic)]
207 dynamic: BTreeMap<String, Value>,
208 }
209
210 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
211 #[teaql(entity = "Product", table = "product")]
212 struct ProductEntityRow {
213 #[teaql(id)]
214 id: u64,
215 name: String,
216 }
217
218 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
219 #[teaql(entity = "OrderLine", table = "orderline")]
220 struct OrderLineEntityRow {
221 #[teaql(id)]
222 id: u64,
223 #[teaql(column = "order_id")]
224 order_id: u64,
225 name: String,
226 #[teaql(column = "product_id")]
227 product_id: u64,
228 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
229 product: Option<ProductEntityRow>,
230 }
231
232 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
233 #[teaql(entity = "OrderLine", table = "orderline")]
234 struct ProductLineEntityRow {
235 #[teaql(id)]
236 id: u64,
237 #[teaql(column = "order_id")]
238 order_id: u64,
239 name: String,
240 #[teaql(column = "product_id")]
241 product_id: u64,
242 }
243
244 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
245 #[teaql(entity = "Product", table = "product")]
246 struct ProductWithLinesEntityRow {
247 #[teaql(id)]
248 id: u64,
249 name: String,
250 #[teaql(relation(
251 target = "OrderLine",
252 local_key = "id",
253 foreign_key = "product_id",
254 many
255 ))]
256 lines: teaql_core::SmartList<ProductLineEntityRow>,
257 }
258
259 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
260 #[teaql(entity = "OrderLine", table = "orderline")]
261 struct OrderLineWithProductEntityRow {
262 #[teaql(id)]
263 id: u64,
264 #[teaql(column = "order_id")]
265 order_id: u64,
266 name: String,
267 #[teaql(column = "product_id")]
268 product_id: u64,
269 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
270 product: Option<ProductWithLinesEntityRow>,
271 }
272
273 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
274 #[teaql(entity = "Order", table = "orders")]
275 struct OrderAggregateRow {
276 #[teaql(id)]
277 id: u64,
278 #[teaql(version)]
279 version: i64,
280 name: String,
281 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
282 lines: teaql_core::SmartList<OrderLineEntityRow>,
283 }
284
285 #[derive(Debug, Clone, PartialEq, DeriveTeaqlEntity)]
286 #[teaql(entity = "Order", table = "orders")]
287 struct Order {
288 #[teaql(id)]
289 id: u64,
290 #[teaql(version)]
291 version: i64,
292 name: String,
293 }
294
295 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
296 #[teaql(entity = "Product", table = "product")]
297 struct TypedGraphProduct {
298 #[teaql(id)]
299 id: u64,
300 name: String,
301 }
302
303 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
304 #[teaql(entity = "OrderLine", table = "orderline")]
305 struct TypedGraphLine {
306 #[teaql(id)]
307 id: u64,
308 #[teaql(column = "order_id")]
309 order_id: Option<u64>,
310 name: String,
311 #[teaql(column = "product_id")]
312 product_id: Option<u64>,
313 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
314 product: Option<TypedGraphProduct>,
315 }
316
317 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
318 #[teaql(entity = "Order", table = "orders")]
319 struct TypedGraphOrder {
320 #[teaql(id)]
321 id: u64,
322 #[teaql(version)]
323 version: i64,
324 name: String,
325 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
326 lines: teaql_core::SmartList<TypedGraphLine>,
327 }
328
329 #[derive(Debug, PartialEq, Eq)]
330 struct OrderEntity {
331 id: u64,
332 version: i64,
333 name: String,
334 }
335
336 impl teaql_core::TeaqlEntity for OrderEntity {
337 fn entity_descriptor() -> EntityDescriptor {
338 entity()
339 }
340 }
341
342 impl Entity for OrderEntity {
343 fn from_record(record: Record) -> Result<Self, EntityError> {
344 let id = match record.get("id") {
345 Some(Value::U64(v)) => *v,
346 Some(Value::I64(v)) if *v >= 0 => *v as u64,
347 other => {
348 return Err(EntityError::new(
349 "Order",
350 format!("invalid id field: {other:?}"),
351 ));
352 }
353 };
354 let version = match record.get("version") {
355 Some(Value::I64(v)) => *v,
356 other => {
357 return Err(EntityError::new(
358 "Order",
359 format!("invalid version field: {other:?}"),
360 ));
361 }
362 };
363 let name = match record.get("name") {
364 Some(Value::Text(v)) => v.clone(),
365 other => {
366 return Err(EntityError::new(
367 "Order",
368 format!("invalid name field: {other:?}"),
369 ));
370 }
371 };
372 Ok(Self { id, version, name })
373 }
374
375 fn into_record(self) -> Record {
376 Record::from([
377 (String::from("id"), Value::U64(self.id)),
378 (String::from("version"), Value::I64(self.version)),
379 (String::from("name"), Value::Text(self.name)),
380 ])
381 }
382 }
383
384 #[derive(Debug)]
385 struct StubError;
386
387 impl std::fmt::Display for StubError {
388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389 write!(f, "stub error")
390 }
391 }
392
393 impl std::error::Error for StubError {}
394
395 impl DataServiceExecutor for StubExecutor {
396 type Error = StubError;
397
398 fn capabilities(&self) -> DataServiceCapabilities {
399 DataServiceCapabilities::default()
400 }
401 }
402
403 impl QueryExecutor for StubExecutor {
404 async fn query(&self, _request: QueryRequest) -> Result<QueryResult, Self::Error> {
405 Ok(QueryResult {
406 rows: self.rows.clone(),
407 metadata: ExecutionMetadata {
408 debug_query: None,
409 backend: "stub".to_owned(),
410 operation: DataServiceOperation::Query,
411 started_at: std::time::SystemTime::now(),
412 ended_at: std::time::SystemTime::now(),
413 affected_rows: None,
414 result_count: Some(self.rows.len()),
415 trace_chain: Vec::new(),
416 comment: None,
417 backend_request_id: None,
418 },
419 })
420 }
421 }
422
423 impl MutationExecutor for StubExecutor {
424 async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
425 Ok(MutationResult {
426 affected_rows: self.affected,
427 generated_values: Record::new(),
428 metadata: ExecutionMetadata {
429 debug_query: None,
430 backend: "stub".to_owned(),
431 operation: DataServiceOperation::Update,
432 started_at: std::time::SystemTime::now(),
433 ended_at: std::time::SystemTime::now(),
434 affected_rows: Some(self.affected),
435 result_count: None,
436 trace_chain: Vec::new(),
437 comment: None,
438 backend_request_id: None,
439 },
440 })
441 }
442 }
443
444 impl DataServiceExecutor for QueueExecutor {
445 type Error = StubError;
446
447 fn capabilities(&self) -> DataServiceCapabilities {
448 DataServiceCapabilities::default()
449 }
450 }
451
452 impl QueryExecutor for QueueExecutor {
453 async fn query(&self, request: QueryRequest) -> Result<QueryResult, Self::Error> {
454 let sql_approx = format!("SELECT ... FROM {} ...", request.query.entity);
455 self.queries.lock().unwrap().push(sql_approx);
456 Ok(QueryResult {
457 rows: self.rows.lock().unwrap().pop_front().unwrap_or_default(),
458 metadata: ExecutionMetadata {
459 debug_query: None,
460 backend: "queue".to_owned(),
461 operation: DataServiceOperation::Query,
462 started_at: std::time::SystemTime::now(),
463 ended_at: std::time::SystemTime::now(),
464 affected_rows: None,
465 result_count: Some(0),
466 trace_chain: Vec::new(),
467 comment: None,
468 backend_request_id: None,
469 },
470 })
471 }
472 }
473
474 impl MutationExecutor for QueueExecutor {
475 async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
476 Ok(MutationResult {
477 affected_rows: self.affected,
478 generated_values: Record::new(),
479 metadata: ExecutionMetadata {
480 debug_query: None,
481 backend: "queue".to_owned(),
482 operation: DataServiceOperation::Update,
483 started_at: std::time::SystemTime::now(),
484 ended_at: std::time::SystemTime::now(),
485 affected_rows: Some(self.affected),
486 result_count: None,
487 trace_chain: Vec::new(),
488 comment: None,
489 backend_request_id: None,
490 },
491 })
492 }
493 }
494
495
496
497 impl RepositoryBehavior for OrderBehavior {
498 fn before_select(
499 &self,
500 _ctx: &UserContext,
501 query: &mut teaql_core::SelectQuery,
502 ) -> Result<(), RuntimeError> {
503 query.filter = Some(Expr::eq("version", 1_i64));
504 Ok(())
505 }
506
507 fn before_insert(
508 &self,
509 _ctx: &UserContext,
510 command: &mut InsertCommand,
511 ) -> Result<(), RuntimeError> {
512 command
513 .values
514 .entry("version".to_owned())
515 .or_insert(Value::I64(1));
516 Ok(())
517 }
518
519 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
520 vec!["lines".to_owned()]
521 }
522 }
523
524 struct ContextAwareOrderBehavior;
525 struct TenantRequestPolicy;
526 struct OrderChecker;
527 struct TypedOrderChecker;
528 #[derive(Clone)]
529 struct RecordingEventSink {
530 events: Arc<Mutex<Vec<RawAuditEvent>>>,
531 }
532
533 impl RepositoryBehavior for ContextAwareOrderBehavior {
534 fn before_insert(
535 &self,
536 ctx: &UserContext,
537 command: &mut InsertCommand,
538 ) -> Result<(), RuntimeError> {
539 let tenant = ctx
540 .get_named_resource::<String>("tenant")
541 .cloned()
542 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
543 let version = *ctx
544 .get_named_resource::<i64>("initial_version")
545 .ok_or_else(|| {
546 RuntimeError::Behavior("missing initial_version resource".to_owned())
547 })?;
548 let trace_id = match ctx.local("trace_id") {
549 Some(Value::Text(value)) => value.clone(),
550 other => {
551 return Err(RuntimeError::Behavior(format!(
552 "missing trace_id local, got {other:?}"
553 )));
554 }
555 };
556
557 command
558 .values
559 .entry("name".to_owned())
560 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
561 command
562 .values
563 .entry("version".to_owned())
564 .or_insert(Value::I64(version));
565 Ok(())
566 }
567 }
568
569 impl RequestPolicy for TenantRequestPolicy {
570 fn enforce_select(
571 &self,
572 ctx: &UserContext,
573 query: &mut SelectQuery,
574 ) -> Result<(), RuntimeError> {
575 if query.entity == "Order" {
576 let tenant_id = ctx
577 .get_named_resource::<u64>("tenant_id")
578 .copied()
579 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
580 query.filter = Some(match query.filter.take() {
581 Some(filter) => filter.and_expr(Expr::eq("id", tenant_id)),
582 None => Expr::eq("id", tenant_id),
583 });
584 }
585 Ok(())
586 }
587
588 fn enforce_insert(
589 &self,
590 ctx: &UserContext,
591 command: &mut InsertCommand,
592 ) -> Result<(), RuntimeError> {
593 if command.entity == "Order" {
594 let tenant_id = ctx
595 .get_named_resource::<u64>("tenant_id")
596 .copied()
597 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
598 command
599 .values
600 .insert("version".to_owned(), Value::I64(tenant_id as i64));
601 }
602 Ok(())
603 }
604 }
605
606 impl Checker for OrderChecker {
607 fn entity(&self) -> &str {
608 "Order"
609 }
610
611 fn check_and_fix(
612 &self,
613 _ctx: &UserContext,
614 record: &mut Record,
615 location: &ObjectLocation,
616 results: &mut CheckResults,
617 ) {
618 let status = CheckObjectStatus::from_record(record);
619 if status.is_create() {
620 self.required(record, "name", location, results);
621 record.entry("version".to_owned()).or_insert(Value::I64(1));
622 }
623 if status.is_update()
624 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
625 {
626 record.insert(
627 "name".to_owned(),
628 Value::Text("graph-update-checked".to_owned()),
629 );
630 }
631 self.min_string_length(record, "name", 3, location, results);
632 }
633 }
634
635 impl TypedChecker<Order> for TypedOrderChecker {
636 fn check_and_fix_typed(
637 &self,
638 _ctx: &UserContext,
639 entity: &mut Order,
640 status: CheckObjectStatus,
641 location: &ObjectLocation,
642 results: &mut CheckResults,
643 ) {
644 if status.is_create() {
645 if entity.name.is_empty() {
646 results.push(CheckResult::required(location.clone().member("name")));
647 }
648 }
649 if entity.name.chars().count() < 3 {
650 results.push(CheckResult::min_str(
651 location.clone().member("name"),
652 3,
653 entity.name.clone(),
654 ));
655 }
656 if entity.name == "fix" {
657 entity.name = "fixed".to_owned();
658 }
659 }
660 }
661
662 impl RawAuditEventSink for RecordingEventSink {
663 fn on_event(&self, _ctx: &UserContext, event: &RawAuditEvent) -> Result<(), RuntimeError> {
664 self.events.lock().unwrap().push(event.clone());
665 Ok(())
666 }
667 }
668
669 struct FixedIdGenerator(u64);
670
671 impl InternalIdGenerator for FixedIdGenerator {
672 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
673 Ok(self.0)
674 }
675 }
676
677 struct SequentialIdGenerator {
678 next: Mutex<u64>,
679 }
680
681 impl SequentialIdGenerator {
682 fn new(next: u64) -> Self {
683 Self {
684 next: Mutex::new(next),
685 }
686 }
687 }
688
689 impl InternalIdGenerator for SequentialIdGenerator {
690 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
691 let mut next = self
692 .next
693 .lock()
694 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
695 let id = *next;
696 *next += 1;
697 Ok(id)
698 }
699 }
700
701 #[tokio::test]
702 async fn metadata_store_registers_entities() {
703 let store = InMemoryMetadataStore::new().with_entity(entity());
704 assert!(store.entity("Order").is_some());
705 }
706
707 #[tokio::test]
708 async fn runtime_module_registers_descriptor_into_context() {
709 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
710 assert!(ctx.entity("Order").is_some());
711 assert!(ctx.has_repository("Order"));
712 }
713
714 #[tokio::test]
715 async fn runtime_module_registers_derived_entity_and_behavior() {
716 let ctx = UserContext::new().with_module(
717 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
718 );
719 assert!(ctx.entity("CatalogProduct").is_some());
720 assert!(ctx.has_repository("CatalogProduct"));
721 assert!(ctx.repository_behavior("CatalogProduct").is_some());
722 }
723
724 #[tokio::test]
725 async fn module_macro_registers_multiple_entities() {
726 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
727 assert!(ctx.entity("CatalogProduct").is_some());
728 assert!(ctx.has_repository("CatalogProduct"));
729 }
730
731 #[tokio::test]
732 async fn module_macro_registers_entity_behavior_pairs() {
733 let ctx =
734 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
735 assert!(ctx.entity("CatalogProduct").is_some());
736 assert!(ctx.repository_behavior("CatalogProduct").is_some());
737 }
738
739 #[tokio::test]
740 async fn repository_returns_optimistic_lock_conflict() {
741 let store = InMemoryMetadataStore::new().with_entity(entity());
742 let executor = StubExecutor {
743 affected: 0,
744 rows: Vec::new(),
745 };
746 let repo = Repository::new(&store, &executor);
747
748 let err = repo
749 .update(
750 &UpdateCommand::new("Order", 1_u64)
751 .expected_version(3)
752 .value("name", "next"),
753 )
754 .await.unwrap_err();
755
756 match err {
757 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
758 other => panic!("unexpected error: {other}"),
759 }
760 }
761
762 #[tokio::test]
763 async fn user_context_indexes_resources_and_locals() {
764 let mut ctx =
765 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
766 ctx.insert_resource::<u64>(42);
767 ctx.insert_named_resource("tenant", String::from("acme"));
768 ctx.put_local("trace_id", "req-1");
769
770 assert!(ctx.entity("Order").is_some());
771 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
772 assert_eq!(
773 ctx.get_named_resource::<String>("tenant"),
774 Some(&String::from("acme"))
775 );
776 assert_eq!(
777 ctx.local("trace_id"),
778 Some(&Value::Text("req-1".to_owned()))
779 );
780 }
781
782 #[tokio::test]
783 async fn user_context_builds_context_repository() {
784 let mut ctx =
785 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
786 ctx.insert_resource(PostgresDialect);
787 ctx.insert_resource(StubExecutor {
788 affected: 1,
789 rows: Vec::new(),
790 });
791
792 let repo = ctx.repository::< StubExecutor>().unwrap();
793 let affected = repo
794 .update(
795 &UpdateCommand::new("Order", 1_u64)
796 .expected_version(3)
797 .value("name", "next"),
798 )
799 .await.unwrap();
800
801 assert_eq!(affected, 1);
802 }
803
804 #[tokio::test]
805 async fn user_context_resolves_repository_by_entity_type() {
806 let mut ctx = UserContext::new()
807 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
808 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
809 ctx.insert_resource(PostgresDialect);
810 ctx.insert_resource(StubExecutor {
811 affected: 1,
812 rows: Vec::new(),
813 });
814
815 let repo = ctx
816 .resolve_repository::< StubExecutor>("Order")
817 .unwrap();
818 assert_eq!(repo.entity(), "Order");
819 assert_eq!(repo.select().entity, "Order");
820
821 let affected = repo
822 .insert(
823 &repo
824 .insert_command()
825 .value("id", 1_u64)
826 .value("version", 1_i64)
827 .value("name", "n"),
828 )
829 .await.unwrap();
830 assert_eq!(affected, 1);
831 }
832
833 #[tokio::test]
834 async fn resolved_repository_applies_behavior_hooks() {
835 let mut ctx = UserContext::new()
836 .with_metadata(
837 InMemoryMetadataStore::new()
838 .with_entity(entity())
839 .with_entity(line_entity())
840 .with_entity(product_entity()),
841 )
842 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
843 .with_repository_behavior_registry(
844 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
845 );
846 ctx.insert_resource(PostgresDialect);
847 ctx.insert_resource(StubExecutor {
848 affected: 1,
849 rows: Vec::new(),
850 });
851
852 let repo = ctx
853 .resolve_repository::< StubExecutor>("Order")
854 .unwrap();
855
856 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
860 let affected = repo.insert(&insert).await.unwrap();
861 assert_eq!(affected, 1);
862 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
863 }
864
865 #[tokio::test]
866 async fn resolved_repository_applies_request_policy_after_behavior_hooks() {
867 let mut ctx = UserContext::new()
868 .with_metadata(
869 InMemoryMetadataStore::new()
870 .with_entity(entity())
871 .with_entity(line_entity())
872 .with_entity(product_entity()),
873 )
874 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
875 .with_repository_behavior_registry(
876 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
877 )
878 .with_request_policy(TenantRequestPolicy);
879 ctx.insert_named_resource("tenant_id", 9_u64);
880 ctx.insert_resource(PostgresDialect);
881 ctx.insert_resource(StubExecutor {
882 affected: 1,
883 rows: Vec::new(),
884 });
885
886 let repo = ctx
887 .resolve_repository::< StubExecutor>("Order")
888 .unwrap();
889
890 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
895 let command = repo.prepare_insert_command(&insert).unwrap();
896 assert_eq!(command.values.get("version"), Some(&Value::I64(9)));
897 }
898
899 #[tokio::test]
900 async fn resolved_repository_prepares_insert_command_with_generated_id() {
901 let mut ctx = UserContext::new()
902 .with_metadata(
903 InMemoryMetadataStore::new()
904 .with_entity(entity())
905 .with_entity(line_entity())
906 .with_entity(product_entity()),
907 )
908 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
909 .with_repository_behavior_registry(
910 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
911 )
912 .with_internal_id_generator(FixedIdGenerator(42));
913 ctx.insert_resource(PostgresDialect);
914 ctx.insert_resource(StubExecutor {
915 affected: 1,
916 rows: Vec::new(),
917 });
918
919 let repo = ctx
920 .resolve_repository::< StubExecutor>("Order")
921 .unwrap();
922
923 let prepared = repo
924 .prepare_insert_command(&repo.insert_command().value("id", 0_u64).value("name", "n"))
925 .unwrap();
926
927 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
928 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
929 assert_eq!(
930 prepared.values.get("name"),
931 Some(&Value::Text("n".to_owned()))
932 );
933
934 let prepared_zero_version = repo
935 .prepare_insert_command(
936 &repo
937 .insert_command()
938 .value("id", 0_u64)
939 .value("version", 0_i64)
940 .value("name", "zero-version"),
941 )
942 .unwrap();
943 assert_eq!(
944 prepared_zero_version.values.get("version"),
945 Some(&Value::I64(1))
946 );
947 }
948
949 #[tokio::test]
950 async fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
951 let mut ctx = UserContext::new()
952 .with_metadata(
953 InMemoryMetadataStore::new()
954 .with_entity(entity())
955 .with_entity(line_entity())
956 .with_entity(product_entity()),
957 )
958 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
959 .with_repository_behavior_registry(
960 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
961 )
962 .with_internal_id_generator(SequentialIdGenerator::new(500));
963 ctx.insert_resource(PostgresDialect);
964 ctx.insert_resource(StubExecutor {
965 affected: 1,
966 rows: Vec::new(),
967 });
968
969 let repo = ctx
970 .resolve_repository::< StubExecutor>("Order")
971 .unwrap();
972 let graph = GraphNode::new("Order").value("name", "root").relation(
973 "lines",
974 GraphNode::new("OrderLine")
975 .value("name", "line-1")
976 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
977 );
978
979 let saved = repo.save_graph(graph).await.unwrap();
980
981 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
982 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
983 let lines = saved.relations.get("lines").unwrap();
984 assert_eq!(lines.len(), 1);
985 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(502)));
986 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
987 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
988 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(501)));
989 let product = lines[0].relations.get("product").unwrap();
990 assert_eq!(product[0].values.get("id"), Some(&Value::U64(501)));
991 }
992
993 #[tokio::test]
994 async fn resolved_repository_extracts_and_saves_typed_entity_graph() {
995 let mut ctx = UserContext::new()
996 .with_metadata(
997 InMemoryMetadataStore::new()
998 .with_entity(entity())
999 .with_entity(line_entity())
1000 .with_entity(product_entity()),
1001 )
1002 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1003 .with_internal_id_generator(SequentialIdGenerator::new(700));
1004 ctx.insert_resource(PostgresDialect);
1005 ctx.insert_resource(StubExecutor {
1006 affected: 1,
1007 rows: Vec::new(),
1008 });
1009
1010 let repo = ctx
1011 .resolve_repository::< StubExecutor>("Order")
1012 .unwrap();
1013 let order = TypedGraphOrder {
1014 id: 0,
1015 version: 0,
1016 name: "typed-root".to_owned(),
1017 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1018 id: 0,
1019 order_id: None,
1020 name: "typed-line".to_owned(),
1021 product_id: None,
1022 product: Some(TypedGraphProduct {
1023 id: 0,
1024 name: "typed-product".to_owned(),
1025 }),
1026 }]),
1027 };
1028
1029 let extracted = repo.graph_node_from_entity(order).unwrap();
1030 assert_eq!(extracted.entity, "Order");
1031 assert_eq!(
1032 extracted.values.get("name"),
1033 Some(&Value::Text("typed-root".to_owned()))
1034 );
1035 assert_eq!(extracted.values.get("id"), Some(&Value::U64(0)));
1036 assert_eq!(extracted.relations["lines"].len(), 1);
1037 assert_eq!(
1038 extracted.relations["lines"][0].values.get("name"),
1039 Some(&Value::Text("typed-line".to_owned()))
1040 );
1041 assert_eq!(
1042 extracted.relations["lines"][0].relations["product"].len(),
1043 1
1044 );
1045
1046 let saved = repo.save_graph(extracted).await.unwrap();
1047 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
1048 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1049 let lines = saved.relations.get("lines").unwrap();
1050 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(702)));
1051 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
1052 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
1053 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(701)));
1054 assert_eq!(
1055 lines[0].relations["product"][0].values.get("id"),
1056 Some(&Value::U64(701))
1057 );
1058 }
1059
1060 #[tokio::test]
1061 async fn resolved_repository_saves_typed_entity_graph_directly() {
1062 let mut ctx = UserContext::new()
1063 .with_metadata(
1064 InMemoryMetadataStore::new()
1065 .with_entity(entity())
1066 .with_entity(line_entity())
1067 .with_entity(product_entity()),
1068 )
1069 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1070 .with_internal_id_generator(SequentialIdGenerator::new(800));
1071 ctx.insert_resource(PostgresDialect);
1072 ctx.insert_resource(StubExecutor {
1073 affected: 1,
1074 rows: Vec::new(),
1075 });
1076
1077 let repo = ctx
1078 .resolve_repository::< StubExecutor>("Order")
1079 .unwrap();
1080 let saved = repo
1081 .save_entity_graph(TypedGraphOrder {
1082 id: 0,
1083 version: 0,
1084 name: "typed-direct".to_owned(),
1085 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1086 id: 0,
1087 order_id: None,
1088 name: "typed-line".to_owned(),
1089 product_id: None,
1090 product: Some(TypedGraphProduct {
1091 id: 0,
1092 name: "typed-product".to_owned(),
1093 }),
1094 }]),
1095 })
1096 .await.unwrap();
1097
1098 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
1099 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1100 assert_eq!(
1101 saved.relations["lines"][0].values.get("order_id"),
1102 Some(&Value::U64(800))
1103 );
1104 assert_eq!(
1105 saved.relations["lines"][0].values.get("product_id"),
1106 Some(&Value::U64(801))
1107 );
1108 }
1109
1110 #[tokio::test]
1111 async fn custom_user_context_can_drive_insert_preparation() {
1112 let mut ctx = UserContext::new()
1113 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1114 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1115 .with_repository_behavior_registry(
1116 InMemoryRepositoryBehaviorRegistry::new()
1117 .with_behavior("Order", ContextAwareOrderBehavior),
1118 )
1119 .with_internal_id_generator(FixedIdGenerator(99));
1120 ctx.insert_named_resource("tenant", String::from("acme"));
1121 ctx.insert_named_resource("initial_version", 7_i64);
1122 ctx.put_local("trace_id", "req-9");
1123 ctx.insert_resource(PostgresDialect);
1124 ctx.insert_resource(StubExecutor {
1125 affected: 1,
1126 rows: Vec::new(),
1127 });
1128
1129 let repo = ctx
1130 .resolve_repository::< StubExecutor>("Order")
1131 .unwrap();
1132 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
1133
1134 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
1135 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
1136 assert_eq!(
1137 prepared.values.get("name"),
1138 Some(&Value::Text("acme:req-9".to_owned()))
1139 );
1140 }
1141
1142 #[tokio::test]
1143 async fn checker_registry_validates_and_fixes_insert_commands() {
1144 let mut ctx = UserContext::new()
1145 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1146 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1147 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1148 .with_internal_id_generator(FixedIdGenerator(77));
1149 ctx.insert_resource(PostgresDialect);
1150 ctx.insert_resource(StubExecutor {
1151 affected: 1,
1152 rows: Vec::new(),
1153 });
1154
1155 let repo = ctx
1156 .resolve_repository::< StubExecutor>("Order")
1157 .unwrap();
1158 let prepared = repo
1159 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
1160 .unwrap();
1161
1162 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
1163 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
1164 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1165
1166 let error = repo
1167 .prepare_insert_command(&repo.insert_command().value("name", "no"))
1168 .unwrap_err();
1169 match error {
1170 RuntimeError::Check(results) => {
1171 assert_eq!(results.len(), 1);
1172 assert_eq!(results[0].location.to_string(), "name");
1173 }
1174 other => panic!("unexpected checker error: {other:?}"),
1175 }
1176 }
1177
1178 #[tokio::test]
1179 async fn typed_checker_validates_and_fixes_derived_entities_without_record_access() {
1180 let mut ctx = UserContext::new()
1181 .with_metadata(InMemoryMetadataStore::new().with_entity(Order::entity_descriptor()))
1182 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1183 .with_checker_registry(
1184 InMemoryCheckerRegistry::new()
1185 .with_checker(TypedEntityChecker::<Order, _>::new(TypedOrderChecker)),
1186 )
1187 .with_internal_id_generator(FixedIdGenerator(79));
1188 ctx.insert_resource(PostgresDialect);
1189 ctx.insert_resource(StubExecutor {
1190 affected: 1,
1191 rows: Vec::new(),
1192 });
1193
1194 let repo = ctx
1195 .resolve_repository::< StubExecutor>("Order")
1196 .unwrap();
1197 let prepared = repo
1198 .prepare_insert_command(
1199 &repo
1200 .insert_command()
1201 .value("name", "fix")
1202 .value("version", 1_i64),
1203 )
1204 .unwrap();
1205 assert_eq!(
1206 prepared.values.get("name"),
1207 Some(&Value::Text("fixed".to_owned()))
1208 );
1209 assert_eq!(prepared.values.get("id"), Some(&Value::U64(79)));
1210 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1211
1212 let error = repo
1213 .prepare_insert_command(&repo.insert_command().value("version", 1_i64))
1214 .unwrap_err();
1215 match error {
1216 RuntimeError::Check(results) => {
1217 assert!(
1218 results
1219 .iter()
1220 .any(|result| result.rule == CheckRule::Required
1221 && result.location.to_string() == "name")
1222 );
1223 }
1224 other => panic!("unexpected typed checker error: {other:?}"),
1225 }
1226 }
1227
1228
1229
1230 #[tokio::test]
1231 async fn checker_registry_reports_nested_create_locations_and_fixes_records() {
1232 let ctx = UserContext::new()
1233 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1234
1235 let mut child = Record::from([
1236 (String::from("id"), Value::U64(10)),
1237 (
1238 String::from(CHECK_OBJECT_STATUS_FIELD),
1239 Value::from(CheckObjectStatus::Create),
1240 ),
1241 ]);
1242 let error = ctx
1243 .check_and_fix_record_at(
1244 "Order",
1245 &mut child,
1246 &ObjectLocation::hash_root("lines").element(0),
1247 )
1248 .unwrap_err();
1249
1250 assert_eq!(child.get("version"), Some(&Value::I64(1)));
1251 match error {
1252 RuntimeError::Check(results) => {
1253 assert_eq!(results.len(), 1);
1254 assert_eq!(results[0].rule, CheckRule::Required);
1255 assert_eq!(results[0].location.to_string(), "lines[0].name");
1256 }
1257 other => panic!("unexpected checker error: {other:?}"),
1258 }
1259
1260 child.insert("name".to_owned(), Value::Text("valid child".to_owned()));
1261 ctx.check_and_fix_record_at(
1262 "Order",
1263 &mut child,
1264 &ObjectLocation::hash_root("lines").element(0),
1265 )
1266 .unwrap();
1267 }
1268
1269 #[tokio::test]
1270 async fn built_in_language_translators_cover_fifteen_languages() {
1271 assert_eq!(Language::ALL.len(), 15);
1272 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
1273 let messages = Language::ALL
1274 .iter()
1275 .map(|language| translate_check_result(*language, &result))
1276 .collect::<Vec<_>>();
1277
1278 assert!(messages.iter().all(|message| !message.is_empty()));
1279 assert!(messages.iter().any(|message| message.contains("required")));
1280 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
1281 assert!(
1282 messages
1283 .iter()
1284 .any(|message| message.contains("obligatoire"))
1285 );
1286 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
1287 assert_eq!(
1288 Language::from_code("zh-TW"),
1289 Some(Language::TraditionalChinese)
1290 );
1291 }
1292
1293 #[tokio::test]
1294 async fn user_context_language_switch_translates_checker_errors() {
1295 let mut ctx = UserContext::new()
1296 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1297 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1298 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1299 .with_internal_id_generator(FixedIdGenerator(77))
1300 .with_language(Language::Chinese);
1301 ctx.insert_resource(PostgresDialect);
1302 ctx.insert_resource(StubExecutor {
1303 affected: 1,
1304 rows: Vec::new(),
1305 });
1306
1307 let repo = ctx
1308 .resolve_repository::< StubExecutor>("Order")
1309 .unwrap();
1310 let error = repo
1311 .prepare_insert_command(&repo.insert_command())
1312 .unwrap_err();
1313 match error {
1314 RuntimeError::Check(results) => {
1315 assert_eq!(results.len(), 1);
1316 assert!(
1317 results[0]
1318 .message
1319 .as_ref()
1320 .is_some_and(|message| message.contains("å¿…å¡«"))
1321 );
1322 }
1323 other => panic!("unexpected checker error: {other:?}"),
1324 }
1325
1326 let mut ctx = UserContext::new().with_language(Language::English);
1327 ctx.set_language_code("es").unwrap();
1328 assert_eq!(ctx.language(), Language::Spanish);
1329 }
1330
1331 #[tokio::test]
1332 async fn checker_registry_merges_graph_update_fixes_by_object_status() {
1333 let mut ctx = UserContext::new()
1334 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1335 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1336 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1337 ctx.insert_resource(PostgresDialect);
1338 ctx.insert_resource(StubExecutor {
1339 affected: 1,
1340 rows: vec![Record::from([
1341 ("id".to_owned(), Value::U64(1)),
1342 ("version".to_owned(), Value::I64(1)),
1343 ("name".to_owned(), Value::Text("old".to_owned())),
1344 ])],
1345 });
1346
1347 let repo = ctx
1348 .resolve_repository::< StubExecutor>("Order")
1349 .unwrap();
1350 let saved = repo
1351 .save_graph(
1352 GraphNode::new("Order")
1353 .value("id", 1_u64)
1354 .value("version", 1_i64)
1355 .value("name", "graph-update"),
1356 )
1357 .await.unwrap();
1358
1359 assert_eq!(
1360 saved.values.get("name"),
1361 Some(&Value::Text("graph-update-checked".to_owned()))
1362 );
1363 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1364 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1365 }
1366
1367 #[tokio::test]
1368 async fn user_context_event_sink_receives_repository_mutation_events() {
1369 let events = Arc::new(Mutex::new(Vec::new()));
1370 let mut ctx = UserContext::new()
1371 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1372 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1373 .with_internal_id_generator(FixedIdGenerator(88))
1374 .with_event_sink(RecordingEventSink {
1375 events: events.clone(),
1376 });
1377 ctx.insert_resource(PostgresDialect);
1378 ctx.insert_resource(StubExecutor {
1379 affected: 1,
1380 rows: vec![Record::from([
1381 ("id".to_owned(), Value::U64(88)),
1382 ("version".to_owned(), Value::I64(1)),
1383 ("name".to_owned(), Value::Text("old".to_owned())),
1384 ])],
1385 });
1386
1387 let repo = ctx
1388 .resolve_repository::< StubExecutor>("Order")
1389 .unwrap();
1390 repo.insert(&repo.insert_command().value("name", "created"))
1391 .await.unwrap();
1392 repo.update(
1393 &repo
1394 .update_command(88_u64)
1395 .expected_version(1)
1396 .value("name", "updated"),
1397 )
1398 .await.unwrap();
1399 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1400 .await.unwrap();
1401 repo.recover(&repo.recover_command(88_u64, -3)).await.unwrap();
1402
1403 let events = events.lock().unwrap();
1404 assert_eq!(events.len(), 4);
1405 assert_eq!(events[0].kind, RawAuditEventKind::Created);
1406 assert_eq!(events[0].entity, "Order");
1407 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1408 assert_eq!(events[1].kind, RawAuditEventKind::Updated);
1409 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1410 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1411 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1412 assert_eq!(
1413 events[1]
1414 .old_values
1415 .as_ref()
1416 .and_then(|values| values.get("name")),
1417 None );
1419 assert_eq!(
1420 events[1]
1421 .new_values
1422 .as_ref()
1423 .and_then(|values| values.get("name")),
1424 Some(&Value::Text("updated".to_owned()))
1425 );
1426 assert_eq!(events[1].changes.len(), 1);
1427 assert_eq!(events[1].changes[0].field, "name");
1428 assert_eq!(
1429 events[1].changes[0].old_value,
1430 None );
1432 assert_eq!(
1433 events[1].changes[0].new_value,
1434 Some(Value::Text("updated".to_owned()))
1435 );
1436 assert_eq!(events[2].kind, RawAuditEventKind::Deleted);
1437 assert!(events[2].old_values.is_none()); assert!(events[2].new_values.is_none());
1439 assert_eq!(events[3].kind, RawAuditEventKind::Recovered);
1440 assert_eq!(
1441 events[3]
1442 .old_values
1443 .as_ref()
1444 .and_then(|values| values.get("version")),
1445 None );
1447 assert_eq!(
1448 events[3]
1449 .new_values
1450 .as_ref()
1451 .and_then(|values| values.get("version")),
1452 Some(&Value::I64(4))
1453 );
1454 assert_eq!(events[3].changes[0].field, "version");
1455 }
1456
1457 #[tokio::test]
1458 async fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1459 let events = Arc::new(Mutex::new(Vec::new()));
1460 let mut ctx = UserContext::new()
1461 .with_metadata(
1462 InMemoryMetadataStore::new()
1463 .with_entity(entity())
1464 .with_entity(line_entity())
1465 .with_entity(product_entity()),
1466 )
1467 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1468 .with_event_sink(RecordingEventSink {
1469 events: events.clone(),
1470 });
1471 ctx.insert_resource(PostgresDialect);
1472 ctx.insert_resource(StubExecutor {
1473 affected: 1,
1474 rows: vec![Record::from([
1475 ("id".to_owned(), Value::U64(1)),
1476 ("version".to_owned(), Value::I64(1)),
1477 ("name".to_owned(), Value::Text("old".to_owned())),
1478 ])],
1479 });
1480
1481 let repo = ctx
1482 .resolve_repository::< StubExecutor>("Order")
1483 .unwrap();
1484 repo.save_graph(
1485 GraphNode::new("Order")
1486 .value("id", 1_u64)
1487 .value("version", 1_i64)
1488 .value("name", "updated")
1489 .relation(
1490 "lines",
1491 GraphNode::new("OrderLine")
1492 .value("name", "line")
1493 .value("product_id", 3_u64),
1494 ),
1495 )
1496 .await.unwrap();
1497
1498 let events = events.lock().unwrap();
1499 assert_eq!(events.len(), 2);
1500 assert_eq!(events[0].kind, RawAuditEventKind::Updated);
1501 assert_eq!(events[0].entity, "Order");
1502 assert_eq!(events[1].kind, RawAuditEventKind::Created);
1503 assert_eq!(events[1].entity, "OrderLine");
1504 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1505 }
1506
1507 #[tokio::test]
1508 async fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1509 let mut ctx = UserContext::new()
1510 .with_metadata(
1511 InMemoryMetadataStore::new()
1512 .with_entity(entity())
1513 .with_entity(line_entity())
1514 .with_entity(product_entity()),
1515 )
1516 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1517 .with_internal_id_generator(SequentialIdGenerator::new(500));
1518 ctx.insert_resource(PostgresDialect);
1519 ctx.insert_resource(StubExecutor {
1520 affected: 1,
1521 rows: vec![Record::from([
1522 ("id".to_owned(), Value::U64(1)),
1523 ("version".to_owned(), Value::I64(1)),
1524 ("name".to_owned(), Value::Text("old".to_owned())),
1525 ])],
1526 });
1527
1528 let plan = ctx
1529 .plan_for_save_graph::< StubExecutor>(
1530 GraphNode::new("Order")
1531 .value("id", 1_u64)
1532 .value("version", 1_i64)
1533 .value("name", "updated")
1534 .relation(
1535 "lines",
1536 GraphNode::new("OrderLine")
1537 .value("name", "new-line-a")
1538 .value("product_id", 2_u64),
1539 )
1540 .relation(
1541 "lines",
1542 GraphNode::new("OrderLine")
1543 .value("name", "new-line-b")
1544 .value("product_id", 2_u64),
1545 )
1546 .relation(
1547 "lines",
1548 GraphNode::new("OrderLine")
1549 .value("id", 5_u64)
1550 .value("version", 1_i64)
1551 .value("name", "same-update-a"),
1552 )
1553 .relation(
1554 "lines",
1555 GraphNode::new("OrderLine")
1556 .value("id", 6_u64)
1557 .value("version", 1_i64)
1558 .value("name", "same-update-b"),
1559 )
1560 .relation(
1561 "lines",
1562 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1563 )
1564 .relation(
1565 "lines",
1566 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1567 ),
1568 )
1569 .await.unwrap();
1570 let counts = plan.grouped_counts();
1571
1572 assert_eq!(
1573 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1574 Some(&1)
1575 );
1576 assert_eq!(
1577 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1578 Some(&2)
1579 );
1580 assert_eq!(
1581 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1582 Some(&2)
1583 );
1584 assert_eq!(
1585 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1586 Some(&1)
1587 );
1588 assert_eq!(
1589 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1590 Some(&1)
1591 );
1592 let create_batch = plan
1593 .batches
1594 .iter()
1595 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1596 .unwrap();
1597 assert_eq!(create_batch.items.len(), 2);
1598 assert_eq!(
1599 create_batch.items[0].values.get("id"),
1600 Some(&Value::U64(500))
1601 );
1602 assert_eq!(
1603 create_batch.items[1].values.get("id"),
1604 Some(&Value::U64(501))
1605 );
1606 let update_batch = plan
1607 .batches
1608 .iter()
1609 .find(|batch| {
1610 batch.entity == "OrderLine"
1611 && batch.kind == GraphMutationKind::Update
1612 && batch.update_fields == vec!["name".to_owned()]
1613 })
1614 .unwrap();
1615 assert_eq!(update_batch.items.len(), 2);
1616 }
1617
1618 #[tokio::test]
1619 async fn resolved_repository_builds_relation_plans() {
1620 let mut ctx = UserContext::new()
1621 .with_metadata(
1622 InMemoryMetadataStore::new()
1623 .with_entity(entity())
1624 .with_entity(line_entity())
1625 .with_entity(product_entity()),
1626 )
1627 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1628 .with_repository_behavior_registry(
1629 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1630 );
1631 ctx.insert_resource(PostgresDialect);
1632 ctx.insert_resource(StubExecutor {
1633 affected: 1,
1634 rows: Vec::new(),
1635 });
1636
1637 let repo = ctx
1638 .resolve_repository::< StubExecutor>("Order")
1639 .unwrap();
1640 let plans = repo.relation_plans().unwrap();
1641
1642 assert_eq!(plans.len(), 1);
1643 assert_eq!(plans[0].relation_name, "lines");
1644 assert_eq!(plans[0].target_entity, "OrderLine");
1645 assert_eq!(plans[0].local_key, "id");
1646 assert_eq!(plans[0].foreign_key, "order_id");
1647 assert!(plans[0].many);
1648 }
1649
1650 #[tokio::test]
1651 async fn resolved_repository_builds_relation_query_from_parent_rows() {
1652 let mut ctx = UserContext::new()
1653 .with_metadata(
1654 InMemoryMetadataStore::new()
1655 .with_entity(entity())
1656 .with_entity(line_entity())
1657 .with_entity(product_entity()),
1658 )
1659 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1660 .with_repository_behavior_registry(
1661 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1662 );
1663 ctx.insert_resource(PostgresDialect);
1664 ctx.insert_resource(StubExecutor {
1665 affected: 1,
1666 rows: Vec::new(),
1667 });
1668
1669 let repo = ctx
1670 .resolve_repository::< StubExecutor>("Order")
1671 .unwrap();
1672 let parent_rows = vec![
1673 Record::from([(String::from("id"), Value::U64(11))]),
1674 Record::from([(String::from("id"), Value::U64(12))]),
1675 ];
1676
1677 let query = repo.relation_query("lines", &parent_rows).unwrap();
1678 }
1683
1684 #[tokio::test]
1685 async fn resolved_repository_enhances_parent_rows_with_relations() {
1686 let mut ctx = UserContext::new()
1687 .with_metadata(
1688 InMemoryMetadataStore::new()
1689 .with_entity(entity())
1690 .with_entity(line_entity())
1691 .with_entity(product_entity()),
1692 )
1693 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1694 .with_repository_behavior_registry(
1695 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1696 );
1697 ctx.insert_resource(PostgresDialect);
1698 ctx.insert_resource(StubExecutor {
1699 affected: 1,
1700 rows: vec![
1701 Record::from([
1702 (String::from("id"), Value::U64(101)),
1703 (String::from("order_id"), Value::U64(11)),
1704 (String::from("name"), Value::Text(String::from("l1"))),
1705 ]),
1706 Record::from([
1707 (String::from("id"), Value::U64(102)),
1708 (String::from("order_id"), Value::U64(11)),
1709 (String::from("name"), Value::Text(String::from("l2"))),
1710 ]),
1711 Record::from([
1712 (String::from("id"), Value::U64(201)),
1713 (String::from("order_id"), Value::U64(12)),
1714 (String::from("name"), Value::Text(String::from("l3"))),
1715 ]),
1716 ],
1717 });
1718
1719 let repo = ctx
1720 .resolve_repository::< StubExecutor>("Order")
1721 .unwrap();
1722 let mut parents = vec![
1723 Record::from([(String::from("id"), Value::U64(11))]),
1724 Record::from([(String::from("id"), Value::U64(12))]),
1725 ];
1726
1727 repo.enhance_relations(&mut parents).await.unwrap();
1728
1729 match parents[0].get("lines") {
1730 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1731 other => panic!("unexpected lines payload: {other:?}"),
1732 }
1733 match parents[1].get("lines") {
1734 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1735 other => panic!("unexpected lines payload: {other:?}"),
1736 }
1737 }
1738
1739 #[tokio::test]
1740 async fn relation_enhancement_wraps_inverse_many_relation_as_list() {
1741 let mut ctx = UserContext::new()
1742 .with_metadata(
1743 InMemoryMetadataStore::new()
1744 .with_entity(OrderLineWithProductEntityRow::entity_descriptor())
1745 .with_entity(ProductWithLinesEntityRow::entity_descriptor()),
1746 )
1747 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("OrderLine"));
1748 ctx.insert_resource(PostgresDialect);
1749 ctx.insert_resource(QueueExecutor {
1750 affected: 1,
1751 rows: Mutex::new(VecDeque::from([
1752 vec![Record::from([
1753 (String::from("id"), Value::U64(11)),
1754 (String::from("order_id"), Value::U64(7)),
1755 (String::from("name"), Value::Text(String::from("line"))),
1756 (String::from("product_id"), Value::U64(101)),
1757 ])],
1758 vec![Record::from([
1759 (String::from("id"), Value::U64(101)),
1760 (String::from("name"), Value::Text(String::from("sku"))),
1761 ])],
1762 ])),
1763 queries: Mutex::new(Vec::new()),
1764 });
1765
1766 let repo = ctx
1767 .resolve_repository::< QueueExecutor>("OrderLine")
1768 .unwrap();
1769 let rows = repo
1770 .fetch_enhanced_entities::<OrderLineWithProductEntityRow>(
1771 &SelectQuery::new("OrderLine").relation("product"),
1772 )
1773 .await.unwrap();
1774
1775 let product = rows.data[0].product.as_ref().unwrap();
1776 assert_eq!(product.lines.data.len(), 1);
1777 assert_eq!(product.lines.data[0].id, 11);
1778 }
1779
1780 #[tokio::test]
1781 async fn resolved_repository_fetches_smart_list_of_entities() {
1782 let mut ctx = UserContext::new()
1783 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1784 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1785 ctx.insert_resource(PostgresDialect);
1786 ctx.insert_resource(StubExecutor {
1787 affected: 1,
1788 rows: vec![Record::from([
1789 (String::from("id"), Value::U64(7)),
1790 (String::from("version"), Value::I64(2)),
1791 (String::from("name"), Value::Text(String::from("typed"))),
1792 ])],
1793 });
1794
1795 let repo = ctx
1796 .resolve_repository::< StubExecutor>("Order")
1797 .unwrap();
1798 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).await.unwrap();
1799
1800 assert_eq!(rows.len(), 1);
1801 assert_eq!(
1802 rows.first(),
1803 Some(&OrderEntity {
1804 id: 7,
1805 version: 2,
1806 name: String::from("typed"),
1807 })
1808 );
1809 }
1810
1811 #[tokio::test]
1812 async fn resolved_repository_fetches_smart_list_of_derived_entities() {
1813 let mut ctx = UserContext::new()
1814 .with_metadata(
1815 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1816 )
1817 .with_repository_registry(
1818 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1819 );
1820 ctx.insert_resource(PostgresDialect);
1821 ctx.insert_resource(StubExecutor {
1822 affected: 1,
1823 rows: vec![Record::from([
1824 (String::from("id"), Value::U64(9)),
1825 (String::from("name"), Value::Text(String::from("derived"))),
1826 ])],
1827 });
1828
1829 let repo = ctx
1830 .resolve_repository::< StubExecutor>("CatalogProduct")
1831 .unwrap();
1832 let rows = repo
1833 .fetch_entities::<CatalogProductRow>(&repo.select())
1834 .await.unwrap();
1835
1836 assert_eq!(rows.len(), 1);
1837 assert_eq!(
1838 rows.first(),
1839 Some(&CatalogProductRow {
1840 id: 9,
1841 name: String::from("derived"),
1842 })
1843 );
1844 }
1845
1846 #[tokio::test]
1847 async fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1848 let mut ctx = UserContext::new()
1849 .with_metadata(
1850 InMemoryMetadataStore::new()
1851 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1852 )
1853 .with_repository_registry(
1854 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1855 );
1856 ctx.insert_resource(PostgresDialect);
1857 ctx.insert_resource(StubExecutor {
1858 affected: 1,
1859 rows: vec![Record::from([
1860 (String::from("id"), Value::U64(1)),
1861 (String::from("lineCount"), Value::I64(3)),
1862 (String::from("amount"), Value::F64(18.5)),
1863 ])],
1864 });
1865
1866 let repo = ctx
1867 .resolve_repository::< StubExecutor>("OrderAggregate")
1868 .unwrap();
1869 let rows = repo
1870 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1871 .await.unwrap();
1872
1873 assert_eq!(rows.len(), 1);
1874 assert_eq!(rows.data[0].id, 1);
1875 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1876 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1877 assert_eq!(
1878 rows.into_vec().into_iter().next().unwrap().into_json(),
1879 serde_json::json!({
1880 "id": 1,
1881 "lineCount": 3,
1882 "amount": 18.5
1883 })
1884 );
1885 }
1886
1887 #[tokio::test]
1888 async fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1889 let executor = QueueExecutor {
1890 affected: 1,
1891 rows: Mutex::new(VecDeque::from([
1892 vec![
1893 Record::from([
1894 (String::from("id"), Value::U64(1)),
1895 (String::from("version"), Value::I64(1)),
1896 (String::from("name"), Value::Text(String::from("first"))),
1897 ]),
1898 Record::from([
1899 (String::from("id"), Value::U64(2)),
1900 (String::from("version"), Value::I64(1)),
1901 (String::from("name"), Value::Text(String::from("second"))),
1902 ]),
1903 ],
1904 vec![Record::from([
1905 (String::from("order_id"), Value::U64(1)),
1906 (String::from("lineCount"), Value::I64(3)),
1907 ])],
1908 ])),
1909 queries: Mutex::new(Vec::new()),
1910 };
1911 let mut ctx = UserContext::new()
1912 .with_metadata(
1913 InMemoryMetadataStore::new()
1914 .with_entity(entity())
1915 .with_entity(line_entity()),
1916 )
1917 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1918 ctx.insert_resource(PostgresDialect);
1919 ctx.insert_resource(executor);
1920
1921 let repo = ctx
1922 .resolve_repository::< QueueExecutor>("Order")
1923 .unwrap();
1924 let rows = repo
1925 .fetch_all_with_relation_aggregates(
1926 &repo
1927 .select()
1928 .project("id")
1929 .project("version")
1930 .project("name"),
1931 &[RelationAggregate::new(
1932 "lines",
1933 "lineCount",
1934 SelectQuery::new("OrderLine"),
1935 true,
1936 )],
1937 )
1938 .await.unwrap();
1939
1940 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1941 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1942
1943 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1944 let queries = executor.queries.lock().unwrap();
1945 assert_eq!(queries.len(), 2);
1946 assert_eq!(
1947 queries[1],
1948 "SELECT ... FROM OrderLine ..."
1949 );
1950 }
1951
1952 #[tokio::test]
1953 async fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1954 let mut line = line_entity();
1955 line.properties
1956 .iter_mut()
1957 .find(|property| property.name == "order_id")
1958 .unwrap()
1959 .column_name = "order_ref".to_owned();
1960 let executor = QueueExecutor {
1961 affected: 1,
1962 rows: Mutex::new(VecDeque::from([
1963 vec![Record::from([
1964 (String::from("id"), Value::U64(1)),
1965 (String::from("version"), Value::I64(1)),
1966 (String::from("name"), Value::Text(String::from("first"))),
1967 ])],
1968 vec![Record::from([
1969 (String::from("order_ref"), Value::I64(1)),
1970 (String::from("lineCount"), Value::I64(3)),
1971 ])],
1972 ])),
1973 queries: Mutex::new(Vec::new()),
1974 };
1975 let mut ctx = UserContext::new()
1976 .with_metadata(
1977 InMemoryMetadataStore::new()
1978 .with_entity(entity())
1979 .with_entity(line),
1980 )
1981 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1982 ctx.insert_resource(PostgresDialect);
1983 ctx.insert_resource(executor);
1984
1985 let repo = ctx
1986 .resolve_repository::< QueueExecutor>("Order")
1987 .unwrap();
1988 let rows = repo
1989 .fetch_all_with_relation_aggregates(
1990 &repo
1991 .select()
1992 .project("id")
1993 .project("version")
1994 .project("name"),
1995 &[RelationAggregate::new(
1996 "lines",
1997 "lineCount",
1998 SelectQuery::new("OrderLine"),
1999 true,
2000 )],
2001 )
2002 .await.unwrap();
2003
2004 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
2005 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2006 assert_eq!(
2007 executor.queries.lock().unwrap()[1],
2008 "SELECT ... FROM OrderLine ..."
2009 );
2010 }
2011
2012 #[tokio::test]
2013 async fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
2014 let executor = QueueExecutor {
2015 affected: 1,
2016 rows: Mutex::new(VecDeque::from([vec![Record::from([(
2017 String::from("count"),
2018 Value::I64(2),
2019 )])]])),
2020 queries: Mutex::new(Vec::new()),
2021 };
2022 let mut ctx = UserContext::new()
2023 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2024 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2025 ctx.insert_resource(PostgresDialect);
2026 ctx.insert_resource(executor);
2027 ctx.insert_resource(InMemoryAggregationCache::default());
2028
2029 let repo = ctx
2030 .resolve_repository::< QueueExecutor>("Order")
2031 .unwrap();
2032 let query = repo
2033 .select()
2034 .count("count")
2035 .enable_aggregation_cache_for(60_000);
2036
2037 let first = repo.fetch_all(&query).await.unwrap();
2038 let second = repo.fetch_all(&query).await.unwrap();
2039
2040 assert_eq!(first, second);
2041 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2042 assert_eq!(executor.queries.lock().unwrap().len(), 1);
2043 }
2044
2045 #[tokio::test]
2046 async fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
2047 let executor = QueueExecutor {
2048 affected: 1,
2049 rows: Mutex::new(VecDeque::from([
2050 vec![Record::from([(String::from("count"), Value::I64(2))])],
2051 vec![Record::from([(String::from("count"), Value::I64(3))])],
2052 ])),
2053 queries: Mutex::new(Vec::new()),
2054 };
2055 let mut ctx = UserContext::new()
2056 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2057 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2058 ctx.insert_resource(PostgresDialect);
2059 ctx.insert_resource(executor);
2060 ctx.insert_resource(
2061 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
2062 as Arc<dyn AggregationCacheBackend>,
2063 );
2064
2065 let repo = ctx
2066 .resolve_repository::< QueueExecutor>("Order")
2067 .unwrap();
2068 let query = repo
2069 .select()
2070 .count("count")
2071 .enable_aggregation_cache_for(60_000);
2072
2073 let first = repo.fetch_all(&query).await.unwrap();
2074 let cached = repo.fetch_all(&query).await.unwrap();
2075 repo.insert(
2076 &InsertCommand::new("Order")
2077 .value("id", 9_u64)
2078 .value("version", 1_i64)
2079 .value("name", "new"),
2080 )
2081 .await.unwrap();
2082 let refreshed = repo.fetch_all(&query).await.unwrap();
2083
2084 assert_eq!(first, cached);
2085 assert_ne!(cached, refreshed);
2086 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2087 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2088 }
2089
2090 #[tokio::test]
2091 async fn aggregation_cache_propagates_to_relation_aggregates() {
2092 let parent_rows = vec![
2093 Record::from([
2094 (String::from("id"), Value::U64(1)),
2095 (String::from("version"), Value::I64(1)),
2096 (String::from("name"), Value::Text(String::from("first"))),
2097 ]),
2098 Record::from([
2099 (String::from("id"), Value::U64(2)),
2100 (String::from("version"), Value::I64(1)),
2101 (String::from("name"), Value::Text(String::from("second"))),
2102 ]),
2103 ];
2104 let aggregate_rows = vec![Record::from([
2105 (String::from("order_id"), Value::U64(1)),
2106 (String::from("lineCount"), Value::I64(3)),
2107 ])];
2108 let executor = QueueExecutor {
2109 affected: 1,
2110 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
2111 queries: Mutex::new(Vec::new()),
2112 };
2113 let mut ctx = UserContext::new()
2114 .with_metadata(
2115 InMemoryMetadataStore::new()
2116 .with_entity(entity())
2117 .with_entity(line_entity()),
2118 )
2119 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2120 ctx.insert_resource(PostgresDialect);
2121 ctx.insert_resource(executor);
2122 ctx.insert_resource(InMemoryAggregationCache::default());
2123
2124 let repo = ctx
2125 .resolve_repository::< QueueExecutor>("Order")
2126 .unwrap();
2127 let query = repo
2128 .select()
2129 .project("id")
2130 .project("version")
2131 .project("name")
2132 .enable_aggregation_cache_for(60_000)
2133 .propagate_aggregation_cache(60_000);
2134 let aggregate =
2135 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2136
2137 let first = repo
2138 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
2139 .await.unwrap();
2140 let second = repo
2141 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2142 .await.unwrap();
2143
2144 assert_eq!(first, second);
2145 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2146 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2147 }
2148
2149 #[tokio::test]
2150 async fn memory_repository_fetches_smart_list_entities_with_query_features() {
2151 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2152 let repository = MemoryRepository::new(metadata).with_rows(
2153 "Order",
2154 vec![
2155 Record::from([
2156 (String::from("id"), Value::U64(1)),
2157 (String::from("version"), Value::I64(1)),
2158 (String::from("name"), Value::Text(String::from("alpha"))),
2159 ]),
2160 Record::from([
2161 (String::from("id"), Value::U64(2)),
2162 (String::from("version"), Value::I64(1)),
2163 (String::from("name"), Value::Text(String::from("beta"))),
2164 ]),
2165 Record::from([
2166 (String::from("id"), Value::U64(3)),
2167 (String::from("version"), Value::I64(1)),
2168 (String::from("name"), Value::Text(String::from("gamma"))),
2169 ]),
2170 ],
2171 );
2172
2173 let query = teaql_core::SelectQuery::new("Order")
2174 .filter(Expr::Binary {
2175 left: Box::new(Expr::column("id")),
2176 op: teaql_core::BinaryOp::Gte,
2177 right: Box::new(Expr::value(2_u64)),
2178 })
2179 .order_by(OrderBy::desc("id"))
2180 .limit(1);
2181
2182 let orders = repository.fetch_entities::<Order>(&query).unwrap();
2183
2184 assert_eq!(orders.ids(), vec![Value::U64(3)]);
2185 assert_eq!(orders.versions(), vec![1]);
2186 assert_eq!(orders.first().unwrap().name, "gamma");
2187 }
2188
2189 #[tokio::test]
2190 async fn memory_repository_runs_relation_aggregates() {
2191 let metadata = InMemoryMetadataStore::new()
2192 .with_entity(entity())
2193 .with_entity(line_entity());
2194
2195 let repository = MemoryRepository::new(metadata)
2196 .with_rows(
2197 "Order",
2198 vec![
2199 Record::from([
2200 (String::from("id"), Value::U64(1)),
2201 (String::from("version"), Value::I64(1)),
2202 (String::from("name"), Value::Text(String::from("first"))),
2203 ]),
2204 Record::from([
2205 (String::from("id"), Value::U64(2)),
2206 (String::from("version"), Value::I64(1)),
2207 (String::from("name"), Value::Text(String::from("second"))),
2208 ]),
2209 ],
2210 )
2211 .with_rows(
2212 "OrderLine",
2213 vec![
2214 Record::from([
2215 (String::from("id"), Value::U64(10)),
2216 (String::from("version"), Value::I64(1)),
2217 (String::from("order_id"), Value::U64(1)),
2218 (String::from("name"), Value::Text(String::from("line1"))),
2219 ]),
2220 Record::from([
2221 (String::from("id"), Value::U64(11)),
2222 (String::from("version"), Value::I64(1)),
2223 (String::from("order_id"), Value::U64(1)),
2224 (String::from("name"), Value::Text(String::from("line2"))),
2225 ]),
2226 Record::from([
2227 (String::from("id"), Value::U64(12)),
2228 (String::from("version"), Value::I64(1)),
2229 (String::from("order_id"), Value::U64(2)),
2230 (String::from("name"), Value::Text(String::from("line3"))),
2231 ]),
2232 ],
2233 );
2234
2235 let query = SelectQuery::new("Order").project("id").project("name");
2236 let aggregate = RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2237
2238 let rows = repository
2239 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2240 .unwrap();
2241
2242 assert_eq!(rows.len(), 2);
2243
2244 let first_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(1))).unwrap();
2245 assert_eq!(first_order.get("lineCount"), Some(&Value::U64(2)));
2246
2247 let second_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(2))).unwrap();
2248 assert_eq!(second_order.get("lineCount"), Some(&Value::U64(1)));
2249 }
2250
2251 #[tokio::test]
2252 async fn memory_repository_runs_aggregates() {
2253 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2254 let repository = MemoryRepository::new(metadata).with_rows(
2255 "Order",
2256 vec![
2257 Record::from([
2258 (String::from("id"), Value::U64(1)),
2259 (String::from("version"), Value::I64(1)),
2260 (String::from("name"), Value::Text(String::from("alpha"))),
2261 ]),
2262 Record::from([
2263 (String::from("id"), Value::U64(2)),
2264 (String::from("version"), Value::I64(2)),
2265 (String::from("name"), Value::Text(String::from("beta"))),
2266 ]),
2267 ],
2268 );
2269
2270 let query = teaql_core::SelectQuery {
2271 entity: String::from("Order"),
2272 projection: Vec::new(),
2273 expr_projection: Vec::new(),
2274 filter: None,
2275 having: None,
2276 order_by: Vec::new(),
2277 slice: None,
2278 trace_chain: Vec::new(),
2279 aggregates: vec![
2280 Aggregate {
2281 function: AggregateFunction::Count,
2282 field: String::from("id"),
2283 alias: String::from("count"),
2284 },
2285 Aggregate {
2286 function: AggregateFunction::Sum,
2287 field: String::from("version"),
2288 alias: String::from("versionSum"),
2289 },
2290 ],
2291 group_by: Vec::new(),
2292 relations: Vec::new(),
2293 aggregation_cache: None,
2294 comment: None,
2295 raw_sql: None,
2296 raw_sql_search_criteria: Vec::new(),
2297 dynamic_properties: Vec::new(),
2298 raw_projections: Vec::new(),
2299 object_group_bys: Vec::new(),
2300 child_enhancements: Vec::new(),
2301 };
2302
2303 let rows = repository.fetch_all(&query).unwrap();
2304
2305 assert_eq!(rows.len(), 1);
2306 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
2307 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2308 }
2309
2310 #[tokio::test]
2311 async fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
2312 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2313 let repository = MemoryRepository::new(metadata).with_rows(
2314 "Order",
2315 vec![
2316 Record::from([
2317 (String::from("id"), Value::U64(1)),
2318 (String::from("version"), Value::I64(1)),
2319 (String::from("name"), Value::Text(String::from("alpha"))),
2320 ]),
2321 Record::from([
2322 (String::from("id"), Value::U64(2)),
2323 (String::from("version"), Value::I64(2)),
2324 (String::from("name"), Value::Text(String::from("alpha"))),
2325 ]),
2326 Record::from([
2327 (String::from("id"), Value::U64(3)),
2328 (String::from("version"), Value::I64(3)),
2329 (String::from("name"), Value::Text(String::from("tmp-beta"))),
2330 ]),
2331 ],
2332 );
2333
2334 let rows = repository
2335 .fetch_all(
2336 &teaql_core::SelectQuery::new("Order")
2337 .filter(
2338 Expr::between("version", 1_i64, 3_i64)
2339 .and_expr(Expr::not_like("name", "tmp%"))
2340 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
2341 )
2342 .group_by("name")
2343 .count("total")
2344 .sum("version", "versionSum"),
2345 )
2346 .unwrap();
2347
2348 assert_eq!(rows.len(), 1);
2349 assert_eq!(
2350 rows[0].get("name"),
2351 Some(&Value::Text(String::from("alpha")))
2352 );
2353 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2354 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2355 }
2356
2357 #[tokio::test]
2358 async fn memory_repository_runs_extended_aggregates_and_having() {
2359 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2360 let repository = MemoryRepository::new(metadata).with_rows(
2361 "Order",
2362 vec![
2363 Record::from([
2364 (String::from("id"), Value::U64(1)),
2365 (String::from("version"), Value::I64(1)),
2366 (String::from("name"), Value::Text(String::from("alpha"))),
2367 ]),
2368 Record::from([
2369 (String::from("id"), Value::U64(2)),
2370 (String::from("version"), Value::I64(3)),
2371 (String::from("name"), Value::Text(String::from("alpha"))),
2372 ]),
2373 Record::from([
2374 (String::from("id"), Value::U64(3)),
2375 (String::from("version"), Value::I64(7)),
2376 (String::from("name"), Value::Text(String::from("beta"))),
2377 ]),
2378 ],
2379 );
2380
2381 let rows = repository
2382 .fetch_all(
2383 &teaql_core::SelectQuery::new("Order")
2384 .group_by("name")
2385 .count("total")
2386 .stddev("version", "stddevVersion")
2387 .var_pop("version", "varPopVersion")
2388 .bit_or("version", "bitOrVersion")
2389 .having(Expr::gt("total", 1_i64)),
2390 )
2391 .unwrap();
2392
2393 assert_eq!(rows.len(), 1);
2394 assert_eq!(
2395 rows[0].get("name"),
2396 Some(&Value::Text(String::from("alpha")))
2397 );
2398 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2399 assert_eq!(
2400 rows[0].get("stddevVersion").map(Value::to_json_value),
2401 Some(serde_json::Value::String(
2402 "1.4142135623730951454746218583".to_owned()
2403 ))
2404 );
2405 assert_eq!(
2406 rows[0].get("varPopVersion"),
2407 Some(&Value::Decimal(Decimal::ONE))
2408 );
2409 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
2410 }
2411
2412 #[tokio::test]
2413 async fn memory_repository_runs_sound_like_filter() {
2414 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2415 let repository = MemoryRepository::new(metadata).with_rows(
2416 "Order",
2417 vec![
2418 Record::from([
2419 (String::from("id"), Value::U64(1)),
2420 (String::from("version"), Value::I64(1)),
2421 (String::from("name"), Value::Text(String::from("Robert"))),
2422 ]),
2423 Record::from([
2424 (String::from("id"), Value::U64(2)),
2425 (String::from("version"), Value::I64(1)),
2426 (String::from("name"), Value::Text(String::from("Rupert"))),
2427 ]),
2428 Record::from([
2429 (String::from("id"), Value::U64(3)),
2430 (String::from("version"), Value::I64(1)),
2431 (String::from("name"), Value::Text(String::from("Ashcraft"))),
2432 ]),
2433 ],
2434 );
2435
2436 let rows = repository
2437 .fetch_all(
2438 &teaql_core::SelectQuery::new("Order")
2439 .filter(Expr::sound_like("name", "Robert"))
2440 .order_asc("id"),
2441 )
2442 .unwrap();
2443
2444 assert_eq!(rows.len(), 2);
2445 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2446 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2447 }
2448
2449 #[tokio::test]
2450 async fn memory_repository_runs_java_style_string_match_filters() {
2451 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2452 let repository = MemoryRepository::new(metadata).with_rows(
2453 "Order",
2454 vec![
2455 Record::from([
2456 (String::from("id"), Value::U64(1)),
2457 (String::from("version"), Value::I64(1)),
2458 (String::from("name"), Value::Text(String::from("tea-order"))),
2459 ]),
2460 Record::from([
2461 (String::from("id"), Value::U64(2)),
2462 (String::from("version"), Value::I64(1)),
2463 (
2464 String::from("name"),
2465 Value::Text(String::from("coffee-order")),
2466 ),
2467 ]),
2468 Record::from([
2469 (String::from("id"), Value::U64(3)),
2470 (String::from("version"), Value::I64(1)),
2471 (
2472 String::from("name"),
2473 Value::Text(String::from("tea-archived")),
2474 ),
2475 ]),
2476 ],
2477 );
2478
2479 let rows = repository
2480 .fetch_all(
2481 &teaql_core::SelectQuery::new("Order")
2482 .filter(
2483 Expr::contain("name", "tea")
2484 .and_expr(Expr::begin_with("name", "tea"))
2485 .and_expr(Expr::end_with("name", "order"))
2486 .and_expr(Expr::not_contain("name", "coffee"))
2487 .and_expr(Expr::not_begin_with("name", "archived"))
2488 .and_expr(Expr::not_end_with("name", "draft")),
2489 )
2490 .order_asc("id"),
2491 )
2492 .unwrap();
2493
2494 assert_eq!(rows.len(), 1);
2495 assert_eq!(
2496 rows[0].get("name"),
2497 Some(&Value::Text("tea-order".to_owned()))
2498 );
2499 }
2500
2501 #[tokio::test]
2502 async fn memory_repository_runs_property_to_property_filters() {
2503 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2504 let repository = MemoryRepository::new(metadata).with_rows(
2505 "Order",
2506 vec![
2507 Record::from([
2508 (String::from("id"), Value::U64(1)),
2509 (String::from("version"), Value::I64(2)),
2510 (String::from("name"), Value::Text(String::from("keep"))),
2511 ]),
2512 Record::from([
2513 (String::from("id"), Value::U64(2)),
2514 (String::from("version"), Value::I64(1)),
2515 (String::from("name"), Value::Text(String::from("skip"))),
2516 ]),
2517 ],
2518 );
2519
2520 let rows = repository
2521 .fetch_all(
2522 &teaql_core::SelectQuery::new("Order")
2523 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2524 .order_asc("id"),
2525 )
2526 .unwrap();
2527
2528 assert_eq!(rows.len(), 1);
2529 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2530 }
2531
2532 #[tokio::test]
2533 async fn memory_repository_supports_mutations_and_optimistic_locking() {
2534 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2535 let repository = MemoryRepository::new(metadata);
2536
2537 repository
2538 .insert(
2539 &InsertCommand::new("Order")
2540 .value("id", 10_u64)
2541 .value("version", 1_i64)
2542 .value("name", "draft"),
2543 )
2544 .unwrap();
2545 repository
2546 .update(
2547 &UpdateCommand::new("Order", 10_u64)
2548 .expected_version(1)
2549 .value("name", "submitted"),
2550 )
2551 .unwrap();
2552
2553 let row = repository
2554 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2555 .unwrap()
2556 .pop()
2557 .unwrap();
2558 assert_eq!(
2559 row.get("name"),
2560 Some(&Value::Text(String::from("submitted")))
2561 );
2562 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2563
2564 let conflict = repository
2565 .update(
2566 &UpdateCommand::new("Order", 10_u64)
2567 .expected_version(1)
2568 .value("name", "stale"),
2569 )
2570 .unwrap_err();
2571 assert!(matches!(
2572 conflict,
2573 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2574 ));
2575
2576 repository
2577 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2578 .unwrap();
2579 let row = repository
2580 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2581 .unwrap()
2582 .pop()
2583 .unwrap();
2584 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2585
2586 repository
2587 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2588 .unwrap();
2589 let row = repository
2590 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2591 .unwrap()
2592 .pop()
2593 .unwrap();
2594 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2595 }
2596
2597 #[tokio::test]
2598 async fn user_context_reports_missing_schema_provider() {
2599 let err = UserContext::new().ensure_schema().await.unwrap_err();
2600 assert!(
2601 matches!(err, RuntimeError::Schema(message) if message == "missing schema provider")
2602 );
2603 }
2604
2605 #[tokio::test]
2606 async fn user_context_stores_and_exposes_user_identifier() {
2607 let mut ctx = UserContext::new();
2608 let pid = std::process::id();
2609 let thread_id_str = format!("{:?}", std::thread::current().id());
2610 let numeric_thread_id = thread_id_str
2611 .strip_prefix("ThreadId(")
2612 .and_then(|s| s.strip_suffix(")"))
2613 .unwrap_or(&thread_id_str);
2614 let os_user = std::env::var("USER")
2615 .or_else(|_| std::env::var("USERNAME"))
2616 .unwrap_or_else(|_| "main".to_owned());
2617 let expected_default = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
2618 assert_eq!(ctx.user_identifier(), Some(expected_default.as_str()));
2619
2620 ctx.set_user_identifier("user-123");
2621 assert_eq!(ctx.user_identifier(), Some("user-123"));
2622
2623 let ctx2 = UserContext::new().with_user_identifier("user-456");
2624 assert_eq!(ctx2.user_identifier(), Some("user-456"));
2625
2626 let mut ctx3 = UserContext::new();
2627 ctx3.set_user_identifier_option(Some("user-789".to_owned()));
2628 assert_eq!(ctx3.user_identifier(), Some("user-789"));
2629 ctx3.set_user_identifier_option(None);
2630 assert_eq!(ctx3.user_identifier(), None);
2631
2632 let ctx4 = UserContext::new().with_user_identifier_option(Some("user-abc".to_owned()));
2633 assert_eq!(ctx4.user_identifier(), Some("user-abc"));
2634
2635 }
2636}
2637
2638pub use checker::{
2639 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
2640 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, TypedChecker,
2641 TypedEntityChecker, clear_record_status, mark_record_status,
2642};