Skip to main content

teaql_runtime/repository/
base.rs

1use teaql_core::{
2    BatchInsertCommand, BatchUpdateCommand, DeleteCommand, Entity, InsertCommand, Record,
3    RecoverCommand, SelectQuery, SmartList, UpdateCommand,
4};
5use teaql_data_service::{MutationRequest, QueryRequest};
6
7use crate::{MetadataStore, RepositoryError, RuntimeError};
8
9use super::Repository;
10
11impl<'a, M, E> Repository<'a, M, E>
12where
13    M: MetadataStore,
14    E: teaql_data_service::QueryExecutor + teaql_data_service::MutationExecutor,
15{
16    pub fn new(metadata: &'a M, executor: &'a E) -> Self {
17        Self {
18            metadata,
19            executor,
20        }
21    }
22
23    pub async fn fetch_all(&self, query: &SelectQuery) -> Result<Vec<Record>, RepositoryError<E::Error>> {
24        let request = QueryRequest {
25            query: query.clone(),
26            trace_chain: query.trace_chain.clone(),
27            comment: query.comment.clone(),
28        };
29        let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
30        Ok(res.rows)
31    }
32
33    pub async fn fetch_smart_list(
34        &self,
35        query: &SelectQuery,
36    ) -> Result<SmartList<Record>, RepositoryError<E::Error>> {
37        let request = QueryRequest {
38            query: query.clone(),
39            trace_chain: query.trace_chain.clone(),
40            comment: query.comment.clone(),
41        };
42        let res = self.executor.query(request).await.map_err(RepositoryError::Executor)?;
43        self.metadata.record_metadata_log(&res.metadata);
44        Ok(SmartList::from(res.rows))
45    }
46
47    pub async fn fetch_entities<T>(
48        &self,
49        query: &SelectQuery,
50    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
51    where
52        T: Entity,
53    {
54        self.fetch_all(query).await?
55            .into_iter()
56            .map(T::from_record)
57            .collect::<Result<Vec<_>, _>>()
58            .map(SmartList::from)
59            .map_err(RepositoryError::Entity)
60    }
61
62    pub async fn fetch_enhanced_entities<T>(
63        &self,
64        query: &SelectQuery,
65    ) -> Result<SmartList<T>, RepositoryError<E::Error>>
66    where
67        T: Entity,
68    {
69        self.fetch_entities(query).await
70    }
71
72    pub async fn insert(&self, command: &InsertCommand) -> Result<u64, RepositoryError<E::Error>> {
73        let request = MutationRequest::Insert(command.clone());
74        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
75        self.metadata.record_metadata_log(&res.metadata);
76        Ok(res.affected_rows)
77    }
78
79    pub async fn update(&self, command: &UpdateCommand) -> Result<u64, RepositoryError<E::Error>> {
80        let request = MutationRequest::Update(command.clone());
81        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
82        self.metadata.record_metadata_log(&res.metadata);
83        let affected = res.affected_rows;
84
85        if command.expected_version.is_some() && affected == 0 {
86            println!("OptimisticLockConflict in base.rs update! entity={}, id={:?}", command.entity, command.id);
87            println!("Backtrace: {:#?}", std::backtrace::Backtrace::force_capture());
88            return Err(RepositoryError::Runtime(
89                RuntimeError::OptimisticLockConflict {
90                    entity: command.entity.clone(),
91                    id: format!("{:?}", command.id),
92                },
93            ));
94        }
95
96        Ok(affected)
97    }
98
99    pub async fn delete(&self, command: &DeleteCommand) -> Result<u64, RepositoryError<E::Error>> {
100        let request = MutationRequest::Delete(command.clone());
101        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
102        self.metadata.record_metadata_log(&res.metadata);
103        let affected = res.affected_rows;
104
105        if command.expected_version.is_some() && affected == 0 {
106            return Err(RepositoryError::Runtime(
107                RuntimeError::OptimisticLockConflict {
108                    entity: command.entity.clone(),
109                    id: format!("{:?}", command.id),
110                },
111            ));
112        }
113
114        Ok(affected)
115    }
116
117    pub async fn batch_insert(
118        &self,
119        command: &teaql_core::BatchInsertCommand,
120    ) -> Result<u64, RepositoryError<E::Error>> {
121        // Build individual InsertCommands for now, or use BatchMutation if appropriate
122        let mut affected = 0;
123        for (i, val) in command.batch_values.iter().enumerate() {
124            let mut insert_cmd = InsertCommand::new(command.entity.clone());
125            insert_cmd.values = val.clone();
126            if i < command.trace_chains.len() {
127                insert_cmd.trace_chain = command.trace_chains[i].clone();
128            }
129            let res = self.executor.mutate(MutationRequest::Insert(insert_cmd)).await.map_err(RepositoryError::Executor)?;
130            self.metadata.record_metadata_log(&res.metadata);
131            affected += res.affected_rows;
132        }
133        Ok(affected)
134    }
135
136    pub async fn batch_update(
137        &self,
138        command: &teaql_core::BatchUpdateCommand,
139    ) -> Result<u64, RepositoryError<E::Error>> {
140        let mut affected = 0;
141        for (i, val) in command.batch_values.iter().enumerate() {
142            let mut update_cmd = UpdateCommand::new(command.entity.clone(), command.batch_ids[i].clone());
143            
144            let mut filtered_values = Record::new();
145            for field in &command.update_fields {
146                if let Some(v) = val.get(field) {
147                    filtered_values.insert(field.clone(), v.clone());
148                }
149            }
150            update_cmd.values = filtered_values;
151            if let Some(Some(v)) = command.batch_expected_versions.get(i) {
152                update_cmd.expected_version = Some(*v);
153            }
154            if let Some(old) = command.batch_old_values.get(i) {
155                update_cmd.old_values = old.clone();
156            }
157            if i < command.trace_chains.len() {
158                update_cmd.trace_chain = command.trace_chains[i].clone();
159            }
160            let res = self.executor.mutate(MutationRequest::Update(update_cmd)).await.map_err(RepositoryError::Executor)?;
161            self.metadata.record_metadata_log(&res.metadata);
162            affected += res.affected_rows;
163        }
164
165        if command.batch_expected_versions.iter().any(|v| v.is_some()) {
166            if affected != command.batch_ids.len() as u64 {
167                println!("OptimisticLockConflict in batch_update! entity={}, affected={}, expected={}", command.entity, affected, command.batch_ids.len());
168                return Err(RepositoryError::Runtime(
169                    RuntimeError::OptimisticLockConflict {
170                        entity: command.entity.clone(),
171                        id: "BATCH".to_owned(),
172                    },
173                ));
174            }
175        }
176
177        Ok(affected)
178    }
179
180    pub async fn recover(&self, command: &RecoverCommand) -> Result<u64, RepositoryError<E::Error>> {
181        let request = MutationRequest::Recover(command.clone());
182        let res = self.executor.mutate(request).await.map_err(RepositoryError::Executor)?;
183        self.metadata.record_metadata_log(&res.metadata);
184        let affected = res.affected_rows;
185
186        if affected == 0 {
187            return Err(RepositoryError::Runtime(
188                RuntimeError::OptimisticLockConflict {
189                    entity: command.entity.clone(),
190                    id: format!("{:?}", command.id),
191                },
192            ));
193        }
194
195        Ok(affected)
196    }
197
198    pub async fn insert_many(
199        &self,
200        commands: &[InsertCommand],
201    ) -> Result<u64, RepositoryError<E::Error>> {
202        let mut total = 0;
203        for command in commands {
204            total += self.insert(command).await?;
205        }
206        Ok(total)
207    }
208
209    pub async fn update_many(
210        &self,
211        commands: &[UpdateCommand],
212    ) -> Result<u64, RepositoryError<E::Error>> {
213        let mut total = 0;
214        for command in commands {
215            total += self.update(command).await?;
216        }
217        Ok(total)
218    }
219
220    pub async fn delete_many(
221        &self,
222        commands: &[DeleteCommand],
223    ) -> Result<u64, RepositoryError<E::Error>> {
224        let mut total = 0;
225        for command in commands {
226            total += self.delete(command).await?;
227        }
228        Ok(total)
229    }
230
231    pub async fn recover_many(
232        &self,
233        commands: &[RecoverCommand],
234    ) -> Result<u64, RepositoryError<E::Error>> {
235        let mut total = 0;
236        for command in commands {
237            total += self.recover(command).await?;
238        }
239        Ok(total)
240    }
241}