1use std::collections::BTreeMap;
4use std::sync::Arc;
5
6use crate::catalog::CatalogRecordWire;
7use crate::error::{DbError, FormatError, SchemaError, TransactionError};
8use crate::index::{encode_index_payload, IndexOp};
9use crate::record::{
10 encode_record_payload_v2_op, encode_record_payload_v3_op, non_pk_defs_in_order, RowValue,
11 ScalarValue, OP_DELETE, OP_REPLACE,
12};
13use crate::schema::{CollectionId, FieldDef};
14use crate::storage::Store;
15
16use super::{
17 handle_registry, index_deletes_for_existing_row, plan_insert_row, row_value_at_path_segments,
18 Database,
19};
20
21impl<S: Store> Database<S> {
22 pub(crate) fn next_txn_id(&mut self) -> u64 {
23 self.txn_seq = self.txn_seq.saturating_add(1);
24 self.txn_seq
25 }
26
27 #[inline]
28 pub(crate) fn commit_write_batch(
29 &mut self,
30 txn_id: u64,
31 body: &[(crate::segments::header::SegmentType, &[u8])],
32 ) -> Result<(), DbError> {
33 super::segment_write::commit_write_txn_v6(
34 &mut self.store,
35 self.segment_start,
36 &mut self.format_minor,
37 txn_id,
38 body,
39 )
40 }
41
42 #[inline]
43 pub(crate) fn apply_catalog_record(&mut self, wire: CatalogRecordWire) -> Result<(), DbError> {
44 self.catalog.apply_record(wire)
45 }
46
47 pub fn transaction<R>(
51 &mut self,
52 f: impl FnOnce(&mut Self) -> Result<R, DbError>,
53 ) -> Result<R, DbError> {
54 self.begin_transaction()?;
55 match f(self) {
56 Ok(v) => match self.commit_transaction() {
57 Ok(()) => Ok(v),
58 Err(e) => {
59 self.rollback_transaction();
60 Err(e)
61 }
62 },
63 Err(e) => {
64 self.rollback_transaction();
65 Err(e)
66 }
67 }
68 }
69
70 pub fn begin_transaction(&mut self) -> Result<(), DbError> {
73 if self.txn_staging.is_some() {
74 return Err(DbError::Transaction(TransactionError::NestedTransaction));
75 }
76 let tid = self.next_txn_id();
77 self.txn_staging = Some(super::TxnStaging {
78 txn_id: tid,
79 shadow_catalog: self.catalog.clone(),
80 shadow_latest: self.latest.clone(),
81 shadow_indexes: self.indexes.clone(),
82 pending: Vec::new(),
83 });
84 Ok(())
85 }
86
87 pub fn commit_transaction(&mut self) -> Result<(), DbError> {
89 self.commit_txn_staging()
90 }
91
92 pub fn rollback_transaction(&mut self) {
94 self.txn_staging = None;
95 }
96
97 fn commit_txn_staging(&mut self) -> Result<(), DbError> {
98 let Some(st) = self.txn_staging.take() else {
99 return Err(DbError::Transaction(TransactionError::NoActiveTransaction));
100 };
101 if st.pending.is_empty() {
102 self.catalog = st.shadow_catalog;
103 self.latest = st.shadow_latest;
104 self.indexes = st.shadow_indexes;
105 return Ok(());
106 }
107 let batch: Vec<(crate::segments::header::SegmentType, &[u8])> =
108 st.pending.iter().map(|(t, b)| (*t, b.as_slice())).collect();
109 self.commit_write_batch(st.txn_id, &batch)?;
110 self.catalog = st.shadow_catalog;
111 self.latest = st.shadow_latest;
112 self.indexes = st.shadow_indexes;
113 self.push_shared_mirror();
114 Ok(())
115 }
116
117 pub(crate) fn push_shared_mirror(&mut self) {
118 let Some(ref shared) = self.shared_mirror else {
119 return;
120 };
121 if let Ok(mut g) = shared.write() {
122 let generation = g.generation.saturating_add(1);
123 *g = Arc::new(handle_registry::SharedDbState {
124 catalog: self.catalog.clone(),
125 latest: self.latest.clone(),
126 indexes: self.indexes.clone(),
127 segment_start: self.segment_start,
128 format_minor: self.format_minor,
129 generation,
130 });
131 }
132 }
133
134 pub fn insert(
139 &mut self,
140 collection_id: CollectionId,
141 row: BTreeMap<String, RowValue>,
142 ) -> Result<(), DbError> {
143 if self.read_only_attached {
144 return Err(DbError::Io(std::io::Error::new(
145 std::io::ErrorKind::PermissionDenied,
146 "database opened read-only",
147 )));
148 }
149 super::segment_write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
150 let (mut payload, full, mut index_entries, pk_scalar) =
151 plan_insert_row(self.catalog_for_read(), collection_id, row)?;
152 #[cfg(test)]
153 let mut full = full;
154 let existing = self
155 .latest_for_read()
156 .get(&(collection_id.0, full.0.clone()))
157 .cloned();
158 if existing.is_some() {
159 #[cfg(test)]
160 if let Some(poison) = self.test_poison_planned_replace_row.take() {
161 poison(collection_id, &mut full.1);
162 }
163 let col = self
165 .catalog_for_read()
166 .get(collection_id)
167 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
168 id: collection_id.0,
169 }))?;
170 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
171 let pk_name =
172 col.primary_field
173 .as_deref()
174 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
175 collection_id: collection_id.0,
176 }))?;
177 let pk_def = col
178 .fields
179 .iter()
180 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
181 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
182 name: pk_name.to_string(),
183 }))?;
184
185 let non_pk_defs = if has_multi_segment_schema {
186 col.fields
187 .iter()
188 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
189 .collect::<Vec<_>>()
190 } else {
191 non_pk_defs_in_order(&col.fields, pk_name)
192 };
193 let mut non_pk: Vec<(FieldDef, RowValue)> = Vec::with_capacity(non_pk_defs.len());
194 for def in &non_pk_defs {
195 let v = row_value_at_path_segments(&full.1, &def.path.0).unwrap_or(RowValue::None);
196 non_pk.push(((*def).clone(), v));
197 }
198 payload = (if has_multi_segment_schema {
199 encode_record_payload_v3_op(
200 collection_id.0,
201 col.current_version.0,
202 OP_REPLACE,
203 &pk_scalar,
204 &pk_def.ty,
205 &non_pk,
206 )
207 } else {
208 encode_record_payload_v2_op(
209 collection_id.0,
210 col.current_version.0,
211 OP_REPLACE,
212 &pk_scalar,
213 &pk_def.ty,
214 &non_pk,
215 )
216 })?;
217 if let Some(ref old_row) = existing {
219 let mut deletes = index_deletes_for_existing_row(
220 collection_id,
221 &pk_scalar,
222 &col.indexes,
223 old_row,
224 );
225 deletes.append(&mut index_entries);
226 index_entries = deletes;
227 }
228 }
229 for e in &index_entries {
230 if e.kind != crate::schema::IndexKind::Unique {
231 continue;
232 }
233 let Some(existing) =
234 self.indexes_for_read()
235 .unique_lookup(e.collection_id, &e.index_name, &e.index_key)
236 else {
237 continue;
238 };
239 if e.op != IndexOp::Insert {
240 continue;
241 }
242 if existing == e.pk_key.as_slice() {
243 continue;
244 }
245 return Err(DbError::Schema(SchemaError::UniqueIndexViolation));
246 }
247 if let Some(st) = &mut self.txn_staging {
248 if !index_entries.is_empty() {
249 let b = encode_index_payload(&index_entries);
250 st.pending
251 .push((crate::segments::header::SegmentType::Index, b));
252 }
253 st.pending.push((
254 crate::segments::header::SegmentType::Record,
255 payload.clone(),
256 ));
257 st.shadow_latest
258 .insert((collection_id.0, full.0.clone()), full.1.clone());
259 for e in index_entries {
260 st.shadow_indexes.apply(e)?;
261 }
262 return Ok(());
263 }
264 let tid = self.next_txn_id();
265 let index_bytes = if index_entries.is_empty() {
266 None
267 } else {
268 Some(encode_index_payload(&index_entries))
269 };
270 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
271 if let Some(ref b) = index_bytes {
272 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
273 }
274 batch.push((
275 crate::segments::header::SegmentType::Record,
276 payload.as_slice(),
277 ));
278 self.commit_write_batch(tid, &batch)?;
279 self.latest.insert((collection_id.0, full.0), full.1);
280 for e in index_entries {
281 self.indexes.apply(e)?;
282 }
283 self.push_shared_mirror();
284 Ok(())
285 }
286
287 pub fn delete(&mut self, collection_id: CollectionId, pk: &ScalarValue) -> Result<(), DbError> {
289 if self.read_only_attached {
290 return Err(DbError::Io(std::io::Error::new(
291 std::io::ErrorKind::PermissionDenied,
292 "database opened read-only",
293 )));
294 }
295 super::segment_write::ensure_header_v0_5(&mut self.store, &mut self.format_minor)?;
296 let col = self
297 .catalog_for_read()
298 .get(collection_id)
299 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
300 id: collection_id.0,
301 }))?;
302 let pk_name =
303 col.primary_field
304 .as_deref()
305 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
306 collection_id: collection_id.0,
307 }))?;
308 let pk_def = col
309 .fields
310 .iter()
311 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
312 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
313 name: pk_name.to_string(),
314 }))?;
315 if !pk.ty_matches(&pk_def.ty) {
316 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
317 }
318 let pk_key = pk.canonical_key_bytes();
319 let existing = self
320 .latest_for_read()
321 .get(&(collection_id.0, pk_key.clone()))
322 .cloned();
323 let Some(old_row) = existing else {
324 return Ok(());
325 };
326 let indexes = col.indexes.clone();
327 let schema_ver = col.current_version.0;
328 let pk_ty = pk_def.ty.clone();
329 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
330
331 let mut index_entries =
332 index_deletes_for_existing_row(collection_id, pk, &indexes, &old_row);
333 #[cfg(not(test))]
334 let pk_for_record = pk.clone();
335 #[cfg(test)]
336 let pk_for_record = {
337 let mut p = pk.clone();
338 if let Some(poison) = self.test_poison_delete_encode_scalar.take() {
339 p = poison(p);
340 }
341 p
342 };
343 let record_payload = (if has_multi_segment_schema {
344 encode_record_payload_v3_op(
345 collection_id.0,
346 schema_ver,
347 OP_DELETE,
348 &pk_for_record,
349 &pk_ty,
350 &[],
351 )
352 } else {
353 encode_record_payload_v2_op(
354 collection_id.0,
355 schema_ver,
356 OP_DELETE,
357 &pk_for_record,
358 &pk_ty,
359 &[],
360 )
361 })?;
362
363 if let Some(st) = &mut self.txn_staging {
364 if !index_entries.is_empty() {
365 let b = encode_index_payload(&index_entries);
366 st.pending
367 .push((crate::segments::header::SegmentType::Index, b));
368 }
369 st.pending.push((
370 crate::segments::header::SegmentType::Record,
371 record_payload.clone(),
372 ));
373 st.shadow_latest.remove(&(collection_id.0, pk_key));
374 for e in index_entries.drain(..) {
375 st.shadow_indexes.apply(e)?;
376 }
377 return Ok(());
378 }
379
380 let tid = self.next_txn_id();
381 let index_bytes = if index_entries.is_empty() {
382 None
383 } else {
384 Some(encode_index_payload(&index_entries))
385 };
386 let mut batch: Vec<(crate::segments::header::SegmentType, &[u8])> = Vec::new();
387 if let Some(ref b) = index_bytes {
388 batch.push((crate::segments::header::SegmentType::Index, b.as_slice()));
389 }
390 batch.push((
391 crate::segments::header::SegmentType::Record,
392 record_payload.as_slice(),
393 ));
394 self.commit_write_batch(tid, &batch)?;
395 self.latest.remove(&(collection_id.0, pk_key));
396 for e in index_entries {
397 self.indexes.apply(e)?;
398 }
399 self.push_shared_mirror();
400 Ok(())
401 }
402}