1#![allow(warnings)]
2mod checker;
3mod context;
4mod entity_runtime;
5mod entity_status;
6mod error;
7mod event;
8mod graph;
9mod id;
10mod language;
11pub mod log_formatter;
12mod memory;
13mod registry;
14mod repository;
15
16pub use context::{
17 DataStore, InfoLogEntry, InMemoryDataStore, LogPayload, SchemaProvider, SqlLogEntry, SqlLogOperation,
18 SqlLogOptions, UnifiedLogBuffer, UnifiedLogEntry, UserContext,
19};
20pub use entity_status::{EntityAction, EntityStatus};
21pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
22pub use error::{ContextError, RepositoryError, RuntimeError};
23pub use event::{
24 RawAuditEvent, RawAuditEventKind, RawAuditEventSink, EntityPropertyChange, InMemoryRawAuditEventSink,
25 SafeAuditEvent, SafeAuditField, SafeAuditEventSink,
26};
27pub use graph::{
28 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem,
29 GraphNode, GraphOperation, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
30};
31pub(crate) use id::local_id_generator;
32pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
33pub use language::{
34 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
35};
36pub use memory::{MemoryRepository, MemoryRepositoryError};
37pub use registry::{
38 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
39 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
40 RequestPolicy, RuntimeModule,
41};
42pub use repository::{
43 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
44 RelationLoadPlan, Repository, ResolvedRepository,
45};
46
47#[cfg(test)]
48mod tests {
49 use std::collections::{BTreeMap, VecDeque};
50 use std::sync::{Arc, Mutex};
51
52 use super::{
53 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult,
54 CheckResults, CheckRule, Checker, RawAuditEvent, RawAuditEventKind, RawAuditEventSink,
55 GraphMutationKind, GraphNode, InMemoryAggregationCache,
56 InMemoryCheckerRegistry, InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry,
57 InMemoryRepositoryRegistry, InternalIdGenerator, Language, MemoryRepository, MetadataStore,
58 ObjectLocation, Repository, RepositoryBehavior, RepositoryError,
59 RequestPolicy, RuntimeError, RuntimeModule, SqlLogOperation, SqlLogOptions, TypedChecker,
60 TypedEntityChecker, UserContext, translate_check_result,
61 };
62 use teaql_data_service::{
63 DataServiceCapabilities, DataServiceExecutor, ExecutionMetadata, MutationExecutor,
64 MutationRequest, MutationResult, QueryExecutor, QueryRequest, QueryResult, DataServiceOperation,
65 };
66 use teaql_core::{
67 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
68 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
69 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
70 };
71 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
72 use teaql_sql::{CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect, quote_identifier_if_needed};
73
74 const ORDER_DEFAULT_PROJECTION: &str = "id, version, name";
75
76 #[derive(Debug, Default, Clone, Copy)]
77 struct PostgresDialect;
78
79 impl SqlDialect for PostgresDialect {
80 fn kind(&self) -> DatabaseKind {
81 DatabaseKind::PostgreSql
82 }
83
84 fn quote_ident(&self, ident: &str) -> String {
85 quote_identifier_if_needed(ident, '"')
86 }
87
88 fn placeholder(&self, index: usize) -> String {
89 format!("${index}")
90 }
91
92 fn schema_type_sql(
93 &self,
94 data_type: DataType,
95 _property: &PropertyDescriptor,
96 ) -> Result<&'static str, SqlCompileError> {
97 match data_type {
98 DataType::Bool => Ok("BOOLEAN"),
99 DataType::I64 | DataType::U64 => Ok("BIGINT"),
100 DataType::F64 => Ok("DOUBLE PRECISION"),
101 DataType::Decimal => Ok("NUMERIC"),
102 DataType::Text => Ok("TEXT"),
103 DataType::Json => Ok("JSONB"),
104 DataType::Date => Ok("DATE"),
105 DataType::Timestamp => Ok("TIMESTAMPTZ"),
106 }
107 }
108 }
109
110 fn entity() -> EntityDescriptor {
111 EntityDescriptor::new("Order")
112 .table_name("orders")
113 .property(
114 PropertyDescriptor::new("id", DataType::U64)
115 .column_name("id")
116 .id()
117 .not_null(),
118 )
119 .property(
120 PropertyDescriptor::new("version", DataType::I64)
121 .column_name("version")
122 .version()
123 .not_null(),
124 )
125 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
126 .relation(
127 teaql_core::RelationDescriptor::new("lines", "OrderLine")
128 .local_key("id")
129 .foreign_key("order_id")
130 .many(),
131 )
132 }
133
134 fn line_entity() -> EntityDescriptor {
135 EntityDescriptor::new("OrderLine")
136 .table_name("orderline")
137 .property(
138 PropertyDescriptor::new("id", DataType::U64)
139 .column_name("id")
140 .id()
141 .not_null(),
142 )
143 .property(
144 PropertyDescriptor::new("version", DataType::I64)
145 .column_name("version")
146 .version(),
147 )
148 .property(
149 PropertyDescriptor::new("order_id", DataType::U64)
150 .column_name("order_id")
151 .not_null(),
152 )
153 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
154 .property(
155 PropertyDescriptor::new("product_id", DataType::U64)
156 .column_name("product_id")
157 .not_null(),
158 )
159 .relation(
160 teaql_core::RelationDescriptor::new("product", "Product")
161 .local_key("product_id")
162 .foreign_key("id"),
163 )
164 }
165
166 fn product_entity() -> EntityDescriptor {
167 EntityDescriptor::new("Product")
168 .table_name("product")
169 .property(
170 PropertyDescriptor::new("id", DataType::U64)
171 .column_name("id")
172 .id()
173 .not_null(),
174 )
175 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
176 }
177
178 #[derive(Debug, Default)]
179 struct StubExecutor {
180 affected: u64,
181 rows: Vec<Record>,
182 }
183
184 #[derive(Debug, Default)]
185 struct QueueExecutor {
186 affected: u64,
187 rows: Mutex<VecDeque<Vec<Record>>>,
188 queries: Mutex<Vec<String>>,
189 }
190
191 struct OrderBehavior;
192
193 #[allow(dead_code)]
194 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
195 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
196 struct CatalogProductRow {
197 #[teaql(id)]
198 id: u64,
199 name: String,
200 }
201
202 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
203 #[teaql(entity = "OrderAggregate", table = "orders")]
204 struct OrderAggregateDynamic {
205 #[teaql(id)]
206 id: u64,
207 #[teaql(dynamic)]
208 dynamic: BTreeMap<String, Value>,
209 }
210
211 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
212 #[teaql(entity = "Product", table = "product")]
213 struct ProductEntityRow {
214 #[teaql(id)]
215 id: u64,
216 name: String,
217 }
218
219 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
220 #[teaql(entity = "OrderLine", table = "orderline")]
221 struct OrderLineEntityRow {
222 #[teaql(id)]
223 id: u64,
224 #[teaql(column = "order_id")]
225 order_id: u64,
226 name: String,
227 #[teaql(column = "product_id")]
228 product_id: u64,
229 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
230 product: Option<ProductEntityRow>,
231 }
232
233 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
234 #[teaql(entity = "OrderLine", table = "orderline")]
235 struct ProductLineEntityRow {
236 #[teaql(id)]
237 id: u64,
238 #[teaql(column = "order_id")]
239 order_id: u64,
240 name: String,
241 #[teaql(column = "product_id")]
242 product_id: u64,
243 }
244
245 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
246 #[teaql(entity = "Product", table = "product")]
247 struct ProductWithLinesEntityRow {
248 #[teaql(id)]
249 id: u64,
250 name: String,
251 #[teaql(relation(
252 target = "OrderLine",
253 local_key = "id",
254 foreign_key = "product_id",
255 many
256 ))]
257 lines: teaql_core::SmartList<ProductLineEntityRow>,
258 }
259
260 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
261 #[teaql(entity = "OrderLine", table = "orderline")]
262 struct OrderLineWithProductEntityRow {
263 #[teaql(id)]
264 id: u64,
265 #[teaql(column = "order_id")]
266 order_id: u64,
267 name: String,
268 #[teaql(column = "product_id")]
269 product_id: u64,
270 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
271 product: Option<ProductWithLinesEntityRow>,
272 }
273
274 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
275 #[teaql(entity = "Order", table = "orders")]
276 struct OrderAggregateRow {
277 #[teaql(id)]
278 id: u64,
279 #[teaql(version)]
280 version: i64,
281 name: String,
282 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
283 lines: teaql_core::SmartList<OrderLineEntityRow>,
284 }
285
286 #[derive(Debug, Clone, PartialEq, DeriveTeaqlEntity)]
287 #[teaql(entity = "Order", table = "orders")]
288 struct Order {
289 #[teaql(id)]
290 id: u64,
291 #[teaql(version)]
292 version: i64,
293 name: String,
294 }
295
296 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
297 #[teaql(entity = "Product", table = "product")]
298 struct TypedGraphProduct {
299 #[teaql(id)]
300 id: u64,
301 name: String,
302 }
303
304 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
305 #[teaql(entity = "OrderLine", table = "orderline")]
306 struct TypedGraphLine {
307 #[teaql(id)]
308 id: u64,
309 #[teaql(column = "order_id")]
310 order_id: Option<u64>,
311 name: String,
312 #[teaql(column = "product_id")]
313 product_id: Option<u64>,
314 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
315 product: Option<TypedGraphProduct>,
316 }
317
318 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
319 #[teaql(entity = "Order", table = "orders")]
320 struct TypedGraphOrder {
321 #[teaql(id)]
322 id: u64,
323 #[teaql(version)]
324 version: i64,
325 name: String,
326 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
327 lines: teaql_core::SmartList<TypedGraphLine>,
328 }
329
330 #[derive(Debug, PartialEq, Eq)]
331 struct OrderEntity {
332 id: u64,
333 version: i64,
334 name: String,
335 }
336
337 impl teaql_core::TeaqlEntity for OrderEntity {
338 fn entity_descriptor() -> EntityDescriptor {
339 entity()
340 }
341 }
342
343 impl Entity for OrderEntity {
344 fn from_record(record: Record) -> Result<Self, EntityError> {
345 let id = match record.get("id") {
346 Some(Value::U64(v)) => *v,
347 Some(Value::I64(v)) if *v >= 0 => *v as u64,
348 other => {
349 return Err(EntityError::new(
350 "Order",
351 format!("invalid id field: {other:?}"),
352 ));
353 }
354 };
355 let version = match record.get("version") {
356 Some(Value::I64(v)) => *v,
357 other => {
358 return Err(EntityError::new(
359 "Order",
360 format!("invalid version field: {other:?}"),
361 ));
362 }
363 };
364 let name = match record.get("name") {
365 Some(Value::Text(v)) => v.clone(),
366 other => {
367 return Err(EntityError::new(
368 "Order",
369 format!("invalid name field: {other:?}"),
370 ));
371 }
372 };
373 Ok(Self { id, version, name })
374 }
375
376 fn into_record(self) -> Record {
377 Record::from([
378 (String::from("id"), Value::U64(self.id)),
379 (String::from("version"), Value::I64(self.version)),
380 (String::from("name"), Value::Text(self.name)),
381 ])
382 }
383 }
384
385 #[derive(Debug)]
386 struct StubError;
387
388 impl std::fmt::Display for StubError {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 write!(f, "stub error")
391 }
392 }
393
394 impl std::error::Error for StubError {}
395
396 impl DataServiceExecutor for StubExecutor {
397 type Error = StubError;
398
399 fn capabilities(&self) -> DataServiceCapabilities {
400 DataServiceCapabilities::default()
401 }
402 }
403
404 impl QueryExecutor for StubExecutor {
405 async fn query(&self, _request: QueryRequest) -> Result<QueryResult, Self::Error> {
406 Ok(QueryResult {
407 rows: self.rows.clone(),
408 metadata: ExecutionMetadata {
409 debug_query: None,
410 backend: "stub".to_owned(),
411 operation: DataServiceOperation::Query,
412 started_at: std::time::SystemTime::now(),
413 ended_at: std::time::SystemTime::now(),
414 affected_rows: None,
415 result_count: Some(self.rows.len()),
416 trace_chain: Vec::new(),
417 comment: None,
418 backend_request_id: None,
419 },
420 })
421 }
422 }
423
424 impl MutationExecutor for StubExecutor {
425 async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
426 Ok(MutationResult {
427 affected_rows: self.affected,
428 generated_values: Record::new(),
429 metadata: ExecutionMetadata {
430 debug_query: None,
431 backend: "stub".to_owned(),
432 operation: DataServiceOperation::Update,
433 started_at: std::time::SystemTime::now(),
434 ended_at: std::time::SystemTime::now(),
435 affected_rows: Some(self.affected),
436 result_count: None,
437 trace_chain: Vec::new(),
438 comment: None,
439 backend_request_id: None,
440 },
441 })
442 }
443 }
444
445 impl DataServiceExecutor for QueueExecutor {
446 type Error = StubError;
447
448 fn capabilities(&self) -> DataServiceCapabilities {
449 DataServiceCapabilities::default()
450 }
451 }
452
453 impl QueryExecutor for QueueExecutor {
454 async fn query(&self, request: QueryRequest) -> Result<QueryResult, Self::Error> {
455 let sql_approx = format!("SELECT ... FROM {} ...", request.query.entity);
456 self.queries.lock().unwrap().push(sql_approx);
457 Ok(QueryResult {
458 rows: self.rows.lock().unwrap().pop_front().unwrap_or_default(),
459 metadata: ExecutionMetadata {
460 debug_query: None,
461 backend: "queue".to_owned(),
462 operation: DataServiceOperation::Query,
463 started_at: std::time::SystemTime::now(),
464 ended_at: std::time::SystemTime::now(),
465 affected_rows: None,
466 result_count: Some(0),
467 trace_chain: Vec::new(),
468 comment: None,
469 backend_request_id: None,
470 },
471 })
472 }
473 }
474
475 impl MutationExecutor for QueueExecutor {
476 async fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
477 Ok(MutationResult {
478 affected_rows: self.affected,
479 generated_values: Record::new(),
480 metadata: ExecutionMetadata {
481 debug_query: None,
482 backend: "queue".to_owned(),
483 operation: DataServiceOperation::Update,
484 started_at: std::time::SystemTime::now(),
485 ended_at: std::time::SystemTime::now(),
486 affected_rows: Some(self.affected),
487 result_count: None,
488 trace_chain: Vec::new(),
489 comment: None,
490 backend_request_id: None,
491 },
492 })
493 }
494 }
495
496
497
498 impl RepositoryBehavior for OrderBehavior {
499 fn before_select(
500 &self,
501 _ctx: &UserContext,
502 query: &mut teaql_core::SelectQuery,
503 ) -> Result<(), RuntimeError> {
504 query.filter = Some(Expr::eq("version", 1_i64));
505 Ok(())
506 }
507
508 fn before_insert(
509 &self,
510 _ctx: &UserContext,
511 command: &mut InsertCommand,
512 ) -> Result<(), RuntimeError> {
513 command
514 .values
515 .entry("version".to_owned())
516 .or_insert(Value::I64(1));
517 Ok(())
518 }
519
520 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
521 vec!["lines".to_owned()]
522 }
523 }
524
525 struct ContextAwareOrderBehavior;
526 struct TenantRequestPolicy;
527 struct OrderChecker;
528 struct TypedOrderChecker;
529 #[derive(Clone)]
530 struct RecordingEventSink {
531 events: Arc<Mutex<Vec<RawAuditEvent>>>,
532 }
533
534 impl RepositoryBehavior for ContextAwareOrderBehavior {
535 fn before_insert(
536 &self,
537 ctx: &UserContext,
538 command: &mut InsertCommand,
539 ) -> Result<(), RuntimeError> {
540 let tenant = ctx
541 .get_named_resource::<String>("tenant")
542 .cloned()
543 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
544 let version = *ctx
545 .get_named_resource::<i64>("initial_version")
546 .ok_or_else(|| {
547 RuntimeError::Behavior("missing initial_version resource".to_owned())
548 })?;
549 let trace_id = match ctx.local("trace_id") {
550 Some(Value::Text(value)) => value.clone(),
551 other => {
552 return Err(RuntimeError::Behavior(format!(
553 "missing trace_id local, got {other:?}"
554 )));
555 }
556 };
557
558 command
559 .values
560 .entry("name".to_owned())
561 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
562 command
563 .values
564 .entry("version".to_owned())
565 .or_insert(Value::I64(version));
566 Ok(())
567 }
568 }
569
570 impl RequestPolicy for TenantRequestPolicy {
571 fn enforce_select(
572 &self,
573 ctx: &UserContext,
574 query: &mut SelectQuery,
575 ) -> Result<(), RuntimeError> {
576 if query.entity == "Order" {
577 let tenant_id = ctx
578 .get_named_resource::<u64>("tenant_id")
579 .copied()
580 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
581 query.filter = Some(match query.filter.take() {
582 Some(filter) => filter.and_expr(Expr::eq("id", tenant_id)),
583 None => Expr::eq("id", tenant_id),
584 });
585 }
586 Ok(())
587 }
588
589 fn enforce_insert(
590 &self,
591 ctx: &UserContext,
592 command: &mut InsertCommand,
593 ) -> Result<(), RuntimeError> {
594 if command.entity == "Order" {
595 let tenant_id = ctx
596 .get_named_resource::<u64>("tenant_id")
597 .copied()
598 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
599 command
600 .values
601 .insert("version".to_owned(), Value::I64(tenant_id as i64));
602 }
603 Ok(())
604 }
605 }
606
607 impl Checker for OrderChecker {
608 fn entity(&self) -> &str {
609 "Order"
610 }
611
612 fn check_and_fix(
613 &self,
614 _ctx: &UserContext,
615 record: &mut Record,
616 location: &ObjectLocation,
617 results: &mut CheckResults,
618 ) {
619 let status = CheckObjectStatus::from_record(record);
620 if status.is_create() {
621 self.required(record, "name", location, results);
622 record.entry("version".to_owned()).or_insert(Value::I64(1));
623 }
624 if status.is_update()
625 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
626 {
627 record.insert(
628 "name".to_owned(),
629 Value::Text("graph-update-checked".to_owned()),
630 );
631 }
632 self.min_string_length(record, "name", 3, location, results);
633 }
634 }
635
636 impl TypedChecker<Order> for TypedOrderChecker {
637 fn check_and_fix_typed(
638 &self,
639 _ctx: &UserContext,
640 entity: &mut Order,
641 status: CheckObjectStatus,
642 location: &ObjectLocation,
643 results: &mut CheckResults,
644 ) {
645 if status.is_create() {
646 if entity.name.is_empty() {
647 results.push(CheckResult::required(location.clone().member("name")));
648 }
649 }
650 if entity.name.chars().count() < 3 {
651 results.push(CheckResult::min_str(
652 location.clone().member("name"),
653 3,
654 entity.name.clone(),
655 ));
656 }
657 if entity.name == "fix" {
658 entity.name = "fixed".to_owned();
659 }
660 }
661 }
662
663 impl RawAuditEventSink for RecordingEventSink {
664 fn on_event(&self, _ctx: &UserContext, event: &RawAuditEvent) -> Result<(), RuntimeError> {
665 self.events.lock().unwrap().push(event.clone());
666 Ok(())
667 }
668 }
669
670 struct FixedIdGenerator(u64);
671
672 impl InternalIdGenerator for FixedIdGenerator {
673 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
674 Ok(self.0)
675 }
676 }
677
678 struct SequentialIdGenerator {
679 next: Mutex<u64>,
680 }
681
682 impl SequentialIdGenerator {
683 fn new(next: u64) -> Self {
684 Self {
685 next: Mutex::new(next),
686 }
687 }
688 }
689
690 impl InternalIdGenerator for SequentialIdGenerator {
691 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
692 let mut next = self
693 .next
694 .lock()
695 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
696 let id = *next;
697 *next += 1;
698 Ok(id)
699 }
700 }
701
702 #[tokio::test]
703 async fn metadata_store_registers_entities() {
704 let store = InMemoryMetadataStore::new().with_entity(entity());
705 assert!(store.entity("Order").is_some());
706 }
707
708 #[tokio::test]
709 async fn runtime_module_registers_descriptor_into_context() {
710 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
711 assert!(ctx.entity("Order").is_some());
712 assert!(ctx.has_repository("Order"));
713 }
714
715 #[tokio::test]
716 async fn runtime_module_registers_derived_entity_and_behavior() {
717 let ctx = UserContext::new().with_module(
718 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
719 );
720 assert!(ctx.entity("CatalogProduct").is_some());
721 assert!(ctx.has_repository("CatalogProduct"));
722 assert!(ctx.repository_behavior("CatalogProduct").is_some());
723 }
724
725 #[tokio::test]
726 async fn module_macro_registers_multiple_entities() {
727 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
728 assert!(ctx.entity("CatalogProduct").is_some());
729 assert!(ctx.has_repository("CatalogProduct"));
730 }
731
732 #[tokio::test]
733 async fn module_macro_registers_entity_behavior_pairs() {
734 let ctx =
735 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
736 assert!(ctx.entity("CatalogProduct").is_some());
737 assert!(ctx.repository_behavior("CatalogProduct").is_some());
738 }
739
740 #[tokio::test]
741 async fn repository_returns_optimistic_lock_conflict() {
742 let store = InMemoryMetadataStore::new().with_entity(entity());
743 let executor = StubExecutor {
744 affected: 0,
745 rows: Vec::new(),
746 };
747 let repo = Repository::new(&store, &executor);
748
749 let err = repo
750 .update(
751 &UpdateCommand::new("Order", 1_u64)
752 .expected_version(3)
753 .value("name", "next"),
754 )
755 .await.unwrap_err();
756
757 match err {
758 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
759 other => panic!("unexpected error: {other}"),
760 }
761 }
762
763 #[tokio::test]
764 async fn user_context_indexes_resources_and_locals() {
765 let mut ctx =
766 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
767 ctx.insert_resource::<u64>(42);
768 ctx.insert_named_resource("tenant", String::from("acme"));
769 ctx.put_local("trace_id", "req-1");
770
771 assert!(ctx.entity("Order").is_some());
772 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
773 assert_eq!(
774 ctx.get_named_resource::<String>("tenant"),
775 Some(&String::from("acme"))
776 );
777 assert_eq!(
778 ctx.local("trace_id"),
779 Some(&Value::Text("req-1".to_owned()))
780 );
781 }
782
783 #[tokio::test]
784 async fn user_context_builds_context_repository() {
785 let mut ctx =
786 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
787 ctx.insert_resource(PostgresDialect);
788 ctx.insert_resource(StubExecutor {
789 affected: 1,
790 rows: Vec::new(),
791 });
792
793 let repo = ctx.repository::< StubExecutor>().unwrap();
794 let affected = repo
795 .update(
796 &UpdateCommand::new("Order", 1_u64)
797 .expected_version(3)
798 .value("name", "next"),
799 )
800 .await.unwrap();
801
802 assert_eq!(affected, 1);
803 }
804
805 #[tokio::test]
806 async fn user_context_resolves_repository_by_entity_type() {
807 let mut ctx = UserContext::new()
808 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
809 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
810 ctx.insert_resource(PostgresDialect);
811 ctx.insert_resource(StubExecutor {
812 affected: 1,
813 rows: Vec::new(),
814 });
815
816 let repo = ctx
817 .resolve_repository::< StubExecutor>("Order")
818 .unwrap();
819 assert_eq!(repo.entity(), "Order");
820 assert_eq!(repo.select().entity, "Order");
821
822 let affected = repo
823 .insert(
824 &repo
825 .insert_command()
826 .value("id", 1_u64)
827 .value("version", 1_i64)
828 .value("name", "n"),
829 )
830 .await.unwrap();
831 assert_eq!(affected, 1);
832 }
833
834 #[tokio::test]
835 async fn resolved_repository_applies_behavior_hooks() {
836 let mut ctx = UserContext::new()
837 .with_metadata(
838 InMemoryMetadataStore::new()
839 .with_entity(entity())
840 .with_entity(line_entity())
841 .with_entity(product_entity()),
842 )
843 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
844 .with_repository_behavior_registry(
845 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
846 );
847 ctx.insert_resource(PostgresDialect);
848 ctx.insert_resource(StubExecutor {
849 affected: 1,
850 rows: Vec::new(),
851 });
852
853 let repo = ctx
854 .resolve_repository::< StubExecutor>("Order")
855 .unwrap();
856
857 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
861 let affected = repo.insert(&insert).await.unwrap();
862 assert_eq!(affected, 1);
863 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
864 }
865
866 #[tokio::test]
867 async fn resolved_repository_applies_request_policy_after_behavior_hooks() {
868 let mut ctx = UserContext::new()
869 .with_metadata(
870 InMemoryMetadataStore::new()
871 .with_entity(entity())
872 .with_entity(line_entity())
873 .with_entity(product_entity()),
874 )
875 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
876 .with_repository_behavior_registry(
877 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
878 )
879 .with_request_policy(TenantRequestPolicy);
880 ctx.insert_named_resource("tenant_id", 9_u64);
881 ctx.insert_resource(PostgresDialect);
882 ctx.insert_resource(StubExecutor {
883 affected: 1,
884 rows: Vec::new(),
885 });
886
887 let repo = ctx
888 .resolve_repository::< StubExecutor>("Order")
889 .unwrap();
890
891 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
896 let command = repo.prepare_insert_command(&insert).unwrap();
897 assert_eq!(command.values.get("version"), Some(&Value::I64(9)));
898 }
899
900 #[tokio::test]
901 async fn resolved_repository_prepares_insert_command_with_generated_id() {
902 let mut ctx = UserContext::new()
903 .with_metadata(
904 InMemoryMetadataStore::new()
905 .with_entity(entity())
906 .with_entity(line_entity())
907 .with_entity(product_entity()),
908 )
909 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
910 .with_repository_behavior_registry(
911 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
912 )
913 .with_internal_id_generator(FixedIdGenerator(42));
914 ctx.insert_resource(PostgresDialect);
915 ctx.insert_resource(StubExecutor {
916 affected: 1,
917 rows: Vec::new(),
918 });
919
920 let repo = ctx
921 .resolve_repository::< StubExecutor>("Order")
922 .unwrap();
923
924 let prepared = repo
925 .prepare_insert_command(&repo.insert_command().value("id", 0_u64).value("name", "n"))
926 .unwrap();
927
928 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
929 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
930 assert_eq!(
931 prepared.values.get("name"),
932 Some(&Value::Text("n".to_owned()))
933 );
934
935 let prepared_zero_version = repo
936 .prepare_insert_command(
937 &repo
938 .insert_command()
939 .value("id", 0_u64)
940 .value("version", 0_i64)
941 .value("name", "zero-version"),
942 )
943 .unwrap();
944 assert_eq!(
945 prepared_zero_version.values.get("version"),
946 Some(&Value::I64(1))
947 );
948 }
949
950 #[tokio::test]
951 async fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
952 let mut ctx = UserContext::new()
953 .with_metadata(
954 InMemoryMetadataStore::new()
955 .with_entity(entity())
956 .with_entity(line_entity())
957 .with_entity(product_entity()),
958 )
959 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
960 .with_repository_behavior_registry(
961 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
962 )
963 .with_internal_id_generator(SequentialIdGenerator::new(500));
964 ctx.insert_resource(PostgresDialect);
965 ctx.insert_resource(StubExecutor {
966 affected: 1,
967 rows: Vec::new(),
968 });
969
970 let repo = ctx
971 .resolve_repository::< StubExecutor>("Order")
972 .unwrap();
973 let graph = GraphNode::new("Order").value("name", "root").relation(
974 "lines",
975 GraphNode::new("OrderLine")
976 .value("name", "line-1")
977 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
978 );
979
980 let saved = repo.save_graph(graph).await.unwrap();
981
982 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
983 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
984 let lines = saved.relations.get("lines").unwrap();
985 assert_eq!(lines.len(), 1);
986 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(502)));
987 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
988 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
989 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(501)));
990 let product = lines[0].relations.get("product").unwrap();
991 assert_eq!(product[0].values.get("id"), Some(&Value::U64(501)));
992 }
993
994 #[tokio::test]
995 async fn resolved_repository_extracts_and_saves_typed_entity_graph() {
996 let mut ctx = UserContext::new()
997 .with_metadata(
998 InMemoryMetadataStore::new()
999 .with_entity(entity())
1000 .with_entity(line_entity())
1001 .with_entity(product_entity()),
1002 )
1003 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1004 .with_internal_id_generator(SequentialIdGenerator::new(700));
1005 ctx.insert_resource(PostgresDialect);
1006 ctx.insert_resource(StubExecutor {
1007 affected: 1,
1008 rows: Vec::new(),
1009 });
1010
1011 let repo = ctx
1012 .resolve_repository::< StubExecutor>("Order")
1013 .unwrap();
1014 let order = TypedGraphOrder {
1015 id: 0,
1016 version: 0,
1017 name: "typed-root".to_owned(),
1018 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1019 id: 0,
1020 order_id: None,
1021 name: "typed-line".to_owned(),
1022 product_id: None,
1023 product: Some(TypedGraphProduct {
1024 id: 0,
1025 name: "typed-product".to_owned(),
1026 }),
1027 }]),
1028 };
1029
1030 let extracted = repo.graph_node_from_entity(order).unwrap();
1031 assert_eq!(extracted.entity, "Order");
1032 assert_eq!(
1033 extracted.values.get("name"),
1034 Some(&Value::Text("typed-root".to_owned()))
1035 );
1036 assert_eq!(extracted.values.get("id"), Some(&Value::U64(0)));
1037 assert_eq!(extracted.relations["lines"].len(), 1);
1038 assert_eq!(
1039 extracted.relations["lines"][0].values.get("name"),
1040 Some(&Value::Text("typed-line".to_owned()))
1041 );
1042 assert_eq!(
1043 extracted.relations["lines"][0].relations["product"].len(),
1044 1
1045 );
1046
1047 let saved = repo.save_graph(extracted).await.unwrap();
1048 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
1049 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1050 let lines = saved.relations.get("lines").unwrap();
1051 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(702)));
1052 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
1053 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
1054 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(701)));
1055 assert_eq!(
1056 lines[0].relations["product"][0].values.get("id"),
1057 Some(&Value::U64(701))
1058 );
1059 }
1060
1061 #[tokio::test]
1062 async fn resolved_repository_saves_typed_entity_graph_directly() {
1063 let mut ctx = UserContext::new()
1064 .with_metadata(
1065 InMemoryMetadataStore::new()
1066 .with_entity(entity())
1067 .with_entity(line_entity())
1068 .with_entity(product_entity()),
1069 )
1070 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1071 .with_internal_id_generator(SequentialIdGenerator::new(800));
1072 ctx.insert_resource(PostgresDialect);
1073 ctx.insert_resource(StubExecutor {
1074 affected: 1,
1075 rows: Vec::new(),
1076 });
1077
1078 let repo = ctx
1079 .resolve_repository::< StubExecutor>("Order")
1080 .unwrap();
1081 let saved = repo
1082 .save_entity_graph(TypedGraphOrder {
1083 id: 0,
1084 version: 0,
1085 name: "typed-direct".to_owned(),
1086 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1087 id: 0,
1088 order_id: None,
1089 name: "typed-line".to_owned(),
1090 product_id: None,
1091 product: Some(TypedGraphProduct {
1092 id: 0,
1093 name: "typed-product".to_owned(),
1094 }),
1095 }]),
1096 })
1097 .await.unwrap();
1098
1099 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
1100 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1101 assert_eq!(
1102 saved.relations["lines"][0].values.get("order_id"),
1103 Some(&Value::U64(800))
1104 );
1105 assert_eq!(
1106 saved.relations["lines"][0].values.get("product_id"),
1107 Some(&Value::U64(801))
1108 );
1109 }
1110
1111 #[tokio::test]
1112 async fn custom_user_context_can_drive_insert_preparation() {
1113 let mut ctx = UserContext::new()
1114 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1115 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1116 .with_repository_behavior_registry(
1117 InMemoryRepositoryBehaviorRegistry::new()
1118 .with_behavior("Order", ContextAwareOrderBehavior),
1119 )
1120 .with_internal_id_generator(FixedIdGenerator(99));
1121 ctx.insert_named_resource("tenant", String::from("acme"));
1122 ctx.insert_named_resource("initial_version", 7_i64);
1123 ctx.put_local("trace_id", "req-9");
1124 ctx.insert_resource(PostgresDialect);
1125 ctx.insert_resource(StubExecutor {
1126 affected: 1,
1127 rows: Vec::new(),
1128 });
1129
1130 let repo = ctx
1131 .resolve_repository::< StubExecutor>("Order")
1132 .unwrap();
1133 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
1134
1135 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
1136 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
1137 assert_eq!(
1138 prepared.values.get("name"),
1139 Some(&Value::Text("acme:req-9".to_owned()))
1140 );
1141 }
1142
1143 #[tokio::test]
1144 async fn checker_registry_validates_and_fixes_insert_commands() {
1145 let mut ctx = UserContext::new()
1146 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1147 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1148 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1149 .with_internal_id_generator(FixedIdGenerator(77));
1150 ctx.insert_resource(PostgresDialect);
1151 ctx.insert_resource(StubExecutor {
1152 affected: 1,
1153 rows: Vec::new(),
1154 });
1155
1156 let repo = ctx
1157 .resolve_repository::< StubExecutor>("Order")
1158 .unwrap();
1159 let prepared = repo
1160 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
1161 .unwrap();
1162
1163 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
1164 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
1165 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1166
1167 let error = repo
1168 .prepare_insert_command(&repo.insert_command().value("name", "no"))
1169 .unwrap_err();
1170 match error {
1171 RuntimeError::Check(results) => {
1172 assert_eq!(results.len(), 1);
1173 assert_eq!(results[0].location.to_string(), "name");
1174 }
1175 other => panic!("unexpected checker error: {other:?}"),
1176 }
1177 }
1178
1179 #[tokio::test]
1180 async fn typed_checker_validates_and_fixes_derived_entities_without_record_access() {
1181 let mut ctx = UserContext::new()
1182 .with_metadata(InMemoryMetadataStore::new().with_entity(Order::entity_descriptor()))
1183 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1184 .with_checker_registry(
1185 InMemoryCheckerRegistry::new()
1186 .with_checker(TypedEntityChecker::<Order, _>::new(TypedOrderChecker)),
1187 )
1188 .with_internal_id_generator(FixedIdGenerator(79));
1189 ctx.insert_resource(PostgresDialect);
1190 ctx.insert_resource(StubExecutor {
1191 affected: 1,
1192 rows: Vec::new(),
1193 });
1194
1195 let repo = ctx
1196 .resolve_repository::< StubExecutor>("Order")
1197 .unwrap();
1198 let prepared = repo
1199 .prepare_insert_command(
1200 &repo
1201 .insert_command()
1202 .value("name", "fix")
1203 .value("version", 1_i64),
1204 )
1205 .unwrap();
1206 assert_eq!(
1207 prepared.values.get("name"),
1208 Some(&Value::Text("fixed".to_owned()))
1209 );
1210 assert_eq!(prepared.values.get("id"), Some(&Value::U64(79)));
1211 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1212
1213 let error = repo
1214 .prepare_insert_command(&repo.insert_command().value("version", 1_i64))
1215 .unwrap_err();
1216 match error {
1217 RuntimeError::Check(results) => {
1218 assert!(
1219 results
1220 .iter()
1221 .any(|result| result.rule == CheckRule::Required
1222 && result.location.to_string() == "name")
1223 );
1224 }
1225 other => panic!("unexpected typed checker error: {other:?}"),
1226 }
1227 }
1228
1229
1230
1231 #[tokio::test]
1232 async fn checker_registry_reports_nested_create_locations_and_fixes_records() {
1233 let ctx = UserContext::new()
1234 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1235
1236 let mut child = Record::from([
1237 (String::from("id"), Value::U64(10)),
1238 (
1239 String::from(CHECK_OBJECT_STATUS_FIELD),
1240 Value::from(CheckObjectStatus::Create),
1241 ),
1242 ]);
1243 let error = ctx
1244 .check_and_fix_record_at(
1245 "Order",
1246 &mut child,
1247 &ObjectLocation::hash_root("lines").element(0),
1248 )
1249 .unwrap_err();
1250
1251 assert_eq!(child.get("version"), Some(&Value::I64(1)));
1252 match error {
1253 RuntimeError::Check(results) => {
1254 assert_eq!(results.len(), 1);
1255 assert_eq!(results[0].rule, CheckRule::Required);
1256 assert_eq!(results[0].location.to_string(), "lines[0].name");
1257 }
1258 other => panic!("unexpected checker error: {other:?}"),
1259 }
1260
1261 child.insert("name".to_owned(), Value::Text("valid child".to_owned()));
1262 ctx.check_and_fix_record_at(
1263 "Order",
1264 &mut child,
1265 &ObjectLocation::hash_root("lines").element(0),
1266 )
1267 .unwrap();
1268 }
1269
1270 #[tokio::test]
1271 async fn built_in_language_translators_cover_fifteen_languages() {
1272 assert_eq!(Language::ALL.len(), 15);
1273 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
1274 let messages = Language::ALL
1275 .iter()
1276 .map(|language| translate_check_result(*language, &result))
1277 .collect::<Vec<_>>();
1278
1279 assert!(messages.iter().all(|message| !message.is_empty()));
1280 assert!(messages.iter().any(|message| message.contains("required")));
1281 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
1282 assert!(
1283 messages
1284 .iter()
1285 .any(|message| message.contains("obligatoire"))
1286 );
1287 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
1288 assert_eq!(
1289 Language::from_code("zh-TW"),
1290 Some(Language::TraditionalChinese)
1291 );
1292 }
1293
1294 #[tokio::test]
1295 async fn user_context_language_switch_translates_checker_errors() {
1296 let mut ctx = UserContext::new()
1297 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1298 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1299 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1300 .with_internal_id_generator(FixedIdGenerator(77))
1301 .with_language(Language::Chinese);
1302 ctx.insert_resource(PostgresDialect);
1303 ctx.insert_resource(StubExecutor {
1304 affected: 1,
1305 rows: Vec::new(),
1306 });
1307
1308 let repo = ctx
1309 .resolve_repository::< StubExecutor>("Order")
1310 .unwrap();
1311 let error = repo
1312 .prepare_insert_command(&repo.insert_command())
1313 .unwrap_err();
1314 match error {
1315 RuntimeError::Check(results) => {
1316 assert_eq!(results.len(), 1);
1317 assert!(
1318 results[0]
1319 .message
1320 .as_ref()
1321 .is_some_and(|message| message.contains("å¿…å¡«"))
1322 );
1323 }
1324 other => panic!("unexpected checker error: {other:?}"),
1325 }
1326
1327 let mut ctx = UserContext::new().with_language(Language::English);
1328 ctx.set_language_code("es").unwrap();
1329 assert_eq!(ctx.language(), Language::Spanish);
1330 }
1331
1332 #[tokio::test]
1333 async fn checker_registry_merges_graph_update_fixes_by_object_status() {
1334 let mut ctx = UserContext::new()
1335 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1336 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1337 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1338 ctx.insert_resource(PostgresDialect);
1339 ctx.insert_resource(StubExecutor {
1340 affected: 1,
1341 rows: vec![Record::from([
1342 ("id".to_owned(), Value::U64(1)),
1343 ("version".to_owned(), Value::I64(1)),
1344 ("name".to_owned(), Value::Text("old".to_owned())),
1345 ])],
1346 });
1347
1348 let repo = ctx
1349 .resolve_repository::< StubExecutor>("Order")
1350 .unwrap();
1351 let saved = repo
1352 .save_graph(
1353 GraphNode::new("Order")
1354 .value("id", 1_u64)
1355 .value("version", 1_i64)
1356 .value("name", "graph-update"),
1357 )
1358 .await.unwrap();
1359
1360 assert_eq!(
1361 saved.values.get("name"),
1362 Some(&Value::Text("graph-update-checked".to_owned()))
1363 );
1364 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1365 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1366 }
1367
1368 #[tokio::test]
1369 async fn user_context_event_sink_receives_repository_mutation_events() {
1370 let events = Arc::new(Mutex::new(Vec::new()));
1371 let mut ctx = UserContext::new()
1372 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1373 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1374 .with_internal_id_generator(FixedIdGenerator(88))
1375 .with_event_sink(RecordingEventSink {
1376 events: events.clone(),
1377 });
1378 ctx.insert_resource(PostgresDialect);
1379 ctx.insert_resource(StubExecutor {
1380 affected: 1,
1381 rows: vec![Record::from([
1382 ("id".to_owned(), Value::U64(88)),
1383 ("version".to_owned(), Value::I64(1)),
1384 ("name".to_owned(), Value::Text("old".to_owned())),
1385 ])],
1386 });
1387
1388 let repo = ctx
1389 .resolve_repository::< StubExecutor>("Order")
1390 .unwrap();
1391 repo.insert(&repo.insert_command().value("name", "created"))
1392 .await.unwrap();
1393 repo.update(
1394 &repo
1395 .update_command(88_u64)
1396 .expected_version(1)
1397 .value("name", "updated"),
1398 )
1399 .await.unwrap();
1400 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1401 .await.unwrap();
1402 repo.recover(&repo.recover_command(88_u64, -3)).await.unwrap();
1403
1404 let events = events.lock().unwrap();
1405 assert_eq!(events.len(), 4);
1406 assert_eq!(events[0].kind, RawAuditEventKind::Created);
1407 assert_eq!(events[0].entity, "Order");
1408 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1409 assert_eq!(events[1].kind, RawAuditEventKind::Updated);
1410 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1411 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1412 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1413 assert_eq!(
1414 events[1]
1415 .old_values
1416 .as_ref()
1417 .and_then(|values| values.get("name")),
1418 None );
1420 assert_eq!(
1421 events[1]
1422 .new_values
1423 .as_ref()
1424 .and_then(|values| values.get("name")),
1425 Some(&Value::Text("updated".to_owned()))
1426 );
1427 assert_eq!(events[1].changes.len(), 1);
1428 assert_eq!(events[1].changes[0].field, "name");
1429 assert_eq!(
1430 events[1].changes[0].old_value,
1431 None );
1433 assert_eq!(
1434 events[1].changes[0].new_value,
1435 Some(Value::Text("updated".to_owned()))
1436 );
1437 assert_eq!(events[2].kind, RawAuditEventKind::Deleted);
1438 assert!(events[2].old_values.is_none()); assert!(events[2].new_values.is_none());
1440 assert_eq!(events[3].kind, RawAuditEventKind::Recovered);
1441 assert_eq!(
1442 events[3]
1443 .old_values
1444 .as_ref()
1445 .and_then(|values| values.get("version")),
1446 None );
1448 assert_eq!(
1449 events[3]
1450 .new_values
1451 .as_ref()
1452 .and_then(|values| values.get("version")),
1453 Some(&Value::I64(4))
1454 );
1455 assert_eq!(events[3].changes[0].field, "version");
1456 }
1457
1458 #[tokio::test]
1459 async fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1460 let events = Arc::new(Mutex::new(Vec::new()));
1461 let mut ctx = UserContext::new()
1462 .with_metadata(
1463 InMemoryMetadataStore::new()
1464 .with_entity(entity())
1465 .with_entity(line_entity())
1466 .with_entity(product_entity()),
1467 )
1468 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1469 .with_event_sink(RecordingEventSink {
1470 events: events.clone(),
1471 });
1472 ctx.insert_resource(PostgresDialect);
1473 ctx.insert_resource(StubExecutor {
1474 affected: 1,
1475 rows: vec![Record::from([
1476 ("id".to_owned(), Value::U64(1)),
1477 ("version".to_owned(), Value::I64(1)),
1478 ("name".to_owned(), Value::Text("old".to_owned())),
1479 ])],
1480 });
1481
1482 let repo = ctx
1483 .resolve_repository::< StubExecutor>("Order")
1484 .unwrap();
1485 repo.save_graph(
1486 GraphNode::new("Order")
1487 .value("id", 1_u64)
1488 .value("version", 1_i64)
1489 .value("name", "updated")
1490 .relation(
1491 "lines",
1492 GraphNode::new("OrderLine")
1493 .value("name", "line")
1494 .value("product_id", 3_u64),
1495 ),
1496 )
1497 .await.unwrap();
1498
1499 let events = events.lock().unwrap();
1500 assert_eq!(events.len(), 2);
1501 assert_eq!(events[0].kind, RawAuditEventKind::Updated);
1502 assert_eq!(events[0].entity, "Order");
1503 assert_eq!(events[1].kind, RawAuditEventKind::Created);
1504 assert_eq!(events[1].entity, "OrderLine");
1505 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1506 }
1507
1508 #[tokio::test]
1509 async fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1510 let mut ctx = UserContext::new()
1511 .with_metadata(
1512 InMemoryMetadataStore::new()
1513 .with_entity(entity())
1514 .with_entity(line_entity())
1515 .with_entity(product_entity()),
1516 )
1517 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1518 .with_internal_id_generator(SequentialIdGenerator::new(500));
1519 ctx.insert_resource(PostgresDialect);
1520 ctx.insert_resource(StubExecutor {
1521 affected: 1,
1522 rows: vec![Record::from([
1523 ("id".to_owned(), Value::U64(1)),
1524 ("version".to_owned(), Value::I64(1)),
1525 ("name".to_owned(), Value::Text("old".to_owned())),
1526 ])],
1527 });
1528
1529 let plan = ctx
1530 .plan_for_save_graph::< StubExecutor>(
1531 GraphNode::new("Order")
1532 .value("id", 1_u64)
1533 .value("version", 1_i64)
1534 .value("name", "updated")
1535 .relation(
1536 "lines",
1537 GraphNode::new("OrderLine")
1538 .value("name", "new-line-a")
1539 .value("product_id", 2_u64),
1540 )
1541 .relation(
1542 "lines",
1543 GraphNode::new("OrderLine")
1544 .value("name", "new-line-b")
1545 .value("product_id", 2_u64),
1546 )
1547 .relation(
1548 "lines",
1549 GraphNode::new("OrderLine")
1550 .value("id", 5_u64)
1551 .value("version", 1_i64)
1552 .value("name", "same-update-a"),
1553 )
1554 .relation(
1555 "lines",
1556 GraphNode::new("OrderLine")
1557 .value("id", 6_u64)
1558 .value("version", 1_i64)
1559 .value("name", "same-update-b"),
1560 )
1561 .relation(
1562 "lines",
1563 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1564 )
1565 .relation(
1566 "lines",
1567 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1568 ),
1569 )
1570 .await.unwrap();
1571 let counts = plan.grouped_counts();
1572
1573 assert_eq!(
1574 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1575 Some(&1)
1576 );
1577 assert_eq!(
1578 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1579 Some(&2)
1580 );
1581 assert_eq!(
1582 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1583 Some(&2)
1584 );
1585 assert_eq!(
1586 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1587 Some(&1)
1588 );
1589 assert_eq!(
1590 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1591 Some(&1)
1592 );
1593 let create_batch = plan
1594 .batches
1595 .iter()
1596 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1597 .unwrap();
1598 assert_eq!(create_batch.items.len(), 2);
1599 assert_eq!(
1600 create_batch.items[0].values.get("id"),
1601 Some(&Value::U64(500))
1602 );
1603 assert_eq!(
1604 create_batch.items[1].values.get("id"),
1605 Some(&Value::U64(501))
1606 );
1607 let update_batch = plan
1608 .batches
1609 .iter()
1610 .find(|batch| {
1611 batch.entity == "OrderLine"
1612 && batch.kind == GraphMutationKind::Update
1613 && batch.update_fields == vec!["name".to_owned()]
1614 })
1615 .unwrap();
1616 assert_eq!(update_batch.items.len(), 2);
1617 }
1618
1619 #[tokio::test]
1620 async fn resolved_repository_builds_relation_plans() {
1621 let mut ctx = UserContext::new()
1622 .with_metadata(
1623 InMemoryMetadataStore::new()
1624 .with_entity(entity())
1625 .with_entity(line_entity())
1626 .with_entity(product_entity()),
1627 )
1628 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1629 .with_repository_behavior_registry(
1630 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1631 );
1632 ctx.insert_resource(PostgresDialect);
1633 ctx.insert_resource(StubExecutor {
1634 affected: 1,
1635 rows: Vec::new(),
1636 });
1637
1638 let repo = ctx
1639 .resolve_repository::< StubExecutor>("Order")
1640 .unwrap();
1641 let plans = repo.relation_plans().unwrap();
1642
1643 assert_eq!(plans.len(), 1);
1644 assert_eq!(plans[0].relation_name, "lines");
1645 assert_eq!(plans[0].target_entity, "OrderLine");
1646 assert_eq!(plans[0].local_key, "id");
1647 assert_eq!(plans[0].foreign_key, "order_id");
1648 assert!(plans[0].many);
1649 }
1650
1651 #[tokio::test]
1652 async fn resolved_repository_builds_relation_query_from_parent_rows() {
1653 let mut ctx = UserContext::new()
1654 .with_metadata(
1655 InMemoryMetadataStore::new()
1656 .with_entity(entity())
1657 .with_entity(line_entity())
1658 .with_entity(product_entity()),
1659 )
1660 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1661 .with_repository_behavior_registry(
1662 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1663 );
1664 ctx.insert_resource(PostgresDialect);
1665 ctx.insert_resource(StubExecutor {
1666 affected: 1,
1667 rows: Vec::new(),
1668 });
1669
1670 let repo = ctx
1671 .resolve_repository::< StubExecutor>("Order")
1672 .unwrap();
1673 let parent_rows = vec![
1674 Record::from([(String::from("id"), Value::U64(11))]),
1675 Record::from([(String::from("id"), Value::U64(12))]),
1676 ];
1677
1678 let query = repo.relation_query("lines", &parent_rows).unwrap();
1679 }
1684
1685 #[tokio::test]
1686 async fn resolved_repository_enhances_parent_rows_with_relations() {
1687 let mut ctx = UserContext::new()
1688 .with_metadata(
1689 InMemoryMetadataStore::new()
1690 .with_entity(entity())
1691 .with_entity(line_entity())
1692 .with_entity(product_entity()),
1693 )
1694 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1695 .with_repository_behavior_registry(
1696 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1697 );
1698 ctx.insert_resource(PostgresDialect);
1699 ctx.insert_resource(StubExecutor {
1700 affected: 1,
1701 rows: vec![
1702 Record::from([
1703 (String::from("id"), Value::U64(101)),
1704 (String::from("order_id"), Value::U64(11)),
1705 (String::from("name"), Value::Text(String::from("l1"))),
1706 ]),
1707 Record::from([
1708 (String::from("id"), Value::U64(102)),
1709 (String::from("order_id"), Value::U64(11)),
1710 (String::from("name"), Value::Text(String::from("l2"))),
1711 ]),
1712 Record::from([
1713 (String::from("id"), Value::U64(201)),
1714 (String::from("order_id"), Value::U64(12)),
1715 (String::from("name"), Value::Text(String::from("l3"))),
1716 ]),
1717 ],
1718 });
1719
1720 let repo = ctx
1721 .resolve_repository::< StubExecutor>("Order")
1722 .unwrap();
1723 let mut parents = vec![
1724 Record::from([(String::from("id"), Value::U64(11))]),
1725 Record::from([(String::from("id"), Value::U64(12))]),
1726 ];
1727
1728 repo.enhance_relations(&mut parents).await.unwrap();
1729
1730 match parents[0].get("lines") {
1731 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1732 other => panic!("unexpected lines payload: {other:?}"),
1733 }
1734 match parents[1].get("lines") {
1735 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1736 other => panic!("unexpected lines payload: {other:?}"),
1737 }
1738 }
1739
1740 #[tokio::test]
1741 async fn relation_enhancement_wraps_inverse_many_relation_as_list() {
1742 let mut ctx = UserContext::new()
1743 .with_metadata(
1744 InMemoryMetadataStore::new()
1745 .with_entity(OrderLineWithProductEntityRow::entity_descriptor())
1746 .with_entity(ProductWithLinesEntityRow::entity_descriptor()),
1747 )
1748 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("OrderLine"));
1749 ctx.insert_resource(PostgresDialect);
1750 ctx.insert_resource(QueueExecutor {
1751 affected: 1,
1752 rows: Mutex::new(VecDeque::from([
1753 vec![Record::from([
1754 (String::from("id"), Value::U64(11)),
1755 (String::from("order_id"), Value::U64(7)),
1756 (String::from("name"), Value::Text(String::from("line"))),
1757 (String::from("product_id"), Value::U64(101)),
1758 ])],
1759 vec![Record::from([
1760 (String::from("id"), Value::U64(101)),
1761 (String::from("name"), Value::Text(String::from("sku"))),
1762 ])],
1763 ])),
1764 queries: Mutex::new(Vec::new()),
1765 });
1766
1767 let repo = ctx
1768 .resolve_repository::< QueueExecutor>("OrderLine")
1769 .unwrap();
1770 let rows = repo
1771 .fetch_enhanced_entities::<OrderLineWithProductEntityRow>(
1772 &SelectQuery::new("OrderLine").relation("product"),
1773 )
1774 .await.unwrap();
1775
1776 let product = rows.data[0].product.as_ref().unwrap();
1777 assert_eq!(product.lines.data.len(), 1);
1778 assert_eq!(product.lines.data[0].id, 11);
1779 }
1780
1781 #[tokio::test]
1782 async fn resolved_repository_fetches_smart_list_of_entities() {
1783 let mut ctx = UserContext::new()
1784 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1785 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1786 ctx.insert_resource(PostgresDialect);
1787 ctx.insert_resource(StubExecutor {
1788 affected: 1,
1789 rows: vec![Record::from([
1790 (String::from("id"), Value::U64(7)),
1791 (String::from("version"), Value::I64(2)),
1792 (String::from("name"), Value::Text(String::from("typed"))),
1793 ])],
1794 });
1795
1796 let repo = ctx
1797 .resolve_repository::< StubExecutor>("Order")
1798 .unwrap();
1799 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).await.unwrap();
1800
1801 assert_eq!(rows.len(), 1);
1802 assert_eq!(
1803 rows.first(),
1804 Some(&OrderEntity {
1805 id: 7,
1806 version: 2,
1807 name: String::from("typed"),
1808 })
1809 );
1810 }
1811
1812 #[tokio::test]
1813 async fn resolved_repository_fetches_smart_list_of_derived_entities() {
1814 let mut ctx = UserContext::new()
1815 .with_metadata(
1816 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1817 )
1818 .with_repository_registry(
1819 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1820 );
1821 ctx.insert_resource(PostgresDialect);
1822 ctx.insert_resource(StubExecutor {
1823 affected: 1,
1824 rows: vec![Record::from([
1825 (String::from("id"), Value::U64(9)),
1826 (String::from("name"), Value::Text(String::from("derived"))),
1827 ])],
1828 });
1829
1830 let repo = ctx
1831 .resolve_repository::< StubExecutor>("CatalogProduct")
1832 .unwrap();
1833 let rows = repo
1834 .fetch_entities::<CatalogProductRow>(&repo.select())
1835 .await.unwrap();
1836
1837 assert_eq!(rows.len(), 1);
1838 assert_eq!(
1839 rows.first(),
1840 Some(&CatalogProductRow {
1841 id: 9,
1842 name: String::from("derived"),
1843 })
1844 );
1845 }
1846
1847 #[tokio::test]
1848 async fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1849 let mut ctx = UserContext::new()
1850 .with_metadata(
1851 InMemoryMetadataStore::new()
1852 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1853 )
1854 .with_repository_registry(
1855 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1856 );
1857 ctx.insert_resource(PostgresDialect);
1858 ctx.insert_resource(StubExecutor {
1859 affected: 1,
1860 rows: vec![Record::from([
1861 (String::from("id"), Value::U64(1)),
1862 (String::from("lineCount"), Value::I64(3)),
1863 (String::from("amount"), Value::F64(18.5)),
1864 ])],
1865 });
1866
1867 let repo = ctx
1868 .resolve_repository::< StubExecutor>("OrderAggregate")
1869 .unwrap();
1870 let rows = repo
1871 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1872 .await.unwrap();
1873
1874 assert_eq!(rows.len(), 1);
1875 assert_eq!(rows.data[0].id, 1);
1876 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1877 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1878 assert_eq!(
1879 rows.into_vec().into_iter().next().unwrap().into_json(),
1880 serde_json::json!({
1881 "id": 1,
1882 "lineCount": 3,
1883 "amount": 18.5
1884 })
1885 );
1886 }
1887
1888 #[tokio::test]
1889 async fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1890 let executor = QueueExecutor {
1891 affected: 1,
1892 rows: Mutex::new(VecDeque::from([
1893 vec![
1894 Record::from([
1895 (String::from("id"), Value::U64(1)),
1896 (String::from("version"), Value::I64(1)),
1897 (String::from("name"), Value::Text(String::from("first"))),
1898 ]),
1899 Record::from([
1900 (String::from("id"), Value::U64(2)),
1901 (String::from("version"), Value::I64(1)),
1902 (String::from("name"), Value::Text(String::from("second"))),
1903 ]),
1904 ],
1905 vec![Record::from([
1906 (String::from("order_id"), Value::U64(1)),
1907 (String::from("lineCount"), Value::I64(3)),
1908 ])],
1909 ])),
1910 queries: Mutex::new(Vec::new()),
1911 };
1912 let mut ctx = UserContext::new()
1913 .with_metadata(
1914 InMemoryMetadataStore::new()
1915 .with_entity(entity())
1916 .with_entity(line_entity()),
1917 )
1918 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1919 ctx.insert_resource(PostgresDialect);
1920 ctx.insert_resource(executor);
1921
1922 let repo = ctx
1923 .resolve_repository::< QueueExecutor>("Order")
1924 .unwrap();
1925 let rows = repo
1926 .fetch_all_with_relation_aggregates(
1927 &repo
1928 .select()
1929 .project("id")
1930 .project("version")
1931 .project("name"),
1932 &[RelationAggregate::new(
1933 "lines",
1934 "lineCount",
1935 SelectQuery::new("OrderLine"),
1936 true,
1937 )],
1938 )
1939 .await.unwrap();
1940
1941 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1942 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1943
1944 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1945 let queries = executor.queries.lock().unwrap();
1946 assert_eq!(queries.len(), 2);
1947 assert_eq!(
1948 queries[1],
1949 "SELECT ... FROM OrderLine ..."
1950 );
1951 }
1952
1953 #[tokio::test]
1954 async fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1955 let mut line = line_entity();
1956 line.properties
1957 .iter_mut()
1958 .find(|property| property.name == "order_id")
1959 .unwrap()
1960 .column_name = "order_ref".to_owned();
1961 let executor = QueueExecutor {
1962 affected: 1,
1963 rows: Mutex::new(VecDeque::from([
1964 vec![Record::from([
1965 (String::from("id"), Value::U64(1)),
1966 (String::from("version"), Value::I64(1)),
1967 (String::from("name"), Value::Text(String::from("first"))),
1968 ])],
1969 vec![Record::from([
1970 (String::from("order_ref"), Value::I64(1)),
1971 (String::from("lineCount"), Value::I64(3)),
1972 ])],
1973 ])),
1974 queries: Mutex::new(Vec::new()),
1975 };
1976 let mut ctx = UserContext::new()
1977 .with_metadata(
1978 InMemoryMetadataStore::new()
1979 .with_entity(entity())
1980 .with_entity(line),
1981 )
1982 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1983 ctx.insert_resource(PostgresDialect);
1984 ctx.insert_resource(executor);
1985
1986 let repo = ctx
1987 .resolve_repository::< QueueExecutor>("Order")
1988 .unwrap();
1989 let rows = repo
1990 .fetch_all_with_relation_aggregates(
1991 &repo
1992 .select()
1993 .project("id")
1994 .project("version")
1995 .project("name"),
1996 &[RelationAggregate::new(
1997 "lines",
1998 "lineCount",
1999 SelectQuery::new("OrderLine"),
2000 true,
2001 )],
2002 )
2003 .await.unwrap();
2004
2005 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
2006 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2007 assert_eq!(
2008 executor.queries.lock().unwrap()[1],
2009 "SELECT ... FROM OrderLine ..."
2010 );
2011 }
2012
2013 #[tokio::test]
2014 async fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
2015 let executor = QueueExecutor {
2016 affected: 1,
2017 rows: Mutex::new(VecDeque::from([vec![Record::from([(
2018 String::from("count"),
2019 Value::I64(2),
2020 )])]])),
2021 queries: Mutex::new(Vec::new()),
2022 };
2023 let mut ctx = UserContext::new()
2024 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2025 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2026 ctx.insert_resource(PostgresDialect);
2027 ctx.insert_resource(executor);
2028 ctx.insert_resource(InMemoryAggregationCache::default());
2029
2030 let repo = ctx
2031 .resolve_repository::< QueueExecutor>("Order")
2032 .unwrap();
2033 let query = repo
2034 .select()
2035 .count("count")
2036 .enable_aggregation_cache_for(60_000);
2037
2038 let first = repo.fetch_all(&query).await.unwrap();
2039 let second = repo.fetch_all(&query).await.unwrap();
2040
2041 assert_eq!(first, second);
2042 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2043 assert_eq!(executor.queries.lock().unwrap().len(), 1);
2044 }
2045
2046 #[tokio::test]
2047 async fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
2048 let executor = QueueExecutor {
2049 affected: 1,
2050 rows: Mutex::new(VecDeque::from([
2051 vec![Record::from([(String::from("count"), Value::I64(2))])],
2052 vec![Record::from([(String::from("count"), Value::I64(3))])],
2053 ])),
2054 queries: Mutex::new(Vec::new()),
2055 };
2056 let mut ctx = UserContext::new()
2057 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2058 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2059 ctx.insert_resource(PostgresDialect);
2060 ctx.insert_resource(executor);
2061 ctx.insert_resource(
2062 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
2063 as Arc<dyn AggregationCacheBackend>,
2064 );
2065
2066 let repo = ctx
2067 .resolve_repository::< QueueExecutor>("Order")
2068 .unwrap();
2069 let query = repo
2070 .select()
2071 .count("count")
2072 .enable_aggregation_cache_for(60_000);
2073
2074 let first = repo.fetch_all(&query).await.unwrap();
2075 let cached = repo.fetch_all(&query).await.unwrap();
2076 repo.insert(
2077 &InsertCommand::new("Order")
2078 .value("id", 9_u64)
2079 .value("version", 1_i64)
2080 .value("name", "new"),
2081 )
2082 .await.unwrap();
2083 let refreshed = repo.fetch_all(&query).await.unwrap();
2084
2085 assert_eq!(first, cached);
2086 assert_ne!(cached, refreshed);
2087 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2088 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2089 }
2090
2091 #[tokio::test]
2092 async fn aggregation_cache_propagates_to_relation_aggregates() {
2093 let parent_rows = vec![
2094 Record::from([
2095 (String::from("id"), Value::U64(1)),
2096 (String::from("version"), Value::I64(1)),
2097 (String::from("name"), Value::Text(String::from("first"))),
2098 ]),
2099 Record::from([
2100 (String::from("id"), Value::U64(2)),
2101 (String::from("version"), Value::I64(1)),
2102 (String::from("name"), Value::Text(String::from("second"))),
2103 ]),
2104 ];
2105 let aggregate_rows = vec![Record::from([
2106 (String::from("order_id"), Value::U64(1)),
2107 (String::from("lineCount"), Value::I64(3)),
2108 ])];
2109 let executor = QueueExecutor {
2110 affected: 1,
2111 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
2112 queries: Mutex::new(Vec::new()),
2113 };
2114 let mut ctx = UserContext::new()
2115 .with_metadata(
2116 InMemoryMetadataStore::new()
2117 .with_entity(entity())
2118 .with_entity(line_entity()),
2119 )
2120 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2121 ctx.insert_resource(PostgresDialect);
2122 ctx.insert_resource(executor);
2123 ctx.insert_resource(InMemoryAggregationCache::default());
2124
2125 let repo = ctx
2126 .resolve_repository::< QueueExecutor>("Order")
2127 .unwrap();
2128 let query = repo
2129 .select()
2130 .project("id")
2131 .project("version")
2132 .project("name")
2133 .enable_aggregation_cache_for(60_000)
2134 .propagate_aggregation_cache(60_000);
2135 let aggregate =
2136 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2137
2138 let first = repo
2139 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
2140 .await.unwrap();
2141 let second = repo
2142 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2143 .await.unwrap();
2144
2145 assert_eq!(first, second);
2146 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2147 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2148 }
2149
2150 #[tokio::test]
2151 async fn memory_repository_fetches_smart_list_entities_with_query_features() {
2152 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2153 let repository = MemoryRepository::new(metadata).with_rows(
2154 "Order",
2155 vec![
2156 Record::from([
2157 (String::from("id"), Value::U64(1)),
2158 (String::from("version"), Value::I64(1)),
2159 (String::from("name"), Value::Text(String::from("alpha"))),
2160 ]),
2161 Record::from([
2162 (String::from("id"), Value::U64(2)),
2163 (String::from("version"), Value::I64(1)),
2164 (String::from("name"), Value::Text(String::from("beta"))),
2165 ]),
2166 Record::from([
2167 (String::from("id"), Value::U64(3)),
2168 (String::from("version"), Value::I64(1)),
2169 (String::from("name"), Value::Text(String::from("gamma"))),
2170 ]),
2171 ],
2172 );
2173
2174 let query = teaql_core::SelectQuery::new("Order")
2175 .filter(Expr::Binary {
2176 left: Box::new(Expr::column("id")),
2177 op: teaql_core::BinaryOp::Gte,
2178 right: Box::new(Expr::value(2_u64)),
2179 })
2180 .order_by(OrderBy::desc("id"))
2181 .limit(1);
2182
2183 let orders = repository.fetch_entities::<Order>(&query).unwrap();
2184
2185 assert_eq!(orders.ids(), vec![Value::U64(3)]);
2186 assert_eq!(orders.versions(), vec![1]);
2187 assert_eq!(orders.first().unwrap().name, "gamma");
2188 }
2189
2190 #[tokio::test]
2191 async fn memory_repository_runs_relation_aggregates() {
2192 let metadata = InMemoryMetadataStore::new()
2193 .with_entity(entity())
2194 .with_entity(line_entity());
2195
2196 let repository = MemoryRepository::new(metadata)
2197 .with_rows(
2198 "Order",
2199 vec![
2200 Record::from([
2201 (String::from("id"), Value::U64(1)),
2202 (String::from("version"), Value::I64(1)),
2203 (String::from("name"), Value::Text(String::from("first"))),
2204 ]),
2205 Record::from([
2206 (String::from("id"), Value::U64(2)),
2207 (String::from("version"), Value::I64(1)),
2208 (String::from("name"), Value::Text(String::from("second"))),
2209 ]),
2210 ],
2211 )
2212 .with_rows(
2213 "OrderLine",
2214 vec![
2215 Record::from([
2216 (String::from("id"), Value::U64(10)),
2217 (String::from("version"), Value::I64(1)),
2218 (String::from("order_id"), Value::U64(1)),
2219 (String::from("name"), Value::Text(String::from("line1"))),
2220 ]),
2221 Record::from([
2222 (String::from("id"), Value::U64(11)),
2223 (String::from("version"), Value::I64(1)),
2224 (String::from("order_id"), Value::U64(1)),
2225 (String::from("name"), Value::Text(String::from("line2"))),
2226 ]),
2227 Record::from([
2228 (String::from("id"), Value::U64(12)),
2229 (String::from("version"), Value::I64(1)),
2230 (String::from("order_id"), Value::U64(2)),
2231 (String::from("name"), Value::Text(String::from("line3"))),
2232 ]),
2233 ],
2234 );
2235
2236 let query = SelectQuery::new("Order").project("id").project("name");
2237 let aggregate = RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2238
2239 let rows = repository
2240 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2241 .unwrap();
2242
2243 assert_eq!(rows.len(), 2);
2244
2245 let first_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(1))).unwrap();
2246 assert_eq!(first_order.get("lineCount"), Some(&Value::U64(2)));
2247
2248 let second_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(2))).unwrap();
2249 assert_eq!(second_order.get("lineCount"), Some(&Value::U64(1)));
2250 }
2251
2252 #[tokio::test]
2253 async fn memory_repository_runs_aggregates() {
2254 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2255 let repository = MemoryRepository::new(metadata).with_rows(
2256 "Order",
2257 vec![
2258 Record::from([
2259 (String::from("id"), Value::U64(1)),
2260 (String::from("version"), Value::I64(1)),
2261 (String::from("name"), Value::Text(String::from("alpha"))),
2262 ]),
2263 Record::from([
2264 (String::from("id"), Value::U64(2)),
2265 (String::from("version"), Value::I64(2)),
2266 (String::from("name"), Value::Text(String::from("beta"))),
2267 ]),
2268 ],
2269 );
2270
2271 let query = teaql_core::SelectQuery {
2272 entity: String::from("Order"),
2273 projection: Vec::new(),
2274 expr_projection: Vec::new(),
2275 filter: None,
2276 having: None,
2277 order_by: Vec::new(),
2278 slice: None,
2279 trace_chain: Vec::new(),
2280 aggregates: vec![
2281 Aggregate {
2282 function: AggregateFunction::Count,
2283 field: String::from("id"),
2284 alias: String::from("count"),
2285 },
2286 Aggregate {
2287 function: AggregateFunction::Sum,
2288 field: String::from("version"),
2289 alias: String::from("versionSum"),
2290 },
2291 ],
2292 group_by: Vec::new(),
2293 relations: Vec::new(),
2294 aggregation_cache: None,
2295 comment: None,
2296 raw_sql: None,
2297 raw_sql_search_criteria: Vec::new(),
2298 dynamic_properties: Vec::new(),
2299 raw_projections: Vec::new(),
2300 object_group_bys: Vec::new(),
2301 search_with_text: None,
2302 child_enhancements: Vec::new(),
2303 };
2304
2305 let rows = repository.fetch_all(&query).unwrap();
2306
2307 assert_eq!(rows.len(), 1);
2308 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
2309 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2310 }
2311
2312 #[tokio::test]
2313 async fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
2314 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2315 let repository = MemoryRepository::new(metadata).with_rows(
2316 "Order",
2317 vec![
2318 Record::from([
2319 (String::from("id"), Value::U64(1)),
2320 (String::from("version"), Value::I64(1)),
2321 (String::from("name"), Value::Text(String::from("alpha"))),
2322 ]),
2323 Record::from([
2324 (String::from("id"), Value::U64(2)),
2325 (String::from("version"), Value::I64(2)),
2326 (String::from("name"), Value::Text(String::from("alpha"))),
2327 ]),
2328 Record::from([
2329 (String::from("id"), Value::U64(3)),
2330 (String::from("version"), Value::I64(3)),
2331 (String::from("name"), Value::Text(String::from("tmp-beta"))),
2332 ]),
2333 ],
2334 );
2335
2336 let rows = repository
2337 .fetch_all(
2338 &teaql_core::SelectQuery::new("Order")
2339 .filter(
2340 Expr::between("version", 1_i64, 3_i64)
2341 .and_expr(Expr::not_like("name", "tmp%"))
2342 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
2343 )
2344 .group_by("name")
2345 .count("total")
2346 .sum("version", "versionSum"),
2347 )
2348 .unwrap();
2349
2350 assert_eq!(rows.len(), 1);
2351 assert_eq!(
2352 rows[0].get("name"),
2353 Some(&Value::Text(String::from("alpha")))
2354 );
2355 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2356 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2357 }
2358
2359 #[tokio::test]
2360 async fn memory_repository_runs_extended_aggregates_and_having() {
2361 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2362 let repository = MemoryRepository::new(metadata).with_rows(
2363 "Order",
2364 vec![
2365 Record::from([
2366 (String::from("id"), Value::U64(1)),
2367 (String::from("version"), Value::I64(1)),
2368 (String::from("name"), Value::Text(String::from("alpha"))),
2369 ]),
2370 Record::from([
2371 (String::from("id"), Value::U64(2)),
2372 (String::from("version"), Value::I64(3)),
2373 (String::from("name"), Value::Text(String::from("alpha"))),
2374 ]),
2375 Record::from([
2376 (String::from("id"), Value::U64(3)),
2377 (String::from("version"), Value::I64(7)),
2378 (String::from("name"), Value::Text(String::from("beta"))),
2379 ]),
2380 ],
2381 );
2382
2383 let rows = repository
2384 .fetch_all(
2385 &teaql_core::SelectQuery::new("Order")
2386 .group_by("name")
2387 .count("total")
2388 .stddev("version", "stddevVersion")
2389 .var_pop("version", "varPopVersion")
2390 .bit_or("version", "bitOrVersion")
2391 .having(Expr::gt("total", 1_i64)),
2392 )
2393 .unwrap();
2394
2395 assert_eq!(rows.len(), 1);
2396 assert_eq!(
2397 rows[0].get("name"),
2398 Some(&Value::Text(String::from("alpha")))
2399 );
2400 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2401 assert_eq!(
2402 rows[0].get("stddevVersion").map(Value::to_json_value),
2403 Some(serde_json::Value::String(
2404 "1.4142135623730951454746218583".to_owned()
2405 ))
2406 );
2407 assert_eq!(
2408 rows[0].get("varPopVersion"),
2409 Some(&Value::Decimal(Decimal::ONE))
2410 );
2411 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
2412 }
2413
2414 #[tokio::test]
2415 async fn memory_repository_runs_sound_like_filter() {
2416 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2417 let repository = MemoryRepository::new(metadata).with_rows(
2418 "Order",
2419 vec![
2420 Record::from([
2421 (String::from("id"), Value::U64(1)),
2422 (String::from("version"), Value::I64(1)),
2423 (String::from("name"), Value::Text(String::from("Robert"))),
2424 ]),
2425 Record::from([
2426 (String::from("id"), Value::U64(2)),
2427 (String::from("version"), Value::I64(1)),
2428 (String::from("name"), Value::Text(String::from("Rupert"))),
2429 ]),
2430 Record::from([
2431 (String::from("id"), Value::U64(3)),
2432 (String::from("version"), Value::I64(1)),
2433 (String::from("name"), Value::Text(String::from("Ashcraft"))),
2434 ]),
2435 ],
2436 );
2437
2438 let rows = repository
2439 .fetch_all(
2440 &teaql_core::SelectQuery::new("Order")
2441 .filter(Expr::sound_like("name", "Robert"))
2442 .order_asc("id"),
2443 )
2444 .unwrap();
2445
2446 assert_eq!(rows.len(), 2);
2447 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2448 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2449 }
2450
2451 #[tokio::test]
2452 async fn memory_repository_runs_java_style_string_match_filters() {
2453 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2454 let repository = MemoryRepository::new(metadata).with_rows(
2455 "Order",
2456 vec![
2457 Record::from([
2458 (String::from("id"), Value::U64(1)),
2459 (String::from("version"), Value::I64(1)),
2460 (String::from("name"), Value::Text(String::from("tea-order"))),
2461 ]),
2462 Record::from([
2463 (String::from("id"), Value::U64(2)),
2464 (String::from("version"), Value::I64(1)),
2465 (
2466 String::from("name"),
2467 Value::Text(String::from("coffee-order")),
2468 ),
2469 ]),
2470 Record::from([
2471 (String::from("id"), Value::U64(3)),
2472 (String::from("version"), Value::I64(1)),
2473 (
2474 String::from("name"),
2475 Value::Text(String::from("tea-archived")),
2476 ),
2477 ]),
2478 ],
2479 );
2480
2481 let rows = repository
2482 .fetch_all(
2483 &teaql_core::SelectQuery::new("Order")
2484 .filter(
2485 Expr::contain("name", "tea")
2486 .and_expr(Expr::begin_with("name", "tea"))
2487 .and_expr(Expr::end_with("name", "order"))
2488 .and_expr(Expr::not_contain("name", "coffee"))
2489 .and_expr(Expr::not_begin_with("name", "archived"))
2490 .and_expr(Expr::not_end_with("name", "draft")),
2491 )
2492 .order_asc("id"),
2493 )
2494 .unwrap();
2495
2496 assert_eq!(rows.len(), 1);
2497 assert_eq!(
2498 rows[0].get("name"),
2499 Some(&Value::Text("tea-order".to_owned()))
2500 );
2501 }
2502
2503 #[tokio::test]
2504 async fn memory_repository_runs_property_to_property_filters() {
2505 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2506 let repository = MemoryRepository::new(metadata).with_rows(
2507 "Order",
2508 vec![
2509 Record::from([
2510 (String::from("id"), Value::U64(1)),
2511 (String::from("version"), Value::I64(2)),
2512 (String::from("name"), Value::Text(String::from("keep"))),
2513 ]),
2514 Record::from([
2515 (String::from("id"), Value::U64(2)),
2516 (String::from("version"), Value::I64(1)),
2517 (String::from("name"), Value::Text(String::from("skip"))),
2518 ]),
2519 ],
2520 );
2521
2522 let rows = repository
2523 .fetch_all(
2524 &teaql_core::SelectQuery::new("Order")
2525 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2526 .order_asc("id"),
2527 )
2528 .unwrap();
2529
2530 assert_eq!(rows.len(), 1);
2531 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2532 }
2533
2534 #[tokio::test]
2535 async fn memory_repository_supports_mutations_and_optimistic_locking() {
2536 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2537 let repository = MemoryRepository::new(metadata);
2538
2539 repository
2540 .insert(
2541 &InsertCommand::new("Order")
2542 .value("id", 10_u64)
2543 .value("version", 1_i64)
2544 .value("name", "draft"),
2545 )
2546 .unwrap();
2547 repository
2548 .update(
2549 &UpdateCommand::new("Order", 10_u64)
2550 .expected_version(1)
2551 .value("name", "submitted"),
2552 )
2553 .unwrap();
2554
2555 let row = repository
2556 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2557 .unwrap()
2558 .pop()
2559 .unwrap();
2560 assert_eq!(
2561 row.get("name"),
2562 Some(&Value::Text(String::from("submitted")))
2563 );
2564 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2565
2566 let conflict = repository
2567 .update(
2568 &UpdateCommand::new("Order", 10_u64)
2569 .expected_version(1)
2570 .value("name", "stale"),
2571 )
2572 .unwrap_err();
2573 assert!(matches!(
2574 conflict,
2575 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2576 ));
2577
2578 repository
2579 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2580 .unwrap();
2581 let row = repository
2582 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2583 .unwrap()
2584 .pop()
2585 .unwrap();
2586 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2587
2588 repository
2589 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2590 .unwrap();
2591 let row = repository
2592 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2593 .unwrap()
2594 .pop()
2595 .unwrap();
2596 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2597 }
2598
2599 #[tokio::test]
2600 async fn user_context_reports_missing_schema_provider() {
2601 let err = UserContext::new().ensure_schema().await.unwrap_err();
2602 assert!(
2603 matches!(err, RuntimeError::Schema(message) if message == "missing schema provider")
2604 );
2605 }
2606
2607 #[tokio::test]
2608 async fn user_context_stores_and_exposes_user_identifier() {
2609 let mut ctx = UserContext::new();
2610 let pid = std::process::id();
2611 let thread_id_str = format!("{:?}", std::thread::current().id());
2612 let numeric_thread_id = thread_id_str
2613 .strip_prefix("ThreadId(")
2614 .and_then(|s| s.strip_suffix(")"))
2615 .unwrap_or(&thread_id_str);
2616 let os_user = std::env::var("USER")
2617 .or_else(|_| std::env::var("USERNAME"))
2618 .unwrap_or_else(|_| "main".to_owned());
2619 let expected_default = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
2620 assert_eq!(ctx.user_identifier(), Some(expected_default.as_str()));
2621
2622 ctx.set_user_identifier("user-123");
2623 assert_eq!(ctx.user_identifier(), Some("user-123"));
2624
2625 let ctx2 = UserContext::new().with_user_identifier("user-456");
2626 assert_eq!(ctx2.user_identifier(), Some("user-456"));
2627
2628 let mut ctx3 = UserContext::new();
2629 ctx3.set_user_identifier_option(Some("user-789".to_owned()));
2630 assert_eq!(ctx3.user_identifier(), Some("user-789"));
2631 ctx3.set_user_identifier_option(None);
2632 assert_eq!(ctx3.user_identifier(), None);
2633
2634 let ctx4 = UserContext::new().with_user_identifier_option(Some("user-abc".to_owned()));
2635 assert_eq!(ctx4.user_identifier(), Some("user-abc"));
2636
2637 }
2638}
2639
2640pub use checker::{
2641 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
2642 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, TypedChecker,
2643 TypedEntityChecker, clear_record_status, mark_record_status,
2644};