multi_machine_dedup/
lib.rs

1use clap::Parser;
2use crc::{Crc, CRC_32_ISCSI};
3use log::{debug, error, info, warn};
4use rusqlite::{params, Connection, Error};
5use std::fs::{self, File};
6use std::io::{BufReader, Read};
7use std::path::{Path, PathBuf};
8use walkdir::WalkDir;
9
10#[derive(Parser, Debug)]
11#[command(name = "multi-machine-dedup")]
12pub struct CLI {
13    #[command(subcommand)]
14    pub cmd: Commands,
15}
16
17#[derive(Parser, Debug)]
18pub enum Commands {
19    #[command(name = "index", about = "Use index to create or update a database")]
20    Index(IndexOptions),
21    #[command(
22        name = "check-integrity",
23        about = "Use check-integrity to verify all files"
24    )]
25    CheckIntegrity(CheckIntegrityOptions),
26    #[command(name = "compare", about = "Use compare to compare two databases")]
27    Compare(CompareOptions),
28}
29
30#[derive(Parser, Debug)]
31pub struct IndexOptions {
32    #[arg(short, long)]
33    pub label: String,
34    #[arg(short, long)]
35    pub db: PathBuf,
36    pub path: PathBuf,
37}
38
39#[derive(Parser, Debug)]
40pub struct CheckIntegrityOptions {
41    #[arg(short, long)]
42    pub label: String,
43    #[arg(short, long)]
44    pub db: PathBuf,
45}
46
47#[derive(Parser, Debug)]
48pub struct CompareOptions {
49    #[arg(long)]
50    pub db1: PathBuf,
51    #[arg(long)]
52    pub db2: PathBuf,
53}
54
55#[derive(Clone, Debug)]
56struct Entry {
57    hash: u32,
58    full_path: String,
59    size: u64,
60    mime: String,
61}
62
63pub const CASTAGNOLI: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);
64
65pub fn create_db(conn: &Connection) {
66    //id              INTEGER PRIMARY KEY AUTOINCREMENT,
67    conn.execute(
68        "CREATE TABLE IF NOT EXISTS file (
69                  label TEXT,
70                  full_path TEXT,
71                  hash              INTEGER,
72                  size INTEGER,              
73                  PRIMARY KEY (label, full_path)
74                  FOREIGN KEY(hash,size) REFERENCES hash(hash,size)
75                  )",
76        [],
77    )
78    .expect("");
79
80    conn.execute(
81        "CREATE TABLE IF NOT EXISTS hash (
82                  hash              INTEGER,
83                  size INTEGER,
84                  mime TEXT,
85                  PRIMARY KEY (hash, size)
86                  )",
87        [],
88    )
89    .expect("");
90}
91
92pub fn index(opt: IndexOptions) {
93    //let conn = Connection::open_in_memory().unwrap();
94    let conn = Connection::open(opt.db).unwrap();
95    create_db(&conn);
96    for entry in WalkDir::new(opt.path) {
97        let entry = entry.unwrap();
98        if entry.file_type().is_dir() {
99            info!("Processsing directory {}", entry.path().display());
100        } else {
101            let x = fs::metadata(entry.path()).unwrap().len();
102            info!("Indexing file {} {:?}", entry.path().display(), x);
103            let crc = hash(entry.path());
104            debug!("The crc is: {} for file {}", crc, entry.path().display());
105            let data = Entry {
106                hash: crc,
107                full_path: entry.path().display().to_string(),
108                size: fs::metadata(entry.path()).unwrap().len(),
109                mime: tree_magic_mini::from_filepath(entry.path())
110                    .get_or_insert("N/A")
111                    .to_string(),
112            };
113            let res = conn.execute(
114                "INSERT INTO hash (hash, size, mime) VALUES (?1, ?2, ?3)",
115                params![data.hash, data.size, data.mime],
116            ); //.expect("req1");
117            match res {
118                Ok(_) => (),
119                Err(error) => match error {
120                    Error::SqliteFailure(error, _msg) => {
121                        if error.extended_code == 1555 {
122                            warn!(
123                                "hash & size '{}' '{}' already indexed",
124                                data.hash, data.size
125                            )
126                        }
127                    }
128                    _ => panic!(
129                        "Unable to index hash & size: '{}' {}",
130                        data.full_path, error
131                    ),
132                },
133            }
134            let res = conn.execute(
135                "INSERT INTO file (label, full_path, hash, size) VALUES (?1, ?2, ?3, ?4)",
136                params![opt.label, data.full_path, data.hash, data.size],
137            ); //.expect("req2");
138            match res {
139                Ok(_) => (),
140                Err(error) => match error {
141                    Error::SqliteFailure(error, _msg) => {
142                        if error.extended_code == 1555 {
143                            error!("path '{}' already indexed", data.full_path)
144                        }
145                    }
146                    _ => panic!("Unable to index file: '{}' {}", data.full_path, error),
147                },
148            }
149        }
150    }
151}
152
153pub fn check_integrity(opt: CheckIntegrityOptions) {
154    let conn = Connection::open(opt.db).unwrap();
155    let mut stmt = conn
156        .prepare("SELECT * FROM file WHERE label=:label")
157        .unwrap();
158    let file_iter = stmt
159        .query_map(&[(":label", &opt.label)], |row| {
160            Ok(Entry {
161                hash: row.get(2)?,
162                full_path: row.get(1)?,
163                size: row.get(3)?,
164                mime: "".to_string(),
165            })
166        })
167        .unwrap();
168
169    let mut ok_count = 0;
170    let mut ko_count = 0;
171    for file in file_iter {
172        let stored_hash: u32 = file.as_ref().unwrap().hash;
173        let path = &file.unwrap().full_path; //diff with &file.as_ref().unwrap() ??
174        if stored_hash != hash(&PathBuf::from(path)) {
175            ko_count += 1;
176            error!("check failed on file: '{}'", path);
177        } else {
178            ok_count += 1;
179            debug!("check ok on file: '{}'", path);
180        }
181    }
182    if ko_count == 0 {
183        info!("Integrity check OK, all {} files verified", ok_count);
184    } else {
185        error!("Integrity check failed, {} files are corrupted", ko_count);
186    }
187}
188
189pub fn compare(opt: CompareOptions) {
190    let conn1 = Connection::open(opt.db1).unwrap();
191    let mut stmt1 = conn1.prepare("SELECT * FROM hash").unwrap();
192    let conn2 = Connection::open(opt.db2).unwrap();
193    let mut stmt2 = conn2.prepare("SELECT * FROM hash").unwrap();
194    let file_iter1 = stmt1
195        .query_map([], |row| {
196            Ok(Entry {
197                hash: row.get(0)?,
198                full_path: "".to_string(),
199                size: row.get(1)?,
200                mime: "".to_string(),
201            })
202        })
203        .unwrap();
204    /* TODO add optionnal filters to command line like --exclude
205        .filter(
206            (|x| {
207                //println!("XX {:?} {}", Path::new(&find_files_from_hash(x.as_ref().unwrap().hash, &conn1)[0]).file_name(), !Path::new(&find_files_from_hash(x.as_ref().unwrap().hash, &conn1)[0]).ends_with("jpg\""));
208                x.as_ref().unwrap().size != 4096
209                    && Path::new(&find_files_from_hash(x.as_ref().unwrap().hash, &conn1)[0])
210                        .file_name()
211                        .unwrap()
212                        != ".DS_Store"
213            }),
214        );
215            println!("XX {:?}",find_files_from_hash(x.as_ref().unwrap().hash, &conn1)[0]);
216            !Path::new(&find_files_from_hash(x.as_ref().unwrap().hash, &conn1)[0]).ends_with("/.DS_Store") }));
217    */
218    let mut count = 0;
219    let mut entries = 0;
220    for file1 in file_iter1 {
221        entries += 1;
222        let mut file_iter2 = stmt2
223            .query_map([], |row| {
224                Ok(Entry {
225                    hash: row.get(0)?,
226                    full_path: "".to_string(),
227                    size: row.get(1)?,
228                    mime: "".to_string(),
229                })
230            })
231            .unwrap();
232        let stored_hash1: u32 = file1.as_ref().unwrap().hash;
233        let f = file_iter2.find(|h| h.as_ref().unwrap().hash == stored_hash1);
234        if f.is_none() {
235            warn!(
236                "hash {} not found in db2, location is {:?}",
237                stored_hash1,
238                find_files_from_hash(stored_hash1, &conn1)
239            );
240            count += 1;
241        }
242    }
243
244    if count == 0 {
245        info!("All {} entries in db1 are also in db2", entries);
246    } else {
247        warn!("Missing/Mismatch {} entries", count);
248    };
249}
250
251fn find_files_from_hash(hash: u32, conn: &Connection) -> Vec<String> {
252    let mut stmt = conn.prepare("SELECT * FROM file WHERE hash=:hash").unwrap();
253    let file_iter = stmt
254        .query_map(&[(":hash", &hash)], |row| {
255            Ok(Entry {
256                hash: row.get(2)?,
257                full_path: row.get(1)?,
258                size: row.get(3)?,
259                mime: "".to_string(),
260            })
261        })
262        .unwrap();
263    let mut files = Vec::new();
264    for file in file_iter {
265        files.push(file.unwrap().full_path.clone());
266    }
267    files
268}
269
270fn get_file_content(path: &Path) -> Vec<u8> {
271    let mut buffer = BufReader::new(File::open(path).unwrap());
272    let mut file_content = Vec::new();
273    let _ = buffer.read_to_end(&mut file_content);
274    file_content
275}
276
277fn hash(path: &Path) -> u32 {
278    let mut digest = CASTAGNOLI.digest();
279    let current_file_content = get_file_content(path);
280    digest.update(&current_file_content);
281    digest.finalize()
282}