1use crate::catalog::{Catalog, db_err};
9use orbok_core::{FileId, FileStatus, OrbokError, OrbokResult, SourceId, now_iso8601};
10use rusqlite::{Row, params};
11
12#[derive(Debug, Clone)]
14pub struct FileRecord {
15 pub file_id: FileId,
16 pub source_id: SourceId,
17 pub original_path: String,
18 pub canonical_path: String,
19 pub display_path: String,
20 pub extension: Option<String>,
21 pub file_size_bytes: u64,
22 pub modified_at: Option<String>,
23 pub platform_file_key: Option<String>,
24 pub content_hash: Option<String>,
25 pub hash_algorithm: Option<String>,
26 pub file_status: FileStatus,
27 pub last_seen_at: String,
28 pub last_indexed_at: Option<String>,
29}
30
31#[derive(Debug, Clone)]
33pub struct NewFile {
34 pub source_id: SourceId,
35 pub original_path: String,
36 pub canonical_path: String,
37 pub display_path: String,
38 pub extension: Option<String>,
39 pub metadata: ObservedMetadata,
40 pub status: FileStatus,
41}
42
43#[derive(Debug, Clone, Default)]
45pub struct ObservedMetadata {
46 pub file_size_bytes: u64,
47 pub modified_at: Option<String>,
48 pub platform_file_key: Option<String>,
49 pub content_hash: Option<String>,
50}
51
52const COLUMNS: &str = "file_id, source_id, original_path, canonical_path, display_path, \
53 extension, file_size_bytes, modified_at, platform_file_key, content_hash, hash_algorithm, \
54 file_status, last_seen_at, last_indexed_at";
55
56pub struct FileRepository<'a> {
58 catalog: &'a Catalog,
59}
60
61impl<'a> FileRepository<'a> {
62 pub fn new(catalog: &'a Catalog) -> Self {
63 Self { catalog }
64 }
65
66
67 pub fn get_by_id(&self, id: &FileId) -> OrbokResult<Option<FileRecord>> {
69 let conn = self.catalog.lock();
70 let mut stmt = conn
71 .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
72 .map_err(db_err)?;
73 let mut rows = stmt
74 .query_map(params![id.as_str()], row_to_record)
75 .map_err(db_err)?;
76 match rows.next() {
77 Some(r) => Ok(Some(r.map_err(db_err)??)),
78 None => Ok(None),
79 }
80 }
81
82
83 pub fn get_by_path_str(&self, display_path_tail: &str) -> OrbokResult<Option<FileRecord>> {
85 let conn = self.catalog.lock();
86 let mut stmt = conn
87 .prepare(&format!(
88 "SELECT {COLUMNS} FROM files WHERE display_path LIKE ?1 LIMIT 1"
89 ))
90 .map_err(db_err)?;
91 let mut rows = stmt
92 .query_map(rusqlite::params![format!("%{display_path_tail}")], row_to_record)
93 .map_err(db_err)?;
94 match rows.next() {
95 Some(r) => Ok(Some(r.map_err(db_err)??)),
96 None => Ok(None),
97 }
98 }
99
100 pub fn get_by_path(
102 &self,
103 source_id: &SourceId,
104 canonical_path: &str,
105 ) -> OrbokResult<Option<FileRecord>> {
106 let conn = self.catalog.lock();
107 let mut stmt = conn
108 .prepare(&format!(
109 "SELECT {COLUMNS} FROM files WHERE source_id = ?1 AND canonical_path = ?2"
110 ))
111 .map_err(db_err)?;
112 let mut rows = stmt
113 .query_map(params![source_id.as_str(), canonical_path], row_to_record)
114 .map_err(db_err)?;
115 match rows.next() {
116 Some(r) => Ok(Some(r.map_err(db_err)??)),
117 None => Ok(None),
118 }
119 }
120
121 pub fn insert(&self, new: NewFile) -> OrbokResult<FileRecord> {
123 let id = FileId::generate();
124 let now = now_iso8601();
125 let hash_algorithm = new.metadata.content_hash.as_ref().map(|_| "sha256");
126 let conn = self.catalog.lock();
127 conn.execute(
128 "INSERT INTO files (file_id, source_id, original_path, canonical_path, display_path, \
129 extension, file_size_bytes, modified_at, platform_file_key, content_hash, \
130 hash_algorithm, file_status, last_seen_at, last_scanned_at, created_at, updated_at) \
131 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?13,?13,?13)",
132 params![
133 id.as_str(),
134 new.source_id.as_str(),
135 new.original_path,
136 new.canonical_path,
137 new.display_path,
138 new.extension,
139 new.metadata.file_size_bytes as i64,
140 new.metadata.modified_at,
141 new.metadata.platform_file_key,
142 new.metadata.content_hash,
143 hash_algorithm,
144 new.status.as_str(),
145 now,
146 ],
147 )
148 .map_err(db_err)?;
149 drop(conn);
150 self.get_by_path_id(&id)
151 }
152
153 fn get_by_path_id(&self, id: &FileId) -> OrbokResult<FileRecord> {
154 let conn = self.catalog.lock();
155 let mut stmt = conn
156 .prepare(&format!("SELECT {COLUMNS} FROM files WHERE file_id = ?1"))
157 .map_err(db_err)?;
158 let mut rows = stmt
159 .query_map(params![id.as_str()], row_to_record)
160 .map_err(db_err)?;
161 match rows.next() {
162 Some(r) => r.map_err(db_err)?,
163 None => Err(OrbokError::FileNotFound),
164 }
165 }
166
167 pub fn touch_seen(&self, id: &FileId) -> OrbokResult<()> {
169 let now = now_iso8601();
170 let conn = self.catalog.lock();
171 conn.execute(
172 "UPDATE files SET last_seen_at = ?2, last_scanned_at = ?2, updated_at = ?2 \
173 WHERE file_id = ?1",
174 params![id.as_str(), now],
175 )
176 .map_err(db_err)?;
177 Ok(())
178 }
179
180 pub fn update_observed(
183 &self,
184 id: &FileId,
185 metadata: &ObservedMetadata,
186 status: FileStatus,
187 ) -> OrbokResult<()> {
188 let now = now_iso8601();
189 let hash_algorithm = metadata.content_hash.as_ref().map(|_| "sha256");
190 let conn = self.catalog.lock();
191 conn.execute(
192 "UPDATE files SET file_size_bytes = ?2, modified_at = ?3, platform_file_key = ?4, \
193 content_hash = COALESCE(?5, content_hash), \
194 hash_algorithm = COALESCE(?6, hash_algorithm), file_status = ?7, \
195 last_seen_at = ?8, last_scanned_at = ?8, updated_at = ?8 WHERE file_id = ?1",
196 params![
197 id.as_str(),
198 metadata.file_size_bytes as i64,
199 metadata.modified_at,
200 metadata.platform_file_key,
201 metadata.content_hash,
202 hash_algorithm,
203 status.as_str(),
204 now,
205 ],
206 )
207 .map_err(db_err)?;
208 Ok(())
209 }
210
211 pub fn set_status(&self, id: &FileId, status: FileStatus) -> OrbokResult<()> {
213 let conn = self.catalog.lock();
214 conn.execute(
215 "UPDATE files SET file_status = ?2, updated_at = ?3 WHERE file_id = ?1",
216 params![id.as_str(), status.as_str(), now_iso8601()],
217 )
218 .map_err(db_err)?;
219 Ok(())
220 }
221
222 pub fn mark_missing_unseen(&self, source_id: &SourceId, cutoff: &str) -> OrbokResult<u64> {
226 let conn = self.catalog.lock();
227 let n = conn
228 .execute(
229 "UPDATE files SET file_status = 'missing', updated_at = ?3 \
230 WHERE source_id = ?1 AND last_seen_at < ?2 \
231 AND file_status NOT IN ('missing', 'deleted')",
232 params![source_id.as_str(), cutoff, now_iso8601()],
233 )
234 .map_err(db_err)?;
235 Ok(n as u64)
236 }
237
238 pub fn count_by_status(&self, source_id: &SourceId) -> OrbokResult<Vec<(FileStatus, u64)>> {
240 let conn = self.catalog.lock();
241 let mut stmt = conn
242 .prepare(
243 "SELECT file_status, COUNT(*) FROM files WHERE source_id = ?1 GROUP BY file_status",
244 )
245 .map_err(db_err)?;
246 let rows = stmt
247 .query_map(params![source_id.as_str()], |row| {
248 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
249 })
250 .map_err(db_err)?;
251 let mut out = Vec::new();
252 for row in rows {
253 let (status, count) = row.map_err(db_err)?;
254 out.push((FileStatus::parse(&status)?, count as u64));
255 }
256 Ok(out)
257 }
258}
259
260fn row_to_record(row: &Row<'_>) -> rusqlite::Result<OrbokResult<FileRecord>> {
261 let status: String = row.get(11)?;
262 let size: i64 = row.get(6)?;
263 Ok((|| {
264 Ok(FileRecord {
265 file_id: FileId::from_string(row.get::<_, String>(0).map_err(db_err)?),
266 source_id: SourceId::from_string(row.get::<_, String>(1).map_err(db_err)?),
267 original_path: row.get(2).map_err(db_err)?,
268 canonical_path: row.get(3).map_err(db_err)?,
269 display_path: row.get(4).map_err(db_err)?,
270 extension: row.get(5).map_err(db_err)?,
271 file_size_bytes: size as u64,
272 modified_at: row.get(7).map_err(db_err)?,
273 platform_file_key: row.get(8).map_err(db_err)?,
274 content_hash: row.get(9).map_err(db_err)?,
275 hash_algorithm: row.get(10).map_err(db_err)?,
276 file_status: FileStatus::parse(&status)?,
277 last_seen_at: row.get(12).map_err(db_err)?,
278 last_indexed_at: row.get(13).map_err(db_err)?,
279 })
280 })())
281}