1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
// Copyright 2025 LunaOS Contributors
// SPDX-License-Identifier: Apache-2.0
//
// IO Pipeline
// Compression, dedup, encryption, and disk operations.
use crate::BLOCK_DEVICES;
use crate::LcpfsCrypto;
use crate::cache::arc::ARC;
use crate::compress::compress::Compress;
use crate::dedup::dedup::DDT;
use crate::fscore::impl_::LcpfsController;
use crate::fscore::structs::{Blkptr, Dva};
use crate::integrity::checksum::Checksum;
use crate::util::alloc::METASLAB;
use crate::{FsError, FsResult};
use alloc::vec;
use alloc::vec::Vec;
/// I/O pipeline for compression, checksumming, and encryption
pub struct Pipeline;
impl Pipeline {
/// Check if RAID-Z1 mode is available (3+ block devices)
fn is_raid_mode() -> bool {
BLOCK_DEVICES.lock().len() >= 3
}
/// Write raw data to disk at the given byte offset (single-disk mode)
fn write_to_disk_single(offset: u64, data: &[u8]) -> Result<(), &'static str> {
let mut devices = BLOCK_DEVICES.lock();
let dev = devices.get_mut(0).ok_or("No block device")?;
// Write data sector by sector (512 bytes each)
let start_sector = (offset / 512) as usize;
let mut remaining = data;
let mut sector_idx = start_sector;
while !remaining.is_empty() {
let chunk_size = core::cmp::min(512, remaining.len());
let mut sector_buf = [0u8; 512];
sector_buf[..chunk_size].copy_from_slice(&remaining[..chunk_size]);
dev.write_block(sector_idx, §or_buf)?;
remaining = &remaining[chunk_size..];
sector_idx += 1;
}
Ok(())
}
/// Write data using RAID-Z1 striping (3 disks: D0, D1, Parity)
fn write_to_disk_raid(offset: u64, data: &[u8]) -> Result<(), &'static str> {
let controller = LcpfsController::new(0, 1, 2);
controller.write_data(offset, data)
}
/// Read raw data from disk at the given byte offset (single-disk mode)
fn read_from_disk_single(offset: u64, size: usize) -> Result<Vec<u8>, &'static str> {
let mut devices = BLOCK_DEVICES.lock();
let dev = devices.get_mut(0).ok_or("No block device")?;
// Read data sector by sector
let start_sector = (offset / 512) as usize;
let sectors_needed = size.div_ceil(512);
let mut result = vec![0u8; sectors_needed * 512];
for i in 0..sectors_needed {
let mut sector_buf = [0u8; 512];
dev.read_block(start_sector + i, &mut sector_buf)?;
result[i * 512..(i + 1) * 512].copy_from_slice(§or_buf);
}
// Trim to requested size
result.truncate(size);
Ok(result)
}
/// Read data using RAID-Z1 (with self-healing on checksum failure)
fn read_from_disk_raid(offset: u64, size: usize) -> Result<Vec<u8>, &'static str> {
let controller = LcpfsController::new(0, 1, 2);
let stripe_size = 1024; // 2x512 sectors per stripe
let start_stripe = (offset / stripe_size as u64) as usize;
let stripes_needed = size.div_ceil(stripe_size);
let mut result = Vec::with_capacity(stripes_needed * stripe_size);
for i in 0..stripes_needed {
let stripe_data = controller.read_stripe(start_stripe + i)?;
result.extend_from_slice(&stripe_data);
}
result.truncate(size);
Ok(result)
}
/// The Write Path: Data -> Compress -> Dedup -> Encrypt -> Allocate -> Write -> Cache
/// Returns (DVA, compression_type) where compression_type: 0=none, 1=lz4
pub fn write_block(data: &[u8], key: &[u8; 32], txg: u64) -> FsResult<(Dva, u8)> {
// 0. Calculate checksum of original data BEFORE any processing
let _original_checksum = Checksum::calculate(data);
// 1. COMPRESSION (optional, skip if data is small or incompressible)
let (compressed_data, compression_type) = if data.len() >= 64 {
let compressed = Compress::compress(data).unwrap_or_else(|| data.to_vec());
// Check if compression actually saved space
if compressed.len() < data.len() {
(compressed, 1u8) // 1 = LZ4
} else {
(data.to_vec(), 0u8) // 0 = No compression
}
} else {
(data.to_vec(), 0u8) // 0 = No compression
};
// 2. DEDUPLICATION CHECK
{
let mut ddt = DDT.lock();
if let Some(existing_dva) = ddt.dedup(data) {
// Block already exists - just increment refcount
// Return with compression type (dedupe might have different compression)
return Ok((existing_dva, compression_type));
}
}
// 3. ENCRYPTION
let (encrypted_data, _nonce) = LcpfsCrypto::encrypt_block(key, &compressed_data, txg)
.map_err(|_| FsError::EncryptionFailed)?;
// 4. ALLOCATION
let dva: Dva = {
let mut metaslab = METASLAB.lock();
metaslab.allocate(encrypted_data.len() as u64)?
};
// 5. PHYSICAL DISK WRITE (RAID-Z1 or single-disk)
let write_result = if Self::is_raid_mode() {
Self::write_to_disk_raid(dva.offset, &encrypted_data)
} else {
Self::write_to_disk_single(dva.offset, &encrypted_data)
};
if let Err(e) = write_result {
// Free the allocation on failure
METASLAB.lock().free(dva, encrypted_data.len() as u64);
return Err(FsError::IoError { vdev: 0, reason: e });
}
// 6. REGISTER IN DEDUP TABLE
{
let mut ddt = DDT.lock();
ddt.register(data, dva);
}
// 7. CACHE IN ARC
{
let mut arc = ARC.lock();
arc.cache(dva, data.to_vec());
}
Ok((dva, compression_type))
}
/// The Read Path: Cache Check -> Physical Read -> Decrypt -> Decompress -> Verify
///
/// This version derives the nonce from the block pointer automatically.
/// Use this for all new code.
pub fn read_block_auto_nonce(bp: &Blkptr, key: &[u8; 32]) -> FsResult<Vec<u8>> {
let nonce = LcpfsCrypto::derive_nonce_from_bp(bp);
Self::read_block(bp, key, &nonce)
}
/// The Read Path: Cache Check -> Physical Read -> Decrypt -> Decompress -> Verify
///
/// # Arguments
///
/// * `bp` - Block pointer with DVA, checksum, and compression info
/// * `key` - 32-byte encryption key
/// * `nonce` - 12-byte nonce (use `derive_nonce_from_bp` or `read_block_auto_nonce`)
pub fn read_block(bp: &Blkptr, key: &[u8; 32], nonce: &[u8; 12]) -> FsResult<Vec<u8>> {
let dva = bp.dva[0];
// 1. ARC Check (Fast Path)
{
let mut arc = ARC.lock();
if let Some(data) = arc.read(&dva) {
return Ok(data);
}
}
// 2. Physical Read (RAID-Z1 or single-disk)
// Use padding field as logical_size, or default to 4KB
let read_size = if bp.padding > 0 && bp.padding <= 131072 {
bp.padding as usize
} else {
4096 // Default to 4KB (common block size)
};
let raw_data = if Self::is_raid_mode() {
Self::read_from_disk_raid(dva.offset, read_size)
} else {
Self::read_from_disk_single(dva.offset, read_size)
}
.map_err(|e| FsError::IoError { vdev: 0, reason: e })?;
// 3. Decryption
let decrypted = LcpfsCrypto::decrypt_block(key, &raw_data, nonce)
.map_err(|_| FsError::DecryptionFailed)?;
// 4. Decompression (if block was compressed)
let plaintext = if bp.flags_compression & 0xFF != 0 {
// Compression flag is set
Compress::decompress(&decrypted).unwrap_or(decrypted)
} else {
decrypted
};
// 5. Checksum Verification - CRITICAL: fail on mismatch, don't return corrupt data
let computed_checksum = Checksum::calculate(&plaintext);
if !computed_checksum.matches(&bp.checksum) {
crate::lcpfs_println!("[ PIPELINE ] Checksum mismatch at DVA {:x}", dva.offset);
// Try RAID-Z1 self-healing if available
if Self::is_raid_mode() {
crate::lcpfs_println!("[ PIPELINE ] Attempting RAID-Z1 self-healing...");
let controller = LcpfsController::new(0, 1, 2);
let stripe_idx = (dva.offset / 1024) as usize;
if let Ok(healed) = controller.read_stripe_with_healing(stripe_idx) {
if let Ok(healed_decrypted) = LcpfsCrypto::decrypt_block(key, &healed, nonce) {
let healed_checksum = Checksum::calculate(&healed_decrypted);
if healed_checksum.matches(&bp.checksum) {
crate::lcpfs_println!("[ PIPELINE ] Self-healing successful!");
let mut arc = ARC.lock();
arc.cache(dva, healed_decrypted.clone());
return Ok(healed_decrypted);
}
}
}
crate::lcpfs_println!("[ PIPELINE ] Self-healing failed");
}
// CRITICAL FIX: Return error instead of corrupt data
return Err(FsError::ChecksumMismatch {
expected: bp.checksum,
actual: computed_checksum.to_u64_array(),
});
}
// 6. Update ARC
{
let mut arc = ARC.lock();
arc.cache(dva, plaintext.clone());
}
Ok(plaintext)
}
/// Write a block and return a complete block pointer
pub fn write_block_full(data: &[u8], key: &[u8; 32], txg: u64) -> FsResult<Blkptr> {
let checksum = Checksum::calculate(data);
let (dva, compression_type) = Self::write_block(data, key, txg)?;
let mut bp = Blkptr::zero();
bp.dva[0] = dva;
bp.birth_txg = txg;
bp.checksum = checksum.value;
bp.fill_count = 1;
bp.padding = data.len() as u64; // Store logical size for reads
bp.flags_compression = compression_type as u64; // Set compression flag
Ok(bp)
}
/// Read multiple blocks in parallel (prefetch)
pub fn prefetch_blocks(bps: &[Blkptr], key: &[u8; 32], nonce: &[u8; 12]) {
for bp in bps {
if !bp.is_hole() {
// Trigger read to populate ARC
let _ = Self::read_block(bp, key, nonce);
}
}
}
/// Sync dirty data to disk.
///
/// This triggers a transaction group sync to ensure all dirty metadata
/// (dnodes, block pointers) are persisted to disk. Data blocks are written
/// immediately by `write_block()`, but metadata updates are batched in
/// transaction groups for efficiency.
///
/// # Returns
///
/// The transaction group number that was synced, or an error if sync failed.
pub fn sync_dirty() -> FsResult<u64> {
use crate::storage::zpl::ZPL;
let arc = ARC.lock();
let cached_blocks = arc.t1.len() + arc.t2.len();
drop(arc);
crate::lcpfs_println!(
"[ PIPELINE ] Syncing (ARC has {} cached blocks)",
cached_blocks
);
// Trigger ZPL txg_sync which writes all dirty metadata to disk
let mut zpl = ZPL.lock();
let txg = zpl.txg_sync(0)?;
crate::lcpfs_println!("[ PIPELINE ] Sync complete, TXG {}", txg);
Ok(txg)
}
}