convert_invert/internals/database/
manager.rs1use anyhow::Context;
2use diesel::{
3 PgConnection, dsl::insert_into, prelude::*, sql_types::Integer, sql_types::Nullable,
4 sql_types::Text,
5};
6
7use crate::internals::context::context_manager::{RejectedTrack, RetryRequest, Track};
8use crate::internals::database::{model, schema};
9use crate::internals::search::search_manager::{
10 DownloadableFile as RuntimeDownloadableFile, JudgeSubmission as RuntimeJudgeSubmission,
11 SearchItem as RuntimeSearchItem,
12};
13
14#[derive(QueryableByName)]
15struct IdRow {
16 #[diesel(sql_type = Integer)]
17 id: i32,
18}
19
20fn reject_reason_name(reason: model::RejectReasonRow) -> &'static str {
21 match reason {
22 model::RejectReasonRow::AlreadyDownloaded => "already_downloaded",
23 model::RejectReasonRow::LowScore => "low_score",
24 model::RejectReasonRow::NotMusic => "not_music",
25 model::RejectReasonRow::Banned => "banned",
26 model::RejectReasonRow::AbandonedAttemptingSearch => "abandoned_attempting_search",
27 }
28}
29
30pub struct DatabaseManager<'a> {
31 pub connection: &'a mut PgConnection,
32}
33
34impl<'a> DatabaseManager<'a> {
35 pub fn new(connection: &'a mut PgConnection) -> Self {
36 Self { connection }
37 }
38
39 fn insert_search_item(
40 connection: &mut PgConnection,
41 search_item: &RuntimeSearchItem,
42 ) -> anyhow::Result<i32> {
43 let value = model::NewSearchItemRow::from(search_item);
44 insert_into(schema::search_items::table)
45 .values(&value)
46 .on_conflict(schema::search_items::track_id)
47 .do_nothing()
48 .execute(connection)
49 .context("Insert search item")?;
50
51 let inserted_id = Self::get_search_item_id(connection, search_item)
52 .context("Fetch inserted or existing search item")?;
53 Ok(inserted_id)
54 }
55
56 fn insert_downloadable_file(
57 connection: &mut PgConnection,
58 downloadable_file: &RuntimeDownloadableFile,
59 ) -> anyhow::Result<i32> {
60 let value = model::NewDownloadableFileRow::from(downloadable_file);
61 insert_into(schema::downloadable_files::table)
62 .values(&value)
63 .on_conflict((
64 schema::downloadable_files::filename,
65 schema::downloadable_files::username,
66 schema::downloadable_files::size,
67 ))
68 .do_nothing()
69 .execute(connection)
70 .context("Insert downloadable file")?;
71
72 let inserted_id = Self::get_downloadable_file_id(connection, downloadable_file)
73 .context("Fetch inserted or existing downloadable file")?;
74 Ok(inserted_id)
75 }
76
77 fn get_downloadable_file_id(
78 connection: &mut PgConnection,
79 downloadable_file: &RuntimeDownloadableFile,
80 ) -> anyhow::Result<i32> {
81 use schema::downloadable_files::dsl as df;
82 let downloadable_id = schema::downloadable_files::table
83 .filter(df::filename.eq(&downloadable_file.filename))
84 .filter(df::username.eq(&downloadable_file.username))
85 .filter(df::size.eq(downloadable_file.size))
86 .select(df::id)
87 .get_result(connection)
88 .context("Fetch downloadable file id")?;
89 Ok(downloadable_id)
90 }
91
92 fn upsert_downloaded_file(
93 connection: &mut PgConnection,
94 downloaded_file: &crate::internals::context::context_manager::DownloadedFile,
95 ) -> anyhow::Result<()> {
96 use schema::downloaded_file::dsl as dl;
97 let track_id = Some(
98 Self::get_search_item_id(connection, &downloaded_file.track)
99 .context("Fetch downloaded file track id")?,
100 );
101 let value = model::NewDownloadedFileRow::from_runtime(downloaded_file, track_id);
102 let Some(track_id) = value.track else {
103 insert_into(schema::downloaded_file::table)
104 .values(&value)
105 .on_conflict(dl::filename)
106 .do_update()
107 .set(dl::track.eq(value.track))
108 .execute(connection)
109 .context("Insert downloaded file without track")?;
110 return Ok(());
111 };
112 diesel::sql_query(
113 "INSERT INTO downloaded_file (filename, track)
114 VALUES ($1, $2)
115 ON CONFLICT (track) WHERE track IS NOT NULL
116 DO UPDATE SET filename = EXCLUDED.filename",
117 )
118 .bind::<Text, _>(&value.filename)
119 .bind::<Integer, _>(track_id)
120 .execute(connection)
121 .context("Insert downloaded file")?;
122 Ok(())
123 }
124
125 pub fn is_search_item_downloaded(
126 &mut self,
127 search_item: &RuntimeSearchItem,
128 ) -> anyhow::Result<bool> {
129 use schema::downloaded_file::dsl as dl;
130 let search_id = match Self::get_search_item_id(self.connection, search_item) {
131 Ok(search_id) => search_id,
132 Err(_) => return Ok(false),
133 };
134 let count: i64 = schema::downloaded_file::table
135 .filter(dl::track.eq(search_id))
136 .count()
137 .get_result(self.connection)
138 .context("Check downloaded track")?;
139 Ok(count > 0)
140 }
141
142 fn existing_rejected_track_id(
143 connection: &mut PgConnection,
144 track_id: i32,
145 value: &model::NewRejectedTrackRow,
146 ) -> anyhow::Result<Option<i32>> {
147 let existing = diesel::sql_query(
148 "SELECT id
149 FROM rejected_track
150 WHERE track = $1
151 AND reason = $2::reject_reason
152 AND value IS NOT DISTINCT FROM $3
153 LIMIT 1",
154 )
155 .bind::<Integer, _>(track_id)
156 .bind::<Text, _>(reject_reason_name(value.reason))
157 .bind::<Nullable<Text>, _>(value.value.as_deref())
158 .get_result::<IdRow>(connection)
159 .optional()
160 .map(|row| row.map(|row| row.id))
161 .context("Fetch existing rejected track")?;
162 Ok(existing)
163 }
164
165 fn existing_retry_request_id(
166 connection: &mut PgConnection,
167 value: &model::NewRetryRequestRow,
168 ) -> anyhow::Result<Option<i32>> {
169 use schema::retry_request::dsl as rr;
170 let existing = schema::retry_request::table
171 .filter(rr::request.eq(value.request))
172 .filter(rr::retry_attempts.eq(value.retry_attempts))
173 .filter(rr::failed_download_result.eq(value.failed_download_result))
174 .select(rr::id)
175 .first::<i32>(connection)
176 .optional()
177 .context("Fetch existing retry request")?;
178 Ok(existing)
179 }
180
181 fn update_jugde_submission_score(
182 connection: &mut PgConnection,
183 judge_submission: &RuntimeJudgeSubmission,
184 ) -> anyhow::Result<()> {
185 let judge_submission_id = Self::get_judge_submission_id(connection, judge_submission)?;
186 diesel::update(
187 schema::judge_submissions::table
188 .filter(schema::judge_submissions::id.eq(&judge_submission_id)),
189 )
190 .set(schema::judge_submissions::score.eq(judge_submission.score))
191 .execute(connection)
192 .context("update and set score")?;
193 Ok(())
194 }
195 fn insert_judge_submission(
196 connection: &mut PgConnection,
197 judge_submission: &RuntimeJudgeSubmission,
198 ) -> anyhow::Result<i32> {
199 use schema::judge_submissions::dsl as js;
200 let track_id = Self::get_search_item_id(connection, &judge_submission.track)
201 .with_context(|| format!("fetching track_id={:?}", judge_submission.track))?;
202 let query_id = Self::insert_downloadable_file(connection, &judge_submission.query)
203 .with_context(|| format!("fetching track_id={:?}", judge_submission.query))?;
204
205 let value = model::NewJudgeSubmissionRow {
206 track: track_id,
207 query: query_id,
208 score: None,
209 };
210 insert_into(schema::judge_submissions::table)
211 .values(&value)
212 .on_conflict((js::track, js::query))
213 .do_nothing()
214 .execute(connection)
215 .context("Insert judge submission")?;
216 let inserted_id = schema::judge_submissions::table
217 .filter(js::track.eq(track_id))
218 .filter(js::query.eq(query_id))
219 .select(js::id)
220 .get_result(connection)
221 .context("Fetch inserted or existing judge submission")?;
222 Ok(inserted_id)
223 }
224 pub fn get_judge_submission_id(
225 connection: &mut PgConnection,
226 judge_submission: &RuntimeJudgeSubmission,
227 ) -> anyhow::Result<i32> {
228 use schema::downloadable_files::dsl as df;
229 use schema::judge_submissions::dsl as js;
230 let query_id: i32 = schema::downloadable_files::table
231 .filter(df::filename.eq(&judge_submission.query.filename))
232 .filter(df::username.eq(&judge_submission.query.username))
233 .filter(df::size.eq(judge_submission.query.size))
234 .select(df::id)
235 .get_result(connection)
236 .context("Getting download id in js")?;
237 let search_id = Self::get_search_item_id(connection, &judge_submission.track)?;
238
239 let judge_id = schema::judge_submissions::table
240 .filter(js::track.eq(search_id))
241 .filter(js::query.eq(query_id))
242 .select(js::id)
243 .get_result(connection)
244 .with_context(|| {
245 format!(
246 "Fetch judge submission id for track_id={} query_id={}",
247 search_id, query_id
248 )
249 })?;
250 Ok(judge_id)
251 }
252
253 fn insert_retry_request(
254 connection: &mut PgConnection,
255 retry_request: &RetryRequest,
256 ) -> anyhow::Result<()> {
257 let request_id = Self::get_judge_submission_id(connection, &retry_request.request)?;
258 let failed_download_result =
259 Self::get_downloadable_file_id(connection, &retry_request.failed_download_result)?;
260 let value = model::NewRetryRequestRow {
261 request: request_id,
262 retry_attempts: i32::from(retry_request.retry_attempts),
263 failed_download_result,
264 };
265 if Self::existing_retry_request_id(connection, &value)?.is_none() {
266 insert_into(schema::retry_request::table)
267 .values(&value)
268 .execute(connection)
269 .context("Insert retry request")?;
270 }
271 Ok(())
272 }
273
274 fn insert_rejected_track(
275 connection: &mut PgConnection,
276 rejected_track: &RejectedTrack,
277 ) -> anyhow::Result<()> {
278 let (judge_submission, _) = rejected_track.parts();
279 let track_id = Self::get_judge_submission_id(connection, judge_submission)?;
280 let value = model::NewRejectedTrackRow::from_runtime(track_id, rejected_track);
281 if Self::existing_rejected_track_id(connection, track_id, &value)?.is_none() {
282 insert_into(schema::rejected_track::table)
283 .values(&value)
284 .execute(connection)
285 .context("Insert rejected track")?;
286 }
287 Ok(())
288 }
289 pub fn get_search_item_id(
290 connection: &mut PgConnection,
291 search_item: &RuntimeSearchItem,
292 ) -> anyhow::Result<i32> {
293 use schema::search_items::dsl as sl;
294 let search_id = schema::search_items::table
295 .filter(sl::track_id.eq(&search_item.track_id))
296 .select(schema::search_items::id)
297 .get_result(connection)
298 .context("database fetch search_id in get seatch id func")?;
299 Ok(search_id)
300 }
301
302 pub fn load_item_to_database(&mut self, item: &Track) -> anyhow::Result<()> {
303 self.connection
304 .transaction::<_, anyhow::Error, _>(|connection| {
305 match item {
306 Track::Query(search_item) | Track::SearchRetry(search_item) => {
307 Self::insert_search_item(connection, search_item)?;
308 }
309 Track::Result(judge_submission) => {
310 Self::insert_judge_submission(connection, judge_submission)?;
311 }
312 Track::Downloadable(judge_submission) => {
313 Self::update_jugde_submission_score(connection, judge_submission)?;
314 }
315 Track::File(downloaded_file) => {
316 Self::upsert_downloaded_file(connection, downloaded_file)?;
317 }
318 Track::Retry(retry_request) => {
319 Self::insert_retry_request(connection, retry_request)?;
320 }
321 Track::Reject(rejected_track) => {
322 Self::insert_rejected_track(connection, rejected_track)?;
323 }
324 }
325 Ok(())
326 })
327 .context("Persist track into database")?;
328 Ok(())
329 }
330}