1use std::sync::Arc;
2use std::time::{Instant, SystemTime};
3
4use teaql_core::{
5 DeleteCommand, Entity, InsertCommand, Record, RecoverCommand, SelectQuery, SmartList,
6 UpdateCommand,
7};
8use teaql_sql::{CompiledQuery, SqlDialect};
9
10use crate::{
11 ContextError, GraphMutationPlan, GraphNode, RepositoryError, RuntimeError, SqlLogOperation,
12 UserContext,
13};
14
15use super::{
16 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache, QueryExecutor,
17 Repository, ResolvedRepository, UserContextMetadata,
18 helpers::invalidate_aggregation_cache_namespace,
19};
20
21impl UserContext {
22 pub fn repository<D, E>(&self) -> Result<ContextRepository<'_, D, E>, ContextError>
23 where
24 D: SqlDialect + Send + Sync + 'static,
25 E: QueryExecutor + Send + Sync + 'static,
26 {
27 if self.metadata.is_none() {
28 return Err(ContextError::MissingResource("metadata".to_owned()));
29 }
30
31 let dialect = self.require_resource::<D>()?;
32 let executor = self.require_resource::<E>()?;
33 Ok(ContextRepository {
34 metadata: UserContextMetadata { context: self },
35 dialect,
36 executor,
37 })
38 }
39
40 pub fn resolve_repository<D, E>(
41 &self,
42 entity: impl Into<String>,
43 ) -> Result<ResolvedRepository<'_, D, E>, ContextError>
44 where
45 D: SqlDialect + Send + Sync + 'static,
46 E: QueryExecutor + Send + Sync + 'static,
47 {
48 let entity = entity.into();
49 if !self.has_repository(&entity) {
50 return Err(ContextError::MissingRepository(entity));
51 }
52 Ok(ResolvedRepository {
53 entity,
54 repository: self.repository::<D, E>()?,
55 })
56 }
57
58 pub fn plan_for_save_graph<D, E>(
59 &self,
60 node: GraphNode,
61 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>>
62 where
63 D: SqlDialect + Send + Sync + 'static,
64 E: QueryExecutor + Send + Sync + 'static,
65 {
66 let repository = self
67 .resolve_repository::<D, E>(node.entity.clone())
68 .map_err(|err| RepositoryError::Runtime(RuntimeError::Graph(err.to_string())))?;
69 repository.plan_graph(node)
70 }
71}
72
73impl<'a, D, E> ContextRepository<'a, D, E>
74where
75 D: SqlDialect,
76 E: QueryExecutor,
77{
78 fn repository(&self) -> Repository<'_, D, UserContextMetadata<'_>, E> {
79 Repository::new(self.dialect, &self.metadata, self.executor)
80 }
81
82 pub fn compile(&self, query: &SelectQuery) -> Result<CompiledQuery, RuntimeError> {
83 self.repository().compile(query)
84 }
85
86 pub fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
87 let mut compiled = self.compile(query).map_err(RepositoryError::Runtime)?;
88 let final_comment = self.resolve_final_comment(query.comment.clone());
89 compiled.comment = final_comment;
90
91 let started_at = SystemTime::now();
92 let started = Instant::now();
93 let rows = self
94 .executor
95 .fetch_all(&compiled)
96 .map_err(RepositoryError::Executor)?;
97 self.log_sql_result(
98 SqlLogOperation::Select,
99 &compiled,
100 started_at,
101 started,
102 Some(rows.len()),
103 Some(query.entity.clone()),
104 None,
105 query.comment.clone(),
106 );
107 Ok(rows)
108 }
109
110 pub fn fetch_smart_list(
111 &self,
112 query: &SelectQuery,
113 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
114 self.repository().fetch_smart_list(query)
115 }
116
117 pub fn fetch_entities<T>(
118 &self,
119 query: &SelectQuery,
120 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
121 where
122 T: Entity,
123 {
124 self.repository().fetch_entities(query)
125 }
126
127 pub fn fetch_enhanced_entities<T>(
128 &self,
129 query: &SelectQuery,
130 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
131 where
132 T: Entity,
133 {
134 self.repository().fetch_enhanced_entities(query)
135 }
136
137 pub fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
138 let mut compiled = self
139 .repository()
140 .compile_insert(command)
141 .map_err(RepositoryError::Runtime)?;
142 let final_comment = self.resolve_final_comment(None);
143 compiled.comment = final_comment;
144
145 let started_at = SystemTime::now();
146 let started = Instant::now();
147 let affected = self
148 .executor
149 .execute(&compiled)
150 .map_err(RepositoryError::Executor)?;
151 self.log_sql_result(
152 SqlLogOperation::Insert,
153 &compiled,
154 started_at,
155 started,
156 None,
157 None,
158 Some(affected),
159 None,
160 );
161 self.invalidate_aggregation_cache_for(&command.entity);
162 Ok(affected)
163 }
164
165 pub fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
166 let affected = self.execute_mutation(
167 SqlLogOperation::Update,
168 &command.entity,
169 self.repository()
170 .compile_update(command)
171 .map_err(RepositoryError::Runtime)?,
172 )?;
173 if command.expected_version.is_some() && affected == 0 {
174 return Err(RepositoryError::Runtime(
175 RuntimeError::OptimisticLockConflict {
176 entity: command.entity.clone(),
177 id: format!("{:?}", command.id),
178 },
179 ));
180 }
181 Ok(affected)
182 }
183
184 pub fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
185 let affected = self.execute_mutation(
186 SqlLogOperation::Delete,
187 &command.entity,
188 self.repository()
189 .compile_delete(command)
190 .map_err(RepositoryError::Runtime)?,
191 )?;
192 if command.expected_version.is_some() && affected == 0 {
193 return Err(RepositoryError::Runtime(
194 RuntimeError::OptimisticLockConflict {
195 entity: command.entity.clone(),
196 id: format!("{:?}", command.id),
197 },
198 ));
199 }
200 Ok(affected)
201 }
202
203 pub fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
204 let affected = self.execute_mutation(
205 SqlLogOperation::Recover,
206 &command.entity,
207 self.repository()
208 .compile_recover(command)
209 .map_err(RepositoryError::Runtime)?,
210 )?;
211 if affected == 0 {
212 return Err(RepositoryError::Runtime(
213 RuntimeError::OptimisticLockConflict {
214 entity: command.entity.clone(),
215 id: format!("{:?}", command.id),
216 },
217 ));
218 }
219 Ok(affected)
220 }
221
222 fn execute_mutation(
223 &self,
224 operation: SqlLogOperation,
225 entity: &str,
226 mut compiled: CompiledQuery,
227 ) -> Result<u64, RepositoryError<E::Error>> {
228 let final_comment = self.resolve_final_comment(None);
229 compiled.comment = final_comment;
230
231 let started_at = SystemTime::now();
232 let started = Instant::now();
233 let affected = self
234 .executor
235 .execute(&compiled)
236 .map_err(RepositoryError::Executor)?;
237 self.log_sql_result(
238 operation,
239 &compiled,
240 started_at,
241 started,
242 None,
243 None,
244 Some(affected),
245 None,
246 );
247 self.invalidate_aggregation_cache_for(entity);
248 Ok(affected)
249 }
250
251 pub(super) fn log_sql_result(
252 &self,
253 operation: SqlLogOperation,
254 compiled: &CompiledQuery,
255 started_at: SystemTime,
256 started: Instant,
257 result_count: Option<usize>,
258 result_type: Option<String>,
259 affected_rows: Option<u64>,
260 comment: Option<String>,
261 ) {
262 self.metadata.context.record_sql_log(
263 operation,
264 compiled,
265 self.dialect.kind(),
266 started_at,
267 SystemTime::now(),
268 started.elapsed(),
269 result_count,
270 result_type,
271 affected_rows,
272 comment,
273 );
274 }
275
276 pub(super) fn invalidate_aggregation_cache_for(&self, entity: &str) {
277 if let Some(cache) = self
278 .metadata
279 .context
280 .get_resource::<Arc<dyn AggregationCacheBackend>>()
281 {
282 invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
283 }
284 if let Some(cache) = self
285 .metadata
286 .context
287 .get_resource::<InMemoryAggregationCache>()
288 {
289 invalidate_aggregation_cache_namespace(cache, entity);
290 }
291 }
292
293 pub(crate) fn resolve_final_comment(&self, comment: Option<String>) -> Option<String> {
294 let stack_comment = self.metadata.context.comment_stack.lock().ok().and_then(|stack| {
295 if stack.is_empty() {
296 None
297 } else {
298 Some(stack.join("->"))
299 }
300 });
301 let business_comment = stack_comment.or(comment);
302 let user_id = self.metadata.context.user_identifier().map(|s| s.to_owned());
303
304 match (user_id, business_comment) {
305 (Some(user), Some(bus)) if !user.is_empty() && !bus.is_empty() => {
306 Some(format!("[{user}] {bus}"))
307 }
308 (Some(user), _) if !user.is_empty() => {
309 Some(format!("[{user}]"))
310 }
311 (_, Some(bus)) if !bus.is_empty() => {
312 Some(bus)
313 }
314 _ => None,
315 }
316 }
317}