sochdb-kernel 2.0.8

SochDB Kernel - Minimal ACID core with plugin architecture
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
// SPDX-License-Identifier: AGPL-3.0-or-later
// SochDB - LLM-Optimized Embedded Database
// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Transaction Management
//!
//! Core transaction manager with MVCC support.
//! This is the minimal ACID transaction implementation for the kernel.

use crate::error::{KernelError, KernelResult, TransactionErrorKind};
use crate::wal::LogSequenceNumber;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

/// Transaction identifier
pub type TransactionId = u64;

/// Timestamp for MVCC
pub type Timestamp = u64;

/// Isolation level
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IsolationLevel {
    /// Read uncommitted - sees uncommitted changes (rarely used)
    ReadUncommitted,
    /// Read committed - only sees committed changes
    ReadCommitted,
    /// Repeatable read - snapshot at first read
    RepeatableRead,
    /// Snapshot isolation - snapshot at transaction start
    #[default]
    SnapshotIsolation,
    /// Serializable - full serializability via SSI
    Serializable,
}

/// Transaction state
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionState {
    /// Transaction is active
    Active,
    /// Transaction is preparing to commit
    Preparing,
    /// Transaction is committed
    Committed,
    /// Transaction is aborted
    Aborted,
}

/// Transaction metadata
#[derive(Debug)]
struct TransactionInfo {
    /// Transaction ID
    id: TransactionId,
    /// Transaction state
    state: TransactionState,
    /// Snapshot timestamp (for MVCC visibility)
    snapshot_ts: Timestamp,
    /// Commit timestamp (set on commit)
    commit_ts: Option<Timestamp>,
    /// Isolation level
    isolation: IsolationLevel,
    /// Start time (for timeout detection)
    start_time: Instant,
    /// Last LSN written by this transaction
    last_lsn: Option<LogSequenceNumber>,
    /// Read set (for SSI conflict detection)
    read_set: Vec<(u32, u64)>, // (table_id, row_id)
    /// Write set (for conflict detection)
    write_set: Vec<(u32, u64)>, // (table_id, row_id)
}

/// Transaction manager
///
/// Manages transaction lifecycle and MVCC timestamps.
pub struct TxnManager {
    /// Next transaction ID
    next_txn_id: AtomicU64,
    /// Current timestamp (logical clock)
    current_ts: AtomicU64,
    /// Active transactions
    active_txns: RwLock<HashMap<TransactionId, TransactionInfo>>,
    /// Transaction timeout
    timeout: Duration,
    /// Lock for commit ordering
    commit_lock: Mutex<()>,
}

impl Default for TxnManager {
    fn default() -> Self {
        Self::new()
    }
}

impl TxnManager {
    /// Create a new transaction manager
    pub fn new() -> Self {
        Self::with_timeout(Duration::from_secs(60))
    }

    /// Create with custom timeout
    pub fn with_timeout(timeout: Duration) -> Self {
        Self {
            next_txn_id: AtomicU64::new(1),
            current_ts: AtomicU64::new(1),
            active_txns: RwLock::new(HashMap::new()),
            timeout,
            commit_lock: Mutex::new(()),
        }
    }

    /// Begin a new transaction with default isolation
    pub fn begin(&self) -> TransactionId {
        self.begin_with_isolation(IsolationLevel::default())
    }

    /// Begin a new transaction with specific isolation level
    pub fn begin_with_isolation(&self, isolation: IsolationLevel) -> TransactionId {
        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
        let snapshot_ts = self.current_ts.load(Ordering::SeqCst);

        let info = TransactionInfo {
            id: txn_id,
            state: TransactionState::Active,
            snapshot_ts,
            commit_ts: None,
            isolation,
            start_time: Instant::now(),
            last_lsn: None,
            read_set: Vec::new(),
            write_set: Vec::new(),
        };

        self.active_txns.write().insert(txn_id, info);
        txn_id
    }

