1use async_trait::async_trait;
4use rusqlite::params;
5
6use crate::domain::location::LocationId;
7use crate::domain::location_file::LocationFile;
8use crate::infra::error::InfraError;
9use crate::infra::location_file_store::LocationFileStore;
10
11use super::mapping::{query_location_files, ts_to_string};
12use super::{map_call_err, SqliteSyncStore};
13
14#[async_trait]
15impl LocationFileStore for SqliteSyncStore {
16 async fn upsert(&self, file: &LocationFile) -> Result<(), InfraError> {
17 let file = file.clone();
18 self.conn
19 .call(move |conn| {
20 let size_i64 = i64::try_from(file.fingerprint().size).map_err(|_| {
21 InfraError::Store {
22 op: "sqlite",
23 reason: format!(
24 "size exceeds i64::MAX: {} (file_id {})",
25 file.fingerprint().size,
26 file.file_id()
27 ),
28 }
29 })?;
30 let modified_at_str = file.fingerprint().modified_at.map(ts_to_string);
31 let updated_at_str = ts_to_string(file.updated_at());
32 conn.execute(
33 "INSERT INTO location_files (file_id, location_id, relative_path, file_hash, content_hash, meta_hash, size, modified_at, state, embedded_id, updated_at)
34 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
35 ON CONFLICT (file_id, location_id) DO UPDATE SET
36 relative_path = excluded.relative_path,
37 file_hash = excluded.file_hash,
38 content_hash = excluded.content_hash,
39 meta_hash = excluded.meta_hash,
40 size = excluded.size,
41 modified_at = excluded.modified_at,
42 state = excluded.state,
43 embedded_id = excluded.embedded_id,
44 updated_at = excluded.updated_at",
45 params![
46 file.file_id(),
47 file.location_id().as_str(),
48 file.relative_path(),
49 file.fingerprint().byte_digest.as_ref().map(|d| d.to_prefixed_string()),
50 file.fingerprint().content_digest.as_ref().map(|d| d.0.clone()),
51 file.fingerprint().meta_digest.as_ref().map(|d| d.0.clone()),
52 size_i64,
53 modified_at_str,
54 file.state().as_str(),
55 file.embedded_id(),
56 updated_at_str,
57 ],
58 )
59 .map_err(|e| InfraError::Store {
60 op: "sqlite",
61 reason: format!("upsert location_file failed: {e}"),
62 })?;
63 Ok(())
64 })
65 .await
66 .map_err(map_call_err)
67 }
68
69 async fn get(
70 &self,
71 file_id: &str,
72 location_id: &LocationId,
73 ) -> Result<Option<LocationFile>, InfraError> {
74 let file_id = file_id.to_string();
75 let loc_str = location_id.as_str().to_string();
76 self.conn
77 .call(move |conn| {
78 let files = query_location_files(
79 conn,
80 "SELECT * FROM location_files WHERE file_id = ? AND location_id = ?",
81 &[
82 &file_id as &dyn rusqlite::types::ToSql,
83 &loc_str as &dyn rusqlite::types::ToSql,
84 ],
85 )?;
86 Ok(files.into_iter().next())
87 })
88 .await
89 .map_err(map_call_err)
90 }
91
92 async fn list_by_file(&self, file_id: &str) -> Result<Vec<LocationFile>, InfraError> {
93 let file_id = file_id.to_string();
94 self.conn
95 .call(move |conn| {
96 query_location_files(
97 conn,
98 "SELECT * FROM location_files WHERE file_id = ? ORDER BY location_id",
99 &[&file_id as &dyn rusqlite::types::ToSql],
100 )
101 })
102 .await
103 .map_err(map_call_err)
104 }
105
106 async fn list_by_location(
107 &self,
108 location_id: &LocationId,
109 ) -> Result<Vec<LocationFile>, InfraError> {
110 let loc_str = location_id.as_str().to_string();
111 self.conn
112 .call(move |conn| {
113 query_location_files(
114 conn,
115 "SELECT * FROM location_files WHERE location_id = ? ORDER BY relative_path",
116 &[&loc_str as &dyn rusqlite::types::ToSql],
117 )
118 })
119 .await
120 .map_err(map_call_err)
121 }
122
123 async fn list_by_files(
124 &self,
125 file_ids: &[&str],
126 ) -> Result<std::collections::HashMap<String, Vec<LocationFile>>, InfraError> {
127 let file_ids: Vec<String> = file_ids.iter().map(|s| s.to_string()).collect();
128 self.conn
129 .call(move |conn| {
130 let mut result: std::collections::HashMap<String, Vec<LocationFile>> =
131 std::collections::HashMap::new();
132 for chunk in file_ids.chunks(999) {
134 let placeholders: Vec<&str> =
135 chunk.iter().map(|_| "?").collect();
136 let sql = format!(
137 "SELECT * FROM location_files WHERE file_id IN ({}) ORDER BY file_id, location_id",
138 placeholders.join(",")
139 );
140 let params: Vec<&dyn rusqlite::types::ToSql> =
141 chunk.iter().map(|s| s as &dyn rusqlite::types::ToSql).collect();
142 let files = query_location_files(conn, &sql, ¶ms)?;
143 for file in files {
144 result
145 .entry(file.file_id().to_string())
146 .or_default()
147 .push(file);
148 }
149 }
150 Ok(result)
151 })
152 .await
153 .map_err(map_call_err)
154 }
155
156 async fn delete(&self, file_id: &str, location_id: &LocationId) -> Result<bool, InfraError> {
157 let file_id = file_id.to_string();
158 let loc_str = location_id.as_str().to_string();
159 self.conn
160 .call(move |conn| {
161 let changes = conn
162 .execute(
163 "DELETE FROM location_files WHERE file_id = ? AND location_id = ?",
164 params![file_id, loc_str],
165 )
166 .map_err(|e| InfraError::Store {
167 op: "sqlite",
168 reason: format!("delete location_file failed: {e}"),
169 })?;
170 Ok(changes > 0)
171 })
172 .await
173 .map_err(map_call_err)
174 }
175
176 async fn count_by_location(&self, location_id: &LocationId) -> Result<usize, InfraError> {
177 let loc_str = location_id.as_str().to_string();
178 self.conn
179 .call(move |conn| {
180 let count: usize = conn
181 .query_row(
182 "SELECT COUNT(*) FROM location_files WHERE location_id = ?",
183 params![loc_str],
184 |row| row.get(0),
185 )
186 .map_err(|e| InfraError::Store {
187 op: "sqlite",
188 reason: format!("count_by_location failed: {e}"),
189 })?;
190 Ok(count)
191 })
192 .await
193 .map_err(map_call_err)
194 }
195}