1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5 AggregationCacheOptions, DeleteCommand, Entity, Expr, InsertCommand, Record, RecoverCommand,
6 RelationAggregate, SelectQuery, SmartList, UpdateCommand, Value,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11 CheckObjectStatus, EntityEvent, RepositoryBehavior, RepositoryError, RuntimeError,
12 SqlLogOperation, clear_record_status, mark_record_status,
13};
14
15use super::{
16 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17 ResolvedRepository, UserContextMetadata, helpers::*,
18};
19
20impl<'a, D, E> ResolvedRepository<'a, D, E>
21where
22 D: SqlDialect,
23 E: QueryExecutor,
24{
25 pub(super) fn query_behavior(&self, entity: &str) -> Option<Arc<dyn RepositoryBehavior>> {
26 self.repository.metadata.context.repository_behavior(entity)
27 }
28
29 pub(super) fn behavior(&self) -> Option<Arc<dyn RepositoryBehavior>> {
30 self.repository
31 .metadata
32 .context
33 .repository_behavior(&self.entity)
34 }
35
36 pub fn entity(&self) -> &str {
37 &self.entity
38 }
39
40 pub fn select(&self) -> SelectQuery {
41 SelectQuery::new(self.entity.clone())
42 }
43
44 pub fn insert_command(&self) -> InsertCommand {
45 InsertCommand::new(self.entity.clone())
46 }
47
48 fn enforce_insert_policy(&self, command: &mut InsertCommand) -> Result<(), RuntimeError> {
49 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
50 policy.enforce_insert(self.repository.metadata.context, command)?;
51 }
52 Ok(())
53 }
54
55 fn enforce_update_policy(&self, command: &mut UpdateCommand) -> Result<(), RuntimeError> {
56 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
57 policy.enforce_update(self.repository.metadata.context, command)?;
58 }
59 Ok(())
60 }
61
62 fn enforce_delete_policy(&self, command: &mut DeleteCommand) -> Result<(), RuntimeError> {
63 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
64 policy.enforce_delete(self.repository.metadata.context, command)?;
65 }
66 Ok(())
67 }
68
69 fn enforce_recover_policy(&self, command: &mut RecoverCommand) -> Result<(), RuntimeError> {
70 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
71 policy.enforce_recover(self.repository.metadata.context, command)?;
72 }
73 Ok(())
74 }
75
76 fn prepare_select_query(&self, query: &SelectQuery) -> Result<SelectQuery, RuntimeError> {
77 let mut query = query.clone();
78 if let Some(behavior) = self.query_behavior(&query.entity) {
79 behavior.before_select(self.repository.metadata.context, &mut query)?;
80 }
81 if let Some(policy) = self.repository.metadata.context.request_policy.as_ref() {
82 policy.enforce_select(self.repository.metadata.context, &mut query)?;
83 }
84 Ok(query)
85 }
86
87 pub fn prepare_insert_command(
88 &self,
89 command: &InsertCommand,
90 ) -> Result<InsertCommand, RuntimeError> {
91 let mut command = command.clone();
92 if let Some(behavior) = self.behavior() {
93 behavior.before_insert(self.repository.metadata.context, &mut command)?;
94 }
95 self.enforce_insert_policy(&mut command)?;
96
97 let entity = self
98 .repository
99 .metadata
100 .context
101 .require_entity(&command.entity)?;
102 if let Some(id_property) = entity.id_property() {
103 let needs_id = !command.values.contains_key(&id_property.name)
104 || is_unassigned_id(command.values.get(&id_property.name));
105 if needs_id {
106 let id = self.repository.metadata.context.next_id(&command.entity)?;
107 command
108 .values
109 .insert(id_property.name.clone(), Value::U64(id));
110 }
111 }
112 ensure_initial_version(&mut command.values, entity);
113 mark_record_status(&mut command.values, CheckObjectStatus::Create);
114 let check_result = self
115 .repository
116 .metadata
117 .context
118 .check_and_fix_record(&command.entity, &mut command.values);
119 clear_record_status(&mut command.values);
120 check_result?;
121
122 Ok(command)
123 }
124
125 pub fn update_command(&self, id: impl Into<Value>) -> UpdateCommand {
126 UpdateCommand::new(self.entity.clone(), id)
127 }
128
129 pub fn prepare_update_command(
130 &self,
131 command: &UpdateCommand,
132 ) -> Result<UpdateCommand, RuntimeError> {
133 let mut command = command.clone();
134 if let Some(behavior) = self.behavior() {
135 behavior.before_update(self.repository.metadata.context, &mut command)?;
136 }
137 self.enforce_update_policy(&mut command)?;
138 mark_record_status(&mut command.values, CheckObjectStatus::Update);
139 let check_result = self
140 .repository
141 .metadata
142 .context
143 .check_and_fix_record(&command.entity, &mut command.values);
144 clear_record_status(&mut command.values);
145 check_result?;
146 Ok(command)
147 }
148
149 pub fn delete_command(&self, id: impl Into<Value>) -> DeleteCommand {
150 DeleteCommand::new(self.entity.clone(), id)
151 }
152
153 pub fn recover_command(&self, id: impl Into<Value>, expected_version: i64) -> RecoverCommand {
154 RecoverCommand::new(self.entity.clone(), id, expected_version)
155 }
156
157 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
158 let query = self.prepare_select_query(query)?;
159 self.repository.compile(&query)
160 }
161
162 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
163 let query = self
164 .prepare_select_query(query)
165 .map_err(RepositoryError::Runtime)?;
166 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
167 let mut rows = self.fetch_prepared_query(&query)?;
168 self.enhance_object_group_bys(&mut rows, &query.object_group_bys)?;
169 self.enhance_child_queries(&mut rows, &query.child_enhancements)?;
170 Ok(rows)
171 }
172
173 fn fetch_prepared_query(
174 &self,
175 query: &SelectQuery,
176 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
177 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
178 let mut compiled = self
179 .repository
180 .compile(query)
181 .map_err(RepositoryError::Runtime)?;
182 compiled.comment = self.repository.resolve_final_comment(query.comment.clone());
183 if let Some(options) = query.aggregation_cache.filter(|options| options.enabled) {
184 if let Some(cache) = self
185 .repository
186 .metadata
187 .context
188 .get_resource::<Arc<dyn AggregationCacheBackend>>()
189 {
190 return self.fetch_prepared_query_with_cache(
191 query,
192 &compiled,
193 options,
194 cache.as_ref(),
195 );
196 }
197 if let Some(cache) = self
198 .repository
199 .metadata
200 .context
201 .get_resource::<InMemoryAggregationCache>()
202 {
203 return self.fetch_prepared_query_with_cache(query, &compiled, options, cache);
204 }
205 }
206 let started_at = SystemTime::now();
207 let started = Instant::now();
208 let rows = self
209 .repository
210 .executor
211 .fetch_all(&compiled)
212 .map_err(RepositoryError::Executor)?;
213 self.repository.log_sql_result(
214 SqlLogOperation::Select,
215 &compiled,
216 started_at,
217 started,
218 Some(rows.len()),
219 Some(query.entity.clone()),
220 None,
221 query.comment.clone(),
222 );
223 Ok(rows)
224 }
225
226 fn fetch_prepared_query_with_cache(
227 &self,
228 query: &SelectQuery,
229 compiled: &CompiledQuery,
230 options: AggregationCacheOptions,
231 cache: &dyn AggregationCacheBackend,
232 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
233 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
234 let key = aggregation_cache_key(
235 cache.namespace(),
236 &aggregation_cache_namespace(&query.entity),
237 compiled,
238 );
239 if let Some(rows) = cache.get(&key, options.cache_expired_millis) {
240 return Ok(rows);
241 }
242 let started_at = SystemTime::now();
243 let started = Instant::now();
244 let rows = self
245 .repository
246 .executor
247 .fetch_all(compiled)
248 .map_err(RepositoryError::Executor)?;
249 self.repository.log_sql_result(
250 SqlLogOperation::Select,
251 compiled,
252 started_at,
253 started,
254 Some(rows.len()),
255 Some(query.entity.clone()),
256 None,
257 query.comment.clone(),
258 );
259 cache.put(key, rows.clone());
260 Ok(rows)
261 }
262
263 pub fn fetch_all_with_relation_aggregates(
264 &self,
265 query: &SelectQuery,
266 relation_aggregates: &[RelationAggregate],
267 ) -> Result<Vec<Record>, RepositoryError<E::Error>> {
268 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
269 let mut rows = self.fetch_all(query)?;
270 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
271 Ok(rows)
272 }
273
274 pub fn fetch_smart_list(
275 &self,
276 query: &SelectQuery,
277 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
278 let query = self
279 .prepare_select_query(query)
280 .map_err(RepositoryError::Runtime)?;
281 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
282 self.repository.fetch_smart_list(&query)
283 }
284
285 pub fn fetch_smart_list_with_relation_aggregates(
286 &self,
287 query: &SelectQuery,
288 relation_aggregates: &[RelationAggregate],
289 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
290 self.fetch_all_with_relation_aggregates(query, relation_aggregates)
291 .map(SmartList::from)
292 }
293
294 pub fn fetch_entities<T>(
295 &self,
296 query: &SelectQuery,
297 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
298 where
299 T: Entity,
300 {
301 let query = self
302 .prepare_select_query(query)
303 .map_err(RepositoryError::Runtime)?;
304 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
305 self.repository.fetch_entities(&query)
306 }
307
308 pub fn fetch_entities_with_relation_aggregates<T>(
309 &self,
310 query: &SelectQuery,
311 relation_aggregates: &[RelationAggregate],
312 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
313 where
314 T: Entity,
315 {
316 self.fetch_all_with_relation_aggregates(query, relation_aggregates)?
317 .into_iter()
318 .map(|record| {
319 let mut entity = T::from_record(record)?;
320 let root = crate::EntityRoot::default();
321 entity.on_loaded(&root as &dyn std::any::Any);
322 Ok(entity)
323 })
324 .collect::<Result<Vec<_>, _>>()
325 .map(SmartList::from)
326 .map_err(RepositoryError::Entity)
327 }
328
329 pub fn fetch_enhanced_entities_with_relation_aggregates<T>(
330 &self,
331 query: &SelectQuery,
332 relation_aggregates: &[RelationAggregate],
333 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
334 where
335 T: Entity,
336 {
337 let query = self
338 .prepare_select_query(query)
339 .map_err(RepositoryError::Runtime)?;
340
341 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
342
343 let mut rows = self.repository.fetch_all(&query)?;
344 self.enhance_relation_aggregates(&mut rows, relation_aggregates, query.aggregation_cache)?;
345 self.enhance_query_relations(&mut rows, &query)?;
346 self.enhance_relations(&mut rows)?;
347 rows.into_iter()
348 .map(|record| {
349 let mut entity = T::from_record(record)?;
350 let root = crate::EntityRoot::default();
351 entity.on_loaded(&root as &dyn std::any::Any);
352 Ok(entity)
353 })
354 .collect::<Result<Vec<_>, _>>()
355 .map(SmartList::from)
356 .map_err(RepositoryError::Entity)
357 }
358
359 pub fn fetch_enhanced_entities<T>(
360 &self,
361 query: &SelectQuery,
362 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
363 where
364 T: Entity,
365 {
366 let query = self
367 .prepare_select_query(query)
368 .map_err(RepositoryError::Runtime)?;
369 let _guard = crate::context::QueryCommentGuard::new(self.repository.metadata.context, query.comment.clone());
370
371 let mut rows = self.repository.fetch_all(&query)?;
372 self.enhance_query_relations(&mut rows, &query)?;
373 self.enhance_relations(&mut rows)?;
374 let root = self.repository.metadata.context.get_resource::<crate::EntityRoot>().cloned();
375
376 rows.into_iter()
377 .map(|record| {
378 let mut entity = T::from_record(record)?;
379 if let Some(ref root) = root {
380 entity.on_loaded(root as &dyn std::any::Any);
381 }
382 Ok(entity)
383 })
384 .collect::<Result<Vec<_>, _>>()
385 .map(SmartList::from)
386 .map_err(RepositoryError::Entity)
387 }
388
389 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
390 let command = self
391 .prepare_insert_command(command)
392 .map_err(RepositoryError::Runtime)?;
393 self.execute_prepared_insert(command)
394 }
395
396 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
397 let command = self
398 .prepare_update_command(command)
399 .map_err(RepositoryError::Runtime)?;
400 self.execute_prepared_update(command)
401 }
402
403 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
404 self.delete_scoped(command, None)
405 }
406
407 pub fn delete_scoped(
408 &self,
409 command: &DeleteCommand,
410 comment_lineage: Option<String>,
411 ) -> Result<u64, RepositoryError<E::Error>> {
412 let mut command = command.clone();
413 if let Some(behavior) = self.behavior() {
414 behavior
415 .before_delete(self.repository.metadata.context, &mut command)
416 .map_err(RepositoryError::Runtime)?;
417 }
418 self.enforce_delete_policy(&mut command)
419 .map_err(RepositoryError::Runtime)?;
420 let resolved_lineage = self.format_lineage_comment(&command.id, comment_lineage);
421 let _guard = resolved_lineage.as_ref().map(|lineage| {
422 crate::context::QueryCommentGuard::new(
423 self.repository.metadata.context,
424 Some(lineage.clone()),
425 )
426 });
427 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
428 let affected = self.repository.delete(&command)?;
429 let mut event = EntityEvent::deleted_with_old_values(
430 command.entity,
431 command.id,
432 command.expected_version,
433 old_values,
434 );
435 event.comment = resolved_lineage;
436 self.emit_event(event)
437 .map_err(RepositoryError::Runtime)?;
438 Ok(affected)
439 }
440
441 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
442 let mut command = command.clone();
443 if let Some(behavior) = self.behavior() {
444 behavior
445 .before_recover(self.repository.metadata.context, &mut command)
446 .map_err(RepositoryError::Runtime)?;
447 }
448 self.enforce_recover_policy(&mut command)
449 .map_err(RepositoryError::Runtime)?;
450 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
451 let affected = self.repository.recover(&command)?;
452 let resolved_lineage = self.format_lineage_comment(&command.id, None);
453 let mut event = EntityEvent::recovered_with_old_values(
454 command.entity,
455 command.id,
456 command.expected_version,
457 old_values,
458 );
459 event.comment = resolved_lineage;
460 self.emit_event(event)
461 .map_err(RepositoryError::Runtime)?;
462 Ok(affected)
463 }
464
465 fn emit_event(&self, event: EntityEvent) -> Result<(), RuntimeError> {
466 self.repository.metadata.context.send_event(event)
467 }
468
469 pub(super) fn execute_prepared_insert(
470 &self,
471 command: InsertCommand,
472 ) -> Result<u64, RepositoryError<E::Error>> {
473 self.execute_prepared_insert_with_comment(command, None)
474 }
475
476 pub(super) fn execute_prepared_insert_with_comment(
477 &self,
478 command: InsertCommand,
479 comment_lineage: Option<String>,
480 ) -> Result<u64, RepositoryError<E::Error>> {
481 let id_val = command.values.get("id").cloned().unwrap_or(Value::Null);
482 let resolved_lineage = self.format_lineage_comment(&id_val, comment_lineage);
483 let _guard = resolved_lineage.as_ref().map(|lineage| {
484 crate::context::QueryCommentGuard::new(
485 self.repository.metadata.context,
486 Some(lineage.clone()),
487 )
488 });
489 let affected = self.repository.insert(&command)?;
490 let mut event = EntityEvent::created(command.entity, command.values);
491 event.comment = resolved_lineage;
492 self.emit_event(event).map_err(RepositoryError::Runtime)?;
493 Ok(affected)
494 }
495
496 pub(super) fn execute_prepared_update(
497 &self,
498 command: UpdateCommand,
499 ) -> Result<u64, RepositoryError<E::Error>> {
500 self.execute_prepared_update_with_comment(command, None)
501 }
502
503 pub(super) fn execute_prepared_update_with_comment(
504 &self,
505 command: UpdateCommand,
506 comment_lineage: Option<String>,
507 ) -> Result<u64, RepositoryError<E::Error>> {
508 let resolved_lineage = self.format_lineage_comment(&command.id, comment_lineage);
509 let _guard = resolved_lineage.as_ref().map(|lineage| {
510 crate::context::QueryCommentGuard::new(
511 self.repository.metadata.context,
512 Some(lineage.clone()),
513 )
514 });
515 let old_values = self.fetch_current_event_row(&command.entity, &command.id)?;
516 let affected = self.repository.update(&command)?;
517 let updated_fields = command.values.keys().cloned().collect();
518 let mut values = command.values.clone();
519 values.insert("id".to_owned(), command.id.clone());
520 if let Some(version) = command.expected_version {
521 values.insert("version".to_owned(), Value::I64(version + 1));
522 }
523 let mut new_values = old_values.clone().unwrap_or_default();
524 for (field, value) in &values {
525 new_values.insert(field.clone(), value.clone());
526 }
527 let mut event = EntityEvent::updated_with_old_values(
528 command.entity,
529 values,
530 old_values,
531 new_values,
532 updated_fields,
533 );
534 event.comment = resolved_lineage;
535 self.emit_event(event).map_err(RepositoryError::Runtime)?;
536 Ok(affected)
537 }
538
539 fn fetch_current_event_row(
540 &self,
541 entity: &str,
542 id: &Value,
543 ) -> Result<Option<Record>, RepositoryError<E::Error>> {
544 if self.repository.metadata.context.event_sink.is_none() {
545 return Ok(None);
546 }
547 let descriptor = self
548 .repository
549 .metadata
550 .context
551 .require_entity(entity)
552 .map_err(RepositoryError::Runtime)?;
553 let Some(id_property) = descriptor.id_property() else {
554 return Ok(None);
555 };
556 let mut rows = self.fetch_all(
557 &SelectQuery::new(entity).filter(Expr::eq(id_property.name.clone(), id.clone())),
558 )?;
559 Ok(rows.pop())
560 }
561
562 fn format_lineage_comment(
563 &self,
564 id: &Value,
565 comment_lineage: Option<String>,
566 ) -> Option<String> {
567 let raw_comment = comment_lineage.or_else(|| {
568 self.repository.metadata.context.comment_stack.lock().ok().and_then(|stack| {
569 if stack.is_empty() {
570 None
571 } else {
572 Some(stack.join(" -> "))
573 }
574 })
575 })?;
576
577 if raw_comment.is_empty() {
578 return None;
579 }
580
581 let id_str = match id {
582 Value::U64(n) => n.to_string(),
583 Value::I64(n) => n.to_string(),
584 Value::Text(s) => s.clone(),
585 other => format!("{other:?}"),
586 };
587 let prefix = format!("{}({}):", self.entity, id_str);
588 if raw_comment.starts_with(&prefix) || raw_comment.contains(&format!(" -> {}({}):", self.entity, id_str)) {
589 Some(raw_comment)
590 } else {
591 Some(format!("{}({}): {}", self.entity, id_str, raw_comment))
592 }
593 }
594
595 pub(super) fn scoped_repository(&self, entity: String) -> ResolvedRepository<'a, D, E> {
596 ResolvedRepository {
597 entity,
598 repository: ContextRepository {
599 metadata: UserContextMetadata {
600 context: self.repository.metadata.context,
601 },
602 dialect: self.repository.dialect,
603 executor: self.repository.executor,
604 },
605 }
606 }
607}