turso_core 0.6.1

The Turso database library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
// Simplified DBSP integration for incremental view maintenance
// For now, we'll use a basic approach and can expand to full DBSP later

use crate::numeric::Numeric;
use crate::Value;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};

/// A 128-bit hash value implemented as a UUID
/// We use UUID because it's a standard 128-bit type we already depend on
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Hash128 {
    // Store as UUID internally for efficient 128-bit representation
    uuid: uuid::Uuid,
}

impl Hash128 {
    /// Create a new 128-bit hash from high and low 64-bit parts
    pub fn new(high: u64, low: u64) -> Self {
        // Convert two u64 values to UUID bytes (big-endian)
        let mut bytes = [0u8; 16];
        bytes[0..8].copy_from_slice(&high.to_be_bytes());
        bytes[8..16].copy_from_slice(&low.to_be_bytes());
        Self {
            uuid: uuid::Uuid::from_bytes(bytes),
        }
    }

    /// Get the low 64 bits as i64 (for when we need a rowid)
    pub fn as_i64(&self) -> i64 {
        let bytes = self.uuid.as_bytes();
        let low = u64::from_be_bytes([
            bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
        ]);
        low as i64
    }

    /// Compute a 128-bit hash of the given values
    /// We serialize values to a string representation and use UUID v5 (SHA-1 based)
    /// to get a deterministic 128-bit hash
    pub fn hash_values(values: &[Value]) -> Self {
        // Build a string representation of all values
        // Use a delimiter that won't appear in normal values
        let mut s = String::new();
        for (i, value) in values.iter().enumerate() {
            if i > 0 {
                s.push('\x00'); // null byte as delimiter
            }
            // Add type prefix to distinguish between types
            match value {
                Value::Null => s.push_str("N:"),
                Value::Numeric(Numeric::Integer(n)) => {
                    s.push_str("I:");
                    s.push_str(&n.to_string());
                }
                Value::Numeric(Numeric::Float(f)) => {
                    s.push_str("F:");
                    // Use to_bits to ensure consistent representation
                    s.push_str(&f64::from(*f).to_bits().to_string());
                }
                Value::Text(t) => {
                    s.push_str("T:");
                    s.push_str(t.as_str());
                }
                Value::Blob(b) => {
                    s.push_str("B:");
                    s.push_str(&hex::encode(b));
                }
            }
        }

        Self::hash_str(&s)
    }

    /// Hash a string value to 128 bits using UUID v5
    pub fn hash_str(s: &str) -> Self {
        // Use UUID v5 with a fixed namespace to get deterministic 128-bit hashes
        // We use the DNS namespace as it's a standard choice
        let uuid = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_DNS, s.as_bytes());
        Self { uuid }
    }

    /// Convert to a big-endian byte array for storage
    pub fn to_blob(self) -> Vec<u8> {
        self.uuid.as_bytes().to_vec()
    }

    /// Create from a big-endian byte array
    pub fn from_blob(bytes: &[u8]) -> Option<Self> {
        if bytes.len() != 16 {
            return None;
        }

        let mut uuid_bytes = [0u8; 16];
        uuid_bytes.copy_from_slice(bytes);
        Some(Self {
            uuid: uuid::Uuid::from_bytes(uuid_bytes),
        })
    }

    /// Convert to a Value::Blob for storage
    pub fn to_value(self) -> Value {
        Value::Blob(self.to_blob())
    }

    /// Try to extract a Hash128 from a Value
    pub fn from_value(value: &Value) -> Option<Self> {
        match value {
            Value::Blob(b) => Self::from_blob(b),
            _ => None,
        }
    }
}

impl std::fmt::Display for Hash128 {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.uuid)
    }
}

