1use crate::catalog::{Catalog, db_err};
9use orbok_core::{FileId, FileStatus, OrbokError, OrbokResult, SourceId, now_iso8601};
10use rusqlite::{Row, params};
11
12#[derive(Debug, Clone)]
14pub struct FileRecord {
15 pub file_id: FileId,
16 pub source_id: SourceId,
17 pub original_path: String,
18 pub canonical_path: String,
19 pub display_path: String,
20 pub extension: Option<String>,
21 pub file_size_bytes: u64,
22 pub modified_at: Option<String>,
23 pub platform_file_key: Option<String>,
24 pub content_hash: Option<String>,
25 pub hash_algorithm: Option<String>,
26 pub file_status: FileStatus,
27 pub last_seen_at: String,
28 pub last_indexed_at: Option<String>,
29}
30
31#[derive(Debug, Clone)]
33pub struct NewFile {
34 pub source_id: SourceId,
35 pub original_path: String,
36 pub canonical_path: String,
37 pub display_path: String,
38 pub extension: Option<String>,
39 pub metadata: ObservedMetadata,
40 pub status: FileStatus,
41}
42
43#[derive(Debug, Clone, Default)]
45pub struct ObservedMetadata {
46 pub file_size_bytes: u64,
47 pub modified_at: Option<String>,
48 pub platform_file_key: Option<String>,
49 pub content_hash: Option<String>,
50}
51
52const COLUMNS: &str = "file_id, source_id, original_path, canonical_path, display_path, \
53 extension, file_size_bytes, modified_at, platform_file_key, content_hash, hash_algorithm, \
54 file_status, last_seen_at, last_indexed_at";
55
56pub struct FileRepository<'a> {
58 catalog: &'a Catalog,
59}
60
61impl<'a> FileRepository<'a> {
62 pub fn new(catalog: &'a Catalog) -> Self {
63 Self { catalog }
64 }
65
66 pub fn get_by_id(&self, id: &FileId) -> OrbokResult<Option<FileRecord>> {
68 let conn = self.catalog.lock();
69 let mut stmt = conn
70 .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
71 .map_err(db_err)?;
72 let mut rows = stmt
73 .query_map(params![id.as_str()], row_to_record)
74 .map_err(db_err)?;
75 match rows.next() {
76 Some(r) => Ok(Some(r.map_err(db_err)??)),
77 None => Ok(None),
78 }
79 }
80
81 pub fn get_by_path_str(&self, display_path_tail: &str) -> OrbokResult<Option<FileRecord>> {
83 let conn = self.catalog.lock();
84 let mut stmt = conn
85 .prepare(&format!(
86 "SELECT {COLUMNS} FROM files WHERE display_path LIKE ?1 LIMIT 1"
87 ))
88 .map_err(db_err)?;
89 let mut rows = stmt
90 .query_map(
91 rusqlite::params![format!("%{display_path_tail}")],
92 row_to_record,
93 )
94 .map_err(db_err)?;
95 match rows.next() {
96 Some(r) => Ok(Some(r.map_err(db_err)??)),
97 None => Ok(None),
98 }
99 }
100
101 pub fn get_by_path(
103 &self,
104 source_id: &SourceId,
105 canonical_path: &str,
106 ) -> OrbokResult<Option<FileRecord>> {
107 let conn = self.catalog.lock();
108 let mut stmt = conn
109 .prepare(&format!(
110 "SELECT {COLUMNS} FROM files WHERE source_id = ?1 AND canonical_path = ?2"
111 ))
112 .map_err(db_err)?;
113 let mut rows = stmt
114 .query_map(params![source_id.as_str(), canonical_path], row_to_record)
115 .map_err(db_err)?;
116 match rows.next() {
117 Some(r) => Ok(Some(r.map_err(db_err)??)),
118 None => Ok(None),
119 }
120 }
121
122 pub fn insert(&self, new: NewFile) -> OrbokResult<FileRecord> {
124 let id = FileId::generate();
125 let now = now_iso8601();
126 let hash_algorithm = new.metadata.content_hash.as_ref().map(|_| "sha256");
127 let conn = self.catalog.lock();
128 conn.execute(
129 "INSERT INTO files (file_id, source_id, original_path, canonical_path, display_path, \
130 extension, file_size_bytes, modified_at, platform_file_key, content_hash, \
131 hash_algorithm, file_status, last_seen_at, last_scanned_at, created_at, updated_at) \
132 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?13,?13,?13)",
133 params![
134 id.as_str(),
135 new.source_id.as_str(),
136 new.original_path,
137 new.canonical_path,
138 new.display_path,
139 new.extension,
140 new.metadata.file_size_bytes as i64,
141 new.metadata.modified_at,
142 new.metadata.platform_file_key,
143 new.metadata.content_hash,
144 hash_algorithm,
145 new.status.as_str(),
146 now,
147 ],
148 )
149 .map_err(db_err)?;
150 drop(conn);
151 self.get_by_path_id(&id)
152 }
153
154 fn get_by_path_id(&self, id: &FileId) -> OrbokResult<FileRecord> {
155 let conn = self.catalog.lock();
156 let mut stmt = conn
157 .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
158 .map_err(db_err)?;
159 let mut rows = stmt
160 .query_map(params![id.as_str()], row_to_record)
161 .map_err(db_err)?;
162 match rows.next() {
163 Some(r) => r.map_err(db_err)?,
164 None => Err(OrbokError::FileNotFound),
165 }
166 }
167
168 pub fn touch_seen(&self, id: &FileId) -> OrbokResult<()> {
170 let now = now_iso8601();
171 let conn = self.catalog.lock();
172 conn.execute(
173 "UPDATE files SET last_seen_at = ?2, last_scanned_at = ?2, updated_at = ?2 \
174 WHERE file_id = ?1",
175 params![id.as_str(), now],
176 )
177 .map_err(db_err)?;
178 Ok(())
179 }
180
181 pub fn update_observed(
184 &self,
185 id: &FileId,
186 metadata: &ObservedMetadata,
187 status: FileStatus,
188 ) -> OrbokResult<()> {
189 let now = now_iso8601();
190 let hash_algorithm = metadata.content_hash.as_ref().map(|_| "sha256");
191 let conn = self.catalog.lock();
192 conn.execute(
193 "UPDATE files SET file_size_bytes = ?2, modified_at = ?3, platform_file_key = ?4, \
194 content_hash = COALESCE(?5, content_hash), \
195 hash_algorithm = COALESCE(?6, hash_algorithm), file_status = ?7, \
196 last_seen_at = ?8, last_scanned_at = ?8, updated_at = ?8 WHERE file_id = ?1",
197 params![
198 id.as_str(),
199 metadata.file_size_bytes as i64,
200 metadata.modified_at,
201 metadata.platform_file_key,
202 metadata.content_hash,
203 hash_algorithm,
204 status.as_str(),
205 now,
206 ],
207 )
208 .map_err(db_err)?;
209 Ok(())
210 }
211
212 pub fn set_status(&self, id: &FileId, status: FileStatus) -> OrbokResult<()> {
214 let conn = self.catalog.lock();
215 conn.execute(
216 "UPDATE files SET file_status = ?2, updated_at = ?3 WHERE file_id = ?1",
217 params![id.as_str(), status.as_str(), now_iso8601()],
218 )
219 .map_err(db_err)?;
220 Ok(())
221 }
222
223 pub fn mark_missing_unseen(&self, source_id: &SourceId, cutoff: &str) -> OrbokResult<u64> {
227 let conn = self.catalog.lock();
228 let n = conn
229 .execute(
230 "UPDATE files SET file_status = 'missing', updated_at = ?3 \
231 WHERE source_id = ?1 AND last_seen_at < ?2 \
232 AND file_status NOT IN ('missing', 'deleted')",
233 params![source_id.as_str(), cutoff, now_iso8601()],
234 )
235 .map_err(db_err)?;
236 Ok(n as u64)
237 }
238
239 pub fn count_by_status(&self, source_id: &SourceId) -> OrbokResult<Vec<(FileStatus, u64)>> {
241 let conn = self.catalog.lock();
242 let mut stmt = conn
243 .prepare(
244 "SELECT file_status, COUNT(*) FROM files WHERE source_id = ?1 GROUP BY file_status",
245 )
246 .map_err(db_err)?;
247 let rows = stmt
248 .query_map(params![source_id.as_str()], |row| {
249 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
250 })
251 .map_err(db_err)?;
252 let mut out = Vec::new();
253 for row in rows {
254 let (status, count) = row.map_err(db_err)?;
255 out.push((FileStatus::parse(&status)?, count as u64));
256 }
257 Ok(out)
258 }
259
260 pub fn count_with_status(&self, status: FileStatus) -> OrbokResult<u64> {
262 let conn = self.catalog.lock();
263 let n: i64 = conn
264 .query_row(
265 "SELECT COUNT(*) FROM files WHERE file_status = ?1",
266 rusqlite::params![status.as_str()],
267 |r| r.get(0),
268 )
269 .map_err(crate::catalog::db_err)?;
270 Ok(n as u64)
271 }
272
273 pub fn count_for_source_with_status(
275 &self,
276 source_id: &orbok_core::SourceId,
277 status: FileStatus,
278 ) -> OrbokResult<u64> {
279 let conn = self.catalog.lock();
280 let n: i64 = conn
281 .query_row(
282 "SELECT COUNT(*) FROM files WHERE source_id = ?1 AND file_status = ?2",
283 rusqlite::params![source_id.as_str(), status.as_str()],
284 |r| r.get(0),
285 )
286 .map_err(crate::catalog::db_err)?;
287 Ok(n as u64)
288 }
289}
290
291fn row_to_record(row: &Row<'_>) -> rusqlite::Result<OrbokResult<FileRecord>> {
292 let status: String = row.get(11)?;
293 let size: i64 = row.get(6)?;
294 Ok((|| {
295 Ok(FileRecord {
296 file_id: FileId::from_string(row.get::<_, String>(0).map_err(db_err)?),
297 source_id: SourceId::from_string(row.get::<_, String>(1).map_err(db_err)?),
298 original_path: row.get(2).map_err(db_err)?,
299 canonical_path: row.get(3).map_err(db_err)?,
300 display_path: row.get(4).map_err(db_err)?,
301 extension: row.get(5).map_err(db_err)?,
302 file_size_bytes: size as u64,
303 modified_at: row.get(7).map_err(db_err)?,
304 platform_file_key: row.get(8).map_err(db_err)?,
305 content_hash: row.get(9).map_err(db_err)?,
306 hash_algorithm: row.get(10).map_err(db_err)?,
307 file_status: FileStatus::parse(&status)?,
308 last_seen_at: row.get(12).map_err(db_err)?,
309 last_indexed_at: row.get(13).map_err(db_err)?,
310 })
311 })())
312}