Skip to main content

orbok_db/repo/
files.rs

1//! File catalog repository (RFC-002 §7.3, RFC-004).
2//!
3//! The scanner drives these operations: upsert on discovery, metadata
4//! comparison for change detection, missing-marking for unseen files.
5//! File catalog records are persistent catalog data (RFC-001 §5.3) and
6//! survive index cleanup.
7
8use crate::catalog::{Catalog, db_err};
9use orbok_core::{FileId, FileStatus, OrbokError, OrbokResult, SourceId, now_iso8601};
10use rusqlite::{Row, params};
11
12/// A cataloged file.
13#[derive(Debug, Clone)]
14pub struct FileRecord {
15    pub file_id: FileId,
16    pub source_id: SourceId,
17    pub original_path: String,
18    pub canonical_path: String,
19    pub display_path: String,
20    pub extension: Option<String>,
21    pub file_size_bytes: u64,
22    pub modified_at: Option<String>,
23    pub platform_file_key: Option<String>,
24    pub content_hash: Option<String>,
25    pub hash_algorithm: Option<String>,
26    pub file_status: FileStatus,
27    pub last_seen_at: String,
28    pub last_indexed_at: Option<String>,
29}
30
31/// Parameters for inserting a newly discovered file.
32#[derive(Debug, Clone)]
33pub struct NewFile {
34    pub source_id: SourceId,
35    pub original_path: String,
36    pub canonical_path: String,
37    pub display_path: String,
38    pub extension: Option<String>,
39    pub metadata: ObservedMetadata,
40    pub status: FileStatus,
41}
42
43/// Metadata observed on disk during a scan (RFC-004 §9.1 fast check).
44#[derive(Debug, Clone, Default)]
45pub struct ObservedMetadata {
46    pub file_size_bytes: u64,
47    pub modified_at: Option<String>,
48    pub platform_file_key: Option<String>,
49    pub content_hash: Option<String>,
50}
51
52const COLUMNS: &str = "file_id, source_id, original_path, canonical_path, display_path, \
53     extension, file_size_bytes, modified_at, platform_file_key, content_hash, hash_algorithm, \
54     file_status, last_seen_at, last_indexed_at";
55
56/// Repository over the `files` table.
57pub struct FileRepository<'a> {
58    catalog: &'a Catalog,
59}
60
61impl<'a> FileRepository<'a> {
62    pub fn new(catalog: &'a Catalog) -> Self {
63        Self { catalog }
64    }
65
66
67    /// Fetch a file record by its primary key.
68    pub fn get_by_id(&self, id: &FileId) -> OrbokResult<Option<FileRecord>> {
69        let conn = self.catalog.lock();
70        let mut stmt = conn
71            .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
72            .map_err(db_err)?;
73        let mut rows = stmt
74            .query_map(params![id.as_str()], row_to_record)
75            .map_err(db_err)?;
76        match rows.next() {
77            Some(r) => Ok(Some(r.map_err(db_err)??)),
78            None => Ok(None),
79        }
80    }
81
82
83    /// Find a file by the tail of its display_path (test convenience).
84    pub fn get_by_path_str(&self, display_path_tail: &str) -> OrbokResult<Option<FileRecord>> {
85        let conn = self.catalog.lock();
86        let mut stmt = conn
87            .prepare(&format!(
88                "SELECT {COLUMNS} FROM files WHERE display_path LIKE ?1 LIMIT 1"
89            ))
90            .map_err(db_err)?;
91        let mut rows = stmt
92            .query_map(rusqlite::params![format!("%{display_path_tail}")], row_to_record)
93            .map_err(db_err)?;
94        match rows.next() {
95            Some(r) => Ok(Some(r.map_err(db_err)??)),
96            None => Ok(None),
97        }
98    }
99
100    /// Look up a file by its identity key (source, canonical path).
101    pub fn get_by_path(
102        &self,
103        source_id: &SourceId,
104        canonical_path: &str,
105    ) -> OrbokResult<Option<FileRecord>> {
106        let conn = self.catalog.lock();
107        let mut stmt = conn
108            .prepare(&format!(
109                "SELECT {COLUMNS} FROM files WHERE source_id = ?1 AND canonical_path = ?2"
110            ))
111            .map_err(db_err)?;
112        let mut rows = stmt
113            .query_map(params![source_id.as_str(), canonical_path], row_to_record)
114            .map_err(db_err)?;
115        match rows.next() {
116            Some(r) => Ok(Some(r.map_err(db_err)??)),
117            None => Ok(None),
118        }
119    }
120
121    /// Insert a newly discovered file.
122    pub fn insert(&self, new: NewFile) -> OrbokResult<FileRecord> {
123        let id = FileId::generate();
124        let now = now_iso8601();
125        let hash_algorithm = new.metadata.content_hash.as_ref().map(|_| "sha256");
126        let conn = self.catalog.lock();
127        conn.execute(
128            "INSERT INTO files (file_id, source_id, original_path, canonical_path, display_path, \
129             extension, file_size_bytes, modified_at, platform_file_key, content_hash, \
130             hash_algorithm, file_status, last_seen_at, last_scanned_at, created_at, updated_at) \
131             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?13,?13,?13)",
132            params![
133                id.as_str(),
134                new.source_id.as_str(),
135                new.original_path,
136                new.canonical_path,
137                new.display_path,
138                new.extension,
139                new.metadata.file_size_bytes as i64,
140                new.metadata.modified_at,
141                new.metadata.platform_file_key,
142                new.metadata.content_hash,
143                hash_algorithm,
144                new.status.as_str(),
145                now,
146            ],
147        )
148        .map_err(db_err)?;
149        drop(conn);
150        self.get_by_path_id(&id)
151    }
152
153    fn get_by_path_id(&self, id: &FileId) -> OrbokResult<FileRecord> {
154        let conn = self.catalog.lock();
155        let mut stmt = conn
156            .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
157            .map_err(db_err)?;
158        let mut rows = stmt
159            .query_map(params![id.as_str()], row_to_record)
160            .map_err(db_err)?;
161        match rows.next() {
162            Some(r) => r.map_err(db_err)?,
163            None => Err(OrbokError::FileNotFound),
164        }
165    }
166
167    /// Touch a file confirmed unchanged by the metadata check.
168    pub fn touch_seen(&self, id: &FileId) -> OrbokResult<()> {
169        let now = now_iso8601();
170        let conn = self.catalog.lock();
171        conn.execute(
172            "UPDATE files SET last_seen_at = ?2, last_scanned_at = ?2, updated_at = ?2 \
173             WHERE file_id = ?1",
174            params![id.as_str(), now],
175        )
176        .map_err(db_err)?;
177        Ok(())
178    }
179
180    /// Record changed on-disk metadata and the resulting status
181    /// transition (RFC-004 §12 stale detection).
182    pub fn update_observed(
183        &self,
184        id: &FileId,
185        metadata: &ObservedMetadata,
186        status: FileStatus,
187    ) -> OrbokResult<()> {
188        let now = now_iso8601();
189        let hash_algorithm = metadata.content_hash.as_ref().map(|_| "sha256");
190        let conn = self.catalog.lock();
191        conn.execute(
192            "UPDATE files SET file_size_bytes = ?2, modified_at = ?3, platform_file_key = ?4, \
193             content_hash = COALESCE(?5, content_hash), \
194             hash_algorithm = COALESCE(?6, hash_algorithm), file_status = ?7, \
195             last_seen_at = ?8, last_scanned_at = ?8, updated_at = ?8 WHERE file_id = ?1",
196            params![
197                id.as_str(),
198                metadata.file_size_bytes as i64,
199                metadata.modified_at,
200                metadata.platform_file_key,
201                metadata.content_hash,
202                hash_algorithm,
203                status.as_str(),
204                now,
205            ],
206        )
207        .map_err(db_err)?;
208        Ok(())
209    }
210
211    /// Set status only (e.g. permission_denied observed mid-scan).
212    pub fn set_status(&self, id: &FileId, status: FileStatus) -> OrbokResult<()> {
213        let conn = self.catalog.lock();
214        conn.execute(
215            "UPDATE files SET file_status = ?2, updated_at = ?3 WHERE file_id = ?1",
216            params![id.as_str(), status.as_str(), now_iso8601()],
217        )
218        .map_err(db_err)?;
219        Ok(())
220    }
221
222    /// RFC-004 §11: mark files of `source_id` not seen since `cutoff`
223    /// as Missing — never Deleted (drives may be disconnected). Returns
224    /// the number of newly missing files.
225    pub fn mark_missing_unseen(&self, source_id: &SourceId, cutoff: &str) -> OrbokResult<u64> {
226        let conn = self.catalog.lock();
227        let n = conn
228            .execute(
229                "UPDATE files SET file_status = 'missing', updated_at = ?3 \
230                 WHERE source_id = ?1 AND last_seen_at < ?2 \
231                 AND file_status NOT IN ('missing', 'deleted')",
232                params![source_id.as_str(), cutoff, now_iso8601()],
233            )
234            .map_err(db_err)?;
235        Ok(n as u64)
236    }
237
238    /// Status counts for one source (Indexing/Sources view summaries).
239    pub fn count_by_status(&self, source_id: &SourceId) -> OrbokResult<Vec<(FileStatus, u64)>> {
240        let conn = self.catalog.lock();
241        let mut stmt = conn
242            .prepare(
243                "SELECT file_status, COUNT(*) FROM files WHERE source_id = ?1 GROUP BY file_status",
244            )
245            .map_err(db_err)?;
246        let rows = stmt
247            .query_map(params![source_id.as_str()], |row| {
248                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
249            })
250            .map_err(db_err)?;
251        let mut out = Vec::new();
252        for row in rows {
253            let (status, count) = row.map_err(db_err)?;
254            out.push((FileStatus::parse(&status)?, count as u64));
255        }
256        Ok(out)
257    }
258
259    /// Count files with a specific status across all sources.
260    pub fn count_with_status(&self, status: FileStatus) -> OrbokResult<u64> {
261        let conn = self.catalog.lock();
262        let n: i64 = conn
263            .query_row(
264                "SELECT COUNT(*) FROM files WHERE file_status = ?1",
265                rusqlite::params![status.as_str()],
266                |r| r.get(0),
267            )
268            .map_err(crate::catalog::db_err)?;
269        Ok(n as u64)
270    }
271
272    /// Count files for a specific source with a specific status.
273    pub fn count_for_source_with_status(
274        &self,
275        source_id: &orbok_core::SourceId,
276        status: FileStatus,
277    ) -> OrbokResult<u64> {
278        let conn = self.catalog.lock();
279        let n: i64 = conn
280            .query_row(
281                "SELECT COUNT(*) FROM files WHERE source_id = ?1 AND file_status = ?2",
282                rusqlite::params![source_id.as_str(), status.as_str()],
283                |r| r.get(0),
284            )
285            .map_err(crate::catalog::db_err)?;
286        Ok(n as u64)
287    }
288
289}
290
291
292fn row_to_record(row: &Row<'_>) -> rusqlite::Result<OrbokResult<FileRecord>> {
293    let status: String = row.get(11)?;
294    let size: i64 = row.get(6)?;
295    Ok((|| {
296        Ok(FileRecord {
297            file_id: FileId::from_string(row.get::<_, String>(0).map_err(db_err)?),
298            source_id: SourceId::from_string(row.get::<_, String>(1).map_err(db_err)?),
299            original_path: row.get(2).map_err(db_err)?,
300            canonical_path: row.get(3).map_err(db_err)?,
301            display_path: row.get(4).map_err(db_err)?,
302            extension: row.get(5).map_err(db_err)?,
303            file_size_bytes: size as u64,
304            modified_at: row.get(7).map_err(db_err)?,
305            platform_file_key: row.get(8).map_err(db_err)?,
306            content_hash: row.get(9).map_err(db_err)?,
307            hash_algorithm: row.get(10).map_err(db_err)?,
308            file_status: FileStatus::parse(&status)?,
309            last_seen_at: row.get(12).map_err(db_err)?,
310            last_indexed_at: row.get(13).map_err(db_err)?,
311        })
312    })())
313}