// The DBSP paper uses as a key the whole record, with both the row key and the values.  This is a
// bit confuses for us in databases, because when you say "key", it is easy to understand that as
// being the row key.
//
// Empirically speaking, using row keys as the ZSet keys will waste a competent but not brilliant
// engineer around 82 and 88 hours, depending on how you count. Hours that are never coming back.
//
// One of the situations in which using row keys completely breaks are table updates. If the "key"
// is the row key, let's say "5", then an update is a delete + insert. Imagine a table that had k =
// 5, v = 5, and a view that filters v > 2.
//
// Now we will do an update that changes v => 1. If the "key" is 5, then inside the Delta set, we
// will have (5, weight = -1), (5, weight = +1), and the whole thing just disappears. The Delta
// set, therefore, has to contain ((5, 5), weight = -1), ((5, 1), weight = +1).
//
// It is theoretically possible to use the rowkey in the ZSet and then use a hash of key ->
// Vec(changes) in the Delta set. But deviating from the paper here is just asking for trouble, as
// I am sure it would break somewhere else.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HashableRow {
    pub rowid: i64,
    pub values: Vec<Value>,
    // Pre-computed hash: DBSP rows are immutable and frequently hashed during joins,
    // making caching worthwhile despite the memory overhead
    cached_hash: Hash128,
}

impl HashableRow {
    pub fn new(rowid: i64, values: Vec<Value>) -> Self {
        let cached_hash = Self::compute_hash(rowid, &values);
        Self {
            rowid,
            values,
            cached_hash,
        }
    }

    fn compute_hash(rowid: i64, values: &[Value]) -> Hash128 {
        // Include rowid in the hash by prepending it to values
        let mut all_values = Vec::with_capacity(values.len() + 1);
        all_values.push(Value::from_i64(rowid));
        all_values.extend_from_slice(values);
        Hash128::hash_values(&all_values)
    }

    pub fn cached_hash(&self) -> Hash128 {
        self.cached_hash
    }
}

impl Hash for HashableRow {
    fn hash<H: Hasher>(&self, state: &mut H) {
        // Hash the 128-bit value by hashing both parts
        self.cached_hash.to_blob().hash(state);
    }
}

impl PartialOrd for HashableRow {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for HashableRow {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        // First compare by rowid, then by values if rowids are equal
        // This ensures Ord is consistent with Eq (which compares all fields)
        match self.rowid.cmp(&other.rowid) {
            std::cmp::Ordering::Equal => {
                // If rowids are equal, compare values to maintain consistency with Eq
                self.values.cmp(&other.values)
            }
            other => other,
        }
    }
}

type DeltaEntry = (HashableRow, isize);
/// A delta represents ordered changes to data
#[derive(Debug, Clone, Default)]
pub struct Delta {
    /// Ordered list of changes: (row, weight) where weight is +1 for insert, -1 for delete
    /// It is crucial that this is ordered. Imagine the case of an update, which becomes a delete +
    /// insert. If this is not ordered, it would be applied in arbitrary order and break the view.
    pub changes: Vec<DeltaEntry>,
}

impl Delta {
    pub fn new() -> Self {
        Self {
            changes: Vec::new(),
        }
    }

    pub fn insert(&mut self, row_key: i64, values: Vec<Value>) {
        let row = HashableRow::new(row_key, values);
        self.changes.push((row, 1));
    }

    pub fn delete(&mut self, row_key: i64, values: Vec<Value>) {
        let row = HashableRow::new(row_key, values);
        self.changes.push((row, -1));
    }

    pub fn is_empty(&self) -> bool {
        self.changes.is_empty()
    }

    pub fn len(&self) -> usize {
        self.changes.len()
    }

    /// Merge another delta into this one
    /// This preserves the order of operations - no consolidation is done
    /// to maintain the full history of changes
    pub fn merge(&mut self, other: &Delta) {
        // Simply append all changes from other, preserving order
        self.changes.extend(other.changes.iter().cloned());
    }

    /// Consolidate changes by combining entries with the same HashableRow
    pub fn consolidate(&mut self) {
        if self.changes.is_empty() {
            return;
        }

        // Use a HashMap to accumulate weights
        let mut consolidated: HashMap<HashableRow, isize> = HashMap::default();

        for (row, weight) in self.changes.drain(..) {
            *consolidated.entry(row).or_insert(0) += weight;
        }

        // Convert back to vec, filtering out zero weights
        self.changes = consolidated
            .into_iter()
            .filter(|(_, weight)| *weight != 0)
            .collect();
    }
}

/// A pair of deltas for operators that process two inputs
#[derive(Debug, Clone, Default)]
pub struct DeltaPair {
    pub left: Delta,
    pub right: Delta,
}

impl DeltaPair {
    /// Create a new delta pair
    pub fn new(left: Delta, right: Delta) -> Self {
        Self { left, right }
    }
}

