1use crate::{
2 IndexSpec,
3 db::{
4 Db,
5 executor::{ExecutorError, SaveExecutor, resolve_unique_pk},
6 store::DataKey,
7 },
8 deserialize,
9 runtime_error::RuntimeError,
10 sanitize,
11 traits::{EntityKind, FromKey},
12};
13use std::marker::PhantomData;
14
15#[derive(Clone, Copy)]
21pub struct UniqueIndexHandle {
22 index: &'static IndexSpec,
23}
24
25impl UniqueIndexHandle {
26 #[must_use]
27 pub const fn index(&self) -> &'static IndexSpec {
29 self.index
30 }
31
32 pub fn new<E: EntityKind>(index: &'static IndexSpec) -> Result<Self, RuntimeError> {
34 if !E::INDEXES.iter().any(|cand| **cand == *index) {
35 return Err(
36 ExecutorError::IndexNotFound(E::PATH.to_string(), index.fields.join(", ")).into(),
37 );
38 }
39
40 if !index.unique {
41 return Err(ExecutorError::IndexNotUnique(
42 E::PATH.to_string(),
43 index.fields.join(", "),
44 )
45 .into());
46 }
47
48 Ok(Self { index })
49 }
50
51 pub fn for_fields<E: EntityKind>(fields: &[&str]) -> Result<Self, RuntimeError> {
53 for index in E::INDEXES {
54 if index.fields == fields {
55 return Self::new::<E>(index);
56 }
57 }
58
59 Err(ExecutorError::IndexNotFound(E::PATH.to_string(), fields.join(", ")).into())
60 }
61}
62
63pub struct UpsertResult<E> {
69 pub entity: E,
70 pub inserted: bool,
71}
72
73#[derive(Clone, Copy)]
78pub struct UpsertExecutor<E: EntityKind> {
79 db: Db<E::Canister>,
80 debug: bool,
81 _marker: PhantomData<E>,
82}
83
84impl<E: EntityKind> UpsertExecutor<E>
85where
86 E::PrimaryKey: FromKey,
87{
88 #[must_use]
90 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
91 Self {
92 db,
93 debug,
94 _marker: PhantomData,
95 }
96 }
97
98 #[must_use]
100 pub const fn debug(mut self) -> Self {
101 self.debug = true;
102 self
103 }
104
105 pub fn by_unique_index(&self, index: UniqueIndexHandle, entity: E) -> Result<E, RuntimeError> {
107 self.upsert(index.index(), entity)
108 }
109
110 pub fn by_unique_index_merge<F>(
112 &self,
113 index: UniqueIndexHandle,
114 entity: E,
115 merge: F,
116 ) -> Result<E, RuntimeError>
117 where
118 F: FnOnce(E, E) -> E,
119 {
120 Ok(self
121 .by_unique_index_merge_result(index, entity, merge)?
122 .entity)
123 }
124
125 pub fn by_unique_index_merge_result<F>(
127 &self,
128 index: UniqueIndexHandle,
129 entity: E,
130 merge: F,
131 ) -> Result<UpsertResult<E>, RuntimeError>
132 where
133 F: FnOnce(E, E) -> E,
134 {
135 self.upsert_merge_result(index.index(), entity, merge)
136 }
137
138 pub fn by_unique_index_result(
140 &self,
141 index: UniqueIndexHandle,
142 entity: E,
143 ) -> Result<UpsertResult<E>, RuntimeError> {
144 self.upsert_result(index.index(), entity)
145 }
146
147 pub fn by_unique_fields(&self, fields: &[&str], entity: E) -> Result<E, RuntimeError> {
149 let index = UniqueIndexHandle::for_fields::<E>(fields)?;
150 self.upsert(index.index(), entity)
151 }
152
153 pub fn by_unique_fields_merge<F>(
155 &self,
156 fields: &[&str],
157 entity: E,
158 merge: F,
159 ) -> Result<E, RuntimeError>
160 where
161 F: FnOnce(E, E) -> E,
162 {
163 Ok(self
164 .by_unique_fields_merge_result(fields, entity, merge)?
165 .entity)
166 }
167
168 pub fn by_unique_fields_merge_result<F>(
170 &self,
171 fields: &[&str],
172 entity: E,
173 merge: F,
174 ) -> Result<UpsertResult<E>, RuntimeError>
175 where
176 F: FnOnce(E, E) -> E,
177 {
178 let index = UniqueIndexHandle::for_fields::<E>(fields)?;
179 self.upsert_merge_result(index.index(), entity, merge)
180 }
181
182 pub fn by_unique_fields_result(
184 &self,
185 fields: &[&str],
186 entity: E,
187 ) -> Result<UpsertResult<E>, RuntimeError> {
188 let index = UniqueIndexHandle::for_fields::<E>(fields)?;
189 self.upsert_result(index.index(), entity)
190 }
191
192 fn resolve_existing_pk(
201 &self,
202 index: &'static IndexSpec,
203 entity: &E,
204 ) -> Result<Option<E::PrimaryKey>, RuntimeError> {
205 let mut lookup = entity.clone();
206 sanitize(&mut lookup)?;
207 resolve_unique_pk::<E>(&self.db, index, &lookup)
208 }
209
210 fn upsert(&self, index: &'static IndexSpec, entity: E) -> Result<E, RuntimeError> {
211 Ok(self.upsert_result(index, entity)?.entity)
212 }
213
214 fn upsert_result(
215 &self,
216 index: &'static IndexSpec,
217 entity: E,
218 ) -> Result<UpsertResult<E>, RuntimeError> {
219 let existing_pk = self.resolve_existing_pk(index, &entity)?;
220 let inserted = existing_pk.is_none();
221
222 let saver = SaveExecutor::new(self.db, self.debug);
224
225 let entity = match existing_pk {
226 Some(pk) => {
227 let mut entity = entity;
228 entity.set_primary_key(pk);
229 saver.update(entity)?
230 }
231 None => saver.insert(entity)?,
232 };
233
234 Ok(UpsertResult { entity, inserted })
235 }
236
237 fn upsert_merge_result<F>(
238 &self,
239 index: &'static IndexSpec,
240 entity: E,
241 merge: F,
242 ) -> Result<UpsertResult<E>, RuntimeError>
243 where
244 F: FnOnce(E, E) -> E,
245 {
246 let existing_pk = self.resolve_existing_pk(index, &entity)?;
247
248 let saver = SaveExecutor::new(self.db, self.debug);
250
251 let result = if let Some(pk) = existing_pk {
252 let existing = self.load_existing(index, pk)?;
254 let mut merged = merge(existing, entity);
255 merged.set_primary_key(pk);
256
257 let entity = saver.update(merged)?;
258 UpsertResult {
259 entity,
260 inserted: false,
261 }
262 } else {
263 let entity = saver.insert(entity)?;
264 UpsertResult {
265 entity,
266 inserted: true,
267 }
268 };
269
270 Ok(result)
271 }
272
273 fn load_existing(
274 &self,
275 index: &'static IndexSpec,
276 pk: E::PrimaryKey,
277 ) -> Result<E, RuntimeError> {
278 let data_key = DataKey::new::<E>(pk.into());
279 let bytes = self
280 .db
281 .context::<E>()
282 .with_store(|store| store.get(&data_key))?;
283
284 let Some(bytes) = bytes else {
285 return Err(ExecutorError::IndexCorrupted(
287 E::PATH.to_string(),
288 index.fields.join(", "),
289 1,
290 )
291 .into());
292 };
293
294 deserialize::<E>(&bytes)
295 }
296}