teaql_runtime/repository/
context.rs1use std::sync::Arc;
2
3use teaql_core::{
4 DeleteCommand, Entity, InsertCommand, Record, RecoverCommand, SelectQuery, SmartList,
5 UpdateCommand,
6};
7
8use crate::{
9 ContextError, GraphMutationPlan, GraphNode, RepositoryError, RuntimeError,
10 UserContext,
11};
12
13use super::{
14 AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15 Repository, ResolvedRepository, UserContextMetadata,
16 helpers::invalidate_aggregation_cache_namespace,
17};
18
19impl UserContext {
20 pub fn repository<E>(&self) -> Result<ContextRepository<'_, E>, ContextError>
21 where
22 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
23 {
24 if self.metadata.is_none() {
25 return Err(ContextError::MissingResource("metadata".to_owned()));
26 }
27
28 let executor = self.require_resource::<E>()?;
29 Ok(ContextRepository {
30 metadata: UserContextMetadata { context: self },
31 executor,
32 })
33 }
34
35 #[doc(hidden)]
36 pub fn resolve_repository<E>(
37 &self,
38 entity: impl Into<String>,
39 ) -> Result<ResolvedRepository<'_, E>, ContextError>
40 where
41 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
42 {
43 let entity = entity.into();
44 if !self.has_repository(&entity) {
45 return Err(ContextError::MissingRepository(entity));
46 }
47 Ok(ResolvedRepository {
48 entity,
49 repository: self.repository::<E>()?,
50 trace_context: Vec::new(),
51 })
52 }
53
54 pub async fn plan_for_save_graph<E>(
55 &self,
56 node: GraphNode,
57 ) -> Result<GraphMutationPlan, RepositoryError<E::Error>>
58 where
59 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
60 {
61 let repository = self
62 .resolve_repository::<E>(node.entity.clone())
63 .map_err(|err| RepositoryError::Runtime(RuntimeError::Graph(err.to_string())))?;
64 repository.plan_graph(node).await
65 }
66}
67
68impl<'a, E> ContextRepository<'a, E>
69where
70 E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
71{
72 fn repository(&self) -> Repository<'_, UserContextMetadata<'_>, E> {
73 Repository::new(&self.metadata, self.executor)
74 }
75
76 pub async fn fetch_all(&self, mut query: SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
77 let final_comment = self.resolve_final_comment(&query.trace_chain, query.comment.clone());
78 query.comment = final_comment;
79 self.repository().fetch_all(&query).await
80 }
81
82 pub async fn fetch_smart_list(
83 &self,
84 query: &SelectQuery,
85 ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
86 self.repository().fetch_smart_list(query).await
87 }
88
89 pub async fn fetch_entities<T>(
90 &self,
91 query: &SelectQuery,
92 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
93 where
94 T: Entity,
95 {
96 self.repository().fetch_entities(query).await
97 }
98
99 pub async fn fetch_enhanced_entities<T>(
100 &self,
101 query: &SelectQuery,
102 ) -> Result<SmartList<T>, RepositoryError<E::Error>>
103 where
104 T: Entity,
105 {
106 self.repository().fetch_enhanced_entities(query).await
107 }
108
109 pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
110 let affected = self.repository().insert(command).await?;
111 self.invalidate_aggregation_cache_for(&command.entity);
112 Ok(affected)
113 }
114
115 pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
116 let affected = self.repository().update(command).await?;
117 self.invalidate_aggregation_cache_for(&command.entity);
118 Ok(affected)
119 }
120
121 pub async fn batch_insert(&self, command: &teaql_core::BatchInsertCommand) -> Result<u64, RepositoryError<E::Error>> {
122 let affected = self.repository().batch_insert(command).await?;
123 self.invalidate_aggregation_cache_for(&command.entity);
124 Ok(affected)
125 }
126
127 pub async fn batch_update(&self, command: &teaql_core::BatchUpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
128 let affected = self.repository().batch_update(command).await?;
129 self.invalidate_aggregation_cache_for(&command.entity);
130 Ok(affected)
131 }
132
133 pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
134 let affected = self.repository().delete(command).await?;
135 self.invalidate_aggregation_cache_for(&command.entity);
136 Ok(affected)
137 }
138
139 pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
140 let affected = self.repository().recover(command).await?;
141 self.invalidate_aggregation_cache_for(&command.entity);
142 Ok(affected)
143 }
144
145 pub(super) fn invalidate_aggregation_cache_for(&self, entity: &str) {
146 if let Some(cache) = self
147 .metadata
148 .context
149 .get_resource::<Arc<dyn AggregationCacheBackend>>()
150 {
151 invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
152 }
153 if let Some(cache) = self
154 .metadata
155 .context
156 .get_resource::<InMemoryAggregationCache>()
157 {
158 invalidate_aggregation_cache_namespace(cache, entity);
159 }
160 }
161
162 pub(crate) fn resolve_final_comment(&self, trace_chain: &[teaql_core::TraceNode], comment: Option<String>) -> Option<String> {
163 let chain_str = if trace_chain.is_empty() {
164 None
165 } else {
166 let formatted = trace_chain.iter().map(|n| {
167 format!("{}({}): {}", n.entity_type, n.entity_id.map(|id| id.to_string()).unwrap_or_else(|| "pending".to_owned()), n.comment)
168 }).collect::<Vec<_>>().join(" -> ");
169 Some(formatted)
170 };
171
172 let business_comment = chain_str.or(comment);
173 let user_id = self.metadata.context.user_identifier().map(|s| s.to_owned());
174
175 match (user_id, business_comment) {
176 (Some(user), Some(bus)) if !user.is_empty() && !bus.is_empty() => {
177 Some(format!("[{user}] {bus}"))
178 }
179 (Some(user), _) if !user.is_empty() => {
180 Some(format!("[{user}]"))
181 }
182 (_, Some(bus)) if !bus.is_empty() => {
183 Some(bus)
184 }
185 _ => None,
186 }
187 }
188}