1use crate::storage::{ChunkStorage, StorageError};
43use chie_crypto::{EncryptionKey, EncryptionNonce};
44use std::collections::HashMap;
45use std::path::PathBuf;
46use thiserror::Error;
47use tokio::fs;
48
49#[derive(Debug, Error)]
51pub enum TransactionError {
52 #[error("Storage error: {0}")]
53 Storage(#[from] StorageError),
54
55 #[error("Transaction not found: {0}")]
56 TransactionNotFound(u64),
57
58 #[error("Transaction already committed: {0}")]
59 AlreadyCommitted(u64),
60
61 #[error("Transaction already rolled back: {0}")]
62 AlreadyRolledBack(u64),
63
64 #[error("Concurrent transaction conflict")]
65 ConcurrentConflict,
66
67 #[error("IO error: {0}")]
68 Io(#[from] std::io::Error),
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum TransactionState {
74 Active,
76 Committed,
78 RolledBack,
80}
81
82#[derive(Debug, Clone)]
84struct WrittenChunk {
85 #[allow(dead_code)]
86 cid: String,
87 #[allow(dead_code)]
88 chunk_index: u64,
89 chunk_path: PathBuf,
90 meta_path: PathBuf,
91 #[allow(dead_code)]
92 size_bytes: u64,
93}
94
95#[derive(Debug)]
97pub struct Transaction {
98 id: u64,
99 state: TransactionState,
100 written_chunks: Vec<WrittenChunk>,
101 content_dirs: Vec<PathBuf>,
102 total_bytes: u64,
103}
104
105impl Transaction {
106 #[must_use]
108 fn new(id: u64) -> Self {
109 Self {
110 id,
111 state: TransactionState::Active,
112 written_chunks: Vec::new(),
113 content_dirs: Vec::new(),
114 total_bytes: 0,
115 }
116 }
117
118 #[must_use]
120 #[inline]
121 pub const fn id(&self) -> u64 {
122 self.id
123 }
124
125 #[must_use]
127 #[inline]
128 pub const fn state(&self) -> TransactionState {
129 self.state
130 }
131
132 #[must_use]
134 #[inline]
135 pub const fn total_bytes(&self) -> u64 {
136 self.total_bytes
137 }
138
139 #[must_use]
141 #[inline]
142 pub const fn is_active(&self) -> bool {
143 matches!(self.state, TransactionState::Active)
144 }
145
146 fn record_chunk(
148 &mut self,
149 cid: String,
150 chunk_index: u64,
151 chunk_path: PathBuf,
152 meta_path: PathBuf,
153 size_bytes: u64,
154 ) {
155 self.written_chunks.push(WrittenChunk {
156 cid,
157 chunk_index,
158 chunk_path,
159 meta_path,
160 size_bytes,
161 });
162 self.total_bytes += size_bytes;
163 }
164
165 fn record_content_dir(&mut self, dir: PathBuf) {
167 if !self.content_dirs.contains(&dir) {
168 self.content_dirs.push(dir);
169 }
170 }
171
172 async fn rollback(&mut self) -> Result<(), TransactionError> {
174 if self.state != TransactionState::Active {
175 return Err(TransactionError::AlreadyRolledBack(self.id));
176 }
177
178 for chunk in &self.written_chunks {
180 let _ = fs::remove_file(&chunk.chunk_path).await;
181 let _ = fs::remove_file(&chunk.meta_path).await;
182 }
183
184 for dir in &self.content_dirs {
186 let _ = fs::remove_dir(dir).await;
187 }
188
189 self.state = TransactionState::RolledBack;
190 self.written_chunks.clear();
191 self.content_dirs.clear();
192 self.total_bytes = 0;
193
194 Ok(())
195 }
196
197 fn commit(&mut self) -> Result<(), TransactionError> {
199 if self.state != TransactionState::Active {
200 return Err(TransactionError::AlreadyCommitted(self.id));
201 }
202
203 self.state = TransactionState::Committed;
204 Ok(())
205 }
206}
207
208pub struct TransactionManager {
210 next_id: u64,
211 active_transactions: HashMap<u64, Transaction>,
212}
213
214impl TransactionManager {
215 #[must_use]
217 pub fn new() -> Self {
218 Self {
219 next_id: 1,
220 active_transactions: HashMap::new(),
221 }
222 }
223
224 pub fn begin_transaction(&mut self) -> u64 {
226 let id = self.next_id;
227 self.next_id += 1;
228
229 let tx = Transaction::new(id);
230 self.active_transactions.insert(id, tx);
231
232 id
233 }
234
235 #[must_use]
237 pub fn get_transaction(&self, id: u64) -> Option<&Transaction> {
238 self.active_transactions.get(&id)
239 }
240
241 pub fn commit(&mut self, id: u64) -> Result<(), TransactionError> {
243 let tx = self
244 .active_transactions
245 .get_mut(&id)
246 .ok_or(TransactionError::TransactionNotFound(id))?;
247
248 tx.commit()?;
249 self.active_transactions.remove(&id);
250 Ok(())
251 }
252
253 pub async fn rollback(
255 &mut self,
256 storage: &mut ChunkStorage,
257 id: u64,
258 ) -> Result<(), TransactionError> {
259 let mut tx = self
260 .active_transactions
261 .remove(&id)
262 .ok_or(TransactionError::TransactionNotFound(id))?;
263
264 tx.rollback().await?;
266
267 storage.decrease_used_bytes(tx.total_bytes);
269
270 Ok(())
271 }
272
273 pub async fn transactional_write(
278 &mut self,
279 storage: &mut ChunkStorage,
280 tx_id: u64,
281 cid: &str,
282 chunks: &[Vec<u8>],
283 key: &EncryptionKey,
284 nonce: &EncryptionNonce,
285 ) -> Result<(), TransactionError> {
286 let tx = self
287 .active_transactions
288 .get_mut(&tx_id)
289 .ok_or(TransactionError::TransactionNotFound(tx_id))?;
290
291 if !tx.is_active() {
292 return Err(TransactionError::AlreadyCommitted(tx_id));
293 }
294
295 let total_size: u64 = chunks.iter().map(|c| c.len() as u64).sum();
297
298 if storage.used_bytes() + total_size > storage.max_bytes() {
300 self.active_transactions.remove(&tx_id);
302 return Err(TransactionError::Storage(StorageError::QuotaExceeded {
303 used: storage.used_bytes(),
304 max: storage.max_bytes(),
305 }));
306 }
307
308 let content_dir = storage.get_chunk_dir(cid);
310 if let Err(e) = fs::create_dir_all(&content_dir).await {
311 self.active_transactions.remove(&tx_id);
313 return Err(TransactionError::Io(e));
314 }
315
316 let tx = self
318 .active_transactions
319 .get_mut(&tx_id)
320 .ok_or(TransactionError::TransactionNotFound(tx_id))?;
321 tx.record_content_dir(content_dir);
322
323 match storage
325 .write_chunks_for_transaction(cid, chunks, key, nonce)
326 .await
327 {
328 Ok(written_chunks) => {
329 let tx = self
331 .active_transactions
332 .get_mut(&tx_id)
333 .ok_or(TransactionError::TransactionNotFound(tx_id))?;
334
335 for (chunk_index, chunk_path, meta_path, size_bytes) in written_chunks {
336 tx.record_chunk(
337 cid.to_string(),
338 chunk_index,
339 chunk_path,
340 meta_path,
341 size_bytes,
342 );
343 }
344 Ok(())
345 }
346 Err(e) => {
347 let mut tx = self
349 .active_transactions
350 .remove(&tx_id)
351 .ok_or(TransactionError::TransactionNotFound(tx_id))?;
352 tx.rollback().await?;
353 storage.decrease_used_bytes(tx.total_bytes);
354 Err(TransactionError::Storage(e))
355 }
356 }
357 }
358
359 #[must_use]
361 #[inline]
362 pub fn active_transaction_count(&self) -> usize {
363 self.active_transactions.len()
364 }
365}
366
367impl Default for TransactionManager {
368 fn default() -> Self {
369 Self::new()
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use chie_crypto::{generate_key, generate_nonce};
377 use tempfile::TempDir;
378
379 async fn create_test_storage() -> (TempDir, ChunkStorage) {
380 let temp_dir = TempDir::new().unwrap();
381 let storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 10_000_000)
382 .await
383 .unwrap();
384 (temp_dir, storage)
385 }
386
387 #[tokio::test]
388 async fn test_transaction_begin_commit() {
389 let mut tx_mgr = TransactionManager::new();
390
391 let tx_id = tx_mgr.begin_transaction();
392 assert_eq!(tx_mgr.active_transaction_count(), 1);
393
394 let tx = tx_mgr.get_transaction(tx_id).unwrap();
395 assert_eq!(tx.id(), tx_id);
396 assert_eq!(tx.state(), TransactionState::Active);
397
398 tx_mgr.commit(tx_id).unwrap();
399 assert_eq!(tx_mgr.active_transaction_count(), 0);
400 }
401
402 #[tokio::test]
403 async fn test_transaction_rollback() {
404 let (_temp_dir, mut storage) = create_test_storage().await;
405 let mut tx_mgr = TransactionManager::new();
406
407 let tx_id = tx_mgr.begin_transaction();
408 tx_mgr.rollback(&mut storage, tx_id).await.unwrap();
409
410 assert_eq!(tx_mgr.active_transaction_count(), 0);
411 }
412
413 #[tokio::test]
414 async fn test_transactional_write_success() {
415 let (_temp_dir, mut storage) = create_test_storage().await;
416 let mut tx_mgr = TransactionManager::new();
417
418 let tx_id = tx_mgr.begin_transaction();
419
420 let key = generate_key();
421 let nonce = generate_nonce();
422 let chunks = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]];
423
424 tx_mgr
425 .transactional_write(&mut storage, tx_id, "QmTest", &chunks, &key, &nonce)
426 .await
427 .unwrap();
428
429 let tx = tx_mgr.get_transaction(tx_id).unwrap();
430 assert!(tx.total_bytes() > 0);
431
432 tx_mgr.commit(tx_id).unwrap();
433 }
434
435 #[tokio::test]
436 async fn test_transactional_write_rollback_on_quota_exceeded() {
437 let temp_dir = TempDir::new().unwrap();
438 let mut storage = ChunkStorage::new(temp_dir.path().to_path_buf(), 100)
440 .await
441 .unwrap();
442 let mut tx_mgr = TransactionManager::new();
443
444 let tx_id = tx_mgr.begin_transaction();
445
446 let key = generate_key();
447 let nonce = generate_nonce();
448 let chunks = vec![vec![0u8; 1000], vec![0u8; 1000]];
450
451 let result = tx_mgr
452 .transactional_write(&mut storage, tx_id, "QmTest", &chunks, &key, &nonce)
453 .await;
454
455 assert!(result.is_err());
456 assert_eq!(tx_mgr.active_transaction_count(), 0);
458 }
459
460 #[tokio::test]
461 async fn test_commit_nonexistent_transaction() {
462 let mut tx_mgr = TransactionManager::new();
463
464 let result = tx_mgr.commit(999);
465 assert!(result.is_err());
466 assert!(matches!(
467 result.unwrap_err(),
468 TransactionError::TransactionNotFound(999)
469 ));
470 }
471
472 #[tokio::test]
473 async fn test_double_commit() {
474 let mut tx_mgr = TransactionManager::new();
475
476 let tx_id = tx_mgr.begin_transaction();
477 tx_mgr.commit(tx_id).unwrap();
478
479 let result = tx_mgr.commit(tx_id);
480 assert!(result.is_err());
481 }
482}