    /// Commit a transaction
    pub fn commit(&self, txn_id: TransactionId) -> KernelResult<Timestamp> {
        // Acquire commit lock for ordering
        let _guard = self.commit_lock.lock();

        let mut txns = self.active_txns.write();

        // First check state and get necessary info
        let (current_state, isolation, read_set, write_set) = {
            let info = txns.get(&txn_id).ok_or(KernelError::Transaction {
                kind: TransactionErrorKind::NotFound(txn_id),
            })?;
            (
                info.state,
                info.isolation,
                info.read_set.clone(),
                info.write_set.clone(),
            )
        };

        match current_state {
            TransactionState::Active | TransactionState::Preparing => {
                // Check for SSI conflicts if serializable (using cloned data)
                if isolation == IsolationLevel::Serializable {
                    self.check_serialization_conflicts_cloned(&read_set, &write_set)?;
                }

                // Now get mutable reference and update
                let info = txns.get_mut(&txn_id).unwrap();

                // Allocate commit timestamp
                let commit_ts = self.current_ts.fetch_add(1, Ordering::SeqCst);
                info.commit_ts = Some(commit_ts);
                info.state = TransactionState::Committed;

                Ok(commit_ts)
            }
            TransactionState::Committed => Err(KernelError::Transaction {
                kind: TransactionErrorKind::AlreadyCommitted,
            }),
            TransactionState::Aborted => Err(KernelError::Transaction {
                kind: TransactionErrorKind::AlreadyAborted,
            }),
        }
    }

    /// Abort a transaction
    pub fn abort(&self, txn_id: TransactionId) -> KernelResult<()> {
        let mut txns = self.active_txns.write();
        let info = txns.get_mut(&txn_id).ok_or(KernelError::Transaction {
            kind: TransactionErrorKind::NotFound(txn_id),
        })?;

        match info.state {
            TransactionState::Active | TransactionState::Preparing => {
                info.state = TransactionState::Aborted;
                Ok(())
            }
            TransactionState::Committed => Err(KernelError::Transaction {
                kind: TransactionErrorKind::AlreadyCommitted,
            }),
            TransactionState::Aborted => Ok(()), // Idempotent
        }
    }

    /// Check if a transaction is active
    pub fn is_active(&self, txn_id: TransactionId) -> bool {
        self.active_txns
            .read()
            .get(&txn_id)
            .map(|info| info.state == TransactionState::Active)
            .unwrap_or(false)
    }

    /// Get snapshot timestamp for a transaction
    pub fn snapshot_ts(&self, txn_id: TransactionId) -> KernelResult<Timestamp> {
        self.active_txns
            .read()
            .get(&txn_id)
            .map(|info| info.snapshot_ts)
            .ok_or(KernelError::Transaction {
                kind: TransactionErrorKind::NotFound(txn_id),
            })
    }

    /// Record a read operation (for SSI)
    pub fn record_read(&self, txn_id: TransactionId, table_id: u32, row_id: u64) {
        if let Some(info) = self.active_txns.write().get_mut(&txn_id)
            && info.isolation == IsolationLevel::Serializable
        {
            info.read_set.push((table_id, row_id));
        }
    }

    /// Record a write operation
    pub fn record_write(&self, txn_id: TransactionId, table_id: u32, row_id: u64) {
        if let Some(info) = self.active_txns.write().get_mut(&txn_id) {
            info.write_set.push((table_id, row_id));
        }
    }

    /// Update last LSN for a transaction
    pub fn set_last_lsn(&self, txn_id: TransactionId, lsn: LogSequenceNumber) {
        if let Some(info) = self.active_txns.write().get_mut(&txn_id) {
            info.last_lsn = Some(lsn);
        }
    }

    /// Get minimum active snapshot timestamp (for GC)
    pub fn min_active_snapshot(&self) -> Option<Timestamp> {
        self.active_txns
            .read()
            .values()
            .filter(|info| info.state == TransactionState::Active)
            .map(|info| info.snapshot_ts)
            .min()
    }

    /// Get active transaction count
    pub fn active_count(&self) -> usize {
        self.active_txns
            .read()
            .values()
            .filter(|info| info.state == TransactionState::Active)
            .count()
    }

