Skip to main content

convert_invert/internals/database/
manager.rs

1use anyhow::Context;
2use diesel::{
3    PgConnection, dsl::insert_into, prelude::*, sql_types::Integer, sql_types::Nullable,
4    sql_types::Text,
5};
6
7use crate::internals::context::context_manager::{RejectedTrack, RetryRequest, Track};
8use crate::internals::database::{model, schema};
9use crate::internals::search::search_manager::{
10    DownloadableFile as RuntimeDownloadableFile, JudgeSubmission as RuntimeJudgeSubmission,
11    SearchItem as RuntimeSearchItem,
12};
13
14#[derive(QueryableByName)]
15struct IdRow {
16    #[diesel(sql_type = Integer)]
17    id: i32,
18}
19
20fn reject_reason_name(reason: model::RejectReasonRow) -> &'static str {
21    match reason {
22        model::RejectReasonRow::AlreadyDownloaded => "already_downloaded",
23        model::RejectReasonRow::LowScore => "low_score",
24        model::RejectReasonRow::NotMusic => "not_music",
25        model::RejectReasonRow::Banned => "banned",
26        model::RejectReasonRow::AbandonedAttemptingSearch => "abandoned_attempting_search",
27    }
28}
29
30pub struct DatabaseManager<'a> {
31    pub connection: &'a mut PgConnection,
32}
33
34impl<'a> DatabaseManager<'a> {
35    pub fn new(connection: &'a mut PgConnection) -> Self {
36        Self { connection }
37    }
38
39    fn insert_search_item(
40        connection: &mut PgConnection,
41        search_item: &RuntimeSearchItem,
42    ) -> anyhow::Result<i32> {
43        let value = model::NewSearchItemRow::from(search_item);
44        insert_into(schema::search_items::table)
45            .values(&value)
46            .on_conflict(schema::search_items::track_id)
47            .do_nothing()
48            .execute(connection)
49            .context("Insert search item")?;
50
51        let inserted_id = Self::get_search_item_id(connection, search_item)
52            .context("Fetch inserted or existing search item")?;
53        Ok(inserted_id)
54    }
55
56    fn insert_downloadable_file(
57        connection: &mut PgConnection,
58        downloadable_file: &RuntimeDownloadableFile,
59    ) -> anyhow::Result<i32> {
60        let value = model::NewDownloadableFileRow::from(downloadable_file);
61        insert_into(schema::downloadable_files::table)
62            .values(&value)
63            .on_conflict((
64                schema::downloadable_files::filename,
65                schema::downloadable_files::username,
66                schema::downloadable_files::size,
67            ))
68            .do_nothing()
69            .execute(connection)
70            .context("Insert downloadable file")?;
71
72        let inserted_id = Self::get_downloadable_file_id(connection, downloadable_file)
73            .context("Fetch inserted or existing downloadable file")?;
74        Ok(inserted_id)
75    }
76
77    fn get_downloadable_file_id(
78        connection: &mut PgConnection,
79        downloadable_file: &RuntimeDownloadableFile,
80    ) -> anyhow::Result<i32> {
81        use schema::downloadable_files::dsl as df;
82        let downloadable_id = schema::downloadable_files::table
83            .filter(df::filename.eq(&downloadable_file.filename))
84            .filter(df::username.eq(&downloadable_file.username))
85            .filter(df::size.eq(downloadable_file.size))
86            .select(df::id)
87            .get_result(connection)
88            .context("Fetch downloadable file id")?;
89        Ok(downloadable_id)
90    }
91
92    fn upsert_downloaded_file(
93        connection: &mut PgConnection,
94        downloaded_file: &crate::internals::context::context_manager::DownloadedFile,
95    ) -> anyhow::Result<()> {
96        use schema::downloaded_file::dsl as dl;
97        let track_id = Some(
98            Self::get_search_item_id(connection, &downloaded_file.track)
99                .context("Fetch downloaded file track id")?,
100        );
101        let value = model::NewDownloadedFileRow::from_runtime(downloaded_file, track_id);
102        let Some(track_id) = value.track else {
103            insert_into(schema::downloaded_file::table)
104                .values(&value)
105                .on_conflict(dl::filename)
106                .do_update()
107                .set(dl::track.eq(value.track))
108                .execute(connection)
109                .context("Insert downloaded file without track")?;
110            return Ok(());
111        };
112        diesel::sql_query(
113            "INSERT INTO downloaded_file (filename, track)
114             VALUES ($1, $2)
115             ON CONFLICT (track) WHERE track IS NOT NULL
116             DO UPDATE SET filename = EXCLUDED.filename",
117        )
118        .bind::<Text, _>(&value.filename)
119        .bind::<Integer, _>(track_id)
120        .execute(connection)
121        .context("Insert downloaded file")?;
122        Ok(())
123    }
124
125    pub fn is_search_item_downloaded(
126        &mut self,
127        search_item: &RuntimeSearchItem,
128    ) -> anyhow::Result<bool> {
129        use schema::downloaded_file::dsl as dl;
130        let search_id = match Self::get_search_item_id(self.connection, search_item) {
131            Ok(search_id) => search_id,
132            Err(_) => return Ok(false),
133        };
134        let count: i64 = schema::downloaded_file::table
135            .filter(dl::track.eq(search_id))
136            .count()
137            .get_result(self.connection)
138            .context("Check downloaded track")?;
139        Ok(count > 0)
140    }
141
142    fn existing_rejected_track_id(
143        connection: &mut PgConnection,
144        track_id: i32,
145        value: &model::NewRejectedTrackRow,
146    ) -> anyhow::Result<Option<i32>> {
147        let existing = diesel::sql_query(
148            "SELECT id
149             FROM rejected_track
150             WHERE track = $1
151               AND reason = $2::reject_reason
152               AND value IS NOT DISTINCT FROM $3
153             LIMIT 1",
154        )
155        .bind::<Integer, _>(track_id)
156        .bind::<Text, _>(reject_reason_name(value.reason))
157        .bind::<Nullable<Text>, _>(value.value.as_deref())
158        .get_result::<IdRow>(connection)
159        .optional()
160        .map(|row| row.map(|row| row.id))
161        .context("Fetch existing rejected track")?;
162        Ok(existing)
163    }
164
165    fn existing_retry_request_id(
166        connection: &mut PgConnection,
167        value: &model::NewRetryRequestRow,
168    ) -> anyhow::Result<Option<i32>> {
169        use schema::retry_request::dsl as rr;
170        let existing = schema::retry_request::table
171            .filter(rr::request.eq(value.request))
172            .filter(rr::retry_attempts.eq(value.retry_attempts))
173            .filter(rr::failed_download_result.eq(value.failed_download_result))
174            .select(rr::id)
175            .first::<i32>(connection)
176            .optional()
177            .context("Fetch existing retry request")?;
178        Ok(existing)
179    }
180
181    fn update_jugde_submission_score(
182        connection: &mut PgConnection,
183        judge_submission: &RuntimeJudgeSubmission,
184    ) -> anyhow::Result<()> {
185        let judge_submission_id = Self::get_judge_submission_id(connection, judge_submission)?;
186        diesel::update(
187            schema::judge_submissions::table
188                .filter(schema::judge_submissions::id.eq(&judge_submission_id)),
189        )
190        .set(schema::judge_submissions::score.eq(judge_submission.score))
191        .execute(connection)
192        .context("update and set score")?;
193        Ok(())
194    }
195    fn insert_judge_submission(
196        connection: &mut PgConnection,
197        judge_submission: &RuntimeJudgeSubmission,
198    ) -> anyhow::Result<i32> {
199        use schema::judge_submissions::dsl as js;
200        let track_id = Self::get_search_item_id(connection, &judge_submission.track)
201            .with_context(|| format!("fetching track_id={:?}", judge_submission.track))?;
202        let query_id = Self::insert_downloadable_file(connection, &judge_submission.query)
203            .with_context(|| format!("fetching track_id={:?}", judge_submission.query))?;
204
205        let value = model::NewJudgeSubmissionRow {
206            track: track_id,
207            query: query_id,
208            score: None,
209        };
210        insert_into(schema::judge_submissions::table)
211            .values(&value)
212            .on_conflict((js::track, js::query))
213            .do_nothing()
214            .execute(connection)
215            .context("Insert judge submission")?;
216        let inserted_id = schema::judge_submissions::table
217            .filter(js::track.eq(track_id))
218            .filter(js::query.eq(query_id))
219            .select(js::id)
220            .get_result(connection)
221            .context("Fetch inserted or existing judge submission")?;
222        Ok(inserted_id)
223    }
224    pub fn get_judge_submission_id(
225        connection: &mut PgConnection,
226        judge_submission: &RuntimeJudgeSubmission,
227    ) -> anyhow::Result<i32> {
228        use schema::downloadable_files::dsl as df;
229        use schema::judge_submissions::dsl as js;
230        let query_id: i32 = schema::downloadable_files::table
231            .filter(df::filename.eq(&judge_submission.query.filename))
232            .filter(df::username.eq(&judge_submission.query.username))
233            .filter(df::size.eq(judge_submission.query.size))
234            .select(df::id)
235            .get_result(connection)
236            .context("Getting download id in js")?;
237        let search_id = Self::get_search_item_id(connection, &judge_submission.track)?;
238
239        let judge_id = schema::judge_submissions::table
240            .filter(js::track.eq(search_id))
241            .filter(js::query.eq(query_id))
242            .select(js::id)
243            .get_result(connection)
244            .with_context(|| {
245                format!(
246                    "Fetch judge submission id for track_id={} query_id={}",
247                    search_id, query_id
248                )
249            })?;
250        Ok(judge_id)
251    }
252
253    fn insert_retry_request(
254        connection: &mut PgConnection,
255        retry_request: &RetryRequest,
256    ) -> anyhow::Result<()> {
257        let request_id = Self::get_judge_submission_id(connection, &retry_request.request)?;
258        let failed_download_result =
259            Self::get_downloadable_file_id(connection, &retry_request.failed_download_result)?;
260        let value = model::NewRetryRequestRow {
261            request: request_id,
262            retry_attempts: i32::from(retry_request.retry_attempts),
263            failed_download_result,
264        };
265        if Self::existing_retry_request_id(connection, &value)?.is_none() {
266            insert_into(schema::retry_request::table)
267                .values(&value)
268                .execute(connection)
269                .context("Insert retry request")?;
270        }
271        Ok(())
272    }
273
274    fn insert_rejected_track(
275        connection: &mut PgConnection,
276        rejected_track: &RejectedTrack,
277    ) -> anyhow::Result<()> {
278        let (judge_submission, _) = rejected_track.parts();
279        let track_id = Self::get_judge_submission_id(connection, judge_submission)?;
280        let value = model::NewRejectedTrackRow::from_runtime(track_id, rejected_track);
281        if Self::existing_rejected_track_id(connection, track_id, &value)?.is_none() {
282            insert_into(schema::rejected_track::table)
283                .values(&value)
284                .execute(connection)
285                .context("Insert rejected track")?;
286        }
287        Ok(())
288    }
289    pub fn get_search_item_id(
290        connection: &mut PgConnection,
291        search_item: &RuntimeSearchItem,
292    ) -> anyhow::Result<i32> {
293        use schema::search_items::dsl as sl;
294        let search_id = schema::search_items::table
295            .filter(sl::track_id.eq(&search_item.track_id))
296            .select(schema::search_items::id)
297            .get_result(connection)
298            .context("database fetch search_id in get seatch id func")?;
299        Ok(search_id)
300    }
301
302    pub fn load_item_to_database(&mut self, item: &Track) -> anyhow::Result<()> {
303        self.connection
304            .transaction::<_, anyhow::Error, _>(|connection| {
305                match item {
306                    Track::Query(search_item) | Track::SearchRetry(search_item) => {
307                        Self::insert_search_item(connection, search_item)?;
308                    }
309                    Track::Result(judge_submission) => {
310                        Self::insert_judge_submission(connection, judge_submission)?;
311                    }
312                    Track::Downloadable(judge_submission) => {
313                        Self::update_jugde_submission_score(connection, judge_submission)?;
314                    }
315                    Track::File(downloaded_file) => {
316                        Self::upsert_downloaded_file(connection, downloaded_file)?;
317                    }
318                    Track::Retry(retry_request) => {
319                        Self::insert_retry_request(connection, retry_request)?;
320                    }
321                    Track::Reject(rejected_track) => {
322                        Self::insert_rejected_track(connection, rejected_track)?;
323                    }
324                }
325                Ok(())
326            })
327            .context("Persist track into database")?;
328        Ok(())
329    }
330}