1#![allow(warnings)]
2mod checker;
3mod context;
4mod entity_runtime;
5mod entity_status;
6mod error;
7mod event;
8mod graph;
9mod id;
10mod language;
11mod memory;
12mod registry;
13mod repository;
14
15pub use context::{
16 InfoLogEntry, LogPayload, SchemaProvider, SqlLogEntry, SqlLogOperation,
17 SqlLogOptions, UnifiedLogBuffer, UnifiedLogEntry, UserContext,
18};
19pub use entity_status::{EntityAction, EntityStatus};
20pub use entity_runtime::{ChangeSetStack, EntityChangeSet, EntityKey, EntityRoot, RootContext};
21pub use error::{ContextError, RepositoryError, RuntimeError};
22pub use event::{
23 EntityEvent, EntityEventKind, EntityEventSink, EntityPropertyChange, InMemoryEntityEventSink,
24};
25pub use graph::{
26 GraphMutationBatch, GraphMutationKind, GraphMutationPlan, GraphMutationPlanItem,
27 GraphNode, GraphOperation, ScopedCommentNode, TraceScopeToken, sorted_update_fields,
28};
29pub(crate) use id::local_id_generator;
30pub use id::{InternalIdGenerator, SnowflakeIdGenerator};
31pub use language::{
32 BuiltinTranslator, Language, MessageTranslator, translate_check_result, translate_location,
33};
34pub use memory::{MemoryRepository, MemoryRepositoryError};
35pub use registry::{
36 InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry, InMemoryRepositoryRegistry,
37 MetadataStore, RepositoryBehavior, RepositoryBehaviorRegistry, RepositoryRegistry,
38 RequestPolicy, RuntimeModule,
39};
40pub use repository::{
41 AggregationCacheBackend, ContextRepository, GraphTransactionBoundary, InMemoryAggregationCache,
42 RelationLoadPlan, Repository, ResolvedRepository,
43};
44
45#[cfg(test)]
46mod tests {
47 use std::collections::{BTreeMap, VecDeque};
48 use std::sync::{Arc, Mutex};
49
50 use super::{
51 AggregationCacheBackend, CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult,
52 CheckResults, CheckRule, Checker, EntityEvent, EntityEventKind, EntityEventSink,
53 GraphMutationKind, GraphNode, InMemoryAggregationCache,
54 InMemoryCheckerRegistry, InMemoryMetadataStore, InMemoryRepositoryBehaviorRegistry,
55 InMemoryRepositoryRegistry, InternalIdGenerator, Language, MemoryRepository, MetadataStore,
56 ObjectLocation, Repository, RepositoryBehavior, RepositoryError,
57 RequestPolicy, RuntimeError, RuntimeModule, SqlLogOperation, SqlLogOptions, TypedChecker,
58 TypedEntityChecker, UserContext, translate_check_result,
59 };
60 use teaql_data_service::{
61 DataServiceCapabilities, DataServiceExecutor, ExecutionMetadata, MutationExecutor,
62 MutationRequest, MutationResult, QueryExecutor, QueryRequest, QueryResult, DataServiceOperation,
63 };
64 use teaql_core::{
65 Aggregate, AggregateFunction, BinaryOp, DataType, Decimal, DeleteCommand, Entity,
66 EntityDescriptor, EntityError, Expr, InsertCommand, OrderBy, PropertyDescriptor, Record,
67 RecoverCommand, RelationAggregate, SelectQuery, TeaqlEntity, UpdateCommand, Value,
68 };
69 use teaql_macros::TeaqlEntity as DeriveTeaqlEntity;
70 use teaql_sql::{CompiledQuery, DatabaseKind, SqlCompileError, SqlDialect, quote_identifier_if_needed};
71
72 const ORDER_DEFAULT_PROJECTION: &str = "id, version, name";
73
74 #[derive(Debug, Default, Clone, Copy)]
75 struct PostgresDialect;
76
77 impl SqlDialect for PostgresDialect {
78 fn kind(&self) -> DatabaseKind {
79 DatabaseKind::PostgreSql
80 }
81
82 fn quote_ident(&self, ident: &str) -> String {
83 quote_identifier_if_needed(ident, '"')
84 }
85
86 fn placeholder(&self, index: usize) -> String {
87 format!("${index}")
88 }
89
90 fn schema_type_sql(
91 &self,
92 data_type: DataType,
93 _property: &PropertyDescriptor,
94 ) -> Result<&'static str, SqlCompileError> {
95 match data_type {
96 DataType::Bool => Ok("BOOLEAN"),
97 DataType::I64 | DataType::U64 => Ok("BIGINT"),
98 DataType::F64 => Ok("DOUBLE PRECISION"),
99 DataType::Decimal => Ok("NUMERIC"),
100 DataType::Text => Ok("TEXT"),
101 DataType::Json => Ok("JSONB"),
102 DataType::Date => Ok("DATE"),
103 DataType::Timestamp => Ok("TIMESTAMPTZ"),
104 }
105 }
106 }
107
108 fn entity() -> EntityDescriptor {
109 EntityDescriptor::new("Order")
110 .table_name("orders")
111 .property(
112 PropertyDescriptor::new("id", DataType::U64)
113 .column_name("id")
114 .id()
115 .not_null(),
116 )
117 .property(
118 PropertyDescriptor::new("version", DataType::I64)
119 .column_name("version")
120 .version()
121 .not_null(),
122 )
123 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
124 .relation(
125 teaql_core::RelationDescriptor::new("lines", "OrderLine")
126 .local_key("id")
127 .foreign_key("order_id")
128 .many(),
129 )
130 }
131
132 fn line_entity() -> EntityDescriptor {
133 EntityDescriptor::new("OrderLine")
134 .table_name("orderline")
135 .property(
136 PropertyDescriptor::new("id", DataType::U64)
137 .column_name("id")
138 .id()
139 .not_null(),
140 )
141 .property(
142 PropertyDescriptor::new("version", DataType::I64)
143 .column_name("version")
144 .version(),
145 )
146 .property(
147 PropertyDescriptor::new("order_id", DataType::U64)
148 .column_name("order_id")
149 .not_null(),
150 )
151 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
152 .property(
153 PropertyDescriptor::new("product_id", DataType::U64)
154 .column_name("product_id")
155 .not_null(),
156 )
157 .relation(
158 teaql_core::RelationDescriptor::new("product", "Product")
159 .local_key("product_id")
160 .foreign_key("id"),
161 )
162 }
163
164 fn product_entity() -> EntityDescriptor {
165 EntityDescriptor::new("Product")
166 .table_name("product")
167 .property(
168 PropertyDescriptor::new("id", DataType::U64)
169 .column_name("id")
170 .id()
171 .not_null(),
172 )
173 .property(PropertyDescriptor::new("name", DataType::Text).column_name("name"))
174 }
175
176 #[derive(Debug, Default)]
177 struct StubExecutor {
178 affected: u64,
179 rows: Vec<Record>,
180 }
181
182 #[derive(Debug, Default)]
183 struct QueueExecutor {
184 affected: u64,
185 rows: Mutex<VecDeque<Vec<Record>>>,
186 queries: Mutex<Vec<String>>,
187 }
188
189 struct OrderBehavior;
190
191 #[allow(dead_code)]
192 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
193 #[teaql(entity = "CatalogProduct", table = "catalog_product")]
194 struct CatalogProductRow {
195 #[teaql(id)]
196 id: u64,
197 name: String,
198 }
199
200 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
201 #[teaql(entity = "OrderAggregate", table = "orders")]
202 struct OrderAggregateDynamic {
203 #[teaql(id)]
204 id: u64,
205 #[teaql(dynamic)]
206 dynamic: BTreeMap<String, Value>,
207 }
208
209 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
210 #[teaql(entity = "Product", table = "product")]
211 struct ProductEntityRow {
212 #[teaql(id)]
213 id: u64,
214 name: String,
215 }
216
217 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
218 #[teaql(entity = "OrderLine", table = "orderline")]
219 struct OrderLineEntityRow {
220 #[teaql(id)]
221 id: u64,
222 #[teaql(column = "order_id")]
223 order_id: u64,
224 name: String,
225 #[teaql(column = "product_id")]
226 product_id: u64,
227 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
228 product: Option<ProductEntityRow>,
229 }
230
231 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
232 #[teaql(entity = "OrderLine", table = "orderline")]
233 struct ProductLineEntityRow {
234 #[teaql(id)]
235 id: u64,
236 #[teaql(column = "order_id")]
237 order_id: u64,
238 name: String,
239 #[teaql(column = "product_id")]
240 product_id: u64,
241 }
242
243 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
244 #[teaql(entity = "Product", table = "product")]
245 struct ProductWithLinesEntityRow {
246 #[teaql(id)]
247 id: u64,
248 name: String,
249 #[teaql(relation(
250 target = "OrderLine",
251 local_key = "id",
252 foreign_key = "product_id",
253 many
254 ))]
255 lines: teaql_core::SmartList<ProductLineEntityRow>,
256 }
257
258 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
259 #[teaql(entity = "OrderLine", table = "orderline")]
260 struct OrderLineWithProductEntityRow {
261 #[teaql(id)]
262 id: u64,
263 #[teaql(column = "order_id")]
264 order_id: u64,
265 name: String,
266 #[teaql(column = "product_id")]
267 product_id: u64,
268 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
269 product: Option<ProductWithLinesEntityRow>,
270 }
271
272 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
273 #[teaql(entity = "Order", table = "orders")]
274 struct OrderAggregateRow {
275 #[teaql(id)]
276 id: u64,
277 #[teaql(version)]
278 version: i64,
279 name: String,
280 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
281 lines: teaql_core::SmartList<OrderLineEntityRow>,
282 }
283
284 #[derive(Debug, Clone, PartialEq, DeriveTeaqlEntity)]
285 #[teaql(entity = "Order", table = "orders")]
286 struct Order {
287 #[teaql(id)]
288 id: u64,
289 #[teaql(version)]
290 version: i64,
291 name: String,
292 }
293
294 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
295 #[teaql(entity = "Product", table = "product")]
296 struct TypedGraphProduct {
297 #[teaql(id)]
298 id: u64,
299 name: String,
300 }
301
302 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
303 #[teaql(entity = "OrderLine", table = "orderline")]
304 struct TypedGraphLine {
305 #[teaql(id)]
306 id: u64,
307 #[teaql(column = "order_id")]
308 order_id: Option<u64>,
309 name: String,
310 #[teaql(column = "product_id")]
311 product_id: Option<u64>,
312 #[teaql(relation(target = "Product", local_key = "product_id", foreign_key = "id"))]
313 product: Option<TypedGraphProduct>,
314 }
315
316 #[derive(Debug, PartialEq, DeriveTeaqlEntity)]
317 #[teaql(entity = "Order", table = "orders")]
318 struct TypedGraphOrder {
319 #[teaql(id)]
320 id: u64,
321 #[teaql(version)]
322 version: i64,
323 name: String,
324 #[teaql(relation(target = "OrderLine", local_key = "id", foreign_key = "order_id", many))]
325 lines: teaql_core::SmartList<TypedGraphLine>,
326 }
327
328 #[derive(Debug, PartialEq, Eq)]
329 struct OrderEntity {
330 id: u64,
331 version: i64,
332 name: String,
333 }
334
335 impl teaql_core::TeaqlEntity for OrderEntity {
336 fn entity_descriptor() -> EntityDescriptor {
337 entity()
338 }
339 }
340
341 impl Entity for OrderEntity {
342 fn from_record(record: Record) -> Result<Self, EntityError> {
343 let id = match record.get("id") {
344 Some(Value::U64(v)) => *v,
345 Some(Value::I64(v)) if *v >= 0 => *v as u64,
346 other => {
347 return Err(EntityError::new(
348 "Order",
349 format!("invalid id field: {other:?}"),
350 ));
351 }
352 };
353 let version = match record.get("version") {
354 Some(Value::I64(v)) => *v,
355 other => {
356 return Err(EntityError::new(
357 "Order",
358 format!("invalid version field: {other:?}"),
359 ));
360 }
361 };
362 let name = match record.get("name") {
363 Some(Value::Text(v)) => v.clone(),
364 other => {
365 return Err(EntityError::new(
366 "Order",
367 format!("invalid name field: {other:?}"),
368 ));
369 }
370 };
371 Ok(Self { id, version, name })
372 }
373
374 fn into_record(self) -> Record {
375 Record::from([
376 (String::from("id"), Value::U64(self.id)),
377 (String::from("version"), Value::I64(self.version)),
378 (String::from("name"), Value::Text(self.name)),
379 ])
380 }
381 }
382
383 #[derive(Debug)]
384 struct StubError;
385
386 impl std::fmt::Display for StubError {
387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388 write!(f, "stub error")
389 }
390 }
391
392 impl std::error::Error for StubError {}
393
394 impl DataServiceExecutor for StubExecutor {
395 type Error = StubError;
396
397 fn capabilities(&self) -> DataServiceCapabilities {
398 DataServiceCapabilities::default()
399 }
400 }
401
402 impl QueryExecutor for StubExecutor {
403 fn query(&self, _request: QueryRequest) -> Result<QueryResult, Self::Error> {
404 Ok(QueryResult {
405 rows: self.rows.clone(),
406 metadata: ExecutionMetadata {
407 backend: "stub".to_owned(),
408 operation: DataServiceOperation::Query,
409 started_at: std::time::SystemTime::now(),
410 ended_at: std::time::SystemTime::now(),
411 affected_rows: None,
412 result_count: Some(self.rows.len()),
413 trace_chain: Vec::new(),
414 comment: None,
415 backend_request_id: None,
416 },
417 })
418 }
419 }
420
421 impl MutationExecutor for StubExecutor {
422 fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
423 Ok(MutationResult {
424 affected_rows: self.affected,
425 generated_values: Record::new(),
426 metadata: ExecutionMetadata {
427 backend: "stub".to_owned(),
428 operation: DataServiceOperation::Update,
429 started_at: std::time::SystemTime::now(),
430 ended_at: std::time::SystemTime::now(),
431 affected_rows: Some(self.affected),
432 result_count: None,
433 trace_chain: Vec::new(),
434 comment: None,
435 backend_request_id: None,
436 },
437 })
438 }
439 }
440
441 impl DataServiceExecutor for QueueExecutor {
442 type Error = StubError;
443
444 fn capabilities(&self) -> DataServiceCapabilities {
445 DataServiceCapabilities::default()
446 }
447 }
448
449 impl QueryExecutor for QueueExecutor {
450 fn query(&self, request: QueryRequest) -> Result<QueryResult, Self::Error> {
451 let sql_approx = format!("SELECT ... FROM {} ...", request.query.entity);
452 self.queries.lock().unwrap().push(sql_approx);
453 Ok(QueryResult {
454 rows: self.rows.lock().unwrap().pop_front().unwrap_or_default(),
455 metadata: ExecutionMetadata {
456 backend: "queue".to_owned(),
457 operation: DataServiceOperation::Query,
458 started_at: std::time::SystemTime::now(),
459 ended_at: std::time::SystemTime::now(),
460 affected_rows: None,
461 result_count: Some(0),
462 trace_chain: Vec::new(),
463 comment: None,
464 backend_request_id: None,
465 },
466 })
467 }
468 }
469
470 impl MutationExecutor for QueueExecutor {
471 fn mutate(&self, _request: MutationRequest) -> Result<MutationResult, Self::Error> {
472 Ok(MutationResult {
473 affected_rows: self.affected,
474 generated_values: Record::new(),
475 metadata: ExecutionMetadata {
476 backend: "queue".to_owned(),
477 operation: DataServiceOperation::Update,
478 started_at: std::time::SystemTime::now(),
479 ended_at: std::time::SystemTime::now(),
480 affected_rows: Some(self.affected),
481 result_count: None,
482 trace_chain: Vec::new(),
483 comment: None,
484 backend_request_id: None,
485 },
486 })
487 }
488 }
489
490
491
492 impl RepositoryBehavior for OrderBehavior {
493 fn before_select(
494 &self,
495 _ctx: &UserContext,
496 query: &mut teaql_core::SelectQuery,
497 ) -> Result<(), RuntimeError> {
498 query.filter = Some(Expr::eq("version", 1_i64));
499 Ok(())
500 }
501
502 fn before_insert(
503 &self,
504 _ctx: &UserContext,
505 command: &mut InsertCommand,
506 ) -> Result<(), RuntimeError> {
507 command
508 .values
509 .entry("version".to_owned())
510 .or_insert(Value::I64(1));
511 Ok(())
512 }
513
514 fn relation_loads(&self, _ctx: &UserContext) -> Vec<String> {
515 vec!["lines".to_owned()]
516 }
517 }
518
519 struct ContextAwareOrderBehavior;
520 struct TenantRequestPolicy;
521 struct OrderChecker;
522 struct TypedOrderChecker;
523 #[derive(Clone)]
524 struct RecordingEventSink {
525 events: Arc<Mutex<Vec<EntityEvent>>>,
526 }
527
528 impl RepositoryBehavior for ContextAwareOrderBehavior {
529 fn before_insert(
530 &self,
531 ctx: &UserContext,
532 command: &mut InsertCommand,
533 ) -> Result<(), RuntimeError> {
534 let tenant = ctx
535 .get_named_resource::<String>("tenant")
536 .cloned()
537 .ok_or_else(|| RuntimeError::Behavior("missing tenant resource".to_owned()))?;
538 let version = *ctx
539 .get_named_resource::<i64>("initial_version")
540 .ok_or_else(|| {
541 RuntimeError::Behavior("missing initial_version resource".to_owned())
542 })?;
543 let trace_id = match ctx.local("trace_id") {
544 Some(Value::Text(value)) => value.clone(),
545 other => {
546 return Err(RuntimeError::Behavior(format!(
547 "missing trace_id local, got {other:?}"
548 )));
549 }
550 };
551
552 command
553 .values
554 .entry("name".to_owned())
555 .or_insert(Value::Text(format!("{tenant}:{trace_id}")));
556 command
557 .values
558 .entry("version".to_owned())
559 .or_insert(Value::I64(version));
560 Ok(())
561 }
562 }
563
564 impl RequestPolicy for TenantRequestPolicy {
565 fn enforce_select(
566 &self,
567 ctx: &UserContext,
568 query: &mut SelectQuery,
569 ) -> Result<(), RuntimeError> {
570 if query.entity == "Order" {
571 let tenant_id = ctx
572 .get_named_resource::<u64>("tenant_id")
573 .copied()
574 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
575 query.filter = Some(match query.filter.take() {
576 Some(filter) => filter.and_expr(Expr::eq("id", tenant_id)),
577 None => Expr::eq("id", tenant_id),
578 });
579 }
580 Ok(())
581 }
582
583 fn enforce_insert(
584 &self,
585 ctx: &UserContext,
586 command: &mut InsertCommand,
587 ) -> Result<(), RuntimeError> {
588 if command.entity == "Order" {
589 let tenant_id = ctx
590 .get_named_resource::<u64>("tenant_id")
591 .copied()
592 .ok_or_else(|| RuntimeError::Policy("missing tenant_id".to_owned()))?;
593 command
594 .values
595 .insert("version".to_owned(), Value::I64(tenant_id as i64));
596 }
597 Ok(())
598 }
599 }
600
601 impl Checker for OrderChecker {
602 fn entity(&self) -> &str {
603 "Order"
604 }
605
606 fn check_and_fix(
607 &self,
608 _ctx: &UserContext,
609 record: &mut Record,
610 location: &ObjectLocation,
611 results: &mut CheckResults,
612 ) {
613 let status = CheckObjectStatus::from_record(record);
614 if status.is_create() {
615 self.required(record, "name", location, results);
616 record.entry("version".to_owned()).or_insert(Value::I64(1));
617 }
618 if status.is_update()
619 && record.get("name") == Some(&Value::Text("graph-update".to_owned()))
620 {
621 record.insert(
622 "name".to_owned(),
623 Value::Text("graph-update-checked".to_owned()),
624 );
625 }
626 self.min_string_length(record, "name", 3, location, results);
627 }
628 }
629
630 impl TypedChecker<Order> for TypedOrderChecker {
631 fn check_and_fix_typed(
632 &self,
633 _ctx: &UserContext,
634 entity: &mut Order,
635 status: CheckObjectStatus,
636 location: &ObjectLocation,
637 results: &mut CheckResults,
638 ) {
639 if status.is_create() {
640 if entity.name.is_empty() {
641 results.push(CheckResult::required(location.clone().member("name")));
642 }
643 }
644 if entity.name.chars().count() < 3 {
645 results.push(CheckResult::min_str(
646 location.clone().member("name"),
647 3,
648 entity.name.clone(),
649 ));
650 }
651 if entity.name == "fix" {
652 entity.name = "fixed".to_owned();
653 }
654 }
655 }
656
657 impl EntityEventSink for RecordingEventSink {
658 fn on_event(&self, _ctx: &UserContext, event: &EntityEvent) -> Result<(), RuntimeError> {
659 self.events.lock().unwrap().push(event.clone());
660 Ok(())
661 }
662 }
663
664 struct FixedIdGenerator(u64);
665
666 impl InternalIdGenerator for FixedIdGenerator {
667 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
668 Ok(self.0)
669 }
670 }
671
672 struct SequentialIdGenerator {
673 next: Mutex<u64>,
674 }
675
676 impl SequentialIdGenerator {
677 fn new(next: u64) -> Self {
678 Self {
679 next: Mutex::new(next),
680 }
681 }
682 }
683
684 impl InternalIdGenerator for SequentialIdGenerator {
685 fn generate_id(&self, _entity: &str) -> Result<u64, RuntimeError> {
686 let mut next = self
687 .next
688 .lock()
689 .map_err(|err| RuntimeError::IdGeneration(err.to_string()))?;
690 let id = *next;
691 *next += 1;
692 Ok(id)
693 }
694 }
695
696 #[test]
697 fn metadata_store_registers_entities() {
698 let store = InMemoryMetadataStore::new().with_entity(entity());
699 assert!(store.entity("Order").is_some());
700 }
701
702 #[test]
703 fn runtime_module_registers_descriptor_into_context() {
704 let ctx = UserContext::new().with_module(RuntimeModule::new().descriptor(entity()));
705 assert!(ctx.entity("Order").is_some());
706 assert!(ctx.has_repository("Order"));
707 }
708
709 #[test]
710 fn runtime_module_registers_derived_entity_and_behavior() {
711 let ctx = UserContext::new().with_module(
712 RuntimeModule::new().entity_with_behavior::<CatalogProductRow, _>(OrderBehavior),
713 );
714 assert!(ctx.entity("CatalogProduct").is_some());
715 assert!(ctx.has_repository("CatalogProduct"));
716 assert!(ctx.repository_behavior("CatalogProduct").is_some());
717 }
718
719 #[test]
720 fn module_macro_registers_multiple_entities() {
721 let ctx = UserContext::new().with_module(crate::module!(CatalogProductRow));
722 assert!(ctx.entity("CatalogProduct").is_some());
723 assert!(ctx.has_repository("CatalogProduct"));
724 }
725
726 #[test]
727 fn module_macro_registers_entity_behavior_pairs() {
728 let ctx =
729 UserContext::new().with_module(crate::module!(CatalogProductRow => OrderBehavior));
730 assert!(ctx.entity("CatalogProduct").is_some());
731 assert!(ctx.repository_behavior("CatalogProduct").is_some());
732 }
733
734 #[test]
735 fn repository_returns_optimistic_lock_conflict() {
736 let store = InMemoryMetadataStore::new().with_entity(entity());
737 let executor = StubExecutor {
738 affected: 0,
739 rows: Vec::new(),
740 };
741 let repo = Repository::new(&store, &executor);
742
743 let err = repo
744 .update(
745 &UpdateCommand::new("Order", 1_u64)
746 .expected_version(3)
747 .value("name", "next"),
748 )
749 .unwrap_err();
750
751 match err {
752 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. }) => {}
753 other => panic!("unexpected error: {other}"),
754 }
755 }
756
757 #[test]
758 fn user_context_indexes_resources_and_locals() {
759 let mut ctx =
760 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
761 ctx.insert_resource::<u64>(42);
762 ctx.insert_named_resource("tenant", String::from("acme"));
763 ctx.put_local("trace_id", "req-1");
764
765 assert!(ctx.entity("Order").is_some());
766 assert_eq!(ctx.get_resource::<u64>(), Some(&42));
767 assert_eq!(
768 ctx.get_named_resource::<String>("tenant"),
769 Some(&String::from("acme"))
770 );
771 assert_eq!(
772 ctx.local("trace_id"),
773 Some(&Value::Text("req-1".to_owned()))
774 );
775 }
776
777 #[test]
778 fn user_context_builds_context_repository() {
779 let mut ctx =
780 UserContext::new().with_metadata(InMemoryMetadataStore::new().with_entity(entity()));
781 ctx.insert_resource(PostgresDialect);
782 ctx.insert_resource(StubExecutor {
783 affected: 1,
784 rows: Vec::new(),
785 });
786
787 let repo = ctx.repository::< StubExecutor>().unwrap();
788 let affected = repo
789 .update(
790 &UpdateCommand::new("Order", 1_u64)
791 .expected_version(3)
792 .value("name", "next"),
793 )
794 .unwrap();
795
796 assert_eq!(affected, 1);
797 }
798
799 #[test]
800 fn user_context_resolves_repository_by_entity_type() {
801 let mut ctx = UserContext::new()
802 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
803 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
804 ctx.insert_resource(PostgresDialect);
805 ctx.insert_resource(StubExecutor {
806 affected: 1,
807 rows: Vec::new(),
808 });
809
810 let repo = ctx
811 .resolve_repository::< StubExecutor>("Order")
812 .unwrap();
813 assert_eq!(repo.entity(), "Order");
814 assert_eq!(repo.select().entity, "Order");
815
816 let affected = repo
817 .insert(
818 &repo
819 .insert_command()
820 .value("id", 1_u64)
821 .value("version", 1_i64)
822 .value("name", "n"),
823 )
824 .unwrap();
825 assert_eq!(affected, 1);
826 }
827
828 #[test]
829 fn resolved_repository_applies_behavior_hooks() {
830 let mut ctx = UserContext::new()
831 .with_metadata(
832 InMemoryMetadataStore::new()
833 .with_entity(entity())
834 .with_entity(line_entity())
835 .with_entity(product_entity()),
836 )
837 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
838 .with_repository_behavior_registry(
839 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
840 );
841 ctx.insert_resource(PostgresDialect);
842 ctx.insert_resource(StubExecutor {
843 affected: 1,
844 rows: Vec::new(),
845 });
846
847 let repo = ctx
848 .resolve_repository::< StubExecutor>("Order")
849 .unwrap();
850
851 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
855 let affected = repo.insert(&insert).unwrap();
856 assert_eq!(affected, 1);
857 assert_eq!(repo.relation_loads(), vec!["lines".to_owned()]);
858 }
859
860 #[test]
861 fn resolved_repository_applies_request_policy_after_behavior_hooks() {
862 let mut ctx = UserContext::new()
863 .with_metadata(
864 InMemoryMetadataStore::new()
865 .with_entity(entity())
866 .with_entity(line_entity())
867 .with_entity(product_entity()),
868 )
869 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
870 .with_repository_behavior_registry(
871 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
872 )
873 .with_request_policy(TenantRequestPolicy);
874 ctx.insert_named_resource("tenant_id", 9_u64);
875 ctx.insert_resource(PostgresDialect);
876 ctx.insert_resource(StubExecutor {
877 affected: 1,
878 rows: Vec::new(),
879 });
880
881 let repo = ctx
882 .resolve_repository::< StubExecutor>("Order")
883 .unwrap();
884
885 let insert = repo.insert_command().value("id", 1_u64).value("name", "n");
890 let command = repo.prepare_insert_command(&insert).unwrap();
891 assert_eq!(command.values.get("version"), Some(&Value::I64(9)));
892 }
893
894 #[test]
895 fn resolved_repository_prepares_insert_command_with_generated_id() {
896 let mut ctx = UserContext::new()
897 .with_metadata(
898 InMemoryMetadataStore::new()
899 .with_entity(entity())
900 .with_entity(line_entity())
901 .with_entity(product_entity()),
902 )
903 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
904 .with_repository_behavior_registry(
905 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
906 )
907 .with_internal_id_generator(FixedIdGenerator(42));
908 ctx.insert_resource(PostgresDialect);
909 ctx.insert_resource(StubExecutor {
910 affected: 1,
911 rows: Vec::new(),
912 });
913
914 let repo = ctx
915 .resolve_repository::< StubExecutor>("Order")
916 .unwrap();
917
918 let prepared = repo
919 .prepare_insert_command(&repo.insert_command().value("id", 0_u64).value("name", "n"))
920 .unwrap();
921
922 assert_eq!(prepared.values.get("id"), Some(&Value::U64(42)));
923 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
924 assert_eq!(
925 prepared.values.get("name"),
926 Some(&Value::Text("n".to_owned()))
927 );
928
929 let prepared_zero_version = repo
930 .prepare_insert_command(
931 &repo
932 .insert_command()
933 .value("id", 0_u64)
934 .value("version", 0_i64)
935 .value("name", "zero-version"),
936 )
937 .unwrap();
938 assert_eq!(
939 prepared_zero_version.values.get("version"),
940 Some(&Value::I64(1))
941 );
942 }
943
944 #[test]
945 fn resolved_repository_saves_create_graph_and_maintains_relation_keys() {
946 let mut ctx = UserContext::new()
947 .with_metadata(
948 InMemoryMetadataStore::new()
949 .with_entity(entity())
950 .with_entity(line_entity())
951 .with_entity(product_entity()),
952 )
953 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
954 .with_repository_behavior_registry(
955 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
956 )
957 .with_internal_id_generator(SequentialIdGenerator::new(500));
958 ctx.insert_resource(PostgresDialect);
959 ctx.insert_resource(StubExecutor {
960 affected: 1,
961 rows: Vec::new(),
962 });
963
964 let repo = ctx
965 .resolve_repository::< StubExecutor>("Order")
966 .unwrap();
967 let graph = GraphNode::new("Order").value("name", "root").relation(
968 "lines",
969 GraphNode::new("OrderLine")
970 .value("name", "line-1")
971 .relation("product", GraphNode::new("Product").value("name", "sku-1")),
972 );
973
974 let saved = repo.save_graph(graph).unwrap();
975
976 assert_eq!(saved.values.get("id"), Some(&Value::U64(500)));
977 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
978 let lines = saved.relations.get("lines").unwrap();
979 assert_eq!(lines.len(), 1);
980 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(502)));
981 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
982 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(500)));
983 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(501)));
984 let product = lines[0].relations.get("product").unwrap();
985 assert_eq!(product[0].values.get("id"), Some(&Value::U64(501)));
986 }
987
988 #[test]
989 fn resolved_repository_extracts_and_saves_typed_entity_graph() {
990 let mut ctx = UserContext::new()
991 .with_metadata(
992 InMemoryMetadataStore::new()
993 .with_entity(entity())
994 .with_entity(line_entity())
995 .with_entity(product_entity()),
996 )
997 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
998 .with_internal_id_generator(SequentialIdGenerator::new(700));
999 ctx.insert_resource(PostgresDialect);
1000 ctx.insert_resource(StubExecutor {
1001 affected: 1,
1002 rows: Vec::new(),
1003 });
1004
1005 let repo = ctx
1006 .resolve_repository::< StubExecutor>("Order")
1007 .unwrap();
1008 let order = TypedGraphOrder {
1009 id: 0,
1010 version: 0,
1011 name: "typed-root".to_owned(),
1012 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1013 id: 0,
1014 order_id: None,
1015 name: "typed-line".to_owned(),
1016 product_id: None,
1017 product: Some(TypedGraphProduct {
1018 id: 0,
1019 name: "typed-product".to_owned(),
1020 }),
1021 }]),
1022 };
1023
1024 let extracted = repo.graph_node_from_entity(order).unwrap();
1025 assert_eq!(extracted.entity, "Order");
1026 assert_eq!(
1027 extracted.values.get("name"),
1028 Some(&Value::Text("typed-root".to_owned()))
1029 );
1030 assert_eq!(extracted.values.get("id"), Some(&Value::U64(0)));
1031 assert_eq!(extracted.relations["lines"].len(), 1);
1032 assert_eq!(
1033 extracted.relations["lines"][0].values.get("name"),
1034 Some(&Value::Text("typed-line".to_owned()))
1035 );
1036 assert_eq!(
1037 extracted.relations["lines"][0].relations["product"].len(),
1038 1
1039 );
1040
1041 let saved = repo.save_graph(extracted).unwrap();
1042 assert_eq!(saved.values.get("id"), Some(&Value::U64(700)));
1043 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1044 let lines = saved.relations.get("lines").unwrap();
1045 assert_eq!(lines[0].values.get("id"), Some(&Value::U64(702)));
1046 assert_eq!(lines[0].values.get("version"), Some(&Value::I64(1)));
1047 assert_eq!(lines[0].values.get("order_id"), Some(&Value::U64(700)));
1048 assert_eq!(lines[0].values.get("product_id"), Some(&Value::U64(701)));
1049 assert_eq!(
1050 lines[0].relations["product"][0].values.get("id"),
1051 Some(&Value::U64(701))
1052 );
1053 }
1054
1055 #[test]
1056 fn resolved_repository_saves_typed_entity_graph_directly() {
1057 let mut ctx = UserContext::new()
1058 .with_metadata(
1059 InMemoryMetadataStore::new()
1060 .with_entity(entity())
1061 .with_entity(line_entity())
1062 .with_entity(product_entity()),
1063 )
1064 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1065 .with_internal_id_generator(SequentialIdGenerator::new(800));
1066 ctx.insert_resource(PostgresDialect);
1067 ctx.insert_resource(StubExecutor {
1068 affected: 1,
1069 rows: Vec::new(),
1070 });
1071
1072 let repo = ctx
1073 .resolve_repository::< StubExecutor>("Order")
1074 .unwrap();
1075 let saved = repo
1076 .save_entity_graph(TypedGraphOrder {
1077 id: 0,
1078 version: 0,
1079 name: "typed-direct".to_owned(),
1080 lines: teaql_core::SmartList::from(vec![TypedGraphLine {
1081 id: 0,
1082 order_id: None,
1083 name: "typed-line".to_owned(),
1084 product_id: None,
1085 product: Some(TypedGraphProduct {
1086 id: 0,
1087 name: "typed-product".to_owned(),
1088 }),
1089 }]),
1090 })
1091 .unwrap();
1092
1093 assert_eq!(saved.values.get("id"), Some(&Value::U64(800)));
1094 assert_eq!(saved.values.get("version"), Some(&Value::I64(1)));
1095 assert_eq!(
1096 saved.relations["lines"][0].values.get("order_id"),
1097 Some(&Value::U64(800))
1098 );
1099 assert_eq!(
1100 saved.relations["lines"][0].values.get("product_id"),
1101 Some(&Value::U64(801))
1102 );
1103 }
1104
1105 #[test]
1106 fn custom_user_context_can_drive_insert_preparation() {
1107 let mut ctx = UserContext::new()
1108 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1109 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1110 .with_repository_behavior_registry(
1111 InMemoryRepositoryBehaviorRegistry::new()
1112 .with_behavior("Order", ContextAwareOrderBehavior),
1113 )
1114 .with_internal_id_generator(FixedIdGenerator(99));
1115 ctx.insert_named_resource("tenant", String::from("acme"));
1116 ctx.insert_named_resource("initial_version", 7_i64);
1117 ctx.put_local("trace_id", "req-9");
1118 ctx.insert_resource(PostgresDialect);
1119 ctx.insert_resource(StubExecutor {
1120 affected: 1,
1121 rows: Vec::new(),
1122 });
1123
1124 let repo = ctx
1125 .resolve_repository::< StubExecutor>("Order")
1126 .unwrap();
1127 let prepared = repo.prepare_insert_command(&repo.insert_command()).unwrap();
1128
1129 assert_eq!(prepared.values.get("id"), Some(&Value::U64(99)));
1130 assert_eq!(prepared.values.get("version"), Some(&Value::I64(7)));
1131 assert_eq!(
1132 prepared.values.get("name"),
1133 Some(&Value::Text("acme:req-9".to_owned()))
1134 );
1135 }
1136
1137 #[test]
1138 fn checker_registry_validates_and_fixes_insert_commands() {
1139 let mut ctx = UserContext::new()
1140 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1141 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1142 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1143 .with_internal_id_generator(FixedIdGenerator(77));
1144 ctx.insert_resource(PostgresDialect);
1145 ctx.insert_resource(StubExecutor {
1146 affected: 1,
1147 rows: Vec::new(),
1148 });
1149
1150 let repo = ctx
1151 .resolve_repository::< StubExecutor>("Order")
1152 .unwrap();
1153 let prepared = repo
1154 .prepare_insert_command(&repo.insert_command().value("name", "valid"))
1155 .unwrap();
1156
1157 assert_eq!(prepared.values.get("id"), Some(&Value::U64(77)));
1158 assert_eq!(prepared.values.get("version"), Some(&Value::I64(1)));
1159 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1160
1161 let error = repo
1162 .prepare_insert_command(&repo.insert_command().value("name", "no"))
1163 .unwrap_err();
1164 match error {
1165 RuntimeError::Check(results) => {
1166 assert_eq!(results.len(), 1);
1167 assert_eq!(results[0].location.to_string(), "name");
1168 }
1169 other => panic!("unexpected checker error: {other:?}"),
1170 }
1171 }
1172
1173 #[test]
1174 fn typed_checker_validates_and_fixes_derived_entities_without_record_access() {
1175 let mut ctx = UserContext::new()
1176 .with_metadata(InMemoryMetadataStore::new().with_entity(Order::entity_descriptor()))
1177 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1178 .with_checker_registry(
1179 InMemoryCheckerRegistry::new()
1180 .with_checker(TypedEntityChecker::<Order, _>::new(TypedOrderChecker)),
1181 )
1182 .with_internal_id_generator(FixedIdGenerator(79));
1183 ctx.insert_resource(PostgresDialect);
1184 ctx.insert_resource(StubExecutor {
1185 affected: 1,
1186 rows: Vec::new(),
1187 });
1188
1189 let repo = ctx
1190 .resolve_repository::< StubExecutor>("Order")
1191 .unwrap();
1192 let prepared = repo
1193 .prepare_insert_command(
1194 &repo
1195 .insert_command()
1196 .value("name", "fix")
1197 .value("version", 1_i64),
1198 )
1199 .unwrap();
1200 assert_eq!(
1201 prepared.values.get("name"),
1202 Some(&Value::Text("fixed".to_owned()))
1203 );
1204 assert_eq!(prepared.values.get("id"), Some(&Value::U64(79)));
1205 assert!(!prepared.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1206
1207 let error = repo
1208 .prepare_insert_command(&repo.insert_command().value("version", 1_i64))
1209 .unwrap_err();
1210 match error {
1211 RuntimeError::Check(results) => {
1212 assert!(
1213 results
1214 .iter()
1215 .any(|result| result.rule == CheckRule::Required
1216 && result.location.to_string() == "name")
1217 );
1218 }
1219 other => panic!("unexpected typed checker error: {other:?}"),
1220 }
1221 }
1222
1223
1224
1225 #[test]
1226 fn checker_registry_reports_nested_create_locations_and_fixes_records() {
1227 let ctx = UserContext::new()
1228 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1229
1230 let mut child = Record::from([
1231 (String::from("id"), Value::U64(10)),
1232 (
1233 String::from(CHECK_OBJECT_STATUS_FIELD),
1234 Value::from(CheckObjectStatus::Create),
1235 ),
1236 ]);
1237 let error = ctx
1238 .check_and_fix_record_at(
1239 "Order",
1240 &mut child,
1241 &ObjectLocation::hash_root("lines").element(0),
1242 )
1243 .unwrap_err();
1244
1245 assert_eq!(child.get("version"), Some(&Value::I64(1)));
1246 match error {
1247 RuntimeError::Check(results) => {
1248 assert_eq!(results.len(), 1);
1249 assert_eq!(results[0].rule, CheckRule::Required);
1250 assert_eq!(results[0].location.to_string(), "lines[0].name");
1251 }
1252 other => panic!("unexpected checker error: {other:?}"),
1253 }
1254
1255 child.insert("name".to_owned(), Value::Text("valid child".to_owned()));
1256 ctx.check_and_fix_record_at(
1257 "Order",
1258 &mut child,
1259 &ObjectLocation::hash_root("lines").element(0),
1260 )
1261 .unwrap();
1262 }
1263
1264 #[test]
1265 fn built_in_language_translators_cover_fifteen_languages() {
1266 assert_eq!(Language::ALL.len(), 15);
1267 let result = super::CheckResult::required(ObjectLocation::hash_root("name"));
1268 let messages = Language::ALL
1269 .iter()
1270 .map(|language| translate_check_result(*language, &result))
1271 .collect::<Vec<_>>();
1272
1273 assert!(messages.iter().all(|message| !message.is_empty()));
1274 assert!(messages.iter().any(|message| message.contains("required")));
1275 assert!(messages.iter().any(|message| message.contains("å¿…å¡«")));
1276 assert!(
1277 messages
1278 .iter()
1279 .any(|message| message.contains("obligatoire"))
1280 );
1281 assert_eq!(Language::from_code("zh-CN"), Some(Language::Chinese));
1282 assert_eq!(
1283 Language::from_code("zh-TW"),
1284 Some(Language::TraditionalChinese)
1285 );
1286 }
1287
1288 #[test]
1289 fn user_context_language_switch_translates_checker_errors() {
1290 let mut ctx = UserContext::new()
1291 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1292 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1293 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker))
1294 .with_internal_id_generator(FixedIdGenerator(77))
1295 .with_language(Language::Chinese);
1296 ctx.insert_resource(PostgresDialect);
1297 ctx.insert_resource(StubExecutor {
1298 affected: 1,
1299 rows: Vec::new(),
1300 });
1301
1302 let repo = ctx
1303 .resolve_repository::< StubExecutor>("Order")
1304 .unwrap();
1305 let error = repo
1306 .prepare_insert_command(&repo.insert_command())
1307 .unwrap_err();
1308 match error {
1309 RuntimeError::Check(results) => {
1310 assert_eq!(results.len(), 1);
1311 assert!(
1312 results[0]
1313 .message
1314 .as_ref()
1315 .is_some_and(|message| message.contains("å¿…å¡«"))
1316 );
1317 }
1318 other => panic!("unexpected checker error: {other:?}"),
1319 }
1320
1321 let mut ctx = UserContext::new().with_language(Language::English);
1322 ctx.set_language_code("es").unwrap();
1323 assert_eq!(ctx.language(), Language::Spanish);
1324 }
1325
1326 #[test]
1327 fn checker_registry_merges_graph_update_fixes_by_object_status() {
1328 let mut ctx = UserContext::new()
1329 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1330 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1331 .with_checker_registry(InMemoryCheckerRegistry::new().with_checker(OrderChecker));
1332 ctx.insert_resource(PostgresDialect);
1333 ctx.insert_resource(StubExecutor {
1334 affected: 1,
1335 rows: vec![Record::from([
1336 ("id".to_owned(), Value::U64(1)),
1337 ("version".to_owned(), Value::I64(1)),
1338 ("name".to_owned(), Value::Text("old".to_owned())),
1339 ])],
1340 });
1341
1342 let repo = ctx
1343 .resolve_repository::< StubExecutor>("Order")
1344 .unwrap();
1345 let saved = repo
1346 .save_graph(
1347 GraphNode::new("Order")
1348 .value("id", 1_u64)
1349 .value("version", 1_i64)
1350 .value("name", "graph-update"),
1351 )
1352 .unwrap();
1353
1354 assert_eq!(
1355 saved.values.get("name"),
1356 Some(&Value::Text("graph-update-checked".to_owned()))
1357 );
1358 assert_eq!(saved.values.get("version"), Some(&Value::I64(2)));
1359 assert!(!saved.values.contains_key(CHECK_OBJECT_STATUS_FIELD));
1360 }
1361
1362 #[test]
1363 fn user_context_event_sink_receives_repository_mutation_events() {
1364 let events = Arc::new(Mutex::new(Vec::new()));
1365 let mut ctx = UserContext::new()
1366 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1367 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1368 .with_internal_id_generator(FixedIdGenerator(88))
1369 .with_event_sink(RecordingEventSink {
1370 events: events.clone(),
1371 });
1372 ctx.insert_resource(PostgresDialect);
1373 ctx.insert_resource(StubExecutor {
1374 affected: 1,
1375 rows: vec![Record::from([
1376 ("id".to_owned(), Value::U64(88)),
1377 ("version".to_owned(), Value::I64(1)),
1378 ("name".to_owned(), Value::Text("old".to_owned())),
1379 ])],
1380 });
1381
1382 let repo = ctx
1383 .resolve_repository::< StubExecutor>("Order")
1384 .unwrap();
1385 repo.insert(&repo.insert_command().value("name", "created"))
1386 .unwrap();
1387 repo.update(
1388 &repo
1389 .update_command(88_u64)
1390 .expected_version(1)
1391 .value("name", "updated"),
1392 )
1393 .unwrap();
1394 repo.delete(&repo.delete_command(88_u64).expected_version(2))
1395 .unwrap();
1396 repo.recover(&repo.recover_command(88_u64, -3)).unwrap();
1397
1398 let events = events.lock().unwrap();
1399 assert_eq!(events.len(), 4);
1400 assert_eq!(events[0].kind, EntityEventKind::Created);
1401 assert_eq!(events[0].entity, "Order");
1402 assert_eq!(events[0].values.get("id"), Some(&Value::U64(88)));
1403 assert_eq!(events[1].kind, EntityEventKind::Updated);
1404 assert_eq!(events[1].values.get("id"), Some(&Value::U64(88)));
1405 assert_eq!(events[1].values.get("version"), Some(&Value::I64(2)));
1406 assert_eq!(events[1].updated_fields, vec!["name".to_owned()]);
1407 assert_eq!(
1408 events[1]
1409 .old_values
1410 .as_ref()
1411 .and_then(|values| values.get("name")),
1412 None );
1414 assert_eq!(
1415 events[1]
1416 .new_values
1417 .as_ref()
1418 .and_then(|values| values.get("name")),
1419 Some(&Value::Text("updated".to_owned()))
1420 );
1421 assert_eq!(events[1].changes.len(), 1);
1422 assert_eq!(events[1].changes[0].field, "name");
1423 assert_eq!(
1424 events[1].changes[0].old_value,
1425 None );
1427 assert_eq!(
1428 events[1].changes[0].new_value,
1429 Some(Value::Text("updated".to_owned()))
1430 );
1431 assert_eq!(events[2].kind, EntityEventKind::Deleted);
1432 assert!(events[2].old_values.is_none()); assert!(events[2].new_values.is_none());
1434 assert_eq!(events[3].kind, EntityEventKind::Recovered);
1435 assert_eq!(
1436 events[3]
1437 .old_values
1438 .as_ref()
1439 .and_then(|values| values.get("version")),
1440 None );
1442 assert_eq!(
1443 events[3]
1444 .new_values
1445 .as_ref()
1446 .and_then(|values| values.get("version")),
1447 Some(&Value::I64(4))
1448 );
1449 assert_eq!(events[3].changes[0].field, "version");
1450 }
1451
1452 #[test]
1453 fn user_context_event_sink_receives_mixed_graph_mutation_events() {
1454 let events = Arc::new(Mutex::new(Vec::new()));
1455 let mut ctx = UserContext::new()
1456 .with_metadata(
1457 InMemoryMetadataStore::new()
1458 .with_entity(entity())
1459 .with_entity(line_entity())
1460 .with_entity(product_entity()),
1461 )
1462 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1463 .with_event_sink(RecordingEventSink {
1464 events: events.clone(),
1465 });
1466 ctx.insert_resource(PostgresDialect);
1467 ctx.insert_resource(StubExecutor {
1468 affected: 1,
1469 rows: vec![Record::from([
1470 ("id".to_owned(), Value::U64(1)),
1471 ("version".to_owned(), Value::I64(1)),
1472 ("name".to_owned(), Value::Text("old".to_owned())),
1473 ])],
1474 });
1475
1476 let repo = ctx
1477 .resolve_repository::< StubExecutor>("Order")
1478 .unwrap();
1479 repo.save_graph(
1480 GraphNode::new("Order")
1481 .value("id", 1_u64)
1482 .value("version", 1_i64)
1483 .value("name", "updated")
1484 .relation(
1485 "lines",
1486 GraphNode::new("OrderLine")
1487 .value("name", "line")
1488 .value("product_id", 3_u64),
1489 ),
1490 )
1491 .unwrap();
1492
1493 let events = events.lock().unwrap();
1494 assert_eq!(events.len(), 2);
1495 assert_eq!(events[0].kind, EntityEventKind::Updated);
1496 assert_eq!(events[0].entity, "Order");
1497 assert_eq!(events[1].kind, EntityEventKind::Created);
1498 assert_eq!(events[1].entity, "OrderLine");
1499 assert_eq!(events[1].values.get("order_id"), Some(&Value::U64(1)));
1500 }
1501
1502 #[test]
1503 fn save_graph_builds_plan_grouped_by_entity_and_operation() {
1504 let mut ctx = UserContext::new()
1505 .with_metadata(
1506 InMemoryMetadataStore::new()
1507 .with_entity(entity())
1508 .with_entity(line_entity())
1509 .with_entity(product_entity()),
1510 )
1511 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1512 .with_internal_id_generator(SequentialIdGenerator::new(500));
1513 ctx.insert_resource(PostgresDialect);
1514 ctx.insert_resource(StubExecutor {
1515 affected: 1,
1516 rows: vec![Record::from([
1517 ("id".to_owned(), Value::U64(1)),
1518 ("version".to_owned(), Value::I64(1)),
1519 ("name".to_owned(), Value::Text("old".to_owned())),
1520 ])],
1521 });
1522
1523 let plan = ctx
1524 .plan_for_save_graph::< StubExecutor>(
1525 GraphNode::new("Order")
1526 .value("id", 1_u64)
1527 .value("version", 1_i64)
1528 .value("name", "updated")
1529 .relation(
1530 "lines",
1531 GraphNode::new("OrderLine")
1532 .value("name", "new-line-a")
1533 .value("product_id", 2_u64),
1534 )
1535 .relation(
1536 "lines",
1537 GraphNode::new("OrderLine")
1538 .value("name", "new-line-b")
1539 .value("product_id", 2_u64),
1540 )
1541 .relation(
1542 "lines",
1543 GraphNode::new("OrderLine")
1544 .value("id", 5_u64)
1545 .value("version", 1_i64)
1546 .value("name", "same-update-a"),
1547 )
1548 .relation(
1549 "lines",
1550 GraphNode::new("OrderLine")
1551 .value("id", 6_u64)
1552 .value("version", 1_i64)
1553 .value("name", "same-update-b"),
1554 )
1555 .relation(
1556 "lines",
1557 GraphNode::new("OrderLine").value("id", 3_u64).remove(),
1558 )
1559 .relation(
1560 "lines",
1561 GraphNode::new("OrderLine").value("id", 4_u64).reference(),
1562 ),
1563 )
1564 .unwrap();
1565 let counts = plan.grouped_counts();
1566
1567 assert_eq!(
1568 counts.get(&("Order".to_owned(), GraphMutationKind::Update)),
1569 Some(&1)
1570 );
1571 assert_eq!(
1572 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Create)),
1573 Some(&2)
1574 );
1575 assert_eq!(
1576 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Update)),
1577 Some(&2)
1578 );
1579 assert_eq!(
1580 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Delete)),
1581 Some(&1)
1582 );
1583 assert_eq!(
1584 counts.get(&("OrderLine".to_owned(), GraphMutationKind::Reference)),
1585 Some(&1)
1586 );
1587 let create_batch = plan
1588 .batches
1589 .iter()
1590 .find(|batch| batch.entity == "OrderLine" && batch.kind == GraphMutationKind::Create)
1591 .unwrap();
1592 assert_eq!(create_batch.items.len(), 2);
1593 assert_eq!(
1594 create_batch.items[0].values.get("id"),
1595 Some(&Value::U64(500))
1596 );
1597 assert_eq!(
1598 create_batch.items[1].values.get("id"),
1599 Some(&Value::U64(501))
1600 );
1601 let update_batch = plan
1602 .batches
1603 .iter()
1604 .find(|batch| {
1605 batch.entity == "OrderLine"
1606 && batch.kind == GraphMutationKind::Update
1607 && batch.update_fields == vec!["name".to_owned()]
1608 })
1609 .unwrap();
1610 assert_eq!(update_batch.items.len(), 2);
1611 }
1612
1613 #[test]
1614 fn resolved_repository_builds_relation_plans() {
1615 let mut ctx = UserContext::new()
1616 .with_metadata(
1617 InMemoryMetadataStore::new()
1618 .with_entity(entity())
1619 .with_entity(line_entity())
1620 .with_entity(product_entity()),
1621 )
1622 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1623 .with_repository_behavior_registry(
1624 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1625 );
1626 ctx.insert_resource(PostgresDialect);
1627 ctx.insert_resource(StubExecutor {
1628 affected: 1,
1629 rows: Vec::new(),
1630 });
1631
1632 let repo = ctx
1633 .resolve_repository::< StubExecutor>("Order")
1634 .unwrap();
1635 let plans = repo.relation_plans().unwrap();
1636
1637 assert_eq!(plans.len(), 1);
1638 assert_eq!(plans[0].relation_name, "lines");
1639 assert_eq!(plans[0].target_entity, "OrderLine");
1640 assert_eq!(plans[0].local_key, "id");
1641 assert_eq!(plans[0].foreign_key, "order_id");
1642 assert!(plans[0].many);
1643 }
1644
1645 #[test]
1646 fn resolved_repository_builds_relation_query_from_parent_rows() {
1647 let mut ctx = UserContext::new()
1648 .with_metadata(
1649 InMemoryMetadataStore::new()
1650 .with_entity(entity())
1651 .with_entity(line_entity())
1652 .with_entity(product_entity()),
1653 )
1654 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1655 .with_repository_behavior_registry(
1656 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1657 );
1658 ctx.insert_resource(PostgresDialect);
1659 ctx.insert_resource(StubExecutor {
1660 affected: 1,
1661 rows: Vec::new(),
1662 });
1663
1664 let repo = ctx
1665 .resolve_repository::< StubExecutor>("Order")
1666 .unwrap();
1667 let parent_rows = vec![
1668 Record::from([(String::from("id"), Value::U64(11))]),
1669 Record::from([(String::from("id"), Value::U64(12))]),
1670 ];
1671
1672 let query = repo.relation_query("lines", &parent_rows).unwrap();
1673 }
1678
1679 #[test]
1680 fn resolved_repository_enhances_parent_rows_with_relations() {
1681 let mut ctx = UserContext::new()
1682 .with_metadata(
1683 InMemoryMetadataStore::new()
1684 .with_entity(entity())
1685 .with_entity(line_entity())
1686 .with_entity(product_entity()),
1687 )
1688 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"))
1689 .with_repository_behavior_registry(
1690 InMemoryRepositoryBehaviorRegistry::new().with_behavior("Order", OrderBehavior),
1691 );
1692 ctx.insert_resource(PostgresDialect);
1693 ctx.insert_resource(StubExecutor {
1694 affected: 1,
1695 rows: vec![
1696 Record::from([
1697 (String::from("id"), Value::U64(101)),
1698 (String::from("order_id"), Value::U64(11)),
1699 (String::from("name"), Value::Text(String::from("l1"))),
1700 ]),
1701 Record::from([
1702 (String::from("id"), Value::U64(102)),
1703 (String::from("order_id"), Value::U64(11)),
1704 (String::from("name"), Value::Text(String::from("l2"))),
1705 ]),
1706 Record::from([
1707 (String::from("id"), Value::U64(201)),
1708 (String::from("order_id"), Value::U64(12)),
1709 (String::from("name"), Value::Text(String::from("l3"))),
1710 ]),
1711 ],
1712 });
1713
1714 let repo = ctx
1715 .resolve_repository::< StubExecutor>("Order")
1716 .unwrap();
1717 let mut parents = vec![
1718 Record::from([(String::from("id"), Value::U64(11))]),
1719 Record::from([(String::from("id"), Value::U64(12))]),
1720 ];
1721
1722 repo.enhance_relations(&mut parents).unwrap();
1723
1724 match parents[0].get("lines") {
1725 Some(Value::List(lines)) => assert_eq!(lines.len(), 2),
1726 other => panic!("unexpected lines payload: {other:?}"),
1727 }
1728 match parents[1].get("lines") {
1729 Some(Value::List(lines)) => assert_eq!(lines.len(), 1),
1730 other => panic!("unexpected lines payload: {other:?}"),
1731 }
1732 }
1733
1734 #[test]
1735 fn relation_enhancement_wraps_inverse_many_relation_as_list() {
1736 let mut ctx = UserContext::new()
1737 .with_metadata(
1738 InMemoryMetadataStore::new()
1739 .with_entity(OrderLineWithProductEntityRow::entity_descriptor())
1740 .with_entity(ProductWithLinesEntityRow::entity_descriptor()),
1741 )
1742 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("OrderLine"));
1743 ctx.insert_resource(PostgresDialect);
1744 ctx.insert_resource(QueueExecutor {
1745 affected: 1,
1746 rows: Mutex::new(VecDeque::from([
1747 vec![Record::from([
1748 (String::from("id"), Value::U64(11)),
1749 (String::from("order_id"), Value::U64(7)),
1750 (String::from("name"), Value::Text(String::from("line"))),
1751 (String::from("product_id"), Value::U64(101)),
1752 ])],
1753 vec![Record::from([
1754 (String::from("id"), Value::U64(101)),
1755 (String::from("name"), Value::Text(String::from("sku"))),
1756 ])],
1757 ])),
1758 queries: Mutex::new(Vec::new()),
1759 });
1760
1761 let repo = ctx
1762 .resolve_repository::< QueueExecutor>("OrderLine")
1763 .unwrap();
1764 let rows = repo
1765 .fetch_enhanced_entities::<OrderLineWithProductEntityRow>(
1766 &SelectQuery::new("OrderLine").relation("product"),
1767 )
1768 .unwrap();
1769
1770 let product = rows.data[0].product.as_ref().unwrap();
1771 assert_eq!(product.lines.data.len(), 1);
1772 assert_eq!(product.lines.data[0].id, 11);
1773 }
1774
1775 #[test]
1776 fn resolved_repository_fetches_smart_list_of_entities() {
1777 let mut ctx = UserContext::new()
1778 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
1779 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1780 ctx.insert_resource(PostgresDialect);
1781 ctx.insert_resource(StubExecutor {
1782 affected: 1,
1783 rows: vec![Record::from([
1784 (String::from("id"), Value::U64(7)),
1785 (String::from("version"), Value::I64(2)),
1786 (String::from("name"), Value::Text(String::from("typed"))),
1787 ])],
1788 });
1789
1790 let repo = ctx
1791 .resolve_repository::< StubExecutor>("Order")
1792 .unwrap();
1793 let rows = repo.fetch_entities::<OrderEntity>(&repo.select()).unwrap();
1794
1795 assert_eq!(rows.len(), 1);
1796 assert_eq!(
1797 rows.first(),
1798 Some(&OrderEntity {
1799 id: 7,
1800 version: 2,
1801 name: String::from("typed"),
1802 })
1803 );
1804 }
1805
1806 #[test]
1807 fn resolved_repository_fetches_smart_list_of_derived_entities() {
1808 let mut ctx = UserContext::new()
1809 .with_metadata(
1810 InMemoryMetadataStore::new().with_entity(CatalogProductRow::entity_descriptor()),
1811 )
1812 .with_repository_registry(
1813 InMemoryRepositoryRegistry::new().with_entity("CatalogProduct"),
1814 );
1815 ctx.insert_resource(PostgresDialect);
1816 ctx.insert_resource(StubExecutor {
1817 affected: 1,
1818 rows: vec![Record::from([
1819 (String::from("id"), Value::U64(9)),
1820 (String::from("name"), Value::Text(String::from("derived"))),
1821 ])],
1822 });
1823
1824 let repo = ctx
1825 .resolve_repository::< StubExecutor>("CatalogProduct")
1826 .unwrap();
1827 let rows = repo
1828 .fetch_entities::<CatalogProductRow>(&repo.select())
1829 .unwrap();
1830
1831 assert_eq!(rows.len(), 1);
1832 assert_eq!(
1833 rows.first(),
1834 Some(&CatalogProductRow {
1835 id: 9,
1836 name: String::from("derived"),
1837 })
1838 );
1839 }
1840
1841 #[test]
1842 fn resolved_repository_collects_dynamic_properties_for_aggregate_output() {
1843 let mut ctx = UserContext::new()
1844 .with_metadata(
1845 InMemoryMetadataStore::new()
1846 .with_entity(OrderAggregateDynamic::entity_descriptor()),
1847 )
1848 .with_repository_registry(
1849 InMemoryRepositoryRegistry::new().with_entity("OrderAggregate"),
1850 );
1851 ctx.insert_resource(PostgresDialect);
1852 ctx.insert_resource(StubExecutor {
1853 affected: 1,
1854 rows: vec![Record::from([
1855 (String::from("id"), Value::U64(1)),
1856 (String::from("lineCount"), Value::I64(3)),
1857 (String::from("amount"), Value::F64(18.5)),
1858 ])],
1859 });
1860
1861 let repo = ctx
1862 .resolve_repository::< StubExecutor>("OrderAggregate")
1863 .unwrap();
1864 let rows = repo
1865 .fetch_entities::<OrderAggregateDynamic>(&repo.select())
1866 .unwrap();
1867
1868 assert_eq!(rows.len(), 1);
1869 assert_eq!(rows.data[0].id, 1);
1870 assert_eq!(rows.data[0].dynamic.get("lineCount"), Some(&Value::I64(3)));
1871 assert_eq!(rows.data[0].dynamic.get("amount"), Some(&Value::F64(18.5)));
1872 assert_eq!(
1873 rows.into_vec().into_iter().next().unwrap().into_json(),
1874 serde_json::json!({
1875 "id": 1,
1876 "lineCount": 3,
1877 "amount": 18.5
1878 })
1879 );
1880 }
1881
1882 #[test]
1883 fn resolved_repository_executes_relation_aggregates_into_dynamic_properties() {
1884 let executor = QueueExecutor {
1885 affected: 1,
1886 rows: Mutex::new(VecDeque::from([
1887 vec![
1888 Record::from([
1889 (String::from("id"), Value::U64(1)),
1890 (String::from("version"), Value::I64(1)),
1891 (String::from("name"), Value::Text(String::from("first"))),
1892 ]),
1893 Record::from([
1894 (String::from("id"), Value::U64(2)),
1895 (String::from("version"), Value::I64(1)),
1896 (String::from("name"), Value::Text(String::from("second"))),
1897 ]),
1898 ],
1899 vec![Record::from([
1900 (String::from("order_id"), Value::U64(1)),
1901 (String::from("lineCount"), Value::I64(3)),
1902 ])],
1903 ])),
1904 queries: Mutex::new(Vec::new()),
1905 };
1906 let mut ctx = UserContext::new()
1907 .with_metadata(
1908 InMemoryMetadataStore::new()
1909 .with_entity(entity())
1910 .with_entity(line_entity()),
1911 )
1912 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1913 ctx.insert_resource(PostgresDialect);
1914 ctx.insert_resource(executor);
1915
1916 let repo = ctx
1917 .resolve_repository::< QueueExecutor>("Order")
1918 .unwrap();
1919 let rows = repo
1920 .fetch_all_with_relation_aggregates(
1921 &repo
1922 .select()
1923 .project("id")
1924 .project("version")
1925 .project("name"),
1926 &[RelationAggregate::new(
1927 "lines",
1928 "lineCount",
1929 SelectQuery::new("OrderLine"),
1930 true,
1931 )],
1932 )
1933 .unwrap();
1934
1935 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
1936 assert_eq!(rows[1].get("lineCount"), Some(&Value::U64(0)));
1937
1938 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
1939 let queries = executor.queries.lock().unwrap();
1940 assert_eq!(queries.len(), 2);
1941 assert_eq!(
1942 queries[1],
1943 "SELECT ... FROM OrderLine ..."
1944 );
1945 }
1946
1947 #[test]
1948 fn resolved_repository_maps_relation_aggregate_storage_key_to_property_key() {
1949 let mut line = line_entity();
1950 line.properties
1951 .iter_mut()
1952 .find(|property| property.name == "order_id")
1953 .unwrap()
1954 .column_name = "order_ref".to_owned();
1955 let executor = QueueExecutor {
1956 affected: 1,
1957 rows: Mutex::new(VecDeque::from([
1958 vec![Record::from([
1959 (String::from("id"), Value::U64(1)),
1960 (String::from("version"), Value::I64(1)),
1961 (String::from("name"), Value::Text(String::from("first"))),
1962 ])],
1963 vec![Record::from([
1964 (String::from("order_ref"), Value::I64(1)),
1965 (String::from("lineCount"), Value::I64(3)),
1966 ])],
1967 ])),
1968 queries: Mutex::new(Vec::new()),
1969 };
1970 let mut ctx = UserContext::new()
1971 .with_metadata(
1972 InMemoryMetadataStore::new()
1973 .with_entity(entity())
1974 .with_entity(line),
1975 )
1976 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
1977 ctx.insert_resource(PostgresDialect);
1978 ctx.insert_resource(executor);
1979
1980 let repo = ctx
1981 .resolve_repository::< QueueExecutor>("Order")
1982 .unwrap();
1983 let rows = repo
1984 .fetch_all_with_relation_aggregates(
1985 &repo
1986 .select()
1987 .project("id")
1988 .project("version")
1989 .project("name"),
1990 &[RelationAggregate::new(
1991 "lines",
1992 "lineCount",
1993 SelectQuery::new("OrderLine"),
1994 true,
1995 )],
1996 )
1997 .unwrap();
1998
1999 assert_eq!(rows[0].get("lineCount"), Some(&Value::I64(3)));
2000 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2001 assert_eq!(
2002 executor.queries.lock().unwrap()[1],
2003 "SELECT ... FROM OrderLine ..."
2004 );
2005 }
2006
2007 #[test]
2008 fn resolved_repository_uses_aggregation_cache_when_resource_is_registered() {
2009 let executor = QueueExecutor {
2010 affected: 1,
2011 rows: Mutex::new(VecDeque::from([vec![Record::from([(
2012 String::from("count"),
2013 Value::I64(2),
2014 )])]])),
2015 queries: Mutex::new(Vec::new()),
2016 };
2017 let mut ctx = UserContext::new()
2018 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2019 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2020 ctx.insert_resource(PostgresDialect);
2021 ctx.insert_resource(executor);
2022 ctx.insert_resource(InMemoryAggregationCache::default());
2023
2024 let repo = ctx
2025 .resolve_repository::< QueueExecutor>("Order")
2026 .unwrap();
2027 let query = repo
2028 .select()
2029 .count("count")
2030 .enable_aggregation_cache_for(60_000);
2031
2032 let first = repo.fetch_all(&query).unwrap();
2033 let second = repo.fetch_all(&query).unwrap();
2034
2035 assert_eq!(first, second);
2036 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2037 assert_eq!(executor.queries.lock().unwrap().len(), 1);
2038 }
2039
2040 #[test]
2041 fn aggregation_cache_is_namespaced_and_invalidated_after_write() {
2042 let executor = QueueExecutor {
2043 affected: 1,
2044 rows: Mutex::new(VecDeque::from([
2045 vec![Record::from([(String::from("count"), Value::I64(2))])],
2046 vec![Record::from([(String::from("count"), Value::I64(3))])],
2047 ])),
2048 queries: Mutex::new(Vec::new()),
2049 };
2050 let mut ctx = UserContext::new()
2051 .with_metadata(InMemoryMetadataStore::new().with_entity(entity()))
2052 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2053 ctx.insert_resource(PostgresDialect);
2054 ctx.insert_resource(executor);
2055 ctx.insert_resource(
2056 Arc::new(InMemoryAggregationCache::with_namespace("tenant-a"))
2057 as Arc<dyn AggregationCacheBackend>,
2058 );
2059
2060 let repo = ctx
2061 .resolve_repository::< QueueExecutor>("Order")
2062 .unwrap();
2063 let query = repo
2064 .select()
2065 .count("count")
2066 .enable_aggregation_cache_for(60_000);
2067
2068 let first = repo.fetch_all(&query).unwrap();
2069 let cached = repo.fetch_all(&query).unwrap();
2070 repo.insert(
2071 &InsertCommand::new("Order")
2072 .value("id", 9_u64)
2073 .value("version", 1_i64)
2074 .value("name", "new"),
2075 )
2076 .unwrap();
2077 let refreshed = repo.fetch_all(&query).unwrap();
2078
2079 assert_eq!(first, cached);
2080 assert_ne!(cached, refreshed);
2081 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2082 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2083 }
2084
2085 #[test]
2086 fn aggregation_cache_propagates_to_relation_aggregates() {
2087 let parent_rows = vec![
2088 Record::from([
2089 (String::from("id"), Value::U64(1)),
2090 (String::from("version"), Value::I64(1)),
2091 (String::from("name"), Value::Text(String::from("first"))),
2092 ]),
2093 Record::from([
2094 (String::from("id"), Value::U64(2)),
2095 (String::from("version"), Value::I64(1)),
2096 (String::from("name"), Value::Text(String::from("second"))),
2097 ]),
2098 ];
2099 let aggregate_rows = vec![Record::from([
2100 (String::from("order_id"), Value::U64(1)),
2101 (String::from("lineCount"), Value::I64(3)),
2102 ])];
2103 let executor = QueueExecutor {
2104 affected: 1,
2105 rows: Mutex::new(VecDeque::from([parent_rows, aggregate_rows])),
2106 queries: Mutex::new(Vec::new()),
2107 };
2108 let mut ctx = UserContext::new()
2109 .with_metadata(
2110 InMemoryMetadataStore::new()
2111 .with_entity(entity())
2112 .with_entity(line_entity()),
2113 )
2114 .with_repository_registry(InMemoryRepositoryRegistry::new().with_entity("Order"));
2115 ctx.insert_resource(PostgresDialect);
2116 ctx.insert_resource(executor);
2117 ctx.insert_resource(InMemoryAggregationCache::default());
2118
2119 let repo = ctx
2120 .resolve_repository::< QueueExecutor>("Order")
2121 .unwrap();
2122 let query = repo
2123 .select()
2124 .project("id")
2125 .project("version")
2126 .project("name")
2127 .enable_aggregation_cache_for(60_000)
2128 .propagate_aggregation_cache(60_000);
2129 let aggregate =
2130 RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2131
2132 let first = repo
2133 .fetch_all_with_relation_aggregates(&query, &[aggregate.clone()])
2134 .unwrap();
2135 let second = repo
2136 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2137 .unwrap();
2138
2139 assert_eq!(first, second);
2140 let executor = ctx.get_resource::<QueueExecutor>().unwrap();
2141 assert_eq!(executor.queries.lock().unwrap().len(), 2);
2142 }
2143
2144 #[test]
2145 fn memory_repository_fetches_smart_list_entities_with_query_features() {
2146 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2147 let repository = MemoryRepository::new(metadata).with_rows(
2148 "Order",
2149 vec![
2150 Record::from([
2151 (String::from("id"), Value::U64(1)),
2152 (String::from("version"), Value::I64(1)),
2153 (String::from("name"), Value::Text(String::from("alpha"))),
2154 ]),
2155 Record::from([
2156 (String::from("id"), Value::U64(2)),
2157 (String::from("version"), Value::I64(1)),
2158 (String::from("name"), Value::Text(String::from("beta"))),
2159 ]),
2160 Record::from([
2161 (String::from("id"), Value::U64(3)),
2162 (String::from("version"), Value::I64(1)),
2163 (String::from("name"), Value::Text(String::from("gamma"))),
2164 ]),
2165 ],
2166 );
2167
2168 let query = teaql_core::SelectQuery::new("Order")
2169 .filter(Expr::Binary {
2170 left: Box::new(Expr::column("id")),
2171 op: teaql_core::BinaryOp::Gte,
2172 right: Box::new(Expr::value(2_u64)),
2173 })
2174 .order_by(OrderBy::desc("id"))
2175 .limit(1);
2176
2177 let orders = repository.fetch_entities::<Order>(&query).unwrap();
2178
2179 assert_eq!(orders.ids(), vec![Value::U64(3)]);
2180 assert_eq!(orders.versions(), vec![1]);
2181 assert_eq!(orders.first().unwrap().name, "gamma");
2182 }
2183
2184 #[test]
2185 fn memory_repository_runs_relation_aggregates() {
2186 let metadata = InMemoryMetadataStore::new()
2187 .with_entity(entity())
2188 .with_entity(line_entity());
2189
2190 let repository = MemoryRepository::new(metadata)
2191 .with_rows(
2192 "Order",
2193 vec![
2194 Record::from([
2195 (String::from("id"), Value::U64(1)),
2196 (String::from("version"), Value::I64(1)),
2197 (String::from("name"), Value::Text(String::from("first"))),
2198 ]),
2199 Record::from([
2200 (String::from("id"), Value::U64(2)),
2201 (String::from("version"), Value::I64(1)),
2202 (String::from("name"), Value::Text(String::from("second"))),
2203 ]),
2204 ],
2205 )
2206 .with_rows(
2207 "OrderLine",
2208 vec![
2209 Record::from([
2210 (String::from("id"), Value::U64(10)),
2211 (String::from("version"), Value::I64(1)),
2212 (String::from("order_id"), Value::U64(1)),
2213 (String::from("name"), Value::Text(String::from("line1"))),
2214 ]),
2215 Record::from([
2216 (String::from("id"), Value::U64(11)),
2217 (String::from("version"), Value::I64(1)),
2218 (String::from("order_id"), Value::U64(1)),
2219 (String::from("name"), Value::Text(String::from("line2"))),
2220 ]),
2221 Record::from([
2222 (String::from("id"), Value::U64(12)),
2223 (String::from("version"), Value::I64(1)),
2224 (String::from("order_id"), Value::U64(2)),
2225 (String::from("name"), Value::Text(String::from("line3"))),
2226 ]),
2227 ],
2228 );
2229
2230 let query = SelectQuery::new("Order").project("id").project("name");
2231 let aggregate = RelationAggregate::new("lines", "lineCount", SelectQuery::new("OrderLine"), true);
2232
2233 let rows = repository
2234 .fetch_all_with_relation_aggregates(&query, &[aggregate])
2235 .unwrap();
2236
2237 assert_eq!(rows.len(), 2);
2238
2239 let first_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(1))).unwrap();
2240 assert_eq!(first_order.get("lineCount"), Some(&Value::U64(2)));
2241
2242 let second_order = rows.iter().find(|r| r.get("id") == Some(&Value::U64(2))).unwrap();
2243 assert_eq!(second_order.get("lineCount"), Some(&Value::U64(1)));
2244 }
2245
2246 #[test]
2247 fn memory_repository_runs_aggregates() {
2248 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2249 let repository = MemoryRepository::new(metadata).with_rows(
2250 "Order",
2251 vec![
2252 Record::from([
2253 (String::from("id"), Value::U64(1)),
2254 (String::from("version"), Value::I64(1)),
2255 (String::from("name"), Value::Text(String::from("alpha"))),
2256 ]),
2257 Record::from([
2258 (String::from("id"), Value::U64(2)),
2259 (String::from("version"), Value::I64(2)),
2260 (String::from("name"), Value::Text(String::from("beta"))),
2261 ]),
2262 ],
2263 );
2264
2265 let query = teaql_core::SelectQuery {
2266 entity: String::from("Order"),
2267 projection: Vec::new(),
2268 expr_projection: Vec::new(),
2269 filter: None,
2270 having: None,
2271 order_by: Vec::new(),
2272 slice: None,
2273 trace_chain: Vec::new(),
2274 aggregates: vec![
2275 Aggregate {
2276 function: AggregateFunction::Count,
2277 field: String::from("id"),
2278 alias: String::from("count"),
2279 },
2280 Aggregate {
2281 function: AggregateFunction::Sum,
2282 field: String::from("version"),
2283 alias: String::from("versionSum"),
2284 },
2285 ],
2286 group_by: Vec::new(),
2287 relations: Vec::new(),
2288 aggregation_cache: None,
2289 comment: None,
2290 raw_sql: None,
2291 raw_sql_search_criteria: Vec::new(),
2292 dynamic_properties: Vec::new(),
2293 raw_projections: Vec::new(),
2294 object_group_bys: Vec::new(),
2295 child_enhancements: Vec::new(),
2296 };
2297
2298 let rows = repository.fetch_all(&query).unwrap();
2299
2300 assert_eq!(rows.len(), 1);
2301 assert_eq!(rows[0].get("count"), Some(&Value::U64(2)));
2302 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2303 }
2304
2305 #[test]
2306 fn memory_repository_runs_grouped_aggregates_and_extended_filters() {
2307 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2308 let repository = MemoryRepository::new(metadata).with_rows(
2309 "Order",
2310 vec![
2311 Record::from([
2312 (String::from("id"), Value::U64(1)),
2313 (String::from("version"), Value::I64(1)),
2314 (String::from("name"), Value::Text(String::from("alpha"))),
2315 ]),
2316 Record::from([
2317 (String::from("id"), Value::U64(2)),
2318 (String::from("version"), Value::I64(2)),
2319 (String::from("name"), Value::Text(String::from("alpha"))),
2320 ]),
2321 Record::from([
2322 (String::from("id"), Value::U64(3)),
2323 (String::from("version"), Value::I64(3)),
2324 (String::from("name"), Value::Text(String::from("tmp-beta"))),
2325 ]),
2326 ],
2327 );
2328
2329 let rows = repository
2330 .fetch_all(
2331 &teaql_core::SelectQuery::new("Order")
2332 .filter(
2333 Expr::between("version", 1_i64, 3_i64)
2334 .and_expr(Expr::not_like("name", "tmp%"))
2335 .and_expr(Expr::not_in_list("name", vec![Value::from("deleted")])),
2336 )
2337 .group_by("name")
2338 .count("total")
2339 .sum("version", "versionSum"),
2340 )
2341 .unwrap();
2342
2343 assert_eq!(rows.len(), 1);
2344 assert_eq!(
2345 rows[0].get("name"),
2346 Some(&Value::Text(String::from("alpha")))
2347 );
2348 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2349 assert_eq!(rows[0].get("versionSum"), Some(&Value::U64(3)));
2350 }
2351
2352 #[test]
2353 fn memory_repository_runs_extended_aggregates_and_having() {
2354 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2355 let repository = MemoryRepository::new(metadata).with_rows(
2356 "Order",
2357 vec![
2358 Record::from([
2359 (String::from("id"), Value::U64(1)),
2360 (String::from("version"), Value::I64(1)),
2361 (String::from("name"), Value::Text(String::from("alpha"))),
2362 ]),
2363 Record::from([
2364 (String::from("id"), Value::U64(2)),
2365 (String::from("version"), Value::I64(3)),
2366 (String::from("name"), Value::Text(String::from("alpha"))),
2367 ]),
2368 Record::from([
2369 (String::from("id"), Value::U64(3)),
2370 (String::from("version"), Value::I64(7)),
2371 (String::from("name"), Value::Text(String::from("beta"))),
2372 ]),
2373 ],
2374 );
2375
2376 let rows = repository
2377 .fetch_all(
2378 &teaql_core::SelectQuery::new("Order")
2379 .group_by("name")
2380 .count("total")
2381 .stddev("version", "stddevVersion")
2382 .var_pop("version", "varPopVersion")
2383 .bit_or("version", "bitOrVersion")
2384 .having(Expr::gt("total", 1_i64)),
2385 )
2386 .unwrap();
2387
2388 assert_eq!(rows.len(), 1);
2389 assert_eq!(
2390 rows[0].get("name"),
2391 Some(&Value::Text(String::from("alpha")))
2392 );
2393 assert_eq!(rows[0].get("total"), Some(&Value::U64(2)));
2394 assert_eq!(
2395 rows[0].get("stddevVersion").map(Value::to_json_value),
2396 Some(serde_json::Value::String(
2397 "1.4142135623730951454746218583".to_owned()
2398 ))
2399 );
2400 assert_eq!(
2401 rows[0].get("varPopVersion"),
2402 Some(&Value::Decimal(Decimal::ONE))
2403 );
2404 assert_eq!(rows[0].get("bitOrVersion"), Some(&Value::I64(3)));
2405 }
2406
2407 #[test]
2408 fn memory_repository_runs_sound_like_filter() {
2409 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2410 let repository = MemoryRepository::new(metadata).with_rows(
2411 "Order",
2412 vec![
2413 Record::from([
2414 (String::from("id"), Value::U64(1)),
2415 (String::from("version"), Value::I64(1)),
2416 (String::from("name"), Value::Text(String::from("Robert"))),
2417 ]),
2418 Record::from([
2419 (String::from("id"), Value::U64(2)),
2420 (String::from("version"), Value::I64(1)),
2421 (String::from("name"), Value::Text(String::from("Rupert"))),
2422 ]),
2423 Record::from([
2424 (String::from("id"), Value::U64(3)),
2425 (String::from("version"), Value::I64(1)),
2426 (String::from("name"), Value::Text(String::from("Ashcraft"))),
2427 ]),
2428 ],
2429 );
2430
2431 let rows = repository
2432 .fetch_all(
2433 &teaql_core::SelectQuery::new("Order")
2434 .filter(Expr::sound_like("name", "Robert"))
2435 .order_asc("id"),
2436 )
2437 .unwrap();
2438
2439 assert_eq!(rows.len(), 2);
2440 assert_eq!(rows[0].get("name"), Some(&Value::Text("Robert".to_owned())));
2441 assert_eq!(rows[1].get("name"), Some(&Value::Text("Rupert".to_owned())));
2442 }
2443
2444 #[test]
2445 fn memory_repository_runs_java_style_string_match_filters() {
2446 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2447 let repository = MemoryRepository::new(metadata).with_rows(
2448 "Order",
2449 vec![
2450 Record::from([
2451 (String::from("id"), Value::U64(1)),
2452 (String::from("version"), Value::I64(1)),
2453 (String::from("name"), Value::Text(String::from("tea-order"))),
2454 ]),
2455 Record::from([
2456 (String::from("id"), Value::U64(2)),
2457 (String::from("version"), Value::I64(1)),
2458 (
2459 String::from("name"),
2460 Value::Text(String::from("coffee-order")),
2461 ),
2462 ]),
2463 Record::from([
2464 (String::from("id"), Value::U64(3)),
2465 (String::from("version"), Value::I64(1)),
2466 (
2467 String::from("name"),
2468 Value::Text(String::from("tea-archived")),
2469 ),
2470 ]),
2471 ],
2472 );
2473
2474 let rows = repository
2475 .fetch_all(
2476 &teaql_core::SelectQuery::new("Order")
2477 .filter(
2478 Expr::contain("name", "tea")
2479 .and_expr(Expr::begin_with("name", "tea"))
2480 .and_expr(Expr::end_with("name", "order"))
2481 .and_expr(Expr::not_contain("name", "coffee"))
2482 .and_expr(Expr::not_begin_with("name", "archived"))
2483 .and_expr(Expr::not_end_with("name", "draft")),
2484 )
2485 .order_asc("id"),
2486 )
2487 .unwrap();
2488
2489 assert_eq!(rows.len(), 1);
2490 assert_eq!(
2491 rows[0].get("name"),
2492 Some(&Value::Text("tea-order".to_owned()))
2493 );
2494 }
2495
2496 #[test]
2497 fn memory_repository_runs_property_to_property_filters() {
2498 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2499 let repository = MemoryRepository::new(metadata).with_rows(
2500 "Order",
2501 vec![
2502 Record::from([
2503 (String::from("id"), Value::U64(1)),
2504 (String::from("version"), Value::I64(2)),
2505 (String::from("name"), Value::Text(String::from("keep"))),
2506 ]),
2507 Record::from([
2508 (String::from("id"), Value::U64(2)),
2509 (String::from("version"), Value::I64(1)),
2510 (String::from("name"), Value::Text(String::from("skip"))),
2511 ]),
2512 ],
2513 );
2514
2515 let rows = repository
2516 .fetch_all(
2517 &teaql_core::SelectQuery::new("Order")
2518 .filter(Expr::compare_columns("version", BinaryOp::Gte, "id"))
2519 .order_asc("id"),
2520 )
2521 .unwrap();
2522
2523 assert_eq!(rows.len(), 1);
2524 assert_eq!(rows[0].get("name"), Some(&Value::Text("keep".to_owned())));
2525 }
2526
2527 #[test]
2528 fn memory_repository_supports_mutations_and_optimistic_locking() {
2529 let metadata = InMemoryMetadataStore::new().with_entity(entity());
2530 let repository = MemoryRepository::new(metadata);
2531
2532 repository
2533 .insert(
2534 &InsertCommand::new("Order")
2535 .value("id", 10_u64)
2536 .value("version", 1_i64)
2537 .value("name", "draft"),
2538 )
2539 .unwrap();
2540 repository
2541 .update(
2542 &UpdateCommand::new("Order", 10_u64)
2543 .expected_version(1)
2544 .value("name", "submitted"),
2545 )
2546 .unwrap();
2547
2548 let row = repository
2549 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2550 .unwrap()
2551 .pop()
2552 .unwrap();
2553 assert_eq!(
2554 row.get("name"),
2555 Some(&Value::Text(String::from("submitted")))
2556 );
2557 assert_eq!(row.get("version"), Some(&Value::I64(2)));
2558
2559 let conflict = repository
2560 .update(
2561 &UpdateCommand::new("Order", 10_u64)
2562 .expected_version(1)
2563 .value("name", "stale"),
2564 )
2565 .unwrap_err();
2566 assert!(matches!(
2567 conflict,
2568 RepositoryError::Runtime(RuntimeError::OptimisticLockConflict { .. })
2569 ));
2570
2571 repository
2572 .delete(&DeleteCommand::new("Order", 10_u64).expected_version(2))
2573 .unwrap();
2574 let row = repository
2575 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2576 .unwrap()
2577 .pop()
2578 .unwrap();
2579 assert_eq!(row.get("version"), Some(&Value::I64(-3)));
2580
2581 repository
2582 .recover(&RecoverCommand::new("Order", 10_u64, -3))
2583 .unwrap();
2584 let row = repository
2585 .fetch_all(&teaql_core::SelectQuery::new("Order").filter(Expr::eq("id", 10_u64)))
2586 .unwrap()
2587 .pop()
2588 .unwrap();
2589 assert_eq!(row.get("version"), Some(&Value::I64(4)));
2590 }
2591
2592 #[tokio::test]
2593 async fn user_context_reports_missing_schema_provider() {
2594 let err = UserContext::new().ensure_schema().await.unwrap_err();
2595 assert!(
2596 matches!(err, RuntimeError::Schema(message) if message == "missing schema provider")
2597 );
2598 }
2599
2600 #[test]
2601 fn user_context_stores_and_exposes_user_identifier() {
2602 let mut ctx = UserContext::new();
2603 let pid = std::process::id();
2604 let thread_id_str = format!("{:?}", std::thread::current().id());
2605 let numeric_thread_id = thread_id_str
2606 .strip_prefix("ThreadId(")
2607 .and_then(|s| s.strip_suffix(")"))
2608 .unwrap_or(&thread_id_str);
2609 let os_user = std::env::var("USER")
2610 .or_else(|_| std::env::var("USERNAME"))
2611 .unwrap_or_else(|_| "main".to_owned());
2612 let expected_default = format!("{os_user}@pid-{pid}.tid-{numeric_thread_id}");
2613 assert_eq!(ctx.user_identifier(), Some(expected_default.as_str()));
2614
2615 ctx.set_user_identifier("user-123");
2616 assert_eq!(ctx.user_identifier(), Some("user-123"));
2617
2618 let ctx2 = UserContext::new().with_user_identifier("user-456");
2619 assert_eq!(ctx2.user_identifier(), Some("user-456"));
2620
2621 let mut ctx3 = UserContext::new();
2622 ctx3.set_user_identifier_option(Some("user-789".to_owned()));
2623 assert_eq!(ctx3.user_identifier(), Some("user-789"));
2624 ctx3.set_user_identifier_option(None);
2625 assert_eq!(ctx3.user_identifier(), None);
2626
2627 let ctx4 = UserContext::new().with_user_identifier_option(Some("user-abc".to_owned()));
2628 assert_eq!(ctx4.user_identifier(), Some("user-abc"));
2629
2630 }
2631}
2632
2633pub use checker::{
2634 CHECK_OBJECT_STATUS_FIELD, CheckObjectStatus, CheckResult, CheckResults, CheckRule, Checker,
2635 CheckerRegistry, InMemoryCheckerRegistry, LocationSegment, ObjectLocation, TypedChecker,
2636 TypedEntityChecker, clear_record_status, mark_record_status,
2637};