Skip to main content

orbok_db/repo/
files.rs

1//! File catalog repository (RFC-002 §7.3, RFC-004).
2//!
3//! The scanner drives these operations: upsert on discovery, metadata
4//! comparison for change detection, missing-marking for unseen files.
5//! File catalog records are persistent catalog data (RFC-001 §5.3) and
6//! survive index cleanup.
7
8use crate::catalog::{Catalog, db_err};
9use orbok_core::{FileId, FileStatus, OrbokError, OrbokResult, SourceId, now_iso8601};
10use rusqlite::{Row, params};
11
12/// A cataloged file.
13#[derive(Debug, Clone)]
14pub struct FileRecord {
15    pub file_id: FileId,
16    pub source_id: SourceId,
17    pub original_path: String,
18    pub canonical_path: String,
19    pub display_path: String,
20    pub extension: Option<String>,
21    pub file_size_bytes: u64,
22    pub modified_at: Option<String>,
23    pub platform_file_key: Option<String>,
24    pub content_hash: Option<String>,
25    pub hash_algorithm: Option<String>,
26    pub file_status: FileStatus,
27    pub last_seen_at: String,
28    pub last_indexed_at: Option<String>,
29}
30
31/// Parameters for inserting a newly discovered file.
32#[derive(Debug, Clone)]
33pub struct NewFile {
34    pub source_id: SourceId,
35    pub original_path: String,
36    pub canonical_path: String,
37    pub display_path: String,
38    pub extension: Option<String>,
39    pub metadata: ObservedMetadata,
40    pub status: FileStatus,
41}
42
43/// Metadata observed on disk during a scan (RFC-004 §9.1 fast check).
44#[derive(Debug, Clone, Default)]
45pub struct ObservedMetadata {
46    pub file_size_bytes: u64,
47    pub modified_at: Option<String>,
48    pub platform_file_key: Option<String>,
49    pub content_hash: Option<String>,
50}
51
52const COLUMNS: &str = "file_id, source_id, original_path, canonical_path, display_path, \
53     extension, file_size_bytes, modified_at, platform_file_key, content_hash, hash_algorithm, \
54     file_status, last_seen_at, last_indexed_at";
55
56/// Repository over the `files` table.
57pub struct FileRepository<'a> {
58    catalog: &'a Catalog,
59}
60
61impl<'a> FileRepository<'a> {
62    pub fn new(catalog: &'a Catalog) -> Self {
63        Self { catalog }
64    }
65
66
67    /// Fetch a file record by its primary key.
68    pub fn get_by_id(&self, id: &FileId) -> OrbokResult<Option<FileRecord>> {
69        let conn = self.catalog.lock();
70        let mut stmt = conn
71            .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
72            .map_err(db_err)?;
73        let mut rows = stmt
74            .query_map(params![id.as_str()], row_to_record)
75            .map_err(db_err)?;
76        match rows.next() {
77            Some(r) => Ok(Some(r.map_err(db_err)??)),
78            None => Ok(None),
79        }
80    }
81
82    /// Look up a file by its identity key (source, canonical path).
83    pub fn get_by_path(
84        &self,
85        source_id: &SourceId,
86        canonical_path: &str,
87    ) -> OrbokResult<Option<FileRecord>> {
88        let conn = self.catalog.lock();
89        let mut stmt = conn
90            .prepare(&format!(
91                "SELECT {COLUMNS} FROM files WHERE source_id = ?1 AND canonical_path = ?2"
92            ))
93            .map_err(db_err)?;
94        let mut rows = stmt
95            .query_map(params![source_id.as_str(), canonical_path], row_to_record)
96            .map_err(db_err)?;
97        match rows.next() {
98            Some(r) => Ok(Some(r.map_err(db_err)??)),
99            None => Ok(None),
100        }
101    }
102
103    /// Insert a newly discovered file.
104    pub fn insert(&self, new: NewFile) -> OrbokResult<FileRecord> {
105        let id = FileId::generate();
106        let now = now_iso8601();
107        let hash_algorithm = new.metadata.content_hash.as_ref().map(|_| "sha256");
108        let conn = self.catalog.lock();
109        conn.execute(
110            "INSERT INTO files (file_id, source_id, original_path, canonical_path, display_path, \
111             extension, file_size_bytes, modified_at, platform_file_key, content_hash, \
112             hash_algorithm, file_status, last_seen_at, last_scanned_at, created_at, updated_at) \
113             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?13,?13,?13)",
114            params![
115                id.as_str(),
116                new.source_id.as_str(),
117                new.original_path,
118                new.canonical_path,
119                new.display_path,
120                new.extension,
121                new.metadata.file_size_bytes as i64,
122                new.metadata.modified_at,
123                new.metadata.platform_file_key,
124                new.metadata.content_hash,
125                hash_algorithm,
126                new.status.as_str(),
127                now,
128            ],
129        )
130        .map_err(db_err)?;
131        drop(conn);
132        self.get_by_path_id(&id)
133    }
134
135    fn get_by_path_id(&self, id: &FileId) -> OrbokResult<FileRecord> {
136        let conn = self.catalog.lock();
137        let mut stmt = conn
138            .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
139            .map_err(db_err)?;
140        let mut rows = stmt
141            .query_map(params![id.as_str()], row_to_record)
142            .map_err(db_err)?;
143        match rows.next() {
144            Some(r) => r.map_err(db_err)?,
145            None => Err(OrbokError::FileNotFound),
146        }
147    }
148
149    /// Touch a file confirmed unchanged by the metadata check.
150    pub fn touch_seen(&self, id: &FileId) -> OrbokResult<()> {
151        let now = now_iso8601();
152        let conn = self.catalog.lock();
153        conn.execute(
154            "UPDATE files SET last_seen_at = ?2, last_scanned_at = ?2, updated_at = ?2 \
155             WHERE file_id = ?1",
156            params![id.as_str(), now],
157        )
158        .map_err(db_err)?;
159        Ok(())
160    }
161
162    /// Record changed on-disk metadata and the resulting status
163    /// transition (RFC-004 §12 stale detection).
164    pub fn update_observed(
165        &self,
166        id: &FileId,
167        metadata: &ObservedMetadata,
168        status: FileStatus,
169    ) -> OrbokResult<()> {
170        let now = now_iso8601();
171        let hash_algorithm = metadata.content_hash.as_ref().map(|_| "sha256");
172        let conn = self.catalog.lock();
173        conn.execute(
174            "UPDATE files SET file_size_bytes = ?2, modified_at = ?3, platform_file_key = ?4, \
175             content_hash = COALESCE(?5, content_hash), \
176             hash_algorithm = COALESCE(?6, hash_algorithm), file_status = ?7, \
177             last_seen_at = ?8, last_scanned_at = ?8, updated_at = ?8 WHERE file_id = ?1",
178            params![
179                id.as_str(),
180                metadata.file_size_bytes as i64,
181                metadata.modified_at,
182                metadata.platform_file_key,
183                metadata.content_hash,
184                hash_algorithm,
185                status.as_str(),
186                now,
187            ],
188        )
189        .map_err(db_err)?;
190        Ok(())
191    }
192
193    /// Set status only (e.g. permission_denied observed mid-scan).
194    pub fn set_status(&self, id: &FileId, status: FileStatus) -> OrbokResult<()> {
195        let conn = self.catalog.lock();
196        conn.execute(
197            "UPDATE files SET file_status = ?2, updated_at = ?3 WHERE file_id = ?1",
198            params![id.as_str(), status.as_str(), now_iso8601()],
199        )
200        .map_err(db_err)?;
201        Ok(())
202    }
203
204    /// RFC-004 §11: mark files of `source_id` not seen since `cutoff`
205    /// as Missing — never Deleted (drives may be disconnected). Returns
206    /// the number of newly missing files.
207    pub fn mark_missing_unseen(&self, source_id: &SourceId, cutoff: &str) -> OrbokResult<u64> {
208        let conn = self.catalog.lock();
209        let n = conn
210            .execute(
211                "UPDATE files SET file_status = 'missing', updated_at = ?3 \
212                 WHERE source_id = ?1 AND last_seen_at < ?2 \
213                 AND file_status NOT IN ('missing', 'deleted')",
214                params![source_id.as_str(), cutoff, now_iso8601()],
215            )
216            .map_err(db_err)?;
217        Ok(n as u64)
218    }
219
220    /// Status counts for one source (Indexing/Sources view summaries).
221    pub fn count_by_status(&self, source_id: &SourceId) -> OrbokResult<Vec<(FileStatus, u64)>> {
222        let conn = self.catalog.lock();
223        let mut stmt = conn
224            .prepare(
225                "SELECT file_status, COUNT(*) FROM files WHERE source_id = ?1 GROUP BY file_status",
226            )
227            .map_err(db_err)?;
228        let rows = stmt
229            .query_map(params![source_id.as_str()], |row| {
230                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
231            })
232            .map_err(db_err)?;
233        let mut out = Vec::new();
234        for row in rows {
235            let (status, count) = row.map_err(db_err)?;
236            out.push((FileStatus::parse(&status)?, count as u64));
237        }
238        Ok(out)
239    }
240}
241
242fn row_to_record(row: &Row<'_>) -> rusqlite::Result<OrbokResult<FileRecord>> {
243    let status: String = row.get(11)?;
244    let size: i64 = row.get(6)?;
245    Ok((|| {
246        Ok(FileRecord {
247            file_id: FileId::from_string(row.get::<_, String>(0).map_err(db_err)?),
248            source_id: SourceId::from_string(row.get::<_, String>(1).map_err(db_err)?),
249            original_path: row.get(2).map_err(db_err)?,
250            canonical_path: row.get(3).map_err(db_err)?,
251            display_path: row.get(4).map_err(db_err)?,
252            extension: row.get(5).map_err(db_err)?,
253            file_size_bytes: size as u64,
254            modified_at: row.get(7).map_err(db_err)?,
255            platform_file_key: row.get(8).map_err(db_err)?,
256            content_hash: row.get(9).map_err(db_err)?,
257            hash_algorithm: row.get(10).map_err(db_err)?,
258            file_status: FileStatus::parse(&status)?,
259            last_seen_at: row.get(12).map_err(db_err)?,
260            last_indexed_at: row.get(13).map_err(db_err)?,
261        })
262    })())
263}