1use async_trait::async_trait;
4use rusqlite::params;
5
6use crate::domain::file_type::FileType;
7use crate::domain::topology_file::TopologyFile;
8use crate::infra::error::InfraError;
9use crate::infra::topology_file_store::TopologyFileStore;
10
11use super::mapping::{query_topology_files, ts_to_string};
12use super::{map_call_err, SqliteSyncStore};
13
14#[async_trait]
15impl TopologyFileStore for SqliteSyncStore {
16 async fn upsert(&self, file: &TopologyFile) -> Result<(), InfraError> {
17 let file = file.clone();
18 self.conn
19 .call(move |conn| {
20 let registered_at_str = ts_to_string(file.registered_at());
21 let deleted_at_str = file.deleted_at().map(ts_to_string);
22 let params = params![
23 file.id(),
24 file.relative_path(),
25 file.canonical_hash(),
26 file.file_type().as_str(),
27 registered_at_str,
28 deleted_at_str,
29 ];
30
31 let result = conn.execute(
32 "INSERT INTO topology_files (id, relative_path, canonical_hash, file_type, registered_at, deleted_at)
33 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
34 ON CONFLICT (id) DO UPDATE SET
35 relative_path = excluded.relative_path,
36 canonical_hash = excluded.canonical_hash,
37 file_type = excluded.file_type,
38 registered_at = excluded.registered_at,
39 deleted_at = excluded.deleted_at",
40 params,
41 );
42
43 match result {
44 Ok(_) => Ok(()),
45 Err(rusqlite::Error::SqliteFailure(err, _))
46 if err.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE =>
47 {
48 tracing::warn!(
51 id = file.id(),
52 path = file.relative_path(),
53 "topology_file upsert: path conflict — retiring existing record"
54 );
55 let now_rfc3339 = ts_to_string(chrono::Utc::now());
56 conn.execute(
57 "UPDATE topology_files SET deleted_at = ?
58 WHERE relative_path = ?2 AND id != ?3 AND deleted_at IS NULL",
59 params![now_rfc3339, file.relative_path(), file.id()],
60 )
61 .map_err(|e| InfraError::Store {
62 op: "sqlite",
63 reason: format!("retire conflicting topology_file failed: {e}"),
64 })?;
65
66 conn.execute(
68 "INSERT INTO topology_files (id, relative_path, canonical_hash, file_type, registered_at, deleted_at)
69 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
70 ON CONFLICT (id) DO UPDATE SET
71 relative_path = excluded.relative_path,
72 canonical_hash = excluded.canonical_hash,
73 file_type = excluded.file_type,
74 registered_at = excluded.registered_at,
75 deleted_at = excluded.deleted_at",
76 params![
77 file.id(),
78 file.relative_path(),
79 file.canonical_hash(),
80 file.file_type().as_str(),
81 registered_at_str,
82 deleted_at_str,
83 ],
84 )
85 .map_err(|e| InfraError::Store {
86 op: "sqlite",
87 reason: format!("upsert topology_file retry failed: {e}"),
88 })?;
89 Ok(())
90 }
91 Err(e) => Err(InfraError::Store {
92 op: "sqlite",
93 reason: format!("upsert topology_file failed: {e}"),
94 }),
95 }
96 })
97 .await
98 .map_err(map_call_err)
99 }
100
101 async fn get_by_id(&self, id: &str) -> Result<Option<TopologyFile>, InfraError> {
102 let id = id.to_string();
103 self.conn
104 .call(move |conn| {
105 let files = query_topology_files(
106 conn,
107 "SELECT * FROM topology_files WHERE id = ?",
108 &[&id as &dyn rusqlite::types::ToSql],
109 )?;
110 Ok(files.into_iter().next())
111 })
112 .await
113 .map_err(map_call_err)
114 }
115
116 async fn get_by_path(&self, relative_path: &str) -> Result<Option<TopologyFile>, InfraError> {
117 let path = relative_path.to_string();
118 self.conn
119 .call(move |conn| {
120 let files = query_topology_files(
121 conn,
122 "SELECT * FROM topology_files WHERE relative_path = ? AND deleted_at IS NULL",
123 &[&path as &dyn rusqlite::types::ToSql],
124 )?;
125 Ok(files.into_iter().next())
126 })
127 .await
128 .map_err(map_call_err)
129 }
130
131 async fn find_by_canonical_hash(&self, hash: &str) -> Result<Option<TopologyFile>, InfraError> {
132 let hash = hash.to_string();
133 self.conn
134 .call(move |conn| {
135 let files = query_topology_files(
136 conn,
137 "SELECT * FROM topology_files WHERE canonical_hash = ? AND deleted_at IS NULL",
138 &[&hash as &dyn rusqlite::types::ToSql],
139 )?;
140 Ok(files.into_iter().next())
141 })
142 .await
143 .map_err(map_call_err)
144 }
145
146 async fn list_active(
147 &self,
148 file_type: Option<FileType>,
149 limit: Option<usize>,
150 ) -> Result<Vec<TopologyFile>, InfraError> {
151 self.conn
152 .call(move |conn| {
153 let mut sql = String::from("SELECT * FROM topology_files WHERE deleted_at IS NULL");
154 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
155
156 if let Some(ft) = file_type {
157 sql.push_str(" AND file_type = ?");
158 param_values.push(Box::new(ft.as_str().to_string()));
159 }
160 sql.push_str(" ORDER BY registered_at DESC");
161 if let Some(n) = limit {
162 sql.push_str(" LIMIT ?");
163 let n_i64 = i64::try_from(n).map_err(|_| InfraError::Store {
164 op: "sqlite",
165 reason: format!("limit exceeds i64::MAX: {n}"),
166 })?;
167 param_values.push(Box::new(n_i64));
168 }
169
170 let refs: Vec<&dyn rusqlite::types::ToSql> =
171 param_values.iter().map(|p| p.as_ref()).collect();
172 query_topology_files(conn, &sql, &refs)
173 })
174 .await
175 .map_err(map_call_err)
176 }
177
178 async fn list_deleted(&self) -> Result<Vec<TopologyFile>, InfraError> {
179 self.conn
180 .call(|conn| {
181 query_topology_files(
182 conn,
183 "SELECT * FROM topology_files WHERE deleted_at IS NOT NULL ORDER BY deleted_at DESC",
184 &[],
185 )
186 })
187 .await
188 .map_err(map_call_err)
189 }
190
191 async fn hard_delete(&self, id: &str) -> Result<bool, InfraError> {
192 let id = id.to_string();
193 self.conn
194 .call(move |conn| {
195 let deleted = conn
196 .execute(
197 "DELETE FROM topology_files WHERE id = ? AND deleted_at IS NOT NULL",
198 params![id],
199 )
200 .map_err(|e| InfraError::Store {
201 op: "sqlite",
202 reason: format!("hard_delete topology_file failed: {e}"),
203 })?;
204 Ok(deleted > 0)
205 })
206 .await
207 .map_err(map_call_err)
208 }
209
210 async fn count_active(&self) -> Result<usize, InfraError> {
211 self.conn
212 .call(|conn| {
213 let count: usize = conn
214 .query_row(
215 "SELECT COUNT(*) FROM topology_files WHERE deleted_at IS NULL",
216 [],
217 |row| row.get(0),
218 )
219 .map_err(|e| InfraError::Store {
220 op: "sqlite",
221 reason: format!("count_active topology_files failed: {e}"),
222 })?;
223 Ok(count)
224 })
225 .await
226 .map_err(map_call_err)
227 }
228
229 async fn list_active_paths(&self) -> Result<Vec<String>, InfraError> {
230 self.conn
231 .call(|conn| {
232 let mut stmt = conn
233 .prepare(
234 "SELECT relative_path FROM topology_files WHERE deleted_at IS NULL ORDER BY relative_path",
235 )
236 .map_err(|e| InfraError::Store {
237 op: "sqlite",
238 reason: format!("{e}"),
239 })?;
240 let rows = stmt
241 .query_map([], |row| row.get::<_, String>(0))
242 .map_err(|e| InfraError::Store {
243 op: "sqlite",
244 reason: format!("{e}"),
245 })?;
246 let mut paths = Vec::new();
247 for row in rows {
248 paths.push(row.map_err(|e| InfraError::Store {
249 op: "sqlite",
250 reason: format!("{e}"),
251 })?);
252 }
253 Ok(paths)
254 })
255 .await
256 .map_err(map_call_err)
257 }
258}