teaql_runtime/repository/
context.rs1use std::sync::Arc;
2
3use teaql_core::{
4 DeleteCommand, Entity, InsertCommand, Record, RecoverCommand, SelectQuery, SmartList,
5 UpdateCommand,
6};
7
8use crate::{
9 ContextError, GraphMutationPlan, GraphNode, RepositoryError, RuntimeError,
10 UserContext,
11};
12
13use super::{
14 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15 Repository, ResolvedRepository, UserContextMetadata,
16 helpers::invalidate_aggregation_cache_namespace,
17};
18
19impl UserContext {
20 pub fn repository<E>(&self) -> Result<ContextRepository<'_, E>, ContextError>
21 where
22 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
23 {
24 if self.metadata.is_none() {
25 return Err(ContextError::MissingResource("metadata".to_owned()));
26 }
27
28 let executor = self.require_resource::<E>()?;
29 Ok(ContextRepository {
30 metadata: UserContextMetadata { context: self },
31 executor,
32 })
33 }
34
35 #[doc(hidden)]
36 pub fn resolve_repository<E>(
37 &self,
38 entity: impl Into<String>,
39 ) -> Result<ResolvedRepository<'_, E>, ContextError>
40 where
41 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
42 {
43 let entity = entity.into();
44 if !self.has_repository(&entity) {
45 return Err(ContextError::MissingRepository(entity));
46 }
47 Ok(ResolvedRepository {
48 entity,
49 repository: self.repository::<E>()?,
50 trace_context: Vec::new(),
51 })
52 }
53
54
55}
56
57impl<'a, E> ContextRepository<'a, E>
58where
59 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
60{
61 fn repository(&self) -> Repository<'_, UserContextMetadata<'_>, E> {
62 Repository::new(&self.metadata, self.executor)
63 }
64
65 pub async fn fetch_all(&self, mut query: SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
66 let final_comment = self.resolve_final_comment(&query.trace_chain, query.comment.clone());
67 query.comment = final_comment;
68 self.repository().fetch_all(&query).await
69 }
70
71 pub async fn fetch_smart_list(
72 &self,
73 query: &SelectQuery,
74 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
75 self.repository().fetch_smart_list(query).await
76 }
77
78 pub async fn fetch_entities<T>(
79 &self,
80 query: &SelectQuery,
81 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
82 where
83 T: Entity,
84 {
85 self.repository().fetch_entities(query).await
86 }
87
88 pub async fn fetch_enhanced_entities<T>(
89 &self,
90 query: &SelectQuery,
91 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
92 where
93 T: Entity,
94 {
95 self.repository().fetch_enhanced_entities(query).await
96 }
97
98 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
99 let affected = self.repository().insert(command).await?;
100 self.invalidate_aggregation_cache_for(&command.entity);
101 Ok(affected)
102 }
103
104 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
105 let affected = self.repository().update(command).await?;
106 self.invalidate_aggregation_cache_for(&command.entity);
107 Ok(affected)
108 }
109
110 pub async fn batch_insert(&self, command: &teaql_core::BatchInsertCommand) -> Result<u64, RepositoryError<E::Error>> {
111 let affected = self.repository().batch_insert(command).await?;
112 self.invalidate_aggregation_cache_for(&command.entity);
113 Ok(affected)
114 }
115
116 pub async fn batch_update(&self, command: &teaql_core::BatchUpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
117 let affected = self.repository().batch_update(command).await?;
118 self.invalidate_aggregation_cache_for(&command.entity);
119 Ok(affected)
120 }
121
122 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
123 let affected = self.repository().delete(command).await?;
124 self.invalidate_aggregation_cache_for(&command.entity);
125 Ok(affected)
126 }
127
128 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
129 let affected = self.repository().recover(command).await?;
130 self.invalidate_aggregation_cache_for(&command.entity);
131 Ok(affected)
132 }
133
134 pub(super) fn invalidate_aggregation_cache_for(&self, entity: &str) {
135 if let Some(cache) = self
136 .metadata
137 .context
138 .get_resource::<Arc<dyn AggregationCacheBackend>>()
139 {
140 invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
141 }
142 if let Some(cache) = self
143 .metadata
144 .context
145 .get_resource::<InMemoryAggregationCache>()
146 {
147 invalidate_aggregation_cache_namespace(cache, entity);
148 }
149 }
150
151 pub(crate) fn resolve_final_comment(&self, trace_chain: &[teaql_core::TraceNode], comment: Option<String>) -> Option<String> {
152 let chain_str = if trace_chain.is_empty() {
153 None
154 } else {
155 let formatted = trace_chain.iter().map(|n| {
156 format!("{}({}): {}", n.entity_type, n.entity_id.map(|id| id.to_string()).unwrap_or_else(|| "pending".to_owned()), n.comment)
157 }).collect::<Vec<_>>().join(" -> ");
158 Some(formatted)
159 };
160
161 let business_comment = chain_str.or(comment);
162 let user_id = self.metadata.context.user_identifier().map(|s| s.to_owned());
163
164 match (user_id, business_comment) {
165 (Some(user), Some(bus)) if !user.is_empty() && !bus.is_empty() => {
166 Some(format!("[{user}] {bus}"))
167 }
168 (Some(user), _) if !user.is_empty() => {
169 Some(format!("[{user}]"))
170 }
171 (_, Some(bus)) if !bus.is_empty() => {
172 Some(bus)
173 }
174 _ => None,
175 }
176 }
177}