Skip to main content

orbok_db/repo/
files.rs

1//! File catalog repository (RFC-002 §7.3, RFC-004).
2//!
3//! The scanner drives these operations: upsert on discovery, metadata
4//! comparison for change detection, missing-marking for unseen files.
5//! File catalog records are persistent catalog data (RFC-001 §5.3) and
6//! survive index cleanup.
7
8use crate::catalog::{Catalog, db_err};
9use orbok_core::{FileId, FileStatus, OrbokError, OrbokResult, SourceId, now_iso8601};
10use rusqlite::{Row, params};
11
12/// A cataloged file.
13#[derive(Debug, Clone)]
14pub struct FileRecord {
15    pub file_id: FileId,
16    pub source_id: SourceId,
17    pub original_path: String,
18    pub canonical_path: String,
19    pub display_path: String,
20    pub extension: Option<String>,
21    pub file_size_bytes: u64,
22    pub modified_at: Option<String>,
23    pub platform_file_key: Option<String>,
24    pub content_hash: Option<String>,
25    pub hash_algorithm: Option<String>,
26    pub file_status: FileStatus,
27    pub last_seen_at: String,
28    pub last_indexed_at: Option<String>,
29}
30
31/// Parameters for inserting a newly discovered file.
32#[derive(Debug, Clone)]
33pub struct NewFile {
34    pub source_id: SourceId,
35    pub original_path: String,
36    pub canonical_path: String,
37    pub display_path: String,
38    pub extension: Option<String>,
39    pub metadata: ObservedMetadata,
40    pub status: FileStatus,
41}
42
43/// Metadata observed on disk during a scan (RFC-004 §9.1 fast check).
44#[derive(Debug, Clone, Default)]
45pub struct ObservedMetadata {
46    pub file_size_bytes: u64,
47    pub modified_at: Option<String>,
48    pub platform_file_key: Option<String>,
49    pub content_hash: Option<String>,
50}
51
52const COLUMNS: &str = "file_id, source_id, original_path, canonical_path, display_path, \
53     extension, file_size_bytes, modified_at, platform_file_key, content_hash, hash_algorithm, \
54     file_status, last_seen_at, last_indexed_at";
55
56/// Repository over the `files` table.
57pub struct FileRepository<'a> {
58    catalog: &'a Catalog,
59}
60
61impl<'a> FileRepository<'a> {
62    pub fn new(catalog: &'a Catalog) -> Self {
63        Self { catalog }
64    }
65
66    /// Fetch a file record by its primary key.
67    pub fn get_by_id(&self, id: &FileId) -> OrbokResult<Option<FileRecord>> {
68        let conn = self.catalog.lock();
69        let mut stmt = conn
70            .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
71            .map_err(db_err)?;
72        let mut rows = stmt
73            .query_map(params![id.as_str()], row_to_record)
74            .map_err(db_err)?;
75        match rows.next() {
76            Some(r) => Ok(Some(r.map_err(db_err)??)),
77            None => Ok(None),
78        }
79    }
80
81    /// Find a file by the tail of its display_path (test convenience).
82    pub fn get_by_path_str(&self, display_path_tail: &str) -> OrbokResult<Option<FileRecord>> {
83        let conn = self.catalog.lock();
84        let mut stmt = conn
85            .prepare(&format!(
86                "SELECT {COLUMNS} FROM files WHERE display_path LIKE ?1 LIMIT 1"
87            ))
88            .map_err(db_err)?;
89        let mut rows = stmt
90            .query_map(
91                rusqlite::params![format!("%{display_path_tail}")],
92                row_to_record,
93            )
94            .map_err(db_err)?;
95        match rows.next() {
96            Some(r) => Ok(Some(r.map_err(db_err)??)),
97            None => Ok(None),
98        }
99    }
100
101    /// Look up a file by its identity key (source, canonical path).
102    pub fn get_by_path(
103        &self,
104        source_id: &SourceId,
105        canonical_path: &str,
106    ) -> OrbokResult<Option<FileRecord>> {
107        let conn = self.catalog.lock();
108        let mut stmt = conn
109            .prepare(&format!(
110                "SELECT {COLUMNS} FROM files WHERE source_id = ?1 AND canonical_path = ?2"
111            ))
112            .map_err(db_err)?;
113        let mut rows = stmt
114            .query_map(params![source_id.as_str(), canonical_path], row_to_record)
115            .map_err(db_err)?;
116        match rows.next() {
117            Some(r) => Ok(Some(r.map_err(db_err)??)),
118            None => Ok(None),
119        }
120    }
121
122    /// Insert a newly discovered file.
123    pub fn insert(&self, new: NewFile) -> OrbokResult<FileRecord> {
124        let id = FileId::generate();
125        let now = now_iso8601();
126        let hash_algorithm = new.metadata.content_hash.as_ref().map(|_| "sha256");
127        let conn = self.catalog.lock();
128        conn.execute(
129            "INSERT INTO files (file_id, source_id, original_path, canonical_path, display_path, \
130             extension, file_size_bytes, modified_at, platform_file_key, content_hash, \
131             hash_algorithm, file_status, last_seen_at, last_scanned_at, created_at, updated_at) \
132             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?13,?13,?13)",
133            params![
134                id.as_str(),
135                new.source_id.as_str(),
136                new.original_path,
137                new.canonical_path,
138                new.display_path,
139                new.extension,
140                new.metadata.file_size_bytes as i64,
141                new.metadata.modified_at,
142                new.metadata.platform_file_key,
143                new.metadata.content_hash,
144                hash_algorithm,
145                new.status.as_str(),
146                now,
147            ],
148        )
149        .map_err(db_err)?;
150        drop(conn);
151        self.get_by_path_id(&id)
152    }
153
154    fn get_by_path_id(&self, id: &FileId) -> OrbokResult<FileRecord> {
155        let conn = self.catalog.lock();
156        let mut stmt = conn
157            .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
158            .map_err(db_err)?;
159        let mut rows = stmt
160            .query_map(params![id.as_str()], row_to_record)
161            .map_err(db_err)?;
162        match rows.next() {
163            Some(r) => r.map_err(db_err)?,
164            None => Err(OrbokError::FileNotFound),
165        }
166    }
167
168    /// Touch a file confirmed unchanged by the metadata check.
169    pub fn touch_seen(&self, id: &FileId) -> OrbokResult<()> {
170        let now = now_iso8601();
171        let conn = self.catalog.lock();
172        conn.execute(
173            "UPDATE files SET last_seen_at = ?2, last_scanned_at = ?2, updated_at = ?2 \
174             WHERE file_id = ?1",
175            params![id.as_str(), now],
176        )
177        .map_err(db_err)?;
178        Ok(())
179    }
180
181    /// Record changed on-disk metadata and the resulting status
182    /// transition (RFC-004 §12 stale detection).
183    pub fn update_observed(
184        &self,
185        id: &FileId,
186        metadata: &ObservedMetadata,
187        status: FileStatus,
188    ) -> OrbokResult<()> {
189        let now = now_iso8601();
190        let hash_algorithm = metadata.content_hash.as_ref().map(|_| "sha256");
191        let conn = self.catalog.lock();
192        conn.execute(
193            "UPDATE files SET file_size_bytes = ?2, modified_at = ?3, platform_file_key = ?4, \
194             content_hash = COALESCE(?5, content_hash), \
195             hash_algorithm = COALESCE(?6, hash_algorithm), file_status = ?7, \
196             last_seen_at = ?8, last_scanned_at = ?8, updated_at = ?8 WHERE file_id = ?1",
197            params![
198                id.as_str(),
199                metadata.file_size_bytes as i64,
200                metadata.modified_at,
201                metadata.platform_file_key,
202                metadata.content_hash,
203                hash_algorithm,
204                status.as_str(),
205                now,
206            ],
207        )
208        .map_err(db_err)?;
209        Ok(())
210    }
211
212    /// Set status only (e.g. permission_denied observed mid-scan).
213    pub fn set_status(&self, id: &FileId, status: FileStatus) -> OrbokResult<()> {
214        let conn = self.catalog.lock();
215        conn.execute(
216            "UPDATE files SET file_status = ?2, updated_at = ?3 WHERE file_id = ?1",
217            params![id.as_str(), status.as_str(), now_iso8601()],
218        )
219        .map_err(db_err)?;
220        Ok(())
221    }
222
223    /// RFC-004 §11: mark files of `source_id` not seen since `cutoff`
224    /// as Missing — never Deleted (drives may be disconnected). Returns
225    /// the number of newly missing files.
226    pub fn mark_missing_unseen(&self, source_id: &SourceId, cutoff: &str) -> OrbokResult<u64> {
227        let conn = self.catalog.lock();
228        let n = conn
229            .execute(
230                "UPDATE files SET file_status = 'missing', updated_at = ?3 \
231                 WHERE source_id = ?1 AND last_seen_at < ?2 \
232                 AND file_status NOT IN ('missing', 'deleted')",
233                params![source_id.as_str(), cutoff, now_iso8601()],
234            )
235            .map_err(db_err)?;
236        Ok(n as u64)
237    }
238
239    /// Status counts for one source (Indexing/Sources view summaries).
240    pub fn count_by_status(&self, source_id: &SourceId) -> OrbokResult<Vec<(FileStatus, u64)>> {
241        let conn = self.catalog.lock();
242        let mut stmt = conn
243            .prepare(
244                "SELECT file_status, COUNT(*) FROM files WHERE source_id = ?1 GROUP BY file_status",
245            )
246            .map_err(db_err)?;
247        let rows = stmt
248            .query_map(params![source_id.as_str()], |row| {
249                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
250            })
251            .map_err(db_err)?;
252        let mut out = Vec::new();
253        for row in rows {
254            let (status, count) = row.map_err(db_err)?;
255            out.push((FileStatus::parse(&status)?, count as u64));
256        }
257        Ok(out)
258    }
259
260    /// Count files with a specific status across all sources.
261    pub fn count_with_status(&self, status: FileStatus) -> OrbokResult<u64> {
262        let conn = self.catalog.lock();
263        let n: i64 = conn
264            .query_row(
265                "SELECT COUNT(*) FROM files WHERE file_status = ?1",
266                rusqlite::params![status.as_str()],
267                |r| r.get(0),
268            )
269            .map_err(crate::catalog::db_err)?;
270        Ok(n as u64)
271    }
272
273    /// Count files for a specific source with a specific status.
274    pub fn count_for_source_with_status(
275        &self,
276        source_id: &orbok_core::SourceId,
277        status: FileStatus,
278    ) -> OrbokResult<u64> {
279        let conn = self.catalog.lock();
280        let n: i64 = conn
281            .query_row(
282                "SELECT COUNT(*) FROM files WHERE source_id = ?1 AND file_status = ?2",
283                rusqlite::params![source_id.as_str(), status.as_str()],
284                |r| r.get(0),
285            )
286            .map_err(crate::catalog::db_err)?;
287        Ok(n as u64)
288    }
289}
290
291fn row_to_record(row: &Row<'_>) -> rusqlite::Result<OrbokResult<FileRecord>> {
292    let status: String = row.get(11)?;
293    let size: i64 = row.get(6)?;
294    Ok((|| {
295        Ok(FileRecord {
296            file_id: FileId::from_string(row.get::<_, String>(0).map_err(db_err)?),
297            source_id: SourceId::from_string(row.get::<_, String>(1).map_err(db_err)?),
298            original_path: row.get(2).map_err(db_err)?,
299            canonical_path: row.get(3).map_err(db_err)?,
300            display_path: row.get(4).map_err(db_err)?,
301            extension: row.get(5).map_err(db_err)?,
302            file_size_bytes: size as u64,
303            modified_at: row.get(7).map_err(db_err)?,
304            platform_file_key: row.get(8).map_err(db_err)?,
305            content_hash: row.get(9).map_err(db_err)?,
306            hash_algorithm: row.get(10).map_err(db_err)?,
307            file_status: FileStatus::parse(&status)?,
308            last_seen_at: row.get(12).map_err(db_err)?,
309            last_indexed_at: row.get(13).map_err(db_err)?,
310        })
311    })())
312}