1use std::sync::Arc;
2
3use teaql_core::{
4 AggregationCacheOptions, DeleteCommand, Entity, InsertCommand, Record, RecoverCommand,
5 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
6};
7
8use crate::{
9 CheckObjectStatus, RawAuditEvent, RepositoryBehavior, RepositoryError, RuntimeError,
10 clear_record_status, mark_record_status,
11};
12
13use super::{
14 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15 ResolvedRepository, UserContextMetadata, helpers::*,
16};
17
18impl<'a, E> ResolvedRepository<'a, E>
19where
20 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
21{
22 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
23 self.repository.metadata.context.repository_behavior(entity)
24 }
25
26 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
27 self.repository
28 .metadata
29 .context
30 .repository_behavior(&self.entity)
31 }
32
33 pub fn entity(&self) -> &str {
34 &self.entity
35 }
36
37 pub fn select(&self) -> SelectQuery {
38 SelectQuery::new(self.entity.clone())
39 }
40
41 pub fn insert_command(&self) -> InsertCommand {
42 InsertCommand::new(self.entity.clone())
43 }
44
45 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
46 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
47 policy.enforce_insert(self.repository.metadata.context, command)?;
48 }
49 Ok(())
50 }
51
52 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
53 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
54 policy.enforce_update(self.repository.metadata.context, command)?;
55 }
56 Ok(())
57 }
58
59 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
60 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
61 policy.enforce_delete(self.repository.metadata.context, command)?;
62 }
63 Ok(())
64 }
65
66 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
67 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
68 policy.enforce_recover(self.repository.metadata.context, command)?;
69 }
70 Ok(())
71 }
72
73 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
74 let mut query = query.clone();
75
76 let mut full_trace = self.trace_context.clone();
77 full_trace.extend(query.trace_chain);
78 query.trace_chain = full_trace;
79
80 if let Some(behavior) = self.query_behavior(&query.entity) {
81 behavior.before_select(self.repository.metadata.context, &mut query)?;
82 }
83 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
84 policy.enforce_select(self.repository.metadata.context, &mut query)?;
85 }
86 Ok(query)
87 }
88
89 pub fn prepare_insert_command(
90 &self,
91 command: &InsertCommand,
92 ) -> Result<InsertCommand, RuntimeError> {
93 let mut command = command.clone();
94 if let Some(behavior) = self.behavior() {
95 behavior.before_insert(self.repository.metadata.context, &mut command)?;
96 }
97 self.enforce_insert_policy(&mut command)?;
98
99 let entity = self
100 .repository
101 .metadata
102 .context
103 .require_entity(&command.entity)?;
104 if let Some(id_property) = entity.id_property() {
105 let needs_id = !command.values.contains_key(&id_property.name)
106 || is_unassigned_id(command.values.get(&id_property.name));
107 if needs_id {
108 let id = self.repository.metadata.context.next_id(&command.entity)?;
109 command
110 .values
111 .insert(id_property.name.clone(), Value::U64(id));
112 }
113 }
114 ensure_initial_version(&mut command.values, entity);
115 mark_record_status(&mut command.values, CheckObjectStatus::Create);
116 let check_result = self
117 .repository
118 .metadata
119 .context
120 .check_and_fix_record(&command.entity, &mut command.values);
121 clear_record_status(&mut command.values);
122 check_result?;
123
124 Ok(command)
125 }
126
127 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
128 UpdateCommand::new(self.entity.clone(), id)
129 }
130
131 pub fn prepare_update_command(
132 &self,
133 command: &UpdateCommand,
134 ) -> Result<UpdateCommand, RuntimeError> {
135 let mut command = command.clone();
136 if let Some(behavior) = self.behavior() {
137 behavior.before_update(self.repository.metadata.context, &mut command)?;
138 }
139 self.enforce_update_policy(&mut command)?;
140
141 Ok(command)
142 }
143
144 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
145 DeleteCommand::new(self.entity.clone(), id)
146 }
147
148 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
149 RecoverCommand::new(self.entity.clone(), id, expected_version)
150 }
151
152 pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
153 let query = self
154 .prepare_select_query(query)
155 .map_err(RepositoryError::Runtime)?;
156 self.fetch_prepared_all(&query).await
157 }
158
159 async fn fetch_prepared_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
160 let mut rows = self.fetch_prepared_query(query).await?;
161 self.enhance_object_group_bys(&mut rows, &query.object_group_bys, &query.trace_chain).await?;
162 self.enhance_child_queries(&mut rows, &query.child_enhancements, &query.trace_chain).await?;
163 self.enhance_query_relations(&mut rows, query).await?;
164 Ok(rows)
165 }
166
167 async fn fetch_prepared_query(
168 &self,
169 query: &SelectQuery,
170 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
171
172 let final_comment = self.repository.resolve_final_comment(&query.trace_chain, query.comment.clone());
173 let mut query = query.clone();
174 query.comment = final_comment;
175 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
176 if let Some(cache) = self
177 .repository
178 .metadata
179 .context
180 .get_resource::<Arc<dyn AggregationCacheBackend>>()
181 {
182 return self.fetch_prepared_query_with_cache(
183 &query,
184 options,
185 cache.as_ref(),
186 ).await;
187 }
188 if let Some(cache) = self
189 .repository
190 .metadata
191 .context
192 .get_resource::<InMemoryAggregationCache>()
193 {
194 return self.fetch_prepared_query_with_cache(&query, options, cache).await;
195 }
196 }
197 let request = teaql_data_service::QueryRequest {
198 query: query.clone(),
199 trace_chain: query.trace_chain.clone(),
200 comment: query.comment.clone(),
201 };
202 let res = self
203 .repository
204 .executor
205 .query(request)
206 .await.map_err(RepositoryError::Executor)?;
207 self.repository.metadata.context.record_metadata_log(&res.metadata);
208 Ok(res.rows)
209 }
210
211 async fn fetch_prepared_query_with_cache(
212 &self,
213 query: &SelectQuery,
214 options: AggregationCacheOptions,
215 cache: &dyn AggregationCacheBackend,
216 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
217
218 let key = aggregation_cache_key(
219 cache.namespace(),
220 &aggregation_cache_namespace(&query.entity),
221 query,
222 );
223 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
224 return Ok(rows);
225 }
226 let request = teaql_data_service::QueryRequest {
227 query: query.clone(),
228 trace_chain: query.trace_chain.clone(),
229 comment: query.comment.clone(),
230 };
231 let res = self
232 .repository
233 .executor
234 .query(request)
235 .await.map_err(RepositoryError::Executor)?;
236 self.repository.metadata.context.record_metadata_log(&res.metadata);
237 let rows = res.rows;
238 cache.put(key, rows.clone());
239 Ok(rows)
240 }
241
242 pub async fn fetch_all_with_relation_aggregates(
243 &self,
244 query: &SelectQuery,
245 relation_aggregates: &[RelationAggregate],
246 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
247 let query = self
248 .prepare_select_query(query)
249 .map_err(RepositoryError::Runtime)?;
250
251 let mut rows = self.fetch_prepared_all(&query).await?;
252 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
253 Ok(rows)
254 }
255
256 pub async fn fetch_smart_list(
257 &self,
258 query: &SelectQuery,
259 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
260 let query = self
261 .prepare_select_query(query)
262 .map_err(RepositoryError::Runtime)?;
263
264 self.repository.fetch_smart_list(&query).await
265 }
266
267 pub async fn fetch_smart_list_with_relation_aggregates(
268 &self,
269 query: &SelectQuery,
270 relation_aggregates: &[RelationAggregate],
271 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
272 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await
273 .map(SmartList::from)
274 }
275
276 pub async fn fetch_entities<T>(
277 &self,
278 query: &SelectQuery,
279 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
280 where
281 T: Entity,
282 {
283 let query = self
284 .prepare_select_query(query)
285 .map_err(RepositoryError::Runtime)?;
286
287 self.repository.fetch_entities(&query).await
288 }
289
290 pub async fn fetch_entities_with_relation_aggregates<T>(
291 &self,
292 query: &SelectQuery,
293 relation_aggregates: &[RelationAggregate],
294 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
295 where
296 T: Entity,
297 {
298 self.fetch_all_with_relation_aggregates(query, relation_aggregates).await?
299 .into_iter()
300 .map(|record| {
301 let mut entity = T::from_record(record)?;
302 let root = crate::EntityRoot::default();
303 entity.on_loaded(&root as &dyn std::any::Any);
304 Ok(entity)
305 })
306 .collect::<Result<Vec<_>, _>>()
307 .map(SmartList::from)
308 .map_err(RepositoryError::Entity)
309 }
310
311 pub async fn fetch_enhanced_entities_with_relation_aggregates<T>(
312 &self,
313 query: &SelectQuery,
314 relation_aggregates: &[RelationAggregate],
315 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
316 where
317 T: Entity,
318 {
319 let query = self
320 .prepare_select_query(query)
321 .map_err(RepositoryError::Runtime)?;
322
323 let mut rows = self.fetch_prepared_all(&query).await?;
324 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache, &query.trace_chain).await?;
325 self.enhance_relations(&mut rows).await?;
326 rows.into_iter()
327 .map(|record| {
328 let mut entity = T::from_record(record)?;
329 let root = crate::EntityRoot::default();
330 entity.on_loaded(&root as &dyn std::any::Any);
331 Ok(entity)
332 })
333 .collect::<Result<Vec<_>, _>>()
334 .map(SmartList::from)
335 .map_err(RepositoryError::Entity)
336 }
337
338 pub async fn fetch_enhanced_entities<T>(
339 &self,
340 query: &SelectQuery,
341 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
342 where
343 T: Entity,
344 {
345 let query = self
346 .prepare_select_query(query)
347 .map_err(RepositoryError::Runtime)?;
348
349 let mut rows = self.fetch_prepared_all(&query).await?;
350 self.enhance_relations(&mut rows).await?;
351 let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
352 rows.into_iter()
353 .map(|record| {
354 let mut entity = T::from_record(record)?;
355 if let Some(ref root) = root {
356 entity.on_loaded(root as &dyn std::any::Any);
357 }
358 Ok(entity)
359 })
360 .collect::<Result<Vec<_>, _>>()
361 .map(SmartList::from)
362 .map_err(RepositoryError::Entity)
363 }
364
365 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
366 let command = self
367 .prepare_insert_command(command)
368 .map_err(RepositoryError::Runtime)?;
369 self.execute_prepared_insert_with_comment(command, self.trace_context.clone()).await
370 }
371
372 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
373 let command = self
374 .prepare_update_command(command)
375 .map_err(RepositoryError::Runtime)?;
376 self.execute_prepared_update_with_comment(command, self.trace_context.clone()).await
377 }
378
379 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
380 self.delete_scoped(command, self.trace_context.clone()).await
381 }
382
383 pub async fn delete_scoped(
384 &self,
385 command: &DeleteCommand,
386 trace_chain: Vec<teaql_core::TraceNode>,
387 ) -> Result<u64, RepositoryError<E::Error>> {
388 let mut command = command.clone();
389 command.trace_chain = trace_chain.clone();
390 if let Some(behavior) = self.behavior() {
391 behavior
392 .before_delete(self.repository.metadata.context, &mut command)
393 .map_err(RepositoryError::Runtime)?;
394 }
395 self.enforce_delete_policy(&mut command)
396 .map_err(RepositoryError::Runtime)?;
397
398 let old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
399 let affected = self.repository.delete(&command).await?;
400
401 let mut event = RawAuditEvent::deleted_with_old_values(
402 command.entity,
403 command.id,
404 command.expected_version,
405 old_values,
406 );
407 event.trace_chain = trace_chain;
408 self.emit_event(event)
409 .map_err(RepositoryError::Runtime)?;
410 Ok(affected)
411 }
412
413 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
414 let mut command = command.clone();
415 command.trace_chain = self.trace_context.clone();
416 if let Some(behavior) = self.behavior() {
417 behavior
418 .before_recover(self.repository.metadata.context, &mut command)
419 .map_err(RepositoryError::Runtime)?;
420 }
421 self.enforce_recover_policy(&mut command)
422 .map_err(RepositoryError::Runtime)?;
423 let old_values = self.fetch_current_event_row(&command.entity, &command.id, command.trace_chain.clone())?;
424 let affected = self.repository.recover(&command).await?;
425 let event = RawAuditEvent::recovered_with_old_values(
426 command.entity,
427 command.id,
428 command.expected_version,
429 old_values,
430 );
431 self.emit_event(event)
432 .map_err(RepositoryError::Runtime)?;
433 Ok(affected)
434 }
435
436 fn emit_event(&self, event: RawAuditEvent) -> Result<(), RuntimeError> {
437 self.repository.metadata.context.send_event(event)
438 }
439
440 #[allow(dead_code)]
441 pub(super) async fn execute_prepared_insert(
442 &self,
443 command: InsertCommand,
444 ) -> Result<u64, RepositoryError<E::Error>> {
445 self.execute_prepared_insert_with_comment(command, Vec::new()).await
446 }
447
448 pub(super) async fn execute_prepared_insert_with_comment(
449 &self,
450 mut command: InsertCommand,
451 trace_chain: Vec<teaql_core::TraceNode>,
452 ) -> Result<u64, RepositoryError<E::Error>> {
453 command.trace_chain = trace_chain.clone();
454 let affected = self.repository.insert(&command).await?;
455 let mut event = RawAuditEvent::created(command.entity, command.values);
456 event.trace_chain = trace_chain;
457 self.emit_event(event).map_err(RepositoryError::Runtime)?;
458 Ok(affected)
459 }
460
461 pub(super) async fn execute_prepared_batch_insert(
462 &self,
463 command: teaql_core::BatchInsertCommand,
464 ) -> Result<u64, RepositoryError<E::Error>> {
465 if command.batch_values.is_empty() {
466 return Ok(0);
467 }
468 let affected = self.repository.batch_insert(&command).await?;
469
470 let entity = command.entity.clone();
471 for (i, values) in command.batch_values.into_iter().enumerate() {
472 let mut event = RawAuditEvent::created(entity.clone(), values);
473 if i < command.trace_chains.len() {
474 event.trace_chain = command.trace_chains[i].clone();
475 }
476 self.emit_event(event).map_err(RepositoryError::Runtime)?;
477 }
478 Ok(affected)
479 }
480
481 #[allow(dead_code)]
482 pub(super) async fn execute_prepared_update(
483 &self,
484 command: UpdateCommand,
485 ) -> Result<u64, RepositoryError<E::Error>> {
486 self.execute_prepared_update_with_comment(command, Vec::new()).await
487 }
488
489 pub(super) async fn execute_prepared_update_with_comment(
490 &self,
491 mut command: UpdateCommand,
492 trace_chain: Vec<teaql_core::TraceNode>,
493 ) -> Result<u64, RepositoryError<E::Error>> {
494 command.trace_chain = trace_chain.clone();
495
496 let mut old_values = command.old_values.clone();
497 let needs_fetch = match &old_values {
498 Some(snapshot) => !command.values.keys().all(|k| snapshot.contains_key(k)),
499 None => true,
500 };
501 if needs_fetch {
502 old_values = self.fetch_current_event_row(&command.entity, &command.id, trace_chain.clone())?;
503 }
504
505 let affected = self.repository.update(&command).await?;
506 let updated_fields = command.values.keys().cloned().collect();
507 let mut values = command.values.clone();
508 values.insert("id".to_owned(), command.id.clone());
509 if let Some(version) = command.expected_version {
510 values.insert("version".to_owned(), Value::I64(version + 1));
511 }
512 let mut new_values = old_values.clone().unwrap_or_default();
513 for (field, value) in &values {
514 new_values.insert(field.clone(), value.clone());
515 }
516 let mut event = RawAuditEvent::updated_with_old_values(
517 command.entity,
518 values,
519 old_values,
520 new_values,
521 updated_fields,
522 );
523 event.trace_chain = trace_chain;
524 self.emit_event(event).map_err(RepositoryError::Runtime)?;
525 Ok(affected)
526 }
527
528 pub(super) async fn execute_prepared_batch_update(
529 &self,
530 command: teaql_core::BatchUpdateCommand,
531 ) -> Result<u64, RepositoryError<E::Error>> {
532 if command.batch_values.is_empty() {
533 return Ok(0);
534 }
535 let affected = self.repository.batch_update(&command).await?;
536
537 let entity = command.entity.clone();
538 for (i, values) in command.batch_values.into_iter().enumerate() {
539 let mut full_values = values.clone();
540 full_values.insert("id".to_owned(), command.batch_ids[i].clone());
541 if let Some(Some(version)) = command.batch_expected_versions.get(i) {
542 full_values.insert("version".to_owned(), teaql_core::Value::I64(*version + 1));
543 }
544
545 let old_values = command.batch_old_values.get(i).cloned().unwrap_or(None);
546 let mut new_values = old_values.clone().unwrap_or_default();
547 for (field, value) in &full_values {
548 new_values.insert(field.clone(), value.clone());
549 }
550
551 let mut event = RawAuditEvent::updated_with_old_values(
552 entity.clone(),
553 full_values,
554 old_values,
555 new_values,
556 command.update_fields.clone(),
557 );
558 if i < command.trace_chains.len() {
559 event.trace_chain = command.trace_chains[i].clone();
560 }
561 self.emit_event(event).map_err(RepositoryError::Runtime)?;
562 }
563 Ok(affected)
564 }
565
566 fn fetch_current_event_row(
567 &self,
568 _entity: &str,
569 _id: &Value,
570 _trace_chain: Vec<teaql_core::TraceNode>,
571 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
572 Ok(None)
575 }
576
577
578 pub fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, E> {
579 ResolvedRepository {
580 entity,
581 repository: ContextRepository {
582 metadata: UserContextMetadata {
583 context: self.repository.metadata.context,
584 },
585 executor: self.repository.executor,
586 },
587 trace_context: Vec::new(),
588 }
589 }
590}