Skip to main content

teaql_runtime/repository/
context.rs

1use std::sync::Arc;
2
3use teaql_core::{
4    DeleteCommand, Entity, InsertCommand, Record, RecoverCommand, SelectQuery, SmartList,
5    UpdateCommand,
6};
7
8use crate::{
9    ContextError, GraphMutationPlan, GraphNode, RepositoryError, RuntimeError,
10    UserContext,
11};
12
13use super::{
14    AggregationCacheBackend, ContextRepository, InMemoryAggregationCache,
15    Repository, ResolvedRepository, UserContextMetadata,
16    helpers::invalidate_aggregation_cache_namespace,
17};
18
19impl UserContext {
20    pub fn repository<E>(&self) -> Result<ContextRepository<'_, E>, ContextError>
21    where
22        E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
23    {
24        if self.metadata.is_none() {
25            return Err(ContextError::MissingResource("metadata".to_owned()));
26        }
27
28        let executor = self.require_resource::<E>()?;
29        Ok(ContextRepository {
30            metadata: UserContextMetadata { context: self },
31            executor,
32        })
33    }
34
35    #[doc(hidden)]
36    pub fn resolve_repository<E>(
37        &self,
38        entity: impl Into<String>,
39    ) -> Result<ResolvedRepository<'_, E>, ContextError>
40    where
41        E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
42    {
43        let entity = entity.into();
44        if !self.has_repository(&entity) {
45            return Err(ContextError::MissingRepository(entity));
46        }
47        Ok(ResolvedRepository {
48            entity,
49            repository: self.repository::<E>()?,
50            trace_context: Vec::new(),
51        })
52    }
53
54
55}
56
57impl<'a, E> ContextRepository<'a, E>
58where
59    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor + Send + Sync + 'static,
60{
61    fn repository(&self) -> Repository<'_, UserContextMetadata<'_>, E> {
62        Repository::new(&self.metadata, self.executor)
63    }
64
65    pub async fn fetch_all(&self, mut query: SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
66        let final_comment = self.resolve_final_comment(&query.trace_chain, query.comment.clone());
67        query.comment = final_comment;
68        self.repository().fetch_all(&query).await
69    }
70
71    pub async fn fetch_smart_list(
72        &self,
73        query: &SelectQuery,
74    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
75        self.repository().fetch_smart_list(query).await
76    }
77
78    pub async fn fetch_entities<T>(
79        &self,
80        query: &SelectQuery,
81    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
82    where
83        T: Entity,
84    {
85        self.repository().fetch_entities(query).await
86    }
87
88    pub async fn fetch_enhanced_entities<T>(
89        &self,
90        query: &SelectQuery,
91    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
92    where
93        T: Entity,
94    {
95        self.repository().fetch_enhanced_entities(query).await
96    }
97
98    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
99        let affected = self.repository().insert(command).await?;
100        self.invalidate_aggregation_cache_for(&command.entity);
101        Ok(affected)
102    }
103
104    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
105        let affected = self.repository().update(command).await?;
106        self.invalidate_aggregation_cache_for(&command.entity);
107        Ok(affected)
108    }
109
110    pub async fn batch_insert(&self, command: &teaql_core::BatchInsertCommand) -> Result<u64, RepositoryError<E::Error>> {
111        let affected = self.repository().batch_insert(command).await?;
112        self.invalidate_aggregation_cache_for(&command.entity);
113        Ok(affected)
114    }
115
116    pub async fn batch_update(&self, command: &teaql_core::BatchUpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
117        let affected = self.repository().batch_update(command).await?;
118        self.invalidate_aggregation_cache_for(&command.entity);
119        Ok(affected)
120    }
121
122    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
123        let affected = self.repository().delete(command).await?;
124        self.invalidate_aggregation_cache_for(&command.entity);
125        Ok(affected)
126    }
127
128    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
129        let affected = self.repository().recover(command).await?;
130        self.invalidate_aggregation_cache_for(&command.entity);
131        Ok(affected)
132    }
133
134    pub(super) fn invalidate_aggregation_cache_for(&self, entity: &str) {
135        if let Some(cache) = self
136            .metadata
137            .context
138            .get_resource::<Arc<dyn AggregationCacheBackend>>()
139        {
140            invalidate_aggregation_cache_namespace(cache.as_ref(), entity);
141        }
142        if let Some(cache) = self
143            .metadata
144            .context
145            .get_resource::<InMemoryAggregationCache>()
146        {
147            invalidate_aggregation_cache_namespace(cache, entity);
148        }
149    }
150
151    pub(crate) fn resolve_final_comment(&self, trace_chain: &[teaql_core::TraceNode], comment: Option<String>) -> Option<String> {
152        let chain_str = if trace_chain.is_empty() {
153            None
154        } else {
155            let formatted = trace_chain.iter().map(|n| {
156                format!("{}({}): {}", n.entity_type, n.entity_id.map(|id| id.to_string()).unwrap_or_else(|| "pending".to_owned()), n.comment)
157            }).collect::<Vec<_>>().join(" -> ");
158            Some(formatted)
159        };
160
161        let business_comment = chain_str.or(comment);
162        let user_id = self.metadata.context.user_identifier().map(|s| s.to_owned());
163
164        match (user_id, business_comment) {
165            (Some(user), Some(bus)) if !user.is_empty() && !bus.is_empty() => {
166                Some(format!("[{user}] {bus}"))
167            }
168            (Some(user), _) if !user.is_empty() => {
169                Some(format!("[{user}]"))
170            }
171            (_, Some(bus)) if !bus.is_empty() => {
172                Some(bus)
173            }
174            _ => None,
175        }
176    }
177}