1use std::sync::Arc;
2
3use teaql_core::{
4 AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9 CheckObjectStatus, RawAuditEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10 clear_record_status, mark_record_status,
11};
12
13use super::{
14 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15 ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23 self.repository.metadata.context.repository_behavior(entity)
24 }
25
26 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27 self.repository
28 .metadata
29 .context
30 .repository_behavior(&self.entity)
31 }
32
33 pub fn entity(&self) -> &str {
34 &self.entity
35 }
36
37 pub fn select(&self) -> SelectQuery {
38 SelectQuery::new(self.entity.clone())
39 }
40
41 pub fn insert_command(&self) -> InsertCommand {
42 InsertCommand::new(self.entity.clone())
43 }
44
45 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47 policy.enforce_insert(self.repository.metadata.context, command)?;
48 }
49 Ok(())
50 }
51
52 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54 policy.enforce_update(self.repository.metadata.context, command)?;
55 }
56 Ok(())
57 }
58
59 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61 policy.enforce_delete(self.repository.metadata.context, command)?;
62 }
63 Ok(())
64 }
65
66 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68 policy.enforce_recover(self.repository.metadata.context, command)?;
69 }
70 Ok(())
71 }
72
73 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74 let mut query = query.clone();
75
76 let mut full_trace = self.trace_context.clone();
77 full_trace.extend(query.trace_chain);
78 query.trace_chain = full_trace;
79
80 if let Some(behavior) = self.query_behavior(&query.entity) {
81 behavior.before_select(self.repository.metadata.context, &mut query)?;
82 }
83 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84 policy.enforce_select(self.repository.metadata.context, &mut query)?;
85 }
86 Ok(query)
87 }
88
89 pub fn prepare_insert_command(
90 &self,
91 command: &InsertCommand,
92 ) -> Result<InsertCommand, RuntimeError> {
93 let mut command = command.clone();
94 if let Some(behavior) = self.behavior() {
95 behavior.before_insert(self.repository.metadata.context, &mut command)?;
96 }
97 self.enforce_insert_policy(&mut command)?;
98
99 let entity = self
100 .repository
101 .metadata
102 .context
103 .require_entity(&command.entity)?;
104 if let Some(id_property) = entity.id_property() {
105 let needs_id = !command.values.contains_key(&id_property.name)
106 || is_unassigned_id(command.values.get(&id_property.name));
107 if needs_id {
108 let id = self.repository.metadata.context.next_id(&command.entity)?;
109 command
110 .values
111 .insert(id_property.name.clone(), Value::U64(id));
112 }
113 }
114 ensure_initial_version(&mut command.values, entity);
115 mark_record_status(&mut command.values, CheckObjectStatus::Create);
116 let check_result = self
117 .repository
118 .metadata
119 .context
120 .check_and_fix_record(&command.entity, &mut command.values);
121 clear_record_status(&mut command.values);
122 check_result?;
123
124 Ok(command)
125 }
126
127 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
128 UpdateCommand::new(self.entity.clone(), id)
129 }
130
131 pub fn prepare_update_command(
132 &self,
133 command: &UpdateCommand,
134 ) -> Result<UpdateCommand, RuntimeError> {
135 let mut command = command.clone();
136 if let Some(behavior) = self.behavior() {
137 behavior.before_update(self.repository.metadata.context, &mut command)?;
138 }
139 self.enforce_update_policy(&mut command)?;
140
141 Ok(command)
142 }
143
144 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
145 DeleteCommand::new(self.entity.clone(), id)
146 }
147
148 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
149 RecoverCommand::new(self.entity.clone(), id, expected_version)
150 }
151
152 pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
153 let query = self
154 .prepare_select_query(query)
155 .map_err(RepositoryError::Runtime)?;
156 self.fetch_prepared_all(&query).await
157 }
158
159 pub async fn fetch_stream(
164 &self,
165 query: &SelectQuery,
166 ) -> Result<Vec<teaql_data_service::StreamChunk>, RepositoryError<E::Error>>
167 where
168 E: teaql_data_service::StreamQueryExecutor,
169 {
170 let query = self
171 .prepare_select_query(query)
172 .map_err(RepositoryError::Runtime)?;
173
174 let chunk_size = query.stream_config
175 .as_ref()
176 .map(|c| c.chunk_size)
177 .unwrap_or(1000);
178
179 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
180 let mut query = query.clone();
181 query.comment = final_comment;
182
183 let request = teaql_data_service::QueryRequest {
184 query: query.clone(),
185 trace_chain: query.trace_chain.clone(),
186 comment: query.comment.clone(),
187 };
188
189 let chunks = self
190 .repository
191 .executor
192 .query_stream(request, chunk_size)
193 .await
194 .map_err(RepositoryError::Executor)?;
195
196 let mut enhanced_chunks = Vec::with_capacity(chunks.len());
198 for mut chunk in chunks {
199 self.enhance_object_group_bys(&mut chunk.rows, &query.object_group_bys, &query.trace_chain).await?;
200 self.enhance_child_queries(&mut chunk.rows, &query.child_enhancements, &query.trace_chain).await?;
201 self.enhance_query_relations(&mut chunk.rows, &query).await?;
202 enhanced_chunks.push(chunk);
203 }
204
205 Ok(enhanced_chunks)
206 }
207
208 async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
209 let mut rows = self.fetch_prepared_query(query).await?;
210 self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
211 self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
212 self.enhance_query_relations(&mut rows, query).await?;
213 Ok(rows)
214 }
215
216 async fn fetch_prepared_query(
217 &self,
218 query: &SelectQuery,
219 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
220
221 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
222 let mut query = query.clone();
223 query.comment = final_comment;
224 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
225 if let Some(cache) = self
226 .repository
227 .metadata
228 .context
229 .get_resource::<Arc<dyn AggregationCacheBackend>>()
230 {
231 return self.fetch_prepared_query_with_cache(
232 &query,
233 options,
234 cache.as_ref(),
235 ).await;
236 }
237 if let Some(cache) = self
238 .repository
239 .metadata
240 .context
241 .get_resource::<InMemoryAggregationCache>()
242 {
243 return self.fetch_prepared_query_with_cache(&query, options, cache).await;
244 }
245 }
246 let request = teaql_data_service::QueryRequest {
247 query: query.clone(),
248 trace_chain: query.trace_chain.clone(),
249 comment: query.comment.clone(),
250 };
251 let res = self
252 .repository
253 .executor
254 .query(request)
255 .await.map_err(RepositoryError::Executor)?;
256 self.repository.metadata.context.record_metadata_log(&res.metadata);
257 Ok(res.rows)
258 }
259
260 async fn fetch_prepared_query_with_cache(
261 &self,
262 query: &SelectQuery,
263 options: AggregationCacheOptions,
264 cache: &dyn AggregationCacheBackend,
265 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
266
267 let key = aggregation_cache_key(
268 cache.namespace(),
269 &aggregation_cache_namespace(&query.entity),
270 query,
271 );
272 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
273 return Ok(rows);
274 }
275 let request = teaql_data_service::QueryRequest {
276 query: query.clone(),
277 trace_chain: query.trace_chain.clone(),
278 comment: query.comment.clone(),
279 };
280 let res = self
281 .repository
282 .executor
283 .query(request)
284 .await.map_err(RepositoryError::Executor)?;
285 self.repository.metadata.context.record_metadata_log(&res.metadata);
286 let rows = res.rows;
287 cache.put(key, rows.clone());
288 Ok(rows)
289 }
290
291 pub async fn fetch_all_with_relation_aggregates(
292 &self,
293 query: &SelectQuery,
294 relation_aggregates: &[RelationAggregate],
295 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
296 let query = self
297 .prepare_select_query(query)
298 .map_err(RepositoryError::Runtime)?;
299
300 let mut rows = self.fetch_prepared_all(&query).await?;
301 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
302 Ok(rows)
303 }
304
305 pub async fn fetch_smart_list(
306 &self,
307 query: &SelectQuery,
308 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
309 let query = self
310 .prepare_select_query(query)
311 .map_err(RepositoryError::Runtime)?;
312
313 self.repository.fetch_smart_list(&query).await
314 }
315
316 pub async fn fetch_smart_list_with_relation_aggregates(
317 &self,
318 query: &SelectQuery,
319 relation_aggregates: &[RelationAggregate],
320 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
321 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
322 .map(SmartList::from)
323 }
324
325 pub async fn fetch_entities<T>(
326 &self,
327 query: &SelectQuery,
328 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
329 where
330 T: Entity,
331 {
332 let query = self
333 .prepare_select_query(query)
334 .map_err(RepositoryError::Runtime)?;
335
336 self.repository.fetch_entities(&query).await
337 }
338
339 pub async fn fetch_entities_with_relation_aggregates<T>(
340 &self,
341 query: &SelectQuery,
342 relation_aggregates: &[RelationAggregate],
343 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
344 where
345 T: Entity,
346 {
347 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
348 .into_iter()
349 .map(|record| {
350 let mut entity = T::from_record(record)?;
351 let root = crate::EntityRoot::default();
352 entity.on_loaded(&root as &dyn std::any::Any);
353 Ok(entity)
354 })
355 .collect::<Result<Vec<_>, _>>()
356 .map(SmartList::from)
357 .map_err(RepositoryError::Entity)
358 }
359
360 pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
361 &self,
362 query: &SelectQuery,
363 relation_aggregates: &[RelationAggregate],
364 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
365 where
366 T: Entity,
367 {
368 let query = self
369 .prepare_select_query(query)
370 .map_err(RepositoryError::Runtime)?;
371
372 let mut rows = self.fetch_prepared_all(&query).await?;
373 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
374 self.enhance_relations(&mut rows).await?;
375 rows.into_iter()
376 .map(|record| {
377 let mut entity = T::from_record(record)?;
378 let root = crate::EntityRoot::default();
379 entity.on_loaded(&root as &dyn std::any::Any);
380 Ok(entity)
381 })
382 .collect::<Result<Vec<_>, _>>()
383 .map(SmartList::from)
384 .map_err(RepositoryError::Entity)
385 }
386
387 pub async fn fetch_enhanced_entities<T>(
388 &self,
389 query: &SelectQuery,
390 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
391 where
392 T: Entity,
393 {
394 let query = self
395 .prepare_select_query(query)
396 .map_err(RepositoryError::Runtime)?;
397
398 let mut rows = self.fetch_prepared_all(&query).await?;
399 self.enhance_relations(&mut rows).await?;
400 let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
401 rows.into_iter()
402 .map(|record| {
403 let mut entity = T::from_record(record)?;
404 if let Some(ref root) = root {
405 entity.on_loaded(root as &dyn std::any::Any);
406 }
407 Ok(entity)
408 })
409 .collect::<Result<Vec<_>, _>>()
410 .map(SmartList::from)
411 .map_err(RepositoryError::Entity)
412 }
413
414 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
415 let command = self
416 .prepare_insert_command(command)
417 .map_err(RepositoryError::Runtime)?;
418 self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
419 }
420
421 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
422 let command = self
423 .prepare_update_command(command)
424 .map_err(RepositoryError::Runtime)?;
425 self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
426 }
427
428 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
429 self.delete_scoped(command, self.trace_context.clone()).await
430 }
431
432 pub async fn delete_scoped(
433 &self,
434 command: &DeleteCommand,
435 trace_chain: Vec<teaql_core::TraceNode>,
436 ) -> Result<u64, RepositoryError<E::Error>> {
437 let mut command = command.clone();
438 command.trace_chain = trace_chain.clone();
439 if let Some(behavior) = self.behavior() {
440 behavior
441 .before_delete(self.repository.metadata.context, &mut command)
442 .map_err(RepositoryError::Runtime)?;
443 }
444 self.enforce_delete_policy(&mut command)
445 .map_err(RepositoryError::Runtime)?;
446
447 let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
448 let affected = self.repository.delete(&command).await?;
449
450 let mut event = RawAuditEvent::deleted_with_old_values(
451 command.entity,
452 command.id,
453 command.expected_version,
454 old_values,
455 );
456 event.trace_chain = trace_chain;
457 self.emit_event(event)
458 .map_err(RepositoryError::Runtime)?;
459 Ok(affected)
460 }
461
462 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
463 let mut command = command.clone();
464 command.trace_chain = self.trace_context.clone();
465 if let Some(behavior) = self.behavior() {
466 behavior
467 .before_recover(self.repository.metadata.context, &mut command)
468 .map_err(RepositoryError::Runtime)?;
469 }
470 self.enforce_recover_policy(&mut command)
471 .map_err(RepositoryError::Runtime)?;
472 let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
473 let affected = self.repository.recover(&command).await?;
474 let event = RawAuditEvent::recovered_with_old_values(
475 command.entity,
476 command.id,
477 command.expected_version,
478 old_values,
479 );
480 self.emit_event(event)
481 .map_err(RepositoryError::Runtime)?;
482 Ok(affected)
483 }
484
485 fn emit_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
486 self.repository.metadata.context.send_event(event)
487 }
488
489 #[allow(dead_code)]
490 pub(super) async fn execute_prepared_insert(
491 &self,
492 command: InsertCommand,
493 ) -> Result<u64, RepositoryError<E::Error>> {
494 self.execute_prepared_insert_with_comment(command, Vec::new()).await
495 }
496
497 pub(super) async fn execute_prepared_insert_with_comment(
498 &self,
499 mut command: InsertCommand,
500 trace_chain: Vec<teaql_core::TraceNode>,
501 ) -> Result<u64, RepositoryError<E::Error>> {
502 command.trace_chain = trace_chain.clone();
503 let affected = self.repository.insert(&command).await?;
504 let mut event = RawAuditEvent::created(command.entity, command.values);
505 event.trace_chain = trace_chain;
506 self.emit_event(event).map_err(RepositoryError::Runtime)?;
507 Ok(affected)
508 }
509
510 pub(super) async fn execute_prepared_batch_insert(
511 &self,
512 command: teaql_core::BatchInsertCommand,
513 ) -> Result<u64, RepositoryError<E::Error>> {
514 if command.batch_values.is_empty() {
515 return Ok(0);
516 }
517 let affected = self.repository.batch_insert(&command).await?;
518
519 let entity = command.entity.clone();
520 for (i, values) in command.batch_values.into_iter().enumerate() {
521 let mut event = RawAuditEvent::created(entity.clone(), values);
522 if i < command.trace_chains.len() {
523 event.trace_chain = command.trace_chains[i].clone();
524 }
525 self.emit_event(event).map_err(RepositoryError::Runtime)?;
526 }
527 Ok(affected)
528 }
529
530 #[allow(dead_code)]
531 pub(super) async fn execute_prepared_update(
532 &self,
533 command: UpdateCommand,
534 ) -> Result<u64, RepositoryError<E::Error>> {
535 self.execute_prepared_update_with_comment(command, Vec::new()).await
536 }
537
538 pub(super) async fn execute_prepared_update_with_comment(
539 &self,
540 mut command: UpdateCommand,
541 trace_chain: Vec<teaql_core::TraceNode>,
542 ) -> Result<u64, RepositoryError<E::Error>> {
543 command.trace_chain = trace_chain.clone();
544
545 let mut old_values = command.old_values.clone();
546 let needs_fetch = match &old_values {
547 Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
548 None => true,
549 };
550 if needs_fetch {
551 old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
552 }
553
554 let affected = self.repository.update(&command).await?;
555 let updated_fields = command.values.keys().cloned().collect();
556 let mut values = command.values.clone();
557 values.insert("id".to_owned(), command.id.clone());
558 if let Some(version) = command.expected_version {
559 values.insert("version".to_owned(), Value::I64(version + 1));
560 }
561 let mut new_values = old_values.clone().unwrap_or_default();
562 for (field, value) in &values {
563 new_values.insert(field.clone(), value.clone());
564 }
565 let mut event = RawAuditEvent::updated_with_old_values(
566 command.entity,
567 values,
568 old_values,
569 new_values,
570 updated_fields,
571 );
572 event.trace_chain = trace_chain;
573 self.emit_event(event).map_err(RepositoryError::Runtime)?;
574 Ok(affected)
575 }
576
577 pub(super) async fn execute_prepared_batch_update(
578 &self,
579 command: teaql_core::BatchUpdateCommand,
580 ) -> Result<u64, RepositoryError<E::Error>> {
581 if command.batch_values.is_empty() {
582 return Ok(0);
583 }
584 let affected = self.repository.batch_update(&command).await?;
585
586 let entity = command.entity.clone();
587 for (i, values) in command.batch_values.into_iter().enumerate() {
588 let mut full_values = values.clone();
589 full_values.insert("id".to_owned(), command.batch_ids[i].clone());
590 if let Some(Some(version)) = command.batch_expected_versions.get(i) {
591 full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
592 }
593
594 let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
595 let mut new_values = old_values.clone().unwrap_or_default();
596 for (field, value) in &full_values {
597 new_values.insert(field.clone(), value.clone());
598 }
599
600 let mut event = RawAuditEvent::updated_with_old_values(
601 entity.clone(),
602 full_values,
603 old_values,
604 new_values,
605 command.update_fields.clone(),
606 );
607 if i < command.trace_chains.len() {
608 event.trace_chain = command.trace_chains[i].clone();
609 }
610 self.emit_event(event).map_err(RepositoryError::Runtime)?;
611 }
612 Ok(affected)
613 }
614
615 fn fetch_current_event_row(
616 &self,
617 _entity: &str,
618 _id: &Value,
619 _trace_chain: Vec<teaql_core::TraceNode>,
620 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
621 Ok(None)
624 }
625
626
627 pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
628 ResolvedRepository {
629 entity,
630 repository: ContextRepository {
631 metadata: UserContextMetadata {
632 context: self.repository.metadata.context,
633 },
634 executor: self.repository.executor,
635 },
636 trace_context: Vec::new(),
637 }
638 }
639}