Skip to main content

vdsl_sync/infra/sqlite/
topology_file_store_impl.rs

1//! TopologyFileStore trait implementation for SqliteSyncStore.
2
3use async_trait::async_trait;
4use rusqlite::params;
5
6use crate::domain::file_type::FileType;
7use crate::domain::topology_file::TopologyFile;
8use crate::infra::error::InfraError;
9use crate::infra::topology_file_store::TopologyFileStore;
10
11use super::mapping::{query_topology_files, ts_to_string};
12use super::{map_call_err, SqliteSyncStore};
13
14#[async_trait]
15impl TopologyFileStore for SqliteSyncStore {
16    async fn upsert(&self, file: &TopologyFile) -> Result<(), InfraError> {
17        let file = file.clone();
18        self.conn
19            .call(move |conn| {
20                let registered_at_str = ts_to_string(file.registered_at());
21                let deleted_at_str = file.deleted_at().map(ts_to_string);
22                let params = params![
23                    file.id(),
24                    file.relative_path(),
25                    file.canonical_hash(),
26                    file.file_type().as_str(),
27                    registered_at_str,
28                    deleted_at_str,
29                ];
30
31                let result = conn.execute(
32                    "INSERT INTO topology_files (id, relative_path, canonical_hash, file_type, registered_at, deleted_at)
33                     VALUES (?1, ?2, ?3, ?4, ?5, ?6)
34                     ON CONFLICT (id) DO UPDATE SET
35                         relative_path = excluded.relative_path,
36                         canonical_hash = excluded.canonical_hash,
37                         file_type = excluded.file_type,
38                         registered_at = excluded.registered_at,
39                         deleted_at = excluded.deleted_at",
40                    params,
41                );
42
43                match result {
44                    Ok(_) => Ok(()),
45                    Err(rusqlite::Error::SqliteFailure(err, _))
46                        if err.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE =>
47                    {
48                        // path衝突: 同一relative_pathの別IDが存在。
49                        // 既存レコードをsoft delete(Rename後の旧pathに新ファイルが来た等)。
50                        tracing::warn!(
51                            id = file.id(),
52                            path = file.relative_path(),
53                            "topology_file upsert: path conflict — retiring existing record"
54                        );
55                        let now_rfc3339 = ts_to_string(chrono::Utc::now());
56                        conn.execute(
57                            "UPDATE topology_files SET deleted_at = ?
58                             WHERE relative_path = ?2 AND id != ?3 AND deleted_at IS NULL",
59                            params![now_rfc3339, file.relative_path(), file.id()],
60                        )
61                        .map_err(|e| InfraError::Store {
62                            op: "sqlite",
63                            reason: format!("retire conflicting topology_file failed: {e}"),
64                        })?;
65
66                        // リトライ
67                        conn.execute(
68                            "INSERT INTO topology_files (id, relative_path, canonical_hash, file_type, registered_at, deleted_at)
69                             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
70                             ON CONFLICT (id) DO UPDATE SET
71                                 relative_path = excluded.relative_path,
72                                 canonical_hash = excluded.canonical_hash,
73                                 file_type = excluded.file_type,
74                                 registered_at = excluded.registered_at,
75                                 deleted_at = excluded.deleted_at",
76                            params![
77                                file.id(),
78                                file.relative_path(),
79                                file.canonical_hash(),
80                                file.file_type().as_str(),
81                                registered_at_str,
82                                deleted_at_str,
83                            ],
84                        )
85                        .map_err(|e| InfraError::Store {
86                            op: "sqlite",
87                            reason: format!("upsert topology_file retry failed: {e}"),
88                        })?;
89                        Ok(())
90                    }
91                    Err(e) => Err(InfraError::Store {
92                        op: "sqlite",
93                        reason: format!("upsert topology_file failed: {e}"),
94                    }),
95                }
96            })
97            .await
98            .map_err(map_call_err)
99    }
100
101    async fn get_by_id(&self, id: &str) -> Result<Option<TopologyFile>, InfraError> {
102        let id = id.to_string();
103        self.conn
104            .call(move |conn| {
105                let files = query_topology_files(
106                    conn,
107                    "SELECT * FROM topology_files WHERE id = ?",
108                    &[&id as &dyn rusqlite::types::ToSql],
109                )?;
110                Ok(files.into_iter().next())
111            })
112            .await
113            .map_err(map_call_err)
114    }
115
116    async fn get_by_path(&self, relative_path: &str) -> Result<Option<TopologyFile>, InfraError> {
117        let path = relative_path.to_string();
118        self.conn
119            .call(move |conn| {
120                let files = query_topology_files(
121                    conn,
122                    "SELECT * FROM topology_files WHERE relative_path = ? AND deleted_at IS NULL",
123                    &[&path as &dyn rusqlite::types::ToSql],
124                )?;
125                Ok(files.into_iter().next())
126            })
127            .await
128            .map_err(map_call_err)
129    }
130
131    async fn find_by_canonical_hash(&self, hash: &str) -> Result<Option<TopologyFile>, InfraError> {
132        let hash = hash.to_string();
133        self.conn
134            .call(move |conn| {
135                let files = query_topology_files(
136                    conn,
137                    "SELECT * FROM topology_files WHERE canonical_hash = ? AND deleted_at IS NULL",
138                    &[&hash as &dyn rusqlite::types::ToSql],
139                )?;
140                Ok(files.into_iter().next())
141            })
142            .await
143            .map_err(map_call_err)
144    }
145
146    async fn list_active(
147        &self,
148        file_type: Option<FileType>,
149        limit: Option<usize>,
150    ) -> Result<Vec<TopologyFile>, InfraError> {
151        self.conn
152            .call(move |conn| {
153                let mut sql = String::from("SELECT * FROM topology_files WHERE deleted_at IS NULL");
154                let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
155
156                if let Some(ft) = file_type {
157                    sql.push_str(" AND file_type = ?");
158                    param_values.push(Box::new(ft.as_str().to_string()));
159                }
160                sql.push_str(" ORDER BY registered_at DESC");
161                if let Some(n) = limit {
162                    sql.push_str(" LIMIT ?");
163                    let n_i64 = i64::try_from(n).map_err(|_| InfraError::Store {
164                        op: "sqlite",
165                        reason: format!("limit exceeds i64::MAX: {n}"),
166                    })?;
167                    param_values.push(Box::new(n_i64));
168                }
169
170                let refs: Vec<&dyn rusqlite::types::ToSql> =
171                    param_values.iter().map(|p| p.as_ref()).collect();
172                query_topology_files(conn, &sql, &refs)
173            })
174            .await
175            .map_err(map_call_err)
176    }
177
178    async fn list_deleted(&self) -> Result<Vec<TopologyFile>, InfraError> {
179        self.conn
180            .call(|conn| {
181                query_topology_files(
182                    conn,
183                    "SELECT * FROM topology_files WHERE deleted_at IS NOT NULL ORDER BY deleted_at DESC",
184                    &[],
185                )
186            })
187            .await
188            .map_err(map_call_err)
189    }
190
191    async fn hard_delete(&self, id: &str) -> Result<bool, InfraError> {
192        let id = id.to_string();
193        self.conn
194            .call(move |conn| {
195                let deleted = conn
196                    .execute(
197                        "DELETE FROM topology_files WHERE id = ? AND deleted_at IS NOT NULL",
198                        params![id],
199                    )
200                    .map_err(|e| InfraError::Store {
201                        op: "sqlite",
202                        reason: format!("hard_delete topology_file failed: {e}"),
203                    })?;
204                Ok(deleted > 0)
205            })
206            .await
207            .map_err(map_call_err)
208    }
209
210    async fn count_active(&self) -> Result<usize, InfraError> {
211        self.conn
212            .call(|conn| {
213                let count: usize = conn
214                    .query_row(
215                        "SELECT COUNT(*) FROM topology_files WHERE deleted_at IS NULL",
216                        [],
217                        |row| row.get(0),
218                    )
219                    .map_err(|e| InfraError::Store {
220                        op: "sqlite",
221                        reason: format!("count_active topology_files failed: {e}"),
222                    })?;
223                Ok(count)
224            })
225            .await
226            .map_err(map_call_err)
227    }
228
229    async fn list_active_paths(&self) -> Result<Vec<String>, InfraError> {
230        self.conn
231            .call(|conn| {
232                let mut stmt = conn
233                    .prepare(
234                        "SELECT relative_path FROM topology_files WHERE deleted_at IS NULL ORDER BY relative_path",
235                    )
236                    .map_err(|e| InfraError::Store {
237                        op: "sqlite",
238                        reason: format!("{e}"),
239                    })?;
240                let rows = stmt
241                    .query_map([], |row| row.get::<_, String>(0))
242                    .map_err(|e| InfraError::Store {
243                        op: "sqlite",
244                        reason: format!("{e}"),
245                    })?;
246                let mut paths = Vec::new();
247                for row in rows {
248                    paths.push(row.map_err(|e| InfraError::Store {
249                        op: "sqlite",
250                        reason: format!("{e}"),
251                    })?);
252                }
253                Ok(paths)
254            })
255            .await
256            .map_err(map_call_err)
257    }
258}