impl From<Delta> for DeltaPair {
    /// Convert a single delta into a delta pair with empty right delta
    fn from(delta: Delta) -> Self {
        Self {
            left: delta,
            right: Delta::new(),
        }
    }
}

impl From<&Delta> for DeltaPair {
    /// Convert a delta reference into a delta pair with empty right delta
    fn from(delta: &Delta) -> Self {
        Self {
            left: delta.clone(),
            right: Delta::new(),
        }
    }
}

/// A simplified ZSet for incremental computation
/// Each element has a weight: positive for additions, negative for deletions
#[derive(Clone, Debug, Default)]
pub struct SimpleZSet<T> {
    data: BTreeMap<T, isize>,
}

#[allow(dead_code)]
impl<T: std::hash::Hash + Eq + Ord + Clone> SimpleZSet<T> {
    pub fn new() -> Self {
        Self {
            data: BTreeMap::new(),
        }
    }

    pub fn insert(&mut self, item: T, weight: isize) {
        let current = self.data.get(&item).copied().unwrap_or(0);
        let new_weight = current + weight;
        if new_weight == 0 {
            self.data.remove(&item);
        } else {
            self.data.insert(item, new_weight);
        }
    }

    pub fn iter(&self) -> impl Iterator<Item = (&T, isize)> {
        self.data.iter().map(|(k, &v)| (k, v))
    }

    /// Get all items with positive weights
    pub fn to_vec(&self) -> Vec<T> {
        self.data
            .iter()
            .filter(|(_, &weight)| weight > 0)
            .map(|(item, _)| item.clone())
            .collect()
    }

    pub fn merge(&mut self, other: &SimpleZSet<T>) {
        for (item, weight) in other.iter() {
            self.insert(item.clone(), weight);
        }
    }

    /// Get the weight for a specific item (0 if not present)
    pub fn get(&self, item: &T) -> isize {
        self.data.get(item).copied().unwrap_or(0)
    }

    /// Get the first element (smallest key) in the Z-set
    pub fn first(&self) -> Option<(&T, isize)> {
        self.data.iter().next().map(|(k, &v)| (k, v))
    }

    /// Get the last element (largest key) in the Z-set
    pub fn last(&self) -> Option<(&T, isize)> {
        self.data.iter().next_back().map(|(k, &v)| (k, v))
    }

    /// Get a range of elements
    pub fn range<R>(&self, range: R) -> impl Iterator<Item = (&T, isize)> + '_
    where
        R: std::ops::RangeBounds<T>,
    {
        self.data.range(range).map(|(k, &v)| (k, v))
    }

    /// Check if empty
    pub fn is_empty(&self) -> bool {
        self.data.is_empty()
    }

    /// Get the number of elements
    pub fn len(&self) -> usize {
        self.data.len()
    }
}

// Type aliases for convenience
pub type RowKey = HashableRow;
pub type RowKeyZSet = SimpleZSet<RowKey>;

impl RowKeyZSet {
    /// Create a Z-set from a Delta by consolidating all changes
    pub fn from_delta(delta: &Delta) -> Self {
        let mut zset = Self::new();

        // Add all changes from the delta, consolidating as we go
        for (row, weight) in &delta.changes {
            zset.insert(row.clone(), *weight);
        }

        zset
    }

