chie_core/
transaction.rs

1//! Transactional chunk operations for atomic storage writes.
2//!
3//! This module provides ACID-compliant transaction support for chunk storage,
4//! ensuring that multi-chunk writes are atomic (all-or-nothing). If any chunk
5//! fails to write, all previously written chunks in the transaction are rolled back.
6//!
7//! # Example
8//!
9//! ```rust
10//! use chie_core::transaction::{Transaction, TransactionManager};
11//! use chie_core::ChunkStorage;
12//! use chie_crypto::{generate_key, generate_nonce};
13//! use std::path::PathBuf;
14//!
15//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
16//! let mut storage = ChunkStorage::new(PathBuf::from("/tmp/storage"), 1_000_000_000).await?;
17//! let mut tx_mgr = TransactionManager::new();
18//!
19//! // Begin a transaction
20//! let tx_id = tx_mgr.begin_transaction();
21//!
22//! let key = generate_key();
23//! let nonce = generate_nonce();
24//! let chunks = vec![vec![1, 2, 3], vec![4, 5, 6]];
25//!
26//! // Perform transactional write
27//! match tx_mgr.transactional_write(&mut storage, tx_id, "QmTest", &chunks, &key, &nonce).await {
28//!     Ok(()) => {
29//!         // Commit transaction
30//!         tx_mgr.commit(tx_id)?;
31//!     }
32//!     Err(e) => {
33//!         // Rollback on error
34//!         tx_mgr.rollback(&mut storage, tx_id).await?;
35//!         return Err(e.into());
36//!     }
37//! }
38//! # Ok(())
39//! # }
40//! ```
41
42use crate::storage::{ChunkStorage, StorageError};
43use chie_crypto::{EncryptionKey, EncryptionNonce};
44use std::collections::HashMap;
45use std::path::PathBuf;
46use thiserror::Error;
47use tokio::fs;
48
49/// Transaction error types.
50#[derive(Debug, Error)]
51pub enum TransactionError {
52    #[error("Storage error: {0}")]
53    Storage(#[from] StorageError),
54
55    #[error("Transaction not found: {0}")]
56    TransactionNotFound(u64),
57
58    #[error("Transaction already committed: {0}")]
59    AlreadyCommitted(u64),
60
61    #[error("Transaction already rolled back: {0}")]
62    AlreadyRolledBack(u64),
63
64    #[error("Concurrent transaction conflict")]
65    ConcurrentConflict,
66
67    #[error("IO error: {0}")]
68    Io(#[from] std::io::Error),
69}
70
71/// Transaction state.
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum TransactionState {
74    /// Transaction is active and can accept operations.
75    Active,
76    /// Transaction has been committed.
77    Committed,
78    /// Transaction has been rolled back.
79    RolledBack,
80}
81
82/// Information about a written chunk in a transaction.
83#[derive(Debug, Clone)]
84struct WrittenChunk {
85    #[allow(dead_code)]
86    cid: String,
87    #[allow(dead_code)]
88    chunk_index: u64,
89    chunk_path: PathBuf,
90    meta_path: PathBuf,
91    #[allow(dead_code)]
92    size_bytes: u64,
93}
94
95/// A transaction for atomic chunk operations.
96#[derive(Debug)]
97pub struct Transaction {
98    id: u64,
99    state: TransactionState,
100    written_chunks: Vec<WrittenChunk>,
101    content_dirs: Vec<PathBuf>,
102    total_bytes: u64,
103}
104
105impl Transaction {
106    /// Create a new transaction.
107    #[must_use]
108    fn new(id: u64) -> Self {
109        Self {
110            id,
111            state: TransactionState::Active,
112            written_chunks: Vec::new(),
113            content_dirs: Vec::new(),
114            total_bytes: 0,
115        }
116    }
117
118    /// Get the transaction ID.
119    #[must_use]
120    #[inline]
121    pub const fn id(&self) -> u64 {
122        self.id
123    }
124
125    /// Get the transaction state.
126    #[must_use]
127    #[inline]
128    pub const fn state(&self) -> TransactionState {
129        self.state
130    }
131
132    /// Get the total bytes written in this transaction.
133    #[must_use]
134    #[inline]
135    pub const fn total_bytes(&self) -> u64 {
136        self.total_bytes
137    }
138
139    /// Check if the transaction is active.
140    #[must_use]
141    #[inline]
142    pub const fn is_active(&self) -> bool {
143        matches!(self.state, TransactionState::Active)
144    }
145
146    /// Record a written chunk.
147    fn record_chunk(
148        &mut self,
149        cid: String,
150        chunk_index: u64,
151        chunk_path: PathBuf,
152        meta_path: PathBuf,
153        size_bytes: u64,
154    ) {
155        self.written_chunks.push(WrittenChunk {
156            cid,
157            chunk_index,
158            chunk_path,
159            meta_path,
160            size_bytes,
161        });
162        self.total_bytes += size_bytes;
163    }
164
165    /// Record a content directory.
166    fn record_content_dir(&mut self, dir: PathBuf) {
167        if !self.content_dirs.contains(&dir) {
168            self.content_dirs.push(dir);
169        }
170    }
171
172    /// Rollback this transaction by deleting all written chunks.
173    async fn rollback(&mut self) -> Result<(), TransactionError> {
174        if self.state != TransactionState::Active {
175            return Err(TransactionError::AlreadyRolledBack(self.id));
176        }
177
178        // Delete all written chunks and metadata
179        for chunk in &self.written_chunks {
180            let _ = fs::remove_file(&chunk.chunk_path).await;
181            let _ = fs::remove_file(&chunk.meta_path).await;
182        }
183
184        // Delete content directories if empty
185        for dir in &self.content_dirs {
186            let _ = fs::remove_dir(dir).await;
187        }
188
189        self.state = TransactionState::RolledBack;
190        self.written_chunks.clear();
191        self.content_dirs.clear();
192        self.total_bytes = 0;
193
194        Ok(())
195    }
196
197    /// Commit this transaction.
198    fn commit(&mut self) -> Result<(), TransactionError> {
199        if self.state != TransactionState::Active {
200            return Err(TransactionError::AlreadyCommitted(self.id));
201        }
202
203        self.state = TransactionState::Committed;
204        Ok(())
205    }
206}
207
208/// Manages transactions for atomic chunk operations.
209pub struct TransactionManager {
210    next_id: u64,
211    active_transactions: HashMap<u64, Transaction>,
212}
213
214impl TransactionManager {
215    /// Create a new transaction manager.
216    #[must_use]
217    pub fn new() -> Self {
218        Self {
219            next_id: 1,
220            active_transactions: HashMap::new(),
221        }
222    }
223
224    /// Begin a new transaction.
225    pub fn begin_transaction(&mut self) -> u64 {
226        let id = self.next_id;
227        self.next_id += 1;
228
229        let tx = Transaction::new(id);
230        self.active_transactions.insert(id, tx);
231
232        id
233    }
234
235    /// Get a transaction by ID.
236    #[must_use]
237    pub fn get_transaction(&self, id: u64) -> Option<&Transaction> {
238        self.active_transactions.get(&id)
239    }
240
241    /// Commit a transaction.
242    pub fn commit(&mut self, id: u64) -> Result<(), TransactionError> {
243        let tx = self
244            .active_transactions
245            .get_mut(&id)
246            .ok_or(TransactionError::TransactionNotFound(id))?;
247
248        tx.commit()?;
249        self.active_transactions.remove(&id);
250        Ok(())
251    }
252
253    /// Rollback a transaction.
254    pub async fn rollback(
255        &mut self,
256        storage: &mut ChunkStorage,
257        id: u64,
258    ) -> Result<(), TransactionError> {
259        let mut tx = self
260            .active_transactions
261            .remove(&id)
262            .ok_or(TransactionError::TransactionNotFound(id))?;
263
264        // Rollback the transaction
265        tx.rollback().await?;
266
267        // Update storage used_bytes
268        storage.decrease_used_bytes(tx.total_bytes);
269
270        Ok(())
271    }
272
273    /// Perform a transactional write of chunks.
274    ///
275    /// This method writes all chunks atomically. If any chunk fails to write,
276    /// all previously written chunks are rolled back.
277    pub async fn transactional_write(
278        &mut self,
279        storage: &mut ChunkStorage,
280        tx_id: u64,
281        cid: &str,
282        chunks: &[Vec<u8>],
283        key: &EncryptionKey,
284        nonce: &EncryptionNonce,
285    ) -> Result<(), TransactionError> {
286        let tx = self
287            .active_transactions
288            .get_mut(&tx_id)
289            .ok_or(TransactionError::TransactionNotFound(tx_id))?;
290
291        if !tx.is_active() {
292            return Err(TransactionError::AlreadyCommitted(tx_id));
293        }
294
295        // Calculate total size
296        let total_size: u64 = chunks.iter().map(|c| c.len() as u64).sum();
297
298        // Check quota
299        if storage.used_bytes() + total_size > storage.max_bytes() {
300            // Remove transaction on quota error
301            self.active_transactions.remove(&tx_id);
302            return Err(TransactionError::Storage(StorageError::QuotaExceeded {
303                used: storage.used_bytes(),
304                max: storage.max_bytes(),
305            }));
306        }
307
308        // Create content directory and record it
309        let content_dir = storage.get_chunk_dir(cid);
310        if let Err(e) = fs::create_dir_all(&content_dir).await {
311            // Remove transaction on IO error
312            self.active_transactions.remove(&tx_id);
313            return Err(TransactionError::Io(e));
314        }
315
316        // Record content dir in transaction
317        let tx = self
318            .active_transactions
319            .get_mut(&tx_id)
320            .ok_or(TransactionError::TransactionNotFound(tx_id))?;
321        tx.record_content_dir(content_dir);
322
323        // Write chunks transactionally
324        match storage
325            .write_chunks_for_transaction(cid, chunks, key, nonce)
326            .await
327        {
328            Ok(written_chunks) => {
329                // Record all written chunks in the transaction
330                let tx = self
331                    .active_transactions
332                    .get_mut(&tx_id)
333                    .ok_or(TransactionError::TransactionNotFound(tx_id))?;
334
335                for (chunk_index, chunk_path, meta_path, size_bytes) in written_chunks {
336                    tx.record_chunk(
337                        cid.to_string(),
338                        chunk_index,
339                        chunk_path,
340                        meta_path,
341                        size_bytes,
342                    );
343                }
344                Ok(())
345            }
346            Err(e) => {
347                // Rollback on error and remove transaction
348                let mut tx = self
349                    .active_transactions
350                    .remove(&tx_id)
351                    .ok_or(TransactionError::TransactionNotFound(tx_id))?;
352                tx.rollback().await?;
353                storage.decrease_used_bytes(tx.total_bytes);
354                Err(TransactionError::Storage(e))
355            }
356        }
357    }
358
359    /// Get the number of active transactions.
360    #[must_use]
361    #[inline]
362    pub fn active_transaction_count(&self) -> usize {
363        self.active_transactions.len()
364    }
365}
366
367impl Default for TransactionManager {
368    fn default() -> Self {
369        Self::new()
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use chie_crypto::{generate_key, generate_nonce};
377    use tempfile::TempDir;
378
379    async fn create_test_storage() -> (TempDir, ChunkStorage) {
380        let temp_dir = TempDir::new().unwrap();
381        let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10_000_000)
382            .await
383            .unwrap();
384        (temp_dir, storage)
385    }
386
387    #[tokio::test]
388    async fn test_transaction_begin_commit() {
389        let mut tx_mgr = TransactionManager::new();
390
391        let tx_id = tx_mgr.begin_transaction();
392        assert_eq!(tx_mgr.active_transaction_count(), 1);
393
394        let tx = tx_mgr.get_transaction(tx_id).unwrap();
395        assert_eq!(tx.id(), tx_id);
396        assert_eq!(tx.state(), TransactionState::Active);
397
398        tx_mgr.commit(tx_id).unwrap();
399        assert_eq!(tx_mgr.active_transaction_count(), 0);
400    }
401
402    #[tokio::test]
403    async fn test_transaction_rollback() {
404        let (_temp_dir, mut storage) = create_test_storage().await;
405        let mut tx_mgr = TransactionManager::new();
406
407        let tx_id = tx_mgr.begin_transaction();
408        tx_mgr.rollback(&mut storage, tx_id).await.unwrap();
409
410        assert_eq!(tx_mgr.active_transaction_count(), 0);
411    }
412
413    #[tokio::test]
414    async fn test_transactional_write_success() {
415        let (_temp_dir, mut storage) = create_test_storage().await;
416        let mut tx_mgr = TransactionManager::new();
417
418        let tx_id = tx_mgr.begin_transaction();
419
420        let key = generate_key();
421        let nonce = generate_nonce();
422        let chunks = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]];
423
424        tx_mgr
425            .transactional_write(&mut storage, tx_id, "QmTest", &chunks, &key, &nonce)
426            .await
427            .unwrap();
428
429        let tx = tx_mgr.get_transaction(tx_id).unwrap();
430        assert!(tx.total_bytes() > 0);
431
432        tx_mgr.commit(tx_id).unwrap();
433    }
434
435    #[tokio::test]
436    async fn test_transactional_write_rollback_on_quota_exceeded() {
437        let temp_dir = TempDir::new().unwrap();
438        // Create storage with very small quota
439        let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 100)
440            .await
441            .unwrap();
442        let mut tx_mgr = TransactionManager::new();
443
444        let tx_id = tx_mgr.begin_transaction();
445
446        let key = generate_key();
447        let nonce = generate_nonce();
448        // Large chunks that exceed quota
449        let chunks = vec![vec![0u8; 1000], vec![0u8; 1000]];
450
451        let result = tx_mgr
452            .transactional_write(&mut storage, tx_id, "QmTest", &chunks, &key, &nonce)
453            .await;
454
455        assert!(result.is_err());
456        // Transaction should be automatically rolled back
457        assert_eq!(tx_mgr.active_transaction_count(), 0);
458    }
459
460    #[tokio::test]
461    async fn test_commit_nonexistent_transaction() {
462        let mut tx_mgr = TransactionManager::new();
463
464        let result = tx_mgr.commit(999);
465        assert!(result.is_err());
466        assert!(matches!(
467            result.unwrap_err(),
468            TransactionError::TransactionNotFound(999)
469        ));
470    }
471
472    #[tokio::test]
473    async fn test_double_commit() {
474        let mut tx_mgr = TransactionManager::new();
475
476        let tx_id = tx_mgr.begin_transaction();
477        tx_mgr.commit(tx_id).unwrap();
478
479        let result = tx_mgr.commit(tx_id);
480        assert!(result.is_err());
481    }
482}