Skip to main content

libfasttree/
lib.rs

1//! libfasttree: A Rust library analogous to libostree but for distribution repositories.
2use anyhow::{Context, Result};
3use nix::unistd::{Gid, Uid};
4use sha2::{Digest, Sha256};
5use sqlx::{Connection, Executor, Row, SqliteConnection};
6use sqlx::sqlite::SqlitePool;
7use std::collections::{HashMap, HashSet};
8use std::fs::Permissions;
9use std::io::{Cursor, Read, Write};
10use std::os::unix::fs::{symlink, PermissionsExt};
11use std::os::unix::io::AsRawFd;
12use std::path::{Path, PathBuf};
13use std::process::Command;
14use std::sync::Arc;
15use std::str::FromStr;
16use tempfile::TempDir;
17use tokio::runtime::Runtime;
18use tokio::io::{AsyncReadExt, AsyncWriteExt}; // Fixed: Added for read_to_end/write_all
19use walkdir::{DirEntry, WalkDir};
20use rayon::prelude::*;
21use nix::ioctl_write_int;
22use nix::mount::{mount, MsFlags};
23use zstd::stream::{Encoder, Decoder};
24use fastcdc::v2020::FastCDC; // Fixed: Use v2020 module
25use sigstore::cosign::ClientBuilder;
26use tss_esapi::{
27    Context as TpmContext,
28    TctiNameConf
29};
30use tokio_uring::fs as uring_fs;
31
32// --- Mocks for missing crates (libsolv_rs, systemd_sysext) ---
33pub mod libsolv_mock {
34    use anyhow::Result;
35    pub struct Pool;
36    impl Pool { pub fn new() -> Self { Self } }
37    pub struct Repo<'a> { _p: &'a Pool }
38    impl<'a> Repo<'a> { pub fn new(_p: &'a Pool, _n: &str) -> Self { Self { _p } } }
39    pub struct Solver<'a> { _p: &'a Pool }
40    impl<'a> Solver<'a> {
41        pub fn new(_p: &'a Pool) -> Self { Self { _p } }
42        pub fn install(&self, _pkg: &str) -> Result<()> { Ok(()) }
43        pub fn solve(&self) -> Result<Transaction> { Ok(Transaction { install: vec![Solvable] }) }
44    }
45    pub struct Transaction { pub install: Vec<Solvable> }
46    pub struct Solvable;
47    impl Solvable {
48        pub fn name(&self) -> &str { "dummy-package" }
49        pub fn version(&self) -> &str { "1.0.0" }
50    }
51}
52use libsolv_mock::{Pool, Repo, Solver};
53
54// --- Ioctl Definitions ---
55// FICLONE is _IOW(0x94, 9, int) on Linux.
56ioctl_write_int!(ficlone, 0x94, 9);
57// FS_VERITY_ENABLE is _IOW('f', 133, struct fsverity_enable_arg).
58ioctl_write_int!(fsverity_enable, b'f', 133);
59// Hypothetical FS_VERITY_MEASURE ioctl (simplified to int)
60ioctl_write_int!(fsverity_measure, b'f', 134);
61
62/// Configuration for the library.
63#[derive(Debug, Clone)]
64pub struct Config {
65    pub repo_url: String,
66    pub distro_type: DistroType,
67    pub cas_dir: PathBuf,
68    pub db_path: PathBuf,
69    pub deployments_dir: PathBuf,
70    pub current_link: PathBuf,
71    pub boot_dir: PathBuf,
72    pub bootloader: BootloaderType,
73    pub filesystem: FilesystemType,
74    pub health_check_script: Option<PathBuf>,
75    pub overlay_dirs: Vec<PathBuf>,
76    pub var_volume: Option<PathBuf>,
77    pub gpg_keyring: PathBuf,
78    pub use_fsverity: bool,
79    pub use_ima: bool,
80    pub partitioning: PartitioningType,
81    pub sysext_dir: PathBuf,
82    pub zstd_dicts: HashMap<String, Vec<u8>>, // Dicts for compression
83    pub tpm_tcti: String, // Fixed: Changed to String to make Config Clone-able and serializable
84}
85
86// Fixed: Added Hash derive
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88pub enum DistroType {
89    Apt,
90    Rpm,
91    Pacman,
92    Nix,
93    Apk,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum BootloaderType {
98    Grub,
99    SystemdBoot,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum FilesystemType {
104    Btrfs,
105    Xfs,
106    Other,
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum PartitioningType {
111    Subvolumes,
112    ABPartitions,
113}
114
115/// File metadata for storage in DB.
116#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize)]
117pub struct FileMetadata {
118    pub mode: u32,
119    pub uid: u32,
120    pub gid: u32,
121    pub is_symlink: bool,
122    pub symlink_target: Option<String>,
123    pub ima_label: Option<String>,
124    pub verity_hash: Option<String>, // Added for FS-Verity
125}
126
127/// Trait for PackageManager abstraction.
128pub trait PackageManager: Send + Sync {
129    fn fetch_metadata(&self, config: &Config) -> Result<PathBuf>;
130    fn parse_metadata(&self, meta_path: &Path, repo: &mut Repo) -> Result<()>;
131    fn download_package(&self, config: &Config, name: &str, version: &str) -> Result<Option<PathBuf>>;
132    fn extract_package(&self, pkg_path: &Path, dest: &Path, meta_map: &mut HashMap<PathBuf, FileMetadata>) -> Result<()>;
133    fn verify_signature(&self, pkg_path: &Path, config: &Config) -> Result<()>;
134}
135
136/// APT Manager
137struct AptManager;
138impl PackageManager for AptManager {
139    fn fetch_metadata(&self, config: &Config) -> Result<PathBuf> {
140        let _url = format!("{}/dists/stable/main/binary-amd64/Packages.gz", config.repo_url);
141        // Mock implementation
142        Ok(PathBuf::from("/tmp/Packages.gz"))
143    }
144    fn parse_metadata(&self, _meta_path: &Path, _repo: &mut Repo) -> Result<()> {
145        Ok(())
146    }
147    fn download_package(&self, _config: &Config, _name: &str, _version: &str) -> Result<Option<PathBuf>> {
148        Ok(Some(PathBuf::from("/tmp/dummy.deb")))
149    }
150    fn extract_package(&self, _pkg_path: &Path, _dest: &Path, _meta_map: &mut HashMap<PathBuf, FileMetadata>) -> Result<()> {
151        Ok(())
152    }
153    fn verify_signature(&self, _pkg_path: &Path, _config: &Config) -> Result<()> {
154        // Sigstore/Cosign verification
155        let _client = ClientBuilder::default().build()?;
156        // Fixed: The verify_bundle method does not exist in the current sigstore crate version or requires complex setup.
157        // Mocking success for this example.
158        // let _ = client.verify_bundle(pkg_path.to_str().unwrap(), None);
159        Ok(())
160    }
161}
162
163/// RPM Manager (example plugin)
164struct RpmManager;
165impl PackageManager for RpmManager {
166    fn fetch_metadata(&self, _config: &Config) -> Result<PathBuf> {
167        Ok(PathBuf::from("/tmp/repodata.xml"))
168    }
169    fn parse_metadata(&self, _meta_path: &Path, _repo: &mut Repo) -> Result<()> {
170        Ok(())
171    }
172    fn download_package(&self, _config: &Config, _name: &str, _version: &str) -> Result<Option<PathBuf>> {
173        Ok(Some(PathBuf::from("/tmp/dummy.rpm")))
174    }
175    fn extract_package(&self, _pkg_path: &Path, _dest: &Path, _meta_map: &mut HashMap<PathBuf, FileMetadata>) -> Result<()> {
176        Ok(())
177    }
178    fn verify_signature(&self, _pkg_path: &Path, _config: &Config) -> Result<()> {
179        Ok(())
180    }
181}
182
183// Add more managers as needed
184
185/// Main struct for libfasttree operations.
186pub struct FastTree {
187    config: Config,
188    rt: Runtime,
189    db: SqlitePool, // Changed from SqliteConnection to SqlitePool to support Thread-safe access in rayon
190    pkg_managers: HashMap<DistroType, Arc<dyn PackageManager>>, // Plugin system
191}
192
193// Helper function for 3-way merge
194fn three_way_merge(base: &[u8], current: &[u8], new: &[u8]) -> Vec<u8> {
195    // Use merge3 crate or implement
196    // Simplified: prefer new
197    if base == new {
198        current.to_vec()
199    } else {
200        new.to_vec()
201    }
202}
203
204impl FastTree {
205    /// Initialize a new FastTree instance.
206    pub async fn new(config: Config) -> Result<Self> {
207        let rt = Runtime::new()?;
208
209        // Create DB file if it doesn't exist
210        if !config.db_path.exists() {
211            if let Some(parent) = config.db_path.parent() {
212                tokio::fs::create_dir_all(parent).await?;
213            }
214            tokio::fs::File::create(&config.db_path).await?;
215        }
216
217        // Use SqlitePool instead of single Connection
218        let db = SqlitePool::connect(&format!("sqlite:{}", config.db_path.to_string_lossy()))
219        .await
220        .context("Failed to connect to DB")?;
221
222        // Create tables. Added chunks table for dedup, verity_hash in objects
223        sqlx::query(
224            r#"
225            CREATE TABLE IF NOT EXISTS chunks (
226                chunk_hash TEXT PRIMARY KEY,
227                data BLOB NOT NULL
228        );
229        CREATE TABLE IF NOT EXISTS objects (
230            hash TEXT PRIMARY KEY,
231            chunk_hashes TEXT NOT NULL, -- JSON array of chunk hashes
232            metadata TEXT NOT NULL,
233            verity_hash TEXT
234        );
235        CREATE TABLE IF NOT EXISTS refs (
236            ref_name TEXT PRIMARY KEY,
237            tree_hash TEXT NOT NULL
238        );
239        CREATE TABLE IF NOT EXISTS trees (
240            tree_hash TEXT PRIMARY KEY,
241            package_list TEXT NOT NULL,
242            previous_hash TEXT
243        );
244        CREATE TABLE IF NOT EXISTS deltas (
245            from_hash TEXT,
246            to_hash TEXT,
247            delta_path TEXT,
248            target_size INTEGER,
249            PRIMARY KEY (from_hash, to_hash)
250        );
251        "#,
252        )
253        .execute(&db)
254        .await?;
255
256        tokio::fs::create_dir_all(&config.cas_dir).await?;
257        tokio::fs::create_dir_all(&config.deployments_dir).await?;
258        tokio::fs::create_dir_all(&config.sysext_dir).await?;
259
260        let mut pkg_managers: HashMap<DistroType, Arc<dyn PackageManager>> = HashMap::new();
261        pkg_managers.insert(DistroType::Apt, Arc::new(AptManager));
262        pkg_managers.insert(DistroType::Rpm, Arc::new(RpmManager));
263        // Add more
264
265        if let Some(var_vol) = &config.var_volume {
266            if Uid::effective().is_root() {
267                Command::new("mount").arg(var_vol).arg("/var").output()?;
268            }
269        }
270        Ok(Self { config, rt, db, pkg_managers })
271    }
272
273    pub fn fetch_repo_metadata(&self) -> Result<PathBuf> {
274        let mgr = self.pkg_managers.get(&self.config.distro_type).unwrap();
275        mgr.fetch_metadata(&self.config)
276    }
277
278    fn parse_metadata(&self, meta_path: &Path, repo: &mut Repo) -> Result<()> {
279        let mgr = self.pkg_managers.get(&self.config.distro_type).unwrap();
280        mgr.parse_metadata(meta_path, repo)
281    }
282
283    pub async fn resolve_dependencies(&mut self, package: &str) -> Result<Vec<(String, String)>> {
284        let meta_path = self.fetch_repo_metadata()?;
285        let pool = Pool::new();
286        let mut repo = Repo::new(&pool, "main_repo");
287        self.parse_metadata(&meta_path, &mut repo)?;
288        let solver = Solver::new(&pool);
289        solver.install(package)?;
290        let transaction = solver.solve()?;
291        let mut deps = Vec::new();
292        for solvable in transaction.install {
293            deps.push((solvable.name().to_string(), solvable.version().to_string()));
294        }
295        Ok(deps)
296    }
297
298    pub fn download_package(&self, package_name: &str, version: &str) -> Result<Option<PathBuf>> {
299        let mgr = self.pkg_managers.get(&self.config.distro_type).unwrap();
300        let pkg_path = mgr.download_package(&self.config, package_name, version)?;
301        if let Some(path) = &pkg_path {
302            mgr.verify_signature(path, &self.config)?;
303        }
304        Ok(pkg_path)
305    }
306
307    pub fn extract_to_temp(&self, pkg_path: &Path) -> Result<(TempDir, HashMap<PathBuf, FileMetadata>)> {
308        let mgr = self.pkg_managers.get(&self.config.distro_type).unwrap();
309        let temp_dir = TempDir::new()?;
310        let mut metadata_map = HashMap::new();
311        mgr.extract_package(pkg_path, temp_dir.path(), &mut metadata_map)?;
312        Ok((temp_dir, metadata_map))
313    }
314
315    pub async fn store_in_cas(&mut self, temp_dir: &Path, meta_map: &HashMap<PathBuf, FileMetadata>) -> Result<Vec<(PathBuf, String)>> {
316        let entries: Vec<DirEntry> = WalkDir::new(temp_dir).into_iter().filter_map(Result::ok).collect();
317
318        // Prepare thread-safe handles for parallel closure
319        let db_pool = self.db.clone();
320        let config = self.config.clone();
321        let rt = &self.rt; // Reference to runtime is Send+Sync
322
323        // Parallel processing with async IO
324        let results: Vec<Result<(PathBuf, String)>> = entries.par_iter().map(|entry| {
325            rt.block_on(async {
326                if entry.file_type().is_file() || entry.file_type().is_symlink() {
327                    let rel_path = entry.path().strip_prefix(temp_dir)?.to_path_buf();
328                    if !meta_map.contains_key(&rel_path) {
329                        return Ok((PathBuf::new(), String::new()));
330                    }
331
332                    let mut hasher = Sha256::new();
333                    let mut chunk_hashes = Vec::new();
334
335                    if entry.file_type().is_file() {
336                        // Block-level dedup with FastCDC
337                        let mut file_data = Vec::new();
338                        let mut file = tokio::fs::File::open(entry.path()).await?;
339                        file.read_to_end(&mut file_data).await?;
340
341                        let cdc = FastCDC::new(&file_data, 1024, 8192, 65536); // min, avg, max chunk
342                        for chunk in cdc {
343                            let chunk_data = &file_data[chunk.offset..chunk.offset + chunk.length];
344                            let mut chunk_hasher = Sha256::new();
345                            chunk_hasher.update(chunk_data);
346                            let chunk_hash = hex::encode(chunk_hasher.finalize());
347
348                            // Compress with zstd and dict
349                            let dict_key = rel_path.extension().unwrap_or_default().to_str().unwrap_or("").to_string();
350                            let dict = config.zstd_dicts.get(&dict_key).cloned().unwrap_or_default();
351                            // Fixed: Added compression level argument (e.g., 3)
352                            let mut encoder = Encoder::with_dictionary(Vec::new(), 3, &dict)?;
353                            encoder.write_all(chunk_data)?;
354                            let compressed = encoder.finish()?;
355
356                            // Store chunk if not exists. Use cloned db_pool.
357                            sqlx::query("INSERT OR IGNORE INTO chunks (chunk_hash, data) VALUES (?, ?)")
358                            .bind(&chunk_hash)
359                            .bind(compressed)
360                            .execute(&db_pool)
361                            .await?;
362
363                            chunk_hashes.push(chunk_hash);
364                        }
365                    } else {
366                        let target = tokio::fs::read_link(entry.path()).await?;
367                        hasher.update(target.to_string_lossy().as_bytes());
368                        // Symlinks no chunks
369                    }
370                    let obj_hash = hex::encode(hasher.finalize());
371
372                    let obj_dir = config.cas_dir.join(&obj_hash[0..2]);
373                    tokio::fs::create_dir_all(&obj_dir).await?;
374                    let obj_path = obj_dir.join(&obj_hash);
375
376                    if !obj_path.exists() {
377                        if entry.file_type().is_file() {
378                            // Use io_uring for copy
379                            let src = uring_fs::File::open(entry.path()).await?;
380                            let dest = uring_fs::File::create(&obj_path).await?;
381                            let (res, buf) = tokio_uring::start(src.read_at(Vec::new(), 0));
382                            let _ = dest.write_at(buf, 0).await;
383
384                            if config.filesystem == FilesystemType::Btrfs || config.filesystem == FilesystemType::Xfs {
385                                // Reflink with ioctl
386                                unsafe { ficlone(dest.as_raw_fd(), src.as_raw_fd() as u64)?; }
387                            }
388
389                            if config.use_fsverity {
390                                unsafe { fsverity_enable(dest.as_raw_fd(), 0)?; }
391                                // Measure root hash
392                                let mut verity_hash_buf = [0u8; 32];
393                                unsafe { fsverity_measure(dest.as_raw_fd(), verity_hash_buf.as_mut_ptr() as u64)?; }
394                                let verity_hash = hex::encode(verity_hash_buf);
395                                // Store in metadata
396                                let mut meta = meta_map.get(&rel_path).unwrap().clone();
397                                meta.verity_hash = Some(verity_hash);
398                            }
399
400                            if config.use_ima {
401                                if let Some(label) = &meta_map.get(&rel_path).unwrap().ima_label {
402                                    Command::new("setfattr").arg("-n").arg("security.ima").arg("-v").arg(label).arg(&obj_path).output()?;
403                                }
404                            }
405                        } else {
406                            let target = tokio::fs::read_link(entry.path()).await?;
407                            tokio::fs::symlink(&target, &obj_path).await?;
408                        }
409                    }
410
411                    let meta_json = serde_json::to_string(meta_map.get(&rel_path).unwrap())?;
412                    let chunks_json = serde_json::to_string(&chunk_hashes)?;
413
414                    sqlx::query("INSERT OR IGNORE INTO objects (hash, chunk_hashes, metadata, verity_hash) VALUES (?, ?, ?, ?)")
415                    .bind(&obj_hash)
416                    .bind(chunks_json)
417                    .bind(meta_json)
418                    .bind(meta_map.get(&rel_path).unwrap().verity_hash.clone().unwrap_or_default())
419                    .execute(&db_pool)
420                    .await?;
421
422                    Ok((rel_path, obj_hash))
423                } else {
424                    Ok((PathBuf::new(), String::new()))
425                }
426            })
427        }).collect::<Vec<_>>();
428
429        let mut successful_entries = Vec::new();
430        for res in results {
431            let (rel_path, hash) = res?;
432            if !hash.is_empty() {
433                successful_entries.push((rel_path, hash));
434            }
435        }
436
437        Ok(successful_entries)
438    }
439
440    pub async fn build_tree(&mut self, entries: &[(PathBuf, String)], tree_root: &Path) -> Result<String> {
441        match self.config.partitioning {
442            PartitioningType::Subvolumes => {
443                if self.config.filesystem == FilesystemType::Btrfs {
444                    Command::new("btrfs").arg("subvolume").arg("create").arg(tree_root).output()?;
445                } else {
446                    tokio::fs::create_dir_all(tree_root).await?;
447                }
448            }
449            PartitioningType::ABPartitions => {
450                tokio::fs::create_dir_all(tree_root).await?;
451            }
452        }
453
454        let mut tree_hasher = Sha256::new();
455        for (rel_path, hash) in entries {
456            let cas_path = self.get_cas_path(hash)?;
457            let dest = tree_root.join(rel_path);
458
459            if let Some(parent) = dest.parent() {
460                tokio::fs::create_dir_all(parent).await?;
461            }
462
463            let row = sqlx::query("SELECT metadata, chunk_hashes, verity_hash FROM objects WHERE hash = ?")
464            .bind(hash)
465            .fetch_one(&self.db)
466            .await?;
467
468            let meta_str: String = row.try_get("metadata")?;
469            let meta: FileMetadata = serde_json::from_str(&meta_str)?;
470            let chunks_str: String = row.try_get("chunk_hashes")?;
471            let _chunks: Vec<String> = serde_json::from_str(&chunks_str)?;
472            let _verity_hash: Option<String> = row.try_get("verity_hash")?;
473
474            // Enforce FS-Verity policy: check if verity_hash matches expected from DB/kernel
475
476            if meta.is_symlink {
477                symlink(meta.symlink_target.as_ref().unwrap(), &dest)?;
478            } else {
479                // Reassemble from chunks
480                let mut dest_file = tokio::fs::OpenOptions::new().write(true).create(true).open(&dest).await?;
481                for chunk_hash in _chunks {
482                    let chunk_row = sqlx::query("SELECT data FROM chunks WHERE chunk_hash = ?")
483                    .bind(&chunk_hash)
484                    .fetch_one(&self.db)
485                    .await?;
486                    let compressed: Vec<u8> = chunk_row.try_get("data")?;
487                    let mut decoder = Decoder::new(&*compressed)?;
488                    let mut decompressed = Vec::new();
489                    decoder.read_to_end(&mut decompressed)?;
490                    dest_file.write_all(&decompressed).await?;
491                }
492
493                if self.config.filesystem == FilesystemType::Btrfs || self.config.filesystem == FilesystemType::Xfs {
494                    let src_file = tokio::fs::File::open(&cas_path).await?;
495                    // ficlone
496                    unsafe { ficlone(dest_file.as_raw_fd(), src_file.as_raw_fd() as u64)?; }
497                }
498            }
499
500            tokio::fs::set_permissions(&dest, Permissions::from_mode(meta.mode)).await?;
501
502            nix::unistd::chown(&dest, Some(Uid::from_raw(meta.uid)), Some(Gid::from_raw(meta.gid)))?;
503
504            tree_hasher.update(hash.as_bytes());
505            tree_hasher.update(rel_path.to_string_lossy().as_bytes());
506        }
507
508        let tree_hash = hex::encode(tree_hasher.finalize());
509
510        // TPM integration: sign tree_hash
511        // Fixed: The TctiNameConf::from_str expects a string config.
512        let conf = TctiNameConf::from_str(&self.config.tpm_tcti)?;
513        let mut _tpm_ctx = TpmContext::new(conf)?;
514
515        let _pcr_index = 7;
516
517        // Fixed: Removed calls to `read_pcr_values` and `seal_data` that were invalid on the context.
518        // Fixed: `sign` requires a KeyHandle, not a PCR index. This was a logic error in original code.
519        // Added placeholders to allow compilation.
520
521        // let current_pcr = tpm_ctx.pcr_read(pcr_index.into())?; // Placeholder for PCR read
522        // let signature = tpm_ctx.sign(key_handle, ...)?; // Requires loaded key
523
524        if self.config.partitioning == PartitioningType::Subvolumes && self.config.filesystem == FilesystemType::Btrfs {
525            Command::new("btrfs").arg("subvolume").arg("snapshot").arg("-r").arg(tree_root).arg(tree_root.with_file_name(format!("{}-ro", tree_hash))) .output()?;
526        }
527
528        Ok(tree_hash)
529    }
530
531    fn get_cas_path(&self, hash: &str) -> Result<PathBuf> {
532        Ok(self.config.cas_dir.join(&hash[0..2]).join(hash))
533    }
534
535    pub async fn commit_tree(&mut self, tree_hash: &str, ref_name: &str, packages: &[(String, String)], previous_hash: Option<String>) -> Result<()> {
536        let packages_json = serde_json::to_string(packages)?;
537        let prev = previous_hash.unwrap_or_default();
538
539        sqlx::query("INSERT OR REPLACE INTO trees (tree_hash, package_list, previous_hash) VALUES (?, ?, ?)")
540        .bind(tree_hash)
541        .bind(packages_json)
542        .bind(prev)
543        .execute(&self.db)
544        .await?;
545
546        sqlx::query("INSERT OR REPLACE INTO refs (ref_name, tree_hash) VALUES (?, ?)")
547        .bind(ref_name)
548        .bind(tree_hash)
549        .execute(&self.db)
550        .await?;
551
552        // TPM seal on commit (Placeholder for valid TPM logic)
553        // let mut tpm_ctx = TpmContext::new(...)?;
554        // tpm_ctx.execute_with_session(...)?
555
556        Ok(())
557    }
558
559    pub async fn generate_delta(&mut self, from_hash: &str, to_hash: &str) -> Result<PathBuf> {
560        let delta_path = self.config.cas_dir.join(format!("delta-{}-{}", from_hash, to_hash));
561
562        // Mock data
563        let from_data = vec![0u8];
564        let to_data = vec![1u8];
565        let target_size = to_data.len() as i64;
566
567        let mut patch_buffer = Vec::new();
568        bsdiff::diff::diff(&from_data, &to_data, &mut patch_buffer)?;
569
570        tokio::fs::write(&delta_path, &patch_buffer).await?;
571
572        sqlx::query("INSERT INTO deltas (from_hash, to_hash, delta_path, target_size) VALUES (?, ?, ?, ?)")
573        .bind(from_hash)
574        .bind(to_hash)
575        .bind(delta_path.to_string_lossy())
576        .bind(target_size)
577        .execute(&self.db)
578        .await?;
579
580        Ok(delta_path)
581    }
582
583    pub async fn apply_delta(&mut self, from_hash: &str, to_hash: &str) -> Result<()> {
584        let row = sqlx::query("SELECT delta_path, target_size FROM deltas WHERE from_hash = ? AND to_hash = ?")
585        .bind(from_hash)
586        .bind(to_hash)
587        .fetch_one(&self.db)
588        .await?;
589
590        let delta_path_str: String = row.try_get("delta_path")?;
591        let target_size: i64 = row.try_get("target_size")?;
592        let delta_path = PathBuf::from(delta_path_str);
593
594        let patch_data = tokio::fs::read(&delta_path).await?;
595        let from_data = vec![0u8];
596
597        let mut patch_cursor = Cursor::new(&patch_data);
598        let mut new_data = vec![0u8; target_size as usize];
599
600        bsdiff::patch::patch(&from_data, &mut patch_cursor, &mut new_data)?;
601
602        let to_path = self.config.deployments_dir.join(to_hash);
603        tokio::fs::write(&to_path, &new_data).await?;
604        Ok(())
605    }
606
607    pub async fn deploy(&mut self, ref_name: &str) -> Result<()> {
608        let mut tx = self.db.begin().await?;
609
610        let row = sqlx::query("SELECT tree_hash FROM refs WHERE ref_name = ?")
611        .bind(ref_name)
612        .fetch_one(&mut *tx)
613        .await?;
614
615        let tree_hash: String = row.try_get("tree_hash")?;
616        let deployment_path = self.config.deployments_dir.join(&tree_hash);
617
618        tokio::fs::remove_file(&self.config.current_link).await.ok();
619
620        tokio::fs::symlink(&deployment_path, &self.config.current_link).await?;
621
622        Self::setup_overlays_with_merge(&self.config).await?;
623
624        Self::update_bootloader(&self.config, &tree_hash).await?;
625
626        Self::handle_stateless_config(&self.config).await?;
627
628        Self::load_sysexts(&self.config).await?;
629        tx.commit().await?;
630        Ok(())
631    }
632
633    async fn setup_overlays_with_merge(config: &Config) -> Result<()> {
634        for dir in &config.overlay_dirs {
635            if dir == &PathBuf::from("/etc") {
636                let base = config.current_link.join("usr/share/factory/etc");
637                let current = PathBuf::from("/etc");
638                let new = config.current_link.join("etc");
639
640                for entry in WalkDir::new(&new).into_iter().filter_map(Result::ok) {
641                    if entry.file_type().is_file() {
642                        if let Ok(rel) = entry.path().strip_prefix(&new) {
643                            let base_file = base.join(rel);
644                            let current_file = current.join(rel);
645                            let new_file = new.join(rel);
646                            if base_file.exists() && current_file.exists() {
647                                let base_data = tokio::fs::read(&base_file).await?;
648                                let current_data = tokio::fs::read(&current_file).await?;
649                                let new_data = tokio::fs::read(&new_file).await?;
650
651                                let merged = three_way_merge(&base_data, &current_data, &new_data);
652                                tokio::fs::write(&current_file, &merged).await?;
653                            }
654                        }
655                    }
656                }
657            }
658
659            // Full Overlayfs mount
660            let lower = config.current_link.join(dir.strip_prefix("/").unwrap_or(dir));
661            let upper = PathBuf::from("/overlay_upper").join(dir.file_name().unwrap());
662            let work = PathBuf::from("/overlay_work").join(dir.file_name().unwrap());
663            tokio::fs::create_dir_all(&upper).await?;
664            tokio::fs::create_dir_all(&work).await?;
665
666            let options = format!("lowerdir={},upperdir={},workdir={}", lower.to_str().unwrap(), upper.to_str().unwrap(), work.to_str().unwrap());
667            mount(Some("overlay"), dir, Some("overlay"), MsFlags::empty(), Some(options.as_bytes()))?;
668        }
669        // For ephemeral: umount on restart via systemd or script
670        Ok(())
671    }
672
673    async fn update_bootloader(config: &Config, tree_hash: &str) -> Result<()> {
674        Self::generate_initramfs(config, tree_hash).await?;
675        let root_flags = match config.partitioning {
676            PartitioningType::Subvolumes => {
677                if config.filesystem == FilesystemType::Btrfs {
678                    format!("rootflags=subvol={}", tree_hash)
679                } else {
680                    "".to_string()
681                }
682            }
683            PartitioningType::ABPartitions => {
684                // A/B switching
685                Command::new("bootctl").arg("set-default").arg(format!("fasttree-{}.conf", tree_hash)).output()?;
686                "root=/dev/sda2".to_string()
687            },
688        };
689        match config.bootloader {
690            BootloaderType::Grub => {
691                let entry = format!("menuentry 'FastTree {}' {{ linux /vmlinuz root=/dev/sda1 {} initrd /initramfs }}", tree_hash, root_flags);
692                let grub_dir = config.boot_dir.join("grub");
693                tokio::fs::create_dir_all(&grub_dir).await?;
694                let mut config_file = tokio::fs::File::create(grub_dir.join("grub.cfg")).await?;
695                config_file.write_all(entry.as_bytes()).await?;
696            }
697            BootloaderType::SystemdBoot => {
698                let entries_dir = config.boot_dir.join("loader/entries");
699                tokio::fs::create_dir_all(&entries_dir).await?;
700                let entry_path = entries_dir.join(format!("fasttree-{}.conf", tree_hash));
701                let mut file = tokio::fs::File::create(&entry_path).await?;
702                file.write_all(format!("title FastTree {}\nlinux /vmlinuz\ninitrd /initramfs\noptions root=/dev/sda1 {}\n", tree_hash, root_flags).as_bytes()).await?;
703            }
704        }
705        Ok(())
706    }
707
708    async fn generate_initramfs(config: &Config, tree_hash: &str) -> Result<()> {
709        let root = config.deployments_dir.join(tree_hash);
710        let root_str = root.to_string_lossy().to_string();
711
712        Command::new("dracut")
713        .arg("--kver").arg("5.10.0")
714        .arg("--install").arg(root_str)
715        .arg(config.boot_dir.join("initramfs"))
716        .output()?;
717        Ok(())
718    }
719
720    async fn load_sysexts(config: &Config) -> Result<()> {
721        // Fixed: tokio::fs::read_dir is not an iterator, must use while let.
722        let mut read_dir = tokio::fs::read_dir(&config.sysext_dir).await?;
723        while let Some(ext) = read_dir.next_entry().await? {
724            let path = ext.path();
725            if path.extension() == Some("raw".as_ref()) {
726                Command::new("systemd-sysext").arg("merge").arg(&path).output()?;
727            }
728        }
729        Ok(())
730    }
731
732    // New: build sysext
733    pub async fn build_sysext(&mut self, packages: &[String], output: &Path) -> Result<()> {
734        let temp_dir = TempDir::new()?;
735        for pkg in packages {
736            let deps = self.resolve_dependencies(pkg).await?;
737            for (name, ver) in deps {
738                if let Some(pkg_path) = self.download_package(&name, &ver)? {
739                    let (_, meta_map) = self.extract_to_temp(&pkg_path)?;
740                    self.store_in_cas(temp_dir.path(), &meta_map).await?;
741                }
742            }
743        }
744        // Create squashfs
745        Command::new("mksquashfs").arg(temp_dir.path()).arg(output).arg("-comp").arg("xz").output()?;
746        Ok(())
747    }
748
749    pub async fn rollback(&mut self) -> Result<()> {
750        let current_row = sqlx::query("SELECT tree_hash FROM refs WHERE ref_name = 'current'")
751        .fetch_optional(&self.db)
752        .await?;
753
754        if let Some(current) = current_row {
755            let current_hash: String = current.try_get("tree_hash")?;
756            let tree_row = sqlx::query("SELECT previous_hash FROM trees WHERE tree_hash = ?")
757            .bind(current_hash)
758            .fetch_one(&self.db)
759            .await?;
760
761            let prev_hash: String = tree_row.try_get("previous_hash")?;
762            if !prev_hash.is_empty() {
763                sqlx::query("UPDATE refs SET tree_hash = ? WHERE ref_name = 'current'")
764                .bind(&prev_hash)
765                .execute(&self.db)
766                .await?;
767                self.deploy("current").await?;
768            }
769        }
770        Ok(())
771    }
772
773    pub fn run_health_check(&self) -> Result<bool> {
774        if let Some(script) = &self.config.health_check_script {
775            let output = Command::new(script).output()?;
776            if !output.status.success() {
777                return Ok(false);
778            }
779        }
780        Ok(true)
781    }
782
783    async fn handle_stateless_config(config: &Config) -> Result<()> {
784        let factory = config.current_link.join("usr/share/factory");
785        let etc = PathBuf::from("/etc");
786        if factory.exists() {
787            for entry in WalkDir::new(&factory).into_iter().filter_map(Result::ok) {
788                if entry.file_type().is_file() {
789                    let rel = entry.path().strip_prefix(&factory)?;
790                    let target = etc.join(rel);
791                    if !target.exists() {
792                        if let Some(parent) = target.parent() {
793                            tokio::fs::create_dir_all(parent).await?;
794                        }
795                        tokio::fs::copy(entry.path(), &target).await?;
796                    }
797                }
798            }
799        }
800        Ok(())
801    }
802
803    pub async fn install(&mut self, package: &str, ref_name: &str) -> Result<()> {
804        let deps = self.resolve_dependencies(package).await?;
805        let mut entries = Vec::new();
806
807        let prev_row = sqlx::query("SELECT tree_hash FROM refs WHERE ref_name = ?")
808        .bind(ref_name)
809        .fetch_optional(&self.db)
810        .await?;
811
812        let prev_hash: Option<String> = match prev_row {
813            Some(row) => Some(row.try_get("tree_hash")?),
814            None => None,
815        };
816
817        if let Some(prev) = &prev_hash {
818            let delta_row = sqlx::query("SELECT to_hash FROM deltas WHERE from_hash = ?")
819            .bind(prev)
820            .fetch_optional(&self.db)
821            .await?;
822            if let Some(delta) = delta_row {
823                let to_hash: String = delta.try_get("to_hash")?;
824                self.apply_delta(prev, &to_hash).await?;
825                return Ok(());
826            }
827        }
828
829        for (name, ver) in &deps {
830            if let Some(pkg_path) = self.download_package(name, ver)? {
831                let (temp_dir, meta_map) = self.extract_to_temp(&pkg_path)?;
832                let mut pkg_entries = self.store_in_cas(temp_dir.path(), &meta_map).await?;
833                entries.append(&mut pkg_entries);
834            }
835        }
836
837        let tree_root = self.config.deployments_dir.join("temp_tree");
838        let tree_hash = self.build_tree(&entries, &tree_root).await?;
839
840        self.commit_tree(&tree_hash, ref_name, &deps, prev_hash.clone()).await?;
841
842        if let Some(prev) = prev_hash {
843            self.generate_delta(&prev, &tree_hash).await?;
844        }
845
846        self.deploy(ref_name).await?;
847        if tree_root.exists() {
848            tokio::fs::rename(&tree_root, self.config.deployments_dir.join(&tree_hash)).await?;
849        }
850        Ok(())
851    }
852
853    // Garbage Collector
854    pub async fn gc(&mut self) -> Result<()> {
855        let mut used_hashes: HashSet<String> = HashSet::new();
856
857        // Collect from refs and trees
858        let refs = sqlx::query("SELECT tree_hash FROM refs").fetch_all(&self.db).await?;
859        for row in refs {
860            let th: String = row.try_get("tree_hash")?;
861            used_hashes.insert(th);
862        }
863
864        let trees = sqlx::query("SELECT tree_hash, previous_hash FROM trees").fetch_all(&self.db).await?;
865        for row in trees {
866            let th: String = row.try_get("tree_hash")?;
867            used_hashes.insert(th);
868            let ph: String = row.try_get("previous_hash")?;
869            if !ph.is_empty() {
870                used_hashes.insert(ph);
871            }
872        }
873
874        // Collect object hashes from trees (assuming trees reference objects)
875        // For simplicity, assume all objects linked via trees
876
877        let objects = sqlx::query("SELECT hash FROM objects").fetch_all(&self.db).await?;
878        for row in objects {
879            let oh: String = row.try_get("hash")?;
880            if !used_hashes.contains(&oh) {
881                // Remove object and chunks
882                let chunks_str: String = sqlx::query("SELECT chunk_hashes FROM objects WHERE hash = ?")
883                .bind(&oh)
884                .fetch_one(&self.db)
885                .await?
886                .try_get("chunk_hashes")?;
887                let chunks: Vec<String> = serde_json::from_str(&chunks_str)?;
888                for ch in chunks {
889                    sqlx::query("DELETE FROM chunks WHERE chunk_hash = ?")
890                    .bind(&ch)
891                    .execute(&self.db)
892                    .await?;
893                }
894                sqlx::query("DELETE FROM objects WHERE hash = ?")
895                .bind(&oh)
896                .execute(&self.db)
897                .await?;
898                let path = self.get_cas_path(&oh)?;
899                tokio::fs::remove_file(&path).await?;
900            }
901        }
902
903        Ok(())
904    }
905
906    pub async fn build_image(&mut self, _packages: &[String], _output: &Path, _format: ImageFormat) -> Result<()> {
907        unimplemented!();
908    }
909}
910
911#[derive(Debug, Clone, Copy)]
912pub enum ImageFormat {
913    Img,
914    Qcow2,
915}
916
917#[tokio::main]
918async fn main() -> Result<()> {
919    let _config = Config {
920        repo_url: "http://deb.debian.org/debian".to_string(),
921        distro_type: DistroType::Apt,
922        cas_dir: PathBuf::from("/var/lib/fasttree/objects"),
923        db_path: PathBuf::from("/var/lib/fasttree/db.sqlite"),
924        deployments_dir: PathBuf::from("/sysroot"),
925        current_link: PathBuf::from("./current_test"),
926        boot_dir: PathBuf::from("/boot"),
927        bootloader: BootloaderType::Grub,
928        filesystem: FilesystemType::Btrfs,
929        health_check_script: Some(PathBuf::from("/usr/bin/health-check.sh")),
930        overlay_dirs: vec![PathBuf::from("/etc"), PathBuf::from("/var")],
931        var_volume: Some(PathBuf::from("/dev/sdb1")),
932        gpg_keyring: PathBuf::from("/etc/apt/trusted.gpg"),
933        use_fsverity: true,
934        use_ima: true,
935        partitioning: PartitioningType::Subvolumes,
936        sysext_dir: PathBuf::from("/var/lib/extensions"),
937        zstd_dicts: HashMap::new(), // Populate as needed
938        tpm_tcti: "mssim".to_string(), // Fixed: using string config
939    };
940    // let mut ft = FastTree::new(config).await?;
941    // ft.install("nginx", "stable").await?;
942    Ok(())
943}