    /// Seek to find ALL entries for the best matching rowid
    /// For GT/GE: returns all entries for the smallest rowid that satisfies the condition
    /// For LT/LE: returns all entries for the largest rowid that satisfies the condition
    /// Returns empty vec if no match found
    pub fn seek(&self, target: i64, op: crate::types::SeekOp) -> Vec<(HashableRow, isize)> {
        use crate::types::SeekOp;

        // First find the best matching rowid
        let best_rowid = match op {
            SeekOp::GT => {
                // Find smallest rowid > target
                self.data
                    .iter()
                    .filter(|(row, _)| row.rowid > target)
                    .map(|(row, _)| row.rowid)
                    .min()
            }
            SeekOp::GE { eq_only: false } => {
                // Find smallest rowid >= target
                self.data
                    .iter()
                    .filter(|(row, _)| row.rowid >= target)
                    .map(|(row, _)| row.rowid)
                    .min()
            }
            SeekOp::GE { eq_only: true } | SeekOp::LE { eq_only: true } => {
                // Need exact match
                if self.data.iter().any(|(row, _)| row.rowid == target) {
                    Some(target)
                } else {
                    None
                }
            }
            SeekOp::LT => {
                // Find largest rowid < target
                self.data
                    .iter()
                    .filter(|(row, _)| row.rowid < target)
                    .map(|(row, _)| row.rowid)
                    .max()
            }
            SeekOp::LE { eq_only: false } => {
                // Find largest rowid <= target
                self.data
                    .iter()
                    .filter(|(row, _)| row.rowid <= target)
                    .map(|(row, _)| row.rowid)
                    .max()
            }
        };

        // Now get ALL entries with that rowid
        match best_rowid {
            Some(rowid) => self
                .data
                .iter()
                .filter(|(row, _)| row.rowid == rowid)
                .map(|(k, &v)| (k.clone(), v))
                .collect(),
            None => Vec::new(),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_zset_merge_with_weights() {
        let mut zset1 = SimpleZSet::new();
        zset1.insert(1, 1); // Row 1 with weight +1
        zset1.insert(2, 1); // Row 2 with weight +1

        let mut zset2 = SimpleZSet::new();
        zset2.insert(2, -1); // Row 2 with weight -1 (delete)
        zset2.insert(3, 1); // Row 3 with weight +1 (insert)

        zset1.merge(&zset2);

        // Row 1: weight 1 (unchanged)
        // Row 2: weight 0 (deleted)
        // Row 3: weight 1 (inserted)
        assert_eq!(zset1.iter().count(), 2); // Only rows 1 and 3
        assert!(zset1.iter().any(|(k, _)| *k == 1));
        assert!(zset1.iter().any(|(k, _)| *k == 3));
        assert!(!zset1.iter().any(|(k, _)| *k == 2)); // Row 2 removed
    }

    #[test]
    fn test_zset_represents_updates_as_delete_plus_insert() {
        let mut zset = SimpleZSet::new();

        // Initial state
        zset.insert(1, 1);

        // Update row 1: delete old + insert new
        zset.insert(1, -1); // Delete old version
        zset.insert(1, 1); // Insert new version

        // Weight should be 1 (not 2)
        let weight = zset.iter().find(|(k, _)| **k == 1).map(|(_, w)| w);
        assert_eq!(weight, Some(1));
    }

    #[test]
    fn test_hashable_row_delta_operations() {
        let mut delta = Delta::new();

        // Test INSERT
        delta.insert(1, vec![Value::from_i64(1), Value::from_i64(100)]);
        assert_eq!(delta.len(), 1);

        // Test UPDATE (DELETE + INSERT) - order matters!
        delta.delete(1, vec![Value::from_i64(1), Value::from_i64(100)]);
        delta.insert(1, vec![Value::from_i64(1), Value::from_i64(200)]);
        assert_eq!(delta.len(), 3); // Should have 3 operations before consolidation

        // Verify order is preserved
        let ops: Vec<_> = delta.changes.iter().collect();
        assert_eq!(ops[0].1, 1); // First insert
        assert_eq!(ops[1].1, -1); // Delete
        assert_eq!(ops[2].1, 1); // Second insert

        // Test consolidation
        delta.consolidate();
        // After consolidation, the first insert and delete should cancel out
        // leaving only the second insert
        assert_eq!(delta.len(), 1);

        let final_row = &delta.changes[0];
        assert_eq!(final_row.0.rowid, 1);
        assert_eq!(
            final_row.0.values,
            vec![Value::from_i64(1), Value::from_i64(200)]
        );
        assert_eq!(final_row.1, 1);
    }

    #[test]
    fn test_duplicate_row_consolidation() {
        let mut delta = Delta::new();

        // Insert same row twice
        delta.insert(2, vec![Value::from_i64(2), Value::from_i64(300)]);
        delta.insert(2, vec![Value::from_i64(2), Value::from_i64(300)]);

        assert_eq!(delta.len(), 2);

        delta.consolidate();
        assert_eq!(delta.len(), 1);

        // Weight should be 2 (sum of both inserts)
        let final_row = &delta.changes[0];
        assert_eq!(final_row.0.rowid, 2);
        assert_eq!(final_row.1, 2);
    }
}