Skip to main content

modelvault_core/db/
write.rs

1//! Row insert/delete and transaction staging.
2
3use std::collections::BTreeMap;
4use std::sync::Arc;
5
6use crate::catalog::CatalogRecordWire;
7use crate::error::{DbError, FormatError, SchemaError, TransactionError};
8use crate::index::{encode_index_payload, IndexOp};
9use crate::record::{
10    encode_record_payload_v2_op, encode_record_payload_v3_op, non_pk_defs_in_order, RowValue,
11    ScalarValue, OP_DELETE, OP_REPLACE,
12};
13use crate::schema::{CollectionId, FieldDef};
14use crate::storage::Store;
15
16use super::{
17    handle_registry, index_deletes_for_existing_row, plan_insert_row, row_value_at_path_segments,
18    Database,
19};
20
21impl<S: Store> Database<S> {
22    pub(crate) fn next_txn_id(&mut self) -> u64 {
23        self.txn_seq = self.txn_seq.saturating_add(1);
24        self.txn_seq
25    }
26
27    #[inline]
28    pub(crate) fn commit_write_batch(
29        &mut self,
30        txn_id: u64,
31        body: &[(crate::segments::header::SegmentType, &[u8])],
32    ) -> Result<(), DbError> {
33        super::segment_write::commit_write_txn_v6(
34            &mut self.store,
35            self.segment_start,
36            &mut self.format_minor,
37            txn_id,
38            body,
39        )
40    }
41
42    #[inline]
43    pub(crate) fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
44        self.catalog.apply_record(wire)
45    }
46
47    /// Run `f` inside a multi-write transaction: durable segments are written on success.
48    ///
49    /// On error, staged work is discarded and nothing new is appended to the log.
50    pub fn transaction<R>(
51        &mut self,
52        f: impl FnOnce(&mut Self) -> Result<R, DbError>,
53    ) -> Result<R, DbError> {
54        self.begin_transaction()?;
55        match f(self) {
56            Ok(v) => match self.commit_transaction() {
57                Ok(()) => Ok(v),
58                Err(e) => {
59                    self.rollback_transaction();
60                    Err(e)
61                }
62            },
63            Err(e) => {
64                self.rollback_transaction();
65                Err(e)
66            }
67        }
68    }
69
70    /// Start a transaction (for bindings that cannot use the closure API). Pairs with
71    /// [`Self::commit_transaction`] or [`Self::rollback_transaction`].
72    pub fn begin_transaction(&mut self) -> Result<(), DbError> {
73        if self.txn_staging.is_some() {
74            return Err(DbError::Transaction(TransactionError::NestedTransaction));
75        }
76        let tid = self.next_txn_id();
77        self.txn_staging = Some(super::TxnStaging {
78            txn_id: tid,
79            shadow_catalog: self.catalog.clone(),
80            shadow_latest: self.latest.clone(),
81            shadow_indexes: self.indexes.clone(),
82            pending: Vec::new(),
83        });
84        Ok(())
85    }
86
87    /// Commit the active transaction started with [`Self::begin_transaction`].
88    pub fn commit_transaction(&mut self) -> Result<(), DbError> {
89        self.commit_txn_staging()
90    }
91
92    /// Discard the active transaction without writing to the log.
93    pub fn rollback_transaction(&mut self) {
94        self.txn_staging = None;
95    }
96
97    fn commit_txn_staging(&mut self) -> Result<(), DbError> {
98        let Some(st) = self.txn_staging.take() else {
99            return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
100        };
101        if st.pending.is_empty() {
102            self.catalog = st.shadow_catalog;
103            self.latest = st.shadow_latest;
104            self.indexes = st.shadow_indexes;
105            return Ok(());
106        }
107        let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
108            st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
109        self.commit_write_batch(st.txn_id, &batch)?;
110        self.catalog = st.shadow_catalog;
111        self.latest = st.shadow_latest;
112        self.indexes = st.shadow_indexes;
113        self.push_shared_mirror();
114        Ok(())
115    }
116
117    pub(crate) fn push_shared_mirror(&mut self) {
118        let Some(ref shared) = self.shared_mirror else {
119            return;
120        };
121        if let Ok(mut g) = shared.write() {
122            let generation = g.generation.saturating_add(1);
123            *g = Arc::new(handle_registry::SharedDbState {
124                catalog: self.catalog.clone(),
125                latest: self.latest.clone(),
126                indexes: self.indexes.clone(),
127                segment_start: self.segment_start,
128                format_minor: self.format_minor,
129                generation,
130            });
131        }
132    }
133
134    /// Insert or replace the row for `collection_id` identified by its primary-key field.
135    ///
136    /// `row` maps **top-level** field names to [`RowValue`]. The primary key field must be present.
137    /// Only single-segment field paths are supported in 0.6.x.
138    pub fn insert(
139        &mut self,
140        collection_id: CollectionId,
141        row: BTreeMap<String, RowValue>,
142    ) -> Result<(), DbError> {
143        if self.read_only_attached {
144            return Err(DbError::Io(std::io::Error::new(
145                std::io::ErrorKind::PermissionDenied,
146                "database opened read-only",
147            )));
148        }
149        super::segment_write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
150        let (mut payload, full, mut index_entries, pk_scalar) =
151            plan_insert_row(self.catalog_for_read(), collection_id, row)?;
152        #[cfg(test)]
153        let mut full = full;
154        let existing = self
155            .latest_for_read()
156            .get(&(collection_id.0, full.0.clone()))
157            .cloned();
158        if existing.is_some() {
159            #[cfg(test)]
160            if let Some(poison) = self.test_poison_planned_replace_row.take() {
161                poison(collection_id, &mut full.1);
162            }
163            // Re-encode with explicit replace opcode.
164            let col = self
165                .catalog_for_read()
166                .get(collection_id)
167                .ok_or(DbError::Schema(SchemaError::UnknownCollection {
168                    id: collection_id.0,
169                }))?;
170            let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
171            let pk_name =
172                col.primary_field
173                    .as_deref()
174                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
175                        collection_id: collection_id.0,
176                    }))?;
177            let pk_def = col
178                .fields
179                .iter()
180                .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
181                .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
182                    name: pk_name.to_string(),
183                }))?;
184
185            let non_pk_defs = if has_multi_segment_schema {
186                col.fields
187                    .iter()
188                    .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
189                    .collect::<Vec<_>>()
190            } else {
191                non_pk_defs_in_order(&col.fields, pk_name)
192            };
193            let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
194            for def in &non_pk_defs {
195                let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
196                non_pk.push(((*def).clone(), v));
197            }
198            payload = (if has_multi_segment_schema {
199                encode_record_payload_v3_op(
200                    collection_id.0,
201                    col.current_version.0,
202                    OP_REPLACE,
203                    &pk_scalar,
204                    &pk_def.ty,
205                    &non_pk,
206                )
207            } else {
208                encode_record_payload_v2_op(
209                    collection_id.0,
210                    col.current_version.0,
211                    OP_REPLACE,
212                    &pk_scalar,
213                    &pk_def.ty,
214                    &non_pk,
215                )
216            })?;
217            // Prepend index deletes for any existing row.
218            if let Some(ref old_row) = existing {
219                let mut deletes = index_deletes_for_existing_row(
220                    collection_id,
221                    &pk_scalar,
222                    &col.indexes,
223                    old_row,
224                );
225                deletes.append(&mut index_entries);
226                index_entries = deletes;
227            }
228        }
229        for e in &index_entries {
230            if e.kind != crate::schema::IndexKind::Unique {
231                continue;
232            }
233            let Some(existing) =
234                self.indexes_for_read()
235                    .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
236            else {
237                continue;
238            };
239            if e.op != IndexOp::Insert {
240                continue;
241            }
242            if existing == e.pk_key.as_slice() {
243                continue;
244            }
245            return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
246        }
247        if let Some(st) = &mut self.txn_staging {
248            if !index_entries.is_empty() {
249                let b = encode_index_payload(&index_entries);
250                st.pending
251                    .push((crate::segments::header::SegmentType::Index, b));
252            }
253            st.pending.push((
254                crate::segments::header::SegmentType::Record,
255                payload.clone(),
256            ));
257            st.shadow_latest
258                .insert((collection_id.0, full.0.clone()), full.1.clone());
259            for e in index_entries {
260                st.shadow_indexes.apply(e)?;
261            }
262            return Ok(());
263        }
264        let tid = self.next_txn_id();
265        let index_bytes = if index_entries.is_empty() {
266            None
267        } else {
268            Some(encode_index_payload(&index_entries))
269        };
270        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
271        if let Some(ref b) = index_bytes {
272            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
273        }
274        batch.push((
275            crate::segments::header::SegmentType::Record,
276            payload.as_slice(),
277        ));
278        self.commit_write_batch(tid, &batch)?;
279        self.latest.insert((collection_id.0, full.0), full.1);
280        for e in index_entries {
281            self.indexes.apply(e)?;
282        }
283        self.push_shared_mirror();
284        Ok(())
285    }
286
287    /// Delete the row for `collection_id` identified by its primary key.
288    pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
289        if self.read_only_attached {
290            return Err(DbError::Io(std::io::Error::new(
291                std::io::ErrorKind::PermissionDenied,
292                "database opened read-only",
293            )));
294        }
295        super::segment_write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
296        let col = self
297            .catalog_for_read()
298            .get(collection_id)
299            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
300                id: collection_id.0,
301            }))?;
302        let pk_name =
303            col.primary_field
304                .as_deref()
305                .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
306                    collection_id: collection_id.0,
307                }))?;
308        let pk_def = col
309            .fields
310            .iter()
311            .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
312            .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
313                name: pk_name.to_string(),
314            }))?;
315        if !pk.ty_matches(&pk_def.ty) {
316            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
317        }
318        let pk_key = pk.canonical_key_bytes();
319        let existing = self
320            .latest_for_read()
321            .get(&(collection_id.0, pk_key.clone()))
322            .cloned();
323        let Some(old_row) = existing else {
324            return Ok(());
325        };
326        let indexes = col.indexes.clone();
327        let schema_ver = col.current_version.0;
328        let pk_ty = pk_def.ty.clone();
329        let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
330
331        let mut index_entries =
332            index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
333        #[cfg(not(test))]
334        let pk_for_record = pk.clone();
335        #[cfg(test)]
336        let pk_for_record = {
337            let mut p = pk.clone();
338            if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
339                p = poison(p);
340            }
341            p
342        };
343        let record_payload = (if has_multi_segment_schema {
344            encode_record_payload_v3_op(
345                collection_id.0,
346                schema_ver,
347                OP_DELETE,
348                &pk_for_record,
349                &pk_ty,
350                &[],
351            )
352        } else {
353            encode_record_payload_v2_op(
354                collection_id.0,
355                schema_ver,
356                OP_DELETE,
357                &pk_for_record,
358                &pk_ty,
359                &[],
360            )
361        })?;
362
363        if let Some(st) = &mut self.txn_staging {
364            if !index_entries.is_empty() {
365                let b = encode_index_payload(&index_entries);
366                st.pending
367                    .push((crate::segments::header::SegmentType::Index, b));
368            }
369            st.pending.push((
370                crate::segments::header::SegmentType::Record,
371                record_payload.clone(),
372            ));
373            st.shadow_latest.remove(&(collection_id.0, pk_key));
374            for e in index_entries.drain(..) {
375                st.shadow_indexes.apply(e)?;
376            }
377            return Ok(());
378        }
379
380        let tid = self.next_txn_id();
381        let index_bytes = if index_entries.is_empty() {
382            None
383        } else {
384            Some(encode_index_payload(&index_entries))
385        };
386        let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
387        if let Some(ref b) = index_bytes {
388            batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
389        }
390        batch.push((
391            crate::segments::header::SegmentType::Record,
392            record_payload.as_slice(),
393        ));
394        self.commit_write_batch(tid, &batch)?;
395        self.latest.remove(&(collection_id.0, pk_key));
396        for e in index_entries {
397            self.indexes.apply(e)?;
398        }
399        self.push_shared_mirror();
400        Ok(())
401    }
402}