    /// Clean up completed transactions older than retention period
    pub fn cleanup(&self, retention: Duration) {
        let now = Instant::now();
        self.active_txns.write().retain(|_, info| {
            // Keep active transactions
            if info.state == TransactionState::Active {
                return true;
            }
            // Keep recently completed transactions
            now.duration_since(info.start_time) < retention
        });
    }

    /// Check for transactions that have timed out
    pub fn check_timeouts(&self) -> Vec<TransactionId> {
        let now = Instant::now();
        self.active_txns
            .read()
            .values()
            .filter(|info| {
                info.state == TransactionState::Active
                    && now.duration_since(info.start_time) > self.timeout
            })
            .map(|info| info.id)
            .collect()
    }

    /// Check serialization conflicts for SSI
    #[allow(dead_code)]
    fn check_serialization_conflicts(
        &self,
        txn: &TransactionInfo,
        _all_txns: &HashMap<TransactionId, TransactionInfo>,
    ) -> KernelResult<()> {
        // Simplified SSI check - in production this would track rw-dependencies
        // and detect dangerous structures (two consecutive rw-antidependencies)
        //
        // For now, we just check for write-write conflicts
        // Full SSI implementation is in sochdb-storage/src/ssi.rs
        let _ = txn;
        Ok(())
    }

    /// Check serialization conflicts for SSI (using cloned data to avoid borrow issues)
    fn check_serialization_conflicts_cloned(
        &self,
        _read_set: &[(u32, u64)],
        _write_set: &[(u32, u64)],
    ) -> KernelResult<()> {
        // Simplified SSI check - in production this would track rw-dependencies
        // and detect dangerous structures (two consecutive rw-antidependencies)
        //
        // For now, we just check for write-write conflicts
        // Full SSI implementation is in sochdb-storage/src/ssi.rs
        Ok(())
    }

    /// Get current timestamp
    pub fn current_timestamp(&self) -> Timestamp {
        self.current_ts.load(Ordering::SeqCst)
    }

    /// Restore state after recovery
    pub fn restore(&self, next_txn_id: TransactionId, current_ts: Timestamp) {
        self.next_txn_id.store(next_txn_id, Ordering::SeqCst);
        self.current_ts.store(current_ts, Ordering::SeqCst);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_begin_commit() {
        let mgr = TxnManager::new();

        let txn1 = mgr.begin();
        assert!(mgr.is_active(txn1));

        let commit_ts = mgr.commit(txn1).unwrap();
        assert!(!mgr.is_active(txn1));
        assert!(commit_ts > 0);
    }

    #[test]
    fn test_begin_abort() {
        let mgr = TxnManager::new();

        let txn1 = mgr.begin();
        assert!(mgr.is_active(txn1));

        mgr.abort(txn1).unwrap();
        assert!(!mgr.is_active(txn1));
    }

    #[test]
    fn test_snapshot_isolation() {
        let mgr = TxnManager::new();

        let txn1 = mgr.begin();
        let ts1 = mgr.snapshot_ts(txn1).unwrap();

        // Commit txn1 to advance timestamp
        mgr.commit(txn1).unwrap();

        let txn2 = mgr.begin();
        let ts2 = mgr.snapshot_ts(txn2).unwrap();

        // txn2 should have later snapshot
        assert!(ts2 >= ts1);
    }

    #[test]
    fn test_double_commit_fails() {
        let mgr = TxnManager::new();
        let txn1 = mgr.begin();

        mgr.commit(txn1).unwrap();
        assert!(mgr.commit(txn1).is_err());
    }

    #[test]
    fn test_min_active_snapshot() {
        let mgr = TxnManager::new();

        let txn1 = mgr.begin();
        let txn2 = mgr.begin();

        let min = mgr.min_active_snapshot().unwrap();
        assert_eq!(min, mgr.snapshot_ts(txn1).unwrap());

        mgr.commit(txn1).unwrap();
        let min = mgr.min_active_snapshot().unwrap();
        assert_eq!(min, mgr.snapshot_ts(txn2).unwrap());
    }
}