icydb_core/db/executor/
upsert.rs

1use crate::{
2    IndexSpec,
3    db::{
4        Db,
5        executor::{ExecutorError, SaveExecutor, resolve_unique_pk},
6        store::DataKey,
7    },
8    deserialize,
9    runtime_error::RuntimeError,
10    sanitize,
11    traits::{EntityKind, FromKey},
12};
13use std::marker::PhantomData;
14
15///
16/// UniqueIndexHandle
17/// Validated handle to a unique index for an entity type.
18///
19
20#[derive(Clone, Copy)]
21pub struct UniqueIndexHandle {
22    index: &'static IndexSpec,
23}
24
25impl UniqueIndexHandle {
26    #[must_use]
27    /// Return the underlying index specification.
28    pub const fn index(&self) -> &'static IndexSpec {
29        self.index
30    }
31
32    /// Wrap a unique index for the given entity type.
33    pub fn new<E: EntityKind>(index: &'static IndexSpec) -> Result<Self, RuntimeError> {
34        if !E::INDEXES.iter().any(|cand| **cand == *index) {
35            return Err(
36                ExecutorError::IndexNotFound(E::PATH.to_string(), index.fields.join(", ")).into(),
37            );
38        }
39
40        if !index.unique {
41            return Err(ExecutorError::IndexNotUnique(
42                E::PATH.to_string(),
43                index.fields.join(", "),
44            )
45            .into());
46        }
47
48        Ok(Self { index })
49    }
50
51    /// Resolve a unique index by its field list for the given entity type.
52    pub fn for_fields<E: EntityKind>(fields: &[&str]) -> Result<Self, RuntimeError> {
53        for index in E::INDEXES {
54            if index.fields == fields {
55                return Self::new::<E>(index);
56            }
57        }
58
59        Err(ExecutorError::IndexNotFound(E::PATH.to_string(), fields.join(", ")).into())
60    }
61}
62
63///
64/// UpsertResult
65///
66
67/// Result of an upsert that reports whether the entity was inserted.
68pub struct UpsertResult<E> {
69    pub entity: E,
70    pub inserted: bool,
71}
72
73///
74/// UpsertExecutor
75///
76
77#[derive(Clone, Copy)]
78pub struct UpsertExecutor<E: EntityKind> {
79    db: Db<E::Canister>,
80    debug: bool,
81    _marker: PhantomData<E>,
82}
83
84impl<E: EntityKind> UpsertExecutor<E>
85where
86    E::PrimaryKey: FromKey,
87{
88    /// Construct a new upsert executor.
89    #[must_use]
90    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
91        Self {
92            db,
93            debug,
94            _marker: PhantomData,
95        }
96    }
97
98    /// Enable debug logging for subsequent upsert operations.
99    #[must_use]
100    pub const fn debug(mut self) -> Self {
101        self.debug = true;
102        self
103    }
104
105    /// Upsert using a unique index specification.
106    pub fn by_unique_index(&self, index: UniqueIndexHandle, entity: E) -> Result<E, RuntimeError> {
107        self.upsert(index.index(), entity)
108    }
109
110    /// Upsert using a unique index specification with a merge closure.
111    pub fn by_unique_index_merge<F>(
112        &self,
113        index: UniqueIndexHandle,
114        entity: E,
115        merge: F,
116    ) -> Result<E, RuntimeError>
117    where
118        F: FnOnce(E, E) -> E,
119    {
120        Ok(self
121            .by_unique_index_merge_result(index, entity, merge)?
122            .entity)
123    }
124
125    /// Upsert using a unique index specification with a merge closure, returning an insert/update flag.
126    pub fn by_unique_index_merge_result<F>(
127        &self,
128        index: UniqueIndexHandle,
129        entity: E,
130        merge: F,
131    ) -> Result<UpsertResult<E>, RuntimeError>
132    where
133        F: FnOnce(E, E) -> E,
134    {
135        self.upsert_merge_result(index.index(), entity, merge)
136    }
137
138    /// Upsert using a unique index specification, returning an insert/update flag.
139    pub fn by_unique_index_result(
140        &self,
141        index: UniqueIndexHandle,
142        entity: E,
143    ) -> Result<UpsertResult<E>, RuntimeError> {
144        self.upsert_result(index.index(), entity)
145    }
146
147    /// Upsert using a unique index identified by its field list.
148    pub fn by_unique_fields(&self, fields: &[&str], entity: E) -> Result<E, RuntimeError> {
149        let index = UniqueIndexHandle::for_fields::<E>(fields)?;
150        self.upsert(index.index(), entity)
151    }
152
153    /// Upsert using a unique index identified by its field list with a merge closure.
154    pub fn by_unique_fields_merge<F>(
155        &self,
156        fields: &[&str],
157        entity: E,
158        merge: F,
159    ) -> Result<E, RuntimeError>
160    where
161        F: FnOnce(E, E) -> E,
162    {
163        Ok(self
164            .by_unique_fields_merge_result(fields, entity, merge)?
165            .entity)
166    }
167
168    /// Upsert using a unique index identified by its field list with a merge closure, returning an insert/update flag.
169    pub fn by_unique_fields_merge_result<F>(
170        &self,
171        fields: &[&str],
172        entity: E,
173        merge: F,
174    ) -> Result<UpsertResult<E>, RuntimeError>
175    where
176        F: FnOnce(E, E) -> E,
177    {
178        let index = UniqueIndexHandle::for_fields::<E>(fields)?;
179        self.upsert_merge_result(index.index(), entity, merge)
180    }
181
182    /// Upsert using a unique index identified by its field list, returning an insert/update flag.
183    pub fn by_unique_fields_result(
184        &self,
185        fields: &[&str],
186        entity: E,
187    ) -> Result<UpsertResult<E>, RuntimeError> {
188        let index = UniqueIndexHandle::for_fields::<E>(fields)?;
189        self.upsert_result(index.index(), entity)
190    }
191
192    ///
193    /// --------------------------------- PRIVATE METHODS ------------------------------------------------
194    ///
195
196    /// Compute the lookup entity (sanitized) and resolve the existing pk for the given unique index.
197    ///
198    /// We sanitize the lookup copy to ensure the index key is derived from the canonical (sanitized)
199    /// representation of the unique fields.
200    fn resolve_existing_pk(
201        &self,
202        index: &'static IndexSpec,
203        entity: &E,
204    ) -> Result<Option<E::PrimaryKey>, RuntimeError> {
205        let mut lookup = entity.clone();
206        sanitize(&mut lookup)?;
207        resolve_unique_pk::<E>(&self.db, index, &lookup)
208    }
209
210    fn upsert(&self, index: &'static IndexSpec, entity: E) -> Result<E, RuntimeError> {
211        Ok(self.upsert_result(index, entity)?.entity)
212    }
213
214    fn upsert_result(
215        &self,
216        index: &'static IndexSpec,
217        entity: E,
218    ) -> Result<UpsertResult<E>, RuntimeError> {
219        let existing_pk = self.resolve_existing_pk(index, &entity)?;
220        let inserted = existing_pk.is_none();
221
222        // Keep saver construction local to avoid type/lifetime issues in helpers.
223        let saver = SaveExecutor::new(self.db, self.debug);
224
225        let entity = match existing_pk {
226            Some(pk) => {
227                let mut entity = entity;
228                entity.set_primary_key(pk);
229                saver.update(entity)?
230            }
231            None => saver.insert(entity)?,
232        };
233
234        Ok(UpsertResult { entity, inserted })
235    }
236
237    fn upsert_merge_result<F>(
238        &self,
239        index: &'static IndexSpec,
240        entity: E,
241        merge: F,
242    ) -> Result<UpsertResult<E>, RuntimeError>
243    where
244        F: FnOnce(E, E) -> E,
245    {
246        let existing_pk = self.resolve_existing_pk(index, &entity)?;
247
248        // Keep saver construction local to avoid type/lifetime issues in helpers.
249        let saver = SaveExecutor::new(self.db, self.debug);
250
251        let result = if let Some(pk) = existing_pk {
252            // Load existing entity by pk and merge caller's entity into it.
253            let existing = self.load_existing(index, pk)?;
254            let mut merged = merge(existing, entity);
255            merged.set_primary_key(pk);
256
257            let entity = saver.update(merged)?;
258            UpsertResult {
259                entity,
260                inserted: false,
261            }
262        } else {
263            let entity = saver.insert(entity)?;
264            UpsertResult {
265                entity,
266                inserted: true,
267            }
268        };
269
270        Ok(result)
271    }
272
273    fn load_existing(
274        &self,
275        index: &'static IndexSpec,
276        pk: E::PrimaryKey,
277    ) -> Result<E, RuntimeError> {
278        let data_key = DataKey::new::<E>(pk.into());
279        let bytes = self
280            .db
281            .context::<E>()
282            .with_store(|store| store.get(&data_key))?;
283
284        let Some(bytes) = bytes else {
285            // Index pointed at a key that does not exist in the primary store.
286            return Err(ExecutorError::IndexCorrupted(
287                E::PATH.to_string(),
288                index.fields.join(", "),
289                1,
290            )
291            .into());
292        };
293
294        deserialize::<E>(&bytes)
295    }
296}