Skip to main content

kellnr_db/database/
mod.rs

1mod operations;
2pub mod test_utils;
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::path::Path;
7
8use chrono::{DateTime, Utc};
9use kellnr_common::crate_data::{CrateData, CrateRegistryDep, CrateVersionData};
10use kellnr_common::crate_overview::CrateOverview;
11use kellnr_common::cratesio_prefetch_msg::{CratesioPrefetchMsg, UpdateData};
12use kellnr_common::index_metadata::{IndexDep, IndexMetadata};
13use kellnr_common::normalized_name::NormalizedName;
14use kellnr_common::original_name::OriginalName;
15use kellnr_common::prefetch::Prefetch;
16use kellnr_common::publish_metadata::PublishMetadata;
17use kellnr_common::version::Version;
18use kellnr_common::webhook::{Webhook, WebhookEvent, WebhookQueue};
19use kellnr_entity::prelude::*;
20use kellnr_entity::{
21    auth_token, crate_author, crate_author_to_crate, crate_category, crate_category_to_crate,
22    crate_group, crate_index, crate_keyword, crate_keyword_to_crate, crate_meta, crate_user,
23    cratesio_crate, cratesio_index, cratesio_meta, doc_queue, group, group_user, krate,
24    oauth2_identity, oauth2_state, owner, session, toolchain, toolchain_component,
25    toolchain_target, user, webhook, webhook_queue,
26};
27use kellnr_migration::iden::{
28    AuthTokenIden, CrateIden, CrateMetaIden, CratesIoIden, CratesIoMetaIden, GroupIden,
29};
30use sea_orm::entity::prelude::Uuid;
31use sea_orm::prelude::async_trait::async_trait;
32use sea_orm::query::{QueryOrder, QuerySelect, TransactionTrait};
33use sea_orm::sea_query::{Alias, Cond, Expr, Iden, JoinType, Order, Query, UnionType};
34use sea_orm::{
35    ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, ExprTrait,
36    FromQueryResult, ModelTrait, QueryFilter, RelationTrait, Set,
37};
38
39use crate::error::DbError;
40use crate::password::{generate_salt, hash_pwd, hash_token};
41use crate::provider::{
42    ChannelInfo, DbResult, OAuth2StateData, PrefetchState, ToolchainComponentInfo,
43    ToolchainTargetInfo, ToolchainWithTargets,
44};
45use crate::tables::init_database;
46use crate::{
47    AuthToken, ConString, CrateMeta, CrateSummary, DbProvider, DocQueueEntry, Group, User,
48};
49
50const DB_DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
51
52pub struct Database {
53    db_con: DatabaseConnection,
54}
55
56impl Database {
57    pub fn existing(db_con: DatabaseConnection) -> Self {
58        Self { db_con }
59    }
60
61    pub async fn new(con: &ConString, max_con: u32) -> Result<Self, DbError> {
62        let db_con = init_database(con, max_con)
63            .await
64            .map_err(|e| DbError::InitializationError(e.to_string()))?;
65
66        if operations::no_user_exists(&db_con).await? {
67            operations::insert_admin_credentials(&db_con, con).await?;
68        }
69
70        Ok(Self { db_con })
71    }
72
73    /// Looks up a user by name; returns the entity model or [`DbError::UserNotFound`].
74    async fn get_user_model(&self, name: &str) -> DbResult<user::Model> {
75        user::Entity::find()
76            .filter(user::Column::Name.eq(name))
77            .one(&self.db_con)
78            .await?
79            .ok_or_else(|| DbError::UserNotFound(name.to_owned()))
80    }
81
82    /// Looks up a krate by normalized name; returns the entity model or [`DbError::CrateNotFound`].
83    async fn get_krate_model(&self, crate_name: &NormalizedName) -> DbResult<krate::Model> {
84        krate::Entity::find()
85            .filter(krate::Column::Name.eq(crate_name))
86            .one(&self.db_con)
87            .await?
88            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))
89    }
90
91    /// Looks up a group by name; returns the entity model or [`DbError::GroupNotFound`].
92    async fn get_group_model(&self, name: &str) -> DbResult<group::Model> {
93        group::Entity::find()
94            .filter(group::Column::Name.eq(name))
95            .one(&self.db_con)
96            .await?
97            .ok_or_else(|| DbError::GroupNotFound(name.to_owned()))
98    }
99
100    /// Loads targets (with their components) for a toolchain and returns a `ToolchainWithTargets`.
101    async fn toolchain_with_targets(&self, tc: toolchain::Model) -> DbResult<ToolchainWithTargets> {
102        let targets = toolchain_target::Entity::find()
103            .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
104            .all(&self.db_con)
105            .await?;
106
107        let mut target_infos = Vec::with_capacity(targets.len());
108        for t in targets {
109            let components = toolchain_component::Entity::find()
110                .filter(toolchain_component::Column::ToolchainTargetFk.eq(t.id))
111                .all(&self.db_con)
112                .await?;
113            target_infos.push(ToolchainTargetInfo {
114                id: t.id,
115                target: t.target,
116                storage_path: t.storage_path,
117                hash: t.hash,
118                size: t.size,
119                status: t.status,
120                components: components
121                    .into_iter()
122                    .map(ToolchainComponentInfo::from)
123                    .collect(),
124            });
125        }
126
127        Ok(ToolchainWithTargets {
128            id: tc.id,
129            name: tc.name,
130            version: tc.version,
131            date: tc.date,
132            channel: tc.channel,
133            created: tc.created,
134            targets: target_infos,
135        })
136    }
137
138    /// Looks up a crate index by name and version; returns the entity model or [`DbError::CrateIndexNotFound`].
139    async fn get_crate_index_model(
140        &self,
141        crate_name: &NormalizedName,
142        version: &Version,
143    ) -> DbResult<crate_index::Model> {
144        crate_index::Entity::find()
145            .filter(crate_index::Column::Name.eq(crate_name))
146            .filter(crate_index::Column::Vers.eq(version))
147            .one(&self.db_con)
148            .await?
149            .ok_or_else(|| DbError::CrateIndexNotFound(crate_name.to_string(), version.to_string()))
150    }
151
152    /// Looks up a toolchain by name and version; returns `None` if not found.
153    async fn get_toolchain_by_name_version(
154        &self,
155        name: &str,
156        version: &str,
157    ) -> DbResult<Option<toolchain::Model>> {
158        toolchain::Entity::find()
159            .filter(toolchain::Column::Name.eq(name))
160            .filter(toolchain::Column::Version.eq(version))
161            .one(&self.db_con)
162            .await
163            .map_err(Into::into)
164    }
165
166    /// Looks up a `crate_group` by crate name and group name; returns `None` if not found.
167    async fn get_crate_group_by_name_and_group(
168        &self,
169        crate_name: &NormalizedName,
170        group: &str,
171    ) -> DbResult<Option<crate_group::Model>> {
172        crate_group::Entity::find()
173            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
174            .join(JoinType::InnerJoin, crate_group::Relation::Group.def())
175            .filter(
176                Cond::all()
177                    .add(krate::Column::Name.eq(crate_name))
178                    .add(group::Column::Name.eq(group)),
179            )
180            .one(&self.db_con)
181            .await
182            .map_err(Into::into)
183    }
184
185    /// Looks up a `group_user` by group name and user name; returns `None` if not found.
186    async fn get_group_user_by_group_and_user(
187        &self,
188        group_name: &str,
189        user: &str,
190    ) -> DbResult<Option<group_user::Model>> {
191        group_user::Entity::find()
192            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
193            .join(JoinType::InnerJoin, group_user::Relation::User.def())
194            .filter(
195                Cond::all()
196                    .add(group::Column::Name.eq(group_name))
197                    .add(user::Column::Name.eq(user)),
198            )
199            .one(&self.db_con)
200            .await
201            .map_err(Into::into)
202    }
203
204    /// Looks up a `crate_user` by crate name and user name; returns `None` if not found.
205    async fn get_crate_user_by_crate_and_user(
206        &self,
207        crate_name: &NormalizedName,
208        user: &str,
209    ) -> DbResult<Option<crate_user::Model>> {
210        crate_user::Entity::find()
211            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
212            .join(JoinType::InnerJoin, crate_user::Relation::User.def())
213            .filter(
214                Cond::all()
215                    .add(krate::Column::Name.eq(crate_name))
216                    .add(user::Column::Name.eq(user)),
217            )
218            .one(&self.db_con)
219            .await
220            .map_err(Into::into)
221    }
222
223    /// Shared implementation for crate overview listing: optional name filter, optional limit/offset, cache union.
224    async fn query_crates(
225        &self,
226        contains: Option<&str>,
227        limit_offset: Option<(u64, u64)>,
228        cache: bool,
229    ) -> DbResult<Vec<CrateOverview>> {
230        let mut query = Query::select();
231        query
232            .expr_as(Expr::col(CrateIden::OriginalName), Alias::new("name"))
233            .expr_as(Expr::col(CrateIden::MaxVersion), Alias::new("version"))
234            .expr_as(Expr::col(CrateIden::LastUpdated), Alias::new("date"))
235            .expr_as(
236                Expr::col(CrateIden::TotalDownloads),
237                Alias::new("total_downloads"),
238            )
239            .expr_as(Expr::col(CrateIden::Description), Alias::new("description"))
240            .expr_as(
241                Expr::col(CrateMetaIden::Documentation),
242                Alias::new("documentation"),
243            )
244            .expr_as(Expr::cust("false"), Alias::new("is_cache"))
245            .from(CrateMetaIden::Table)
246            .inner_join(
247                CrateIden::Table,
248                Expr::col((CrateMetaIden::Table, CrateMetaIden::CrateFk))
249                    .equals((CrateIden::Table, CrateIden::Id)),
250            )
251            .and_where(
252                Expr::col((CrateMetaIden::Table, CrateMetaIden::Version))
253                    .equals((CrateIden::Table, CrateIden::MaxVersion)),
254            );
255
256        // Optional name filter
257        if let Some(contains) = contains {
258            query.and_where(
259                Expr::col((CrateIden::Table, CrateIden::Name)).like(format!("%{contains}%")),
260            );
261        }
262
263        // UNION with cached crates
264        if cache {
265            let mut query2 = Query::select();
266            query2
267                .expr_as(Expr::col(CratesIoIden::OriginalName), Alias::new("name"))
268                .expr_as(Expr::col(CratesIoIden::MaxVersion), Alias::new("version"))
269                .expr_as(Expr::col(CratesIoIden::LastModified), Alias::new("date"))
270                .expr_as(
271                    Expr::col(CratesIoIden::TotalDownloads),
272                    Alias::new("total_downloads"),
273                )
274                .expr_as(
275                    Expr::col(CratesIoIden::Description),
276                    Alias::new("description"),
277                )
278                .expr_as(
279                    Expr::col(CratesIoMetaIden::Documentation),
280                    Alias::new("documentation"),
281                )
282                .expr_as(Expr::cust("true"), Alias::new("is_cache"))
283                .from(CratesIoMetaIden::Table)
284                .inner_join(
285                    CratesIoIden::Table,
286                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::CratesIoFk))
287                        .equals((CratesIoIden::Table, CratesIoIden::Id)),
288                )
289                .and_where(
290                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::Version))
291                        .equals((CratesIoIden::Table, CratesIoIden::MaxVersion)),
292                );
293            if let Some(contains) = contains {
294                query2.and_where(
295                    Expr::col((CratesIoIden::Table, CratesIoIden::OriginalName))
296                        .like(format!("%{contains}%")),
297                );
298            }
299            query.union(UnionType::All, query2);
300        }
301
302        // Ordering and optional limit/offset
303        query.order_by(Alias::new("name"), Order::Asc);
304        if let Some((limit, offset)) = limit_offset {
305            query.limit(limit).offset(offset);
306        }
307
308        let builder = self.db_con.get_database_backend();
309        CrateOverview::find_by_statement(builder.build(&query))
310            .all(&self.db_con)
311            .await
312            .map_err(DbError::from)
313    }
314
315    /// Executes a count query `SELECT COUNT(id_column) FROM table` and returns the count as u64.
316    async fn count<T>(&self, table: T, id_column: T, error: DbError) -> DbResult<u64>
317    where
318        T: Iden + Copy,
319    {
320        #[derive(Debug, PartialEq, FromQueryResult)]
321        struct CountResult {
322            count: Option<i64>,
323        }
324
325        let statement = self.db_con.get_database_backend().build(
326            Query::select()
327                .expr_as(Expr::col((table, id_column)).count(), Alias::new("count"))
328                .from(table),
329        );
330        let Some(result) = CountResult::find_by_statement(statement)
331            .one(&self.db_con)
332            .await?
333        else {
334            return Err(error);
335        };
336        result
337            .count
338            .and_then(|v| u64::try_from(v).ok())
339            .ok_or(error)
340    }
341
342    async fn get_webhook_model(&self, id: &str) -> DbResult<webhook::Model> {
343        webhook::Entity::find()
344            .filter(webhook::Column::Id.eq(
345                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
346            ))
347            .one(&self.db_con)
348            .await?
349            .ok_or(DbError::WebhookNotFound)
350    }
351
352    async fn get_webhook_queue_model(&self, id: &str) -> DbResult<webhook_queue::Model> {
353        webhook_queue::Entity::find()
354            .filter(webhook_queue::Column::Id.eq(
355                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
356            ))
357            .one(&self.db_con)
358            .await?
359            .ok_or(DbError::WebhookNotFound)
360    }
361
362    async fn get_owner_by_crate_and_user(
363        &self,
364        crate_name: &str,
365        owner_name: &str,
366    ) -> DbResult<Option<owner::Model>> {
367        owner::Entity::find()
368            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
369            .join(JoinType::InnerJoin, owner::Relation::User.def())
370            .filter(
371                Cond::all()
372                    .add(krate::Column::Name.eq(crate_name))
373                    .add(user::Column::Name.eq(owner_name)),
374            )
375            .one(&self.db_con)
376            .await
377            .map_err(Into::into)
378    }
379
380    async fn delete_or_not_found<M, A>(&self, model: Option<M>, err: DbError) -> DbResult<()>
381    where
382        M: ModelTrait + sea_orm::IntoActiveModel<A>,
383        A: ActiveModelTrait<Entity = M::Entity> + sea_orm::ActiveModelBehavior + Send,
384    {
385        model.ok_or(err)?.delete(&self.db_con).await?;
386        Ok(())
387    }
388}
389
390fn webhook_model_to_obj(w: webhook::Model) -> DbResult<Webhook> {
391    let event = w
392        .event
393        .as_str()
394        .try_into()
395        .map_err(|_| DbError::InvalidWebhookEvent(w.event))?;
396    Ok(Webhook {
397        id: Some(w.id.into()),
398        name: w.name,
399        event,
400        callback_url: w.callback_url,
401    })
402}
403
404/// Generates an `add_*` method that links two entities via a join table.
405///
406/// # Parameters
407/// - `$method`: The method name to generate (e.g., `add_crate_user_impl`)
408/// - `$relation_entity`: The join table entity module (e.g., `crate_user`)
409/// - `$insert_entity`: The Sea ORM Entity type for insertion (e.g., `CrateUser`)
410/// - `$fk1`: The foreign key field for the first entity
411/// - `$fk2`: The foreign key field for the second entity
412macro_rules! impl_add_relation {
413    ($method:ident, $relation_entity:ident, $insert_entity:ident, $fk1:ident, $fk2:ident) => {
414        async fn $method(&self, fk1: i64, fk2: i64) -> DbResult<()> {
415            let model = $relation_entity::ActiveModel {
416                $fk1: Set(fk1),
417                $fk2: Set(fk2),
418                ..Default::default()
419            };
420            $insert_entity::insert(model).exec(&self.db_con).await?;
421            Ok(())
422        }
423    };
424}
425
426impl Database {
427    impl_add_relation!(
428        add_crate_user_impl,
429        crate_user,
430        CrateUser,
431        crate_fk,
432        user_fk
433    );
434    impl_add_relation!(
435        add_crate_group_impl,
436        crate_group,
437        CrateGroup,
438        crate_fk,
439        group_fk
440    );
441    impl_add_relation!(
442        add_group_user_impl,
443        group_user,
444        GroupUser,
445        group_fk,
446        user_fk
447    );
448    impl_add_relation!(add_owner_impl, owner, Owner, crate_fk, user_fk);
449}
450
451#[async_trait]
452impl DbProvider for Database {
453    async fn get_total_unique_cached_crates(&self) -> DbResult<u64> {
454        self.count(
455            CratesIoIden::Table,
456            CratesIoIden::Id,
457            DbError::FailedToCountCrates,
458        )
459        .await
460    }
461
462    async fn get_total_cached_crate_versions(&self) -> DbResult<u64> {
463        self.count(
464            CratesIoMetaIden::Table,
465            CratesIoMetaIden::Id,
466            DbError::FailedToCountCrateVersions,
467        )
468        .await
469    }
470
471    async fn get_total_cached_downloads(&self) -> DbResult<u64> {
472        #[derive(FromQueryResult)]
473        struct Model {
474            total_downloads: i64,
475        }
476
477        let total_downloads = cratesio_crate::Entity::find()
478            .select_only()
479            .column(cratesio_crate::Column::TotalDownloads)
480            .into_model::<Model>()
481            .all(&self.db_con)
482            .await?;
483
484        Ok(total_downloads
485            .iter()
486            .map(|m| m.total_downloads as u64)
487            .sum())
488    }
489
490    async fn authenticate_user(&self, name: &str, pwd: &str) -> DbResult<User> {
491        let user = self.get_user(name).await?;
492
493        if hash_pwd(pwd, &user.salt) == user.pwd {
494            Ok(user)
495        } else {
496            Err(DbError::PasswordMismatch)
497        }
498    }
499
500    async fn increase_download_counter(
501        &self,
502        crate_name: &NormalizedName,
503        crate_version: &Version,
504    ) -> DbResult<()> {
505        let txn = self.db_con.begin().await?;
506
507        krate::Entity::update_many()
508            .col_expr(
509                krate::Column::TotalDownloads,
510                Expr::col(krate::Column::TotalDownloads).add(1),
511            )
512            .filter(krate::Column::Name.eq(crate_name))
513            .exec(&txn)
514            .await?;
515
516        let crate_id = krate::Entity::find()
517            .filter(krate::Column::Name.eq(crate_name))
518            .one(&txn)
519            .await?
520            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
521            .id;
522
523        crate_meta::Entity::update_many()
524            .col_expr(
525                crate_meta::Column::Downloads,
526                Expr::col(crate_meta::Column::Downloads).add(1),
527            )
528            .filter(
529                Cond::all()
530                    .add(crate_meta::Column::Version.eq(crate_version))
531                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
532            )
533            .exec(&txn)
534            .await?;
535
536        txn.commit().await?;
537        Ok(())
538    }
539
540    async fn increase_cached_download_counter(
541        &self,
542        crate_name: &NormalizedName,
543        crate_version: &Version,
544    ) -> DbResult<()> {
545        let txn = self.db_con.begin().await?;
546
547        cratesio_crate::Entity::update_many()
548            .col_expr(
549                cratesio_crate::Column::TotalDownloads,
550                Expr::col(cratesio_crate::Column::TotalDownloads).add(1),
551            )
552            .filter(cratesio_crate::Column::Name.eq(crate_name))
553            .exec(&txn)
554            .await?;
555
556        let crate_id = cratesio_crate::Entity::find()
557            .filter(cratesio_crate::Column::Name.eq(crate_name))
558            .one(&txn)
559            .await?
560            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
561            .id;
562
563        cratesio_meta::Entity::update_many()
564            .col_expr(
565                cratesio_meta::Column::Downloads,
566                Expr::col(cratesio_meta::Column::Downloads).add(1),
567            )
568            .filter(
569                Cond::all()
570                    .add(cratesio_meta::Column::Version.eq(crate_version))
571                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
572            )
573            .exec(&txn)
574            .await?;
575
576        txn.commit().await?;
577        Ok(())
578    }
579
580    async fn increase_download_counter_by(
581        &self,
582        crate_name: &NormalizedName,
583        crate_version: &Version,
584        count: u64,
585    ) -> DbResult<()> {
586        let txn = self.db_con.begin().await?;
587
588        krate::Entity::update_many()
589            .col_expr(
590                krate::Column::TotalDownloads,
591                Expr::col(krate::Column::TotalDownloads).add(count as i64),
592            )
593            .filter(krate::Column::Name.eq(crate_name))
594            .exec(&txn)
595            .await?;
596
597        let crate_id = krate::Entity::find()
598            .filter(krate::Column::Name.eq(crate_name))
599            .one(&txn)
600            .await?
601            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
602            .id;
603
604        crate_meta::Entity::update_many()
605            .col_expr(
606                crate_meta::Column::Downloads,
607                Expr::col(crate_meta::Column::Downloads).add(count as i64),
608            )
609            .filter(
610                Cond::all()
611                    .add(crate_meta::Column::Version.eq(crate_version))
612                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
613            )
614            .exec(&txn)
615            .await?;
616
617        txn.commit().await?;
618        Ok(())
619    }
620
621    async fn increase_cached_download_counter_by(
622        &self,
623        crate_name: &NormalizedName,
624        crate_version: &Version,
625        count: u64,
626    ) -> DbResult<()> {
627        let txn = self.db_con.begin().await?;
628
629        cratesio_crate::Entity::update_many()
630            .col_expr(
631                cratesio_crate::Column::TotalDownloads,
632                Expr::col(cratesio_crate::Column::TotalDownloads).add(count as i64),
633            )
634            .filter(cratesio_crate::Column::Name.eq(crate_name))
635            .exec(&txn)
636            .await?;
637
638        let crate_id = cratesio_crate::Entity::find()
639            .filter(cratesio_crate::Column::Name.eq(crate_name))
640            .one(&txn)
641            .await?
642            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
643            .id;
644
645        cratesio_meta::Entity::update_many()
646            .col_expr(
647                cratesio_meta::Column::Downloads,
648                Expr::col(cratesio_meta::Column::Downloads).add(count as i64),
649            )
650            .filter(
651                Cond::all()
652                    .add(cratesio_meta::Column::Version.eq(crate_version))
653                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
654            )
655            .exec(&txn)
656            .await?;
657
658        txn.commit().await?;
659        Ok(())
660    }
661
662    async fn get_last_updated_crate(&self) -> DbResult<Option<(OriginalName, Version)>> {
663        let krate = krate::Entity::find()
664            .order_by_desc(krate::Column::LastUpdated)
665            .one(&self.db_con)
666            .await?;
667
668        if let Some(krate) = krate {
669            // SAFETY: Unchecked is ok, as only valid crate names are inserted into the database
670            let name = OriginalName::from_unchecked(krate.original_name);
671            // SAFETY: Unchecked is ok, as only valid versions are inserted into the database
672            let version = Version::from_unchecked_str(&krate.max_version);
673            Ok(Some((name, version)))
674        } else {
675            Ok(None)
676        }
677    }
678
679    async fn validate_session(&self, session_token: &str) -> DbResult<(String, bool)> {
680        let u = user::Entity::find()
681            .join(JoinType::InnerJoin, user::Relation::Session.def())
682            .filter(session::Column::Token.eq(session_token))
683            .one(&self.db_con)
684            .await?
685            .ok_or(DbError::SessionNotFound)?;
686
687        Ok((u.name, u.is_admin))
688    }
689
690    async fn add_session_token(&self, name: &str, session_token: &str) -> DbResult<()> {
691        let user = self.get_user(name).await?;
692        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
693
694        let s = session::ActiveModel {
695            token: Set(session_token.to_owned()),
696            created: Set(created),
697            user_fk: Set(user.id as i64),
698            ..Default::default()
699        };
700
701        s.insert(&self.db_con).await?;
702        Ok(())
703    }
704
705    async fn add_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
706        let crate_fk = self.get_krate_model(crate_name).await?.id;
707        let user_fk = self.get_user_model(user).await?.id;
708        self.add_crate_user_impl(crate_fk, user_fk).await
709    }
710
711    async fn add_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
712        let crate_fk = self.get_krate_model(crate_name).await?.id;
713        let group_fk = self.get_group_model(group).await?.id;
714        self.add_crate_group_impl(crate_fk, group_fk).await
715    }
716
717    async fn add_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
718        let group_fk = self.get_group_model(group_name).await?.id;
719        let user_fk = self.get_user_model(user).await?.id;
720        self.add_group_user_impl(group_fk, user_fk).await
721    }
722
723    async fn add_owner(&self, crate_name: &NormalizedName, owner: &str) -> DbResult<()> {
724        let crate_fk = self.get_krate_model(crate_name).await?.id;
725        let user_fk = self.get_user_model(owner).await?.id;
726        self.add_owner_impl(crate_fk, user_fk).await
727    }
728
729    async fn is_download_restricted(&self, crate_name: &NormalizedName) -> DbResult<bool> {
730        Ok(krate::Entity::find()
731            .filter(krate::Column::Name.eq(crate_name))
732            .one(&self.db_con)
733            .await?
734            .is_some_and(|model| model.restricted_download))
735    }
736
737    async fn change_download_restricted(
738        &self,
739        crate_name: &NormalizedName,
740        restricted: bool,
741    ) -> DbResult<()> {
742        let mut krate: krate::ActiveModel = self.get_krate_model(crate_name).await?.into();
743        krate.restricted_download = Set(restricted);
744        krate.update(&self.db_con).await?;
745        Ok(())
746    }
747
748    async fn is_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
749        Ok(self
750            .get_crate_user_by_crate_and_user(crate_name, user)
751            .await?
752            .is_some())
753    }
754
755    async fn is_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<bool> {
756        Ok(self
757            .get_crate_group_by_name_and_group(crate_name, group)
758            .await?
759            .is_some())
760    }
761
762    async fn is_crate_group_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
763        let user = user::Entity::find()
764            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
765            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
766            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
767            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
768            .filter(
769                Cond::all()
770                    .add(krate::Column::Name.eq(crate_name))
771                    .add(user::Column::Name.eq(user)),
772            )
773            .one(&self.db_con)
774            .await?;
775        Ok(user.is_some())
776    }
777
778    async fn is_group_user(&self, group_name: &str, user: &str) -> DbResult<bool> {
779        Ok(self
780            .get_group_user_by_group_and_user(group_name, user)
781            .await?
782            .is_some())
783    }
784
785    async fn is_owner(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
786        let owner = owner::Entity::find()
787            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
788            .join(JoinType::InnerJoin, owner::Relation::User.def())
789            .filter(
790                Cond::all()
791                    .add(krate::Column::Name.eq(crate_name))
792                    .add(user::Column::Name.eq(user)),
793            )
794            .one(&self.db_con)
795            .await?;
796
797        Ok(owner.is_some())
798    }
799
800    async fn get_crate_id(&self, crate_name: &NormalizedName) -> DbResult<Option<i64>> {
801        let id = krate::Entity::find()
802            .filter(krate::Column::Name.eq(crate_name))
803            .one(&self.db_con)
804            .await?
805            .map(|model| model.id);
806
807        Ok(id)
808    }
809
810    async fn get_crate_owners(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
811        let u = user::Entity::find()
812            .join(JoinType::InnerJoin, user::Relation::Owner.def())
813            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
814            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
815            .all(&self.db_con)
816            .await?;
817
818        Ok(u.into_iter().map(User::from).collect())
819    }
820
821    async fn get_crate_users(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
822        let u = user::Entity::find()
823            .join(JoinType::InnerJoin, user::Relation::CrateUser.def())
824            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
825            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
826            .all(&self.db_con)
827            .await?;
828
829        Ok(u.into_iter().map(User::from).collect())
830    }
831
832    async fn get_crate_groups(&self, crate_name: &NormalizedName) -> DbResult<Vec<Group>> {
833        let u = group::Entity::find()
834            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
835            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
836            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
837            .all(&self.db_con)
838            .await?;
839
840        Ok(u.into_iter().map(Group::from).collect())
841    }
842
843    async fn get_group_users(&self, group_name: &str) -> DbResult<Vec<User>> {
844        let u = user::Entity::find()
845            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
846            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
847            .filter(Expr::col((GroupIden::Table, group::Column::Name)).eq(group_name))
848            .all(&self.db_con)
849            .await?;
850
851        Ok(u.into_iter().map(User::from).collect())
852    }
853
854    async fn get_crate_versions(&self, crate_name: &NormalizedName) -> DbResult<Vec<Version>> {
855        let u = crate_meta::Entity::find()
856            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
857            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
858            .all(&self.db_con)
859            .await?;
860
861        Ok(u.into_iter()
862            .map(|meta| Version::from_unchecked_str(&meta.version))
863            .collect())
864    }
865
866    async fn delete_session_token(&self, session_token: &str) -> DbResult<()> {
867        if let Some(s) = session::Entity::find()
868            .filter(session::Column::Token.eq(session_token))
869            .one(&self.db_con)
870            .await?
871        {
872            s.delete(&self.db_con).await?;
873        }
874
875        Ok(())
876    }
877
878    async fn delete_user(&self, user_name: &str) -> DbResult<()> {
879        self.get_user_model(user_name)
880            .await?
881            .delete(&self.db_con)
882            .await?;
883        Ok(())
884    }
885
886    async fn delete_group(&self, group_name: &str) -> DbResult<()> {
887        self.get_group_model(group_name)
888            .await?
889            .delete(&self.db_con)
890            .await?;
891        Ok(())
892    }
893
894    async fn change_pwd(&self, user_name: &str, new_pwd: &str) -> DbResult<()> {
895        let salt = generate_salt();
896        let hashed = hash_pwd(new_pwd, &salt);
897
898        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
899        u.pwd = Set(hashed);
900        u.salt = Set(salt);
901
902        u.update(&self.db_con).await?;
903        Ok(())
904    }
905
906    async fn change_read_only_state(&self, user_name: &str, state: bool) -> DbResult<()> {
907        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
908        u.is_read_only = Set(state);
909
910        u.update(&self.db_con).await?;
911        Ok(())
912    }
913
914    async fn change_admin_state(&self, user_name: &str, state: bool) -> DbResult<()> {
915        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
916        u.is_admin = Set(state);
917
918        u.update(&self.db_con).await?;
919        Ok(())
920    }
921
922    async fn crate_version_exists(&self, crate_id: i64, version: &str) -> DbResult<bool> {
923        let cm = crate_meta::Entity::find()
924            .filter(
925                Cond::all()
926                    .add(crate_meta::Column::CrateFk.eq(crate_id))
927                    .add(crate_meta::Column::Version.eq(version)),
928            )
929            .one(&self.db_con)
930            .await?;
931
932        Ok(cm.is_some())
933    }
934
935    async fn get_max_version_from_id(&self, crate_id: i64) -> DbResult<Version> {
936        operations::get_max_version_from_id(&self.db_con, crate_id).await
937    }
938
939    async fn get_max_version_from_name(&self, crate_name: &NormalizedName) -> DbResult<Version> {
940        let k = self.get_krate_model(crate_name).await?;
941        let v = Version::try_from(&k.max_version)
942            .map_err(|_| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
943        Ok(v)
944    }
945
946    async fn update_max_version(&self, crate_id: i64, version: &Version) -> DbResult<()> {
947        let krate = krate::Entity::find_by_id(crate_id)
948            .one(&self.db_con)
949            .await?
950            .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
951
952        let mut k: krate::ActiveModel = krate.into();
953        k.max_version = Set(version.to_string());
954        k.update(&self.db_con).await?;
955
956        Ok(())
957    }
958
959    async fn add_auth_token(&self, name: &str, token: &str, user: &str) -> DbResult<()> {
960        let hashed_token = hash_token(token);
961        let user = self.get_user_model(user).await?;
962
963        let at = auth_token::ActiveModel {
964            name: Set(name.to_owned()),
965            token: Set(hashed_token),
966            user_fk: Set(user.id),
967            ..Default::default()
968        };
969
970        at.insert(&self.db_con).await?;
971
972        Ok(())
973    }
974
975    async fn get_user_from_token(&self, token: &str) -> DbResult<User> {
976        let token = hash_token(token);
977
978        let u = user::Entity::find()
979            .join(JoinType::InnerJoin, user::Relation::AuthToken.def())
980            .filter(Expr::col((AuthTokenIden::Table, AuthTokenIden::Token)).eq(token))
981            .one(&self.db_con)
982            .await?
983            .ok_or(DbError::TokenNotFound)?;
984
985        Ok(User::from(u))
986    }
987
988    async fn get_user(&self, name: &str) -> DbResult<User> {
989        Ok(User::from(self.get_user_model(name).await?))
990    }
991
992    async fn get_group(&self, name: &str) -> DbResult<Group> {
993        Ok(Group::from(self.get_group_model(name).await?))
994    }
995
996    async fn get_auth_tokens(&self, user_name: &str) -> DbResult<Vec<AuthToken>> {
997        let at: Vec<auth_token::Model> = auth_token::Entity::find()
998            .join(JoinType::InnerJoin, auth_token::Relation::User.def())
999            .filter(user::Column::Name.eq(user_name))
1000            .all(&self.db_con)
1001            .await?;
1002
1003        Ok(at.into_iter().map(AuthToken::from).collect())
1004    }
1005
1006    async fn delete_auth_token(&self, id: i32) -> DbResult<()> {
1007        auth_token::Entity::delete_by_id(id as i64)
1008            .exec(&self.db_con)
1009            .await?;
1010        Ok(())
1011    }
1012
1013    async fn delete_owner(&self, crate_name: &str, owner: &str) -> DbResult<()> {
1014        let model = self.get_owner_by_crate_and_user(crate_name, owner).await?;
1015        self.delete_or_not_found(model, DbError::OwnerNotFound(owner.to_string()))
1016            .await
1017    }
1018
1019    async fn delete_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
1020        let model = self
1021            .get_crate_user_by_crate_and_user(crate_name, user)
1022            .await?;
1023        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
1024            .await
1025    }
1026
1027    async fn delete_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
1028        let model = self
1029            .get_crate_group_by_name_and_group(crate_name, group)
1030            .await?;
1031        self.delete_or_not_found(model, DbError::GroupNotFound(group.to_string()))
1032            .await
1033    }
1034
1035    async fn delete_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
1036        let model = self
1037            .get_group_user_by_group_and_user(group_name, user)
1038            .await?;
1039        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
1040            .await
1041    }
1042
1043    async fn add_user(
1044        &self,
1045        name: &str,
1046        pwd: &str,
1047        salt: &str,
1048        is_admin: bool,
1049        is_read_only: bool,
1050    ) -> DbResult<()> {
1051        let hashed_pwd = hash_pwd(pwd, salt);
1052        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1053
1054        let u = user::ActiveModel {
1055            name: Set(name.to_owned()),
1056            pwd: Set(hashed_pwd),
1057            salt: Set(salt.to_owned()),
1058            is_admin: Set(is_admin),
1059            is_read_only: Set(is_read_only),
1060            created: Set(created),
1061            ..Default::default()
1062        };
1063
1064        u.insert(&self.db_con).await?;
1065        Ok(())
1066    }
1067
1068    async fn add_group(&self, name: &str) -> DbResult<()> {
1069        let g = group::ActiveModel {
1070            name: Set(name.to_owned()),
1071            ..Default::default()
1072        };
1073
1074        g.insert(&self.db_con).await?;
1075        Ok(())
1076    }
1077
1078    async fn get_users(&self) -> DbResult<Vec<User>> {
1079        let users = user::Entity::find()
1080            .order_by_asc(user::Column::Name)
1081            .all(&self.db_con)
1082            .await?;
1083
1084        Ok(users.into_iter().map(User::from).collect())
1085    }
1086
1087    async fn get_groups(&self) -> DbResult<Vec<Group>> {
1088        let groups = group::Entity::find()
1089            .order_by_asc(group::Column::Name)
1090            .all(&self.db_con)
1091            .await?;
1092
1093        Ok(groups.into_iter().map(Group::from).collect())
1094    }
1095
1096    async fn get_total_unique_crates(&self) -> DbResult<u64> {
1097        self.count(
1098            CrateIden::Table,
1099            CrateIden::Id,
1100            DbError::FailedToCountCrates,
1101        )
1102        .await
1103    }
1104
1105    async fn get_total_crate_versions(&self) -> DbResult<u64> {
1106        self.count(
1107            CrateMetaIden::Table,
1108            CrateMetaIden::Id,
1109            DbError::FailedToCountCrateVersions,
1110        )
1111        .await
1112    }
1113
1114    async fn get_total_downloads(&self) -> DbResult<u64> {
1115        #[derive(FromQueryResult)]
1116        struct Model {
1117            total_downloads: i64,
1118        }
1119
1120        Ok(krate::Entity::find()
1121            .select_only()
1122            .column(krate::Column::TotalDownloads)
1123            .into_model::<Model>()
1124            .all(&self.db_con)
1125            .await?
1126            .into_iter()
1127            .map(|m| m.total_downloads as u64)
1128            .sum())
1129    }
1130
1131    async fn get_top_crates_downloads(&self, top: u64) -> DbResult<Vec<(String, u64)>> {
1132        #[derive(Debug, PartialEq, FromQueryResult)]
1133        struct SelectResult {
1134            original_name: String,
1135            total_downloads: i64,
1136        }
1137
1138        let stmt = self.db_con.get_database_backend().build(
1139            Query::select()
1140                .columns(vec![CrateIden::OriginalName, CrateIden::TotalDownloads])
1141                .from(CrateIden::Table)
1142                .order_by(CrateIden::TotalDownloads, Order::Desc)
1143                .limit(top),
1144        );
1145
1146        Ok(SelectResult::find_by_statement(stmt)
1147            .all(&self.db_con)
1148            .await?
1149            .into_iter()
1150            .map(|x| (x.original_name, x.total_downloads as u64))
1151            .collect())
1152    }
1153
1154    async fn get_crate_summaries(&self) -> DbResult<Vec<CrateSummary>> {
1155        let krates = krate::Entity::find()
1156            .order_by(krate::Column::Name, Order::Asc)
1157            .all(&self.db_con)
1158            .await?;
1159
1160        Ok(krates.into_iter().map(CrateSummary::from).collect())
1161    }
1162
1163    async fn add_doc_queue(
1164        &self,
1165        krate: &NormalizedName,
1166        version: &Version,
1167        path: &Path,
1168    ) -> DbResult<()> {
1169        let s = doc_queue::ActiveModel {
1170            krate: Set(krate.to_string()),
1171            version: Set(version.to_string()),
1172            // FIXME: Convert Path to String properly, handle errors
1173            path: Set(path.to_string_lossy().to_string()),
1174            ..Default::default()
1175        };
1176
1177        s.insert(&self.db_con).await?;
1178        Ok(())
1179    }
1180
1181    async fn delete_doc_queue(&self, id: i64) -> DbResult<()> {
1182        DocQueue::delete_by_id(id).exec(&self.db_con).await?;
1183        Ok(())
1184    }
1185
1186    async fn get_doc_queue(&self) -> DbResult<Vec<DocQueueEntry>> {
1187        let entities = DocQueue::find().all(&self.db_con).await?;
1188
1189        Ok(entities.into_iter().map(DocQueueEntry::from).collect())
1190    }
1191
1192    async fn delete_crate(&self, krate: &NormalizedName, version: &Version) -> DbResult<()> {
1193        let txn = self.db_con.begin().await?;
1194
1195        // Delete the entry from the "crate_meta" table
1196        let crate_meta_version = crate_meta::Entity::find()
1197            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1198            .filter(krate::Column::Name.eq(krate))
1199            .filter(crate_meta::Column::Version.eq(version))
1200            .one(&txn)
1201            .await?
1202            .ok_or_else(|| DbError::CrateMetaNotFound(krate.to_string(), version.to_string()))?;
1203        let crate_id = crate_meta_version.crate_fk;
1204        let current_max_version = operations::get_max_version_from_id(&txn, crate_id).await?;
1205        crate_meta_version.delete(&txn).await?;
1206
1207        // Delete the crate index entry from "crate_index" table
1208        let crate_index_version = crate_index::Entity::find()
1209            .join(JoinType::InnerJoin, crate_index::Relation::Krate.def())
1210            .filter(krate::Column::Name.eq(krate))
1211            .filter(crate_index::Column::Vers.eq(version))
1212            .one(&txn)
1213            .await?
1214            .ok_or_else(|| DbError::CrateIndexNotFound(krate.to_string(), version.to_string()))?;
1215        crate_index_version.delete(&txn).await?;
1216
1217        // If it was the last entry in the "crate_meta" table, delete the entry
1218        // in the "crate" table as well
1219        let crate_meta_rows = crate_meta::Entity::find()
1220            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1221            .filter(krate::Column::Name.eq(krate))
1222            .all(&txn)
1223            .await?;
1224
1225        if crate_meta_rows.is_empty() {
1226            krate::Entity::delete_many()
1227                .filter(krate::Column::Name.eq(krate))
1228                .exec(&txn)
1229                .await?;
1230        } else {
1231            let c = krate::Entity::find_by_id(crate_id)
1232                .one(&txn)
1233                .await?
1234                .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
1235            let mut c: krate::ActiveModel = c.into();
1236
1237            // Update the max. version if the deleted version was the max. version.
1238            if version == &current_max_version {
1239                let new_max_version = crate_meta_rows
1240                    .into_iter()
1241                    .map(|cm| Version::from_unchecked_str(&cm.version))
1242                    .max()
1243                    .unwrap(); // Safe to unwrap, as crate_meta_rows is not empty
1244                c.max_version = Set(new_max_version.into_inner());
1245            }
1246            // Update the ETag value of the crate index.
1247            let etag = operations::compute_etag(&txn, krate, crate_id).await?;
1248            c.e_tag = Set(etag);
1249            c.update(&txn).await?;
1250        }
1251
1252        txn.commit().await?;
1253
1254        Ok(())
1255    }
1256
1257    async fn get_crate_meta_list(&self, crate_name: &NormalizedName) -> DbResult<Vec<CrateMeta>> {
1258        let crate_meta = crate_meta::Entity::find()
1259            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1260            .filter(krate::Column::Name.eq(crate_name))
1261            .all(&self.db_con)
1262            .await?;
1263
1264        let crate_meta = crate_meta
1265            .into_iter()
1266            .map(|cm| CrateMeta {
1267                name: crate_name.to_string(),
1268                id: cm.id,
1269                version: cm.version,
1270                created: cm.created,
1271                downloads: cm.downloads,
1272                crate_fk: cm.crate_fk,
1273            })
1274            .collect();
1275
1276        Ok(crate_meta)
1277    }
1278
1279    async fn update_last_updated(&self, id: i64, last_updated: &DateTime<Utc>) -> DbResult<()> {
1280        let krate = krate::Entity::find_by_id(id)
1281            .one(&self.db_con)
1282            .await?
1283            .ok_or(DbError::CrateNotFoundWithId(id))?;
1284
1285        let date = last_updated.format(DB_DATE_FORMAT).to_string();
1286
1287        let mut krate: krate::ActiveModel = krate.into();
1288        krate.last_updated = Set(date);
1289        krate.update(&self.db_con).await?;
1290
1291        Ok(())
1292    }
1293
1294    async fn search_in_crate_name(
1295        &self,
1296        contains: &str,
1297        cache: bool,
1298    ) -> DbResult<Vec<CrateOverview>> {
1299        self.query_crates(Some(contains), None, cache).await
1300    }
1301
1302    async fn get_crate_overview_list(
1303        &self,
1304        limit: u64,
1305        offset: u64,
1306        cache: bool,
1307    ) -> DbResult<Vec<CrateOverview>> {
1308        self.query_crates(None, Some((limit, offset)), cache).await
1309    }
1310
1311    async fn get_crate_data(&self, crate_name: &NormalizedName) -> DbResult<CrateData> {
1312        let krate = self.get_krate_model(crate_name).await?;
1313
1314        let owners: Vec<String> = krate
1315            .find_related(owner::Entity)
1316            .find_also_related(user::Entity)
1317            .all(&self.db_con)
1318            .await?
1319            .into_iter()
1320            .filter_map(|(_, v)| v.map(|v| v.name))
1321            .collect();
1322        let categories: Vec<String> = krate
1323            .find_related(crate_category_to_crate::Entity)
1324            .find_also_related(crate_category::Entity)
1325            .all(&self.db_con)
1326            .await?
1327            .into_iter()
1328            .filter_map(|(_, v)| v.map(|v| v.category))
1329            .collect();
1330        let keywords: Vec<String> = krate
1331            .find_related(crate_keyword_to_crate::Entity)
1332            .find_also_related(crate_keyword::Entity)
1333            .all(&self.db_con)
1334            .await?
1335            .into_iter()
1336            .filter_map(|(_, v)| v.map(|v| v.keyword))
1337            .collect();
1338        let authors: Vec<String> = krate
1339            .find_related(crate_author_to_crate::Entity)
1340            .find_also_related(crate_author::Entity)
1341            .all(&self.db_con)
1342            .await?
1343            .into_iter()
1344            .filter_map(|(_, v)| v.map(|v| v.author))
1345            .collect();
1346        let crate_metas = krate
1347            .find_related(crate_meta::Entity)
1348            .all(&self.db_con)
1349            .await?;
1350        let crate_indices = krate
1351            .find_related(crate_index::Entity)
1352            .all(&self.db_con)
1353            .await?;
1354
1355        let mut versions = Vec::new();
1356        for cm in crate_metas {
1357            let ci = crate_indices
1358                .iter()
1359                .find(|ci| ci.vers == cm.version)
1360                .ok_or_else(|| {
1361                    DbError::CrateIndexNotFound(krate.name.clone(), cm.version.clone())
1362                })?;
1363            let dependencies: Vec<CrateRegistryDep> = match ci.deps.clone() {
1364                Some(deps) => {
1365                    let ix = serde_json::from_value::<Vec<IndexDep>>(deps)
1366                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1367
1368                    let mut ft = Vec::new();
1369                    for dep in ix {
1370                        ft.push(CrateRegistryDep::from_index(
1371                            operations::get_desc_for_crate_dep(
1372                                &self.db_con,
1373                                &dep.name,
1374                                dep.registry.as_deref(),
1375                            )
1376                            .await?,
1377                            dep,
1378                        ));
1379                    }
1380
1381                    ft
1382                }
1383                None => Vec::default(),
1384            };
1385            let features: BTreeMap<String, Vec<String>> = match ci.features.clone() {
1386                Some(features) => serde_json::from_value::<BTreeMap<String, Vec<String>>>(features)
1387                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?,
1388                None => BTreeMap::default(),
1389            };
1390
1391            versions.push(CrateVersionData {
1392                version: cm.version,
1393                created: cm.created,
1394                downloads: cm.downloads,
1395                readme: cm.readme,
1396                license: cm.license,
1397                license_file: cm.license_file,
1398                documentation: cm.documentation,
1399                dependencies,
1400                checksum: ci.cksum.clone(),
1401                features,
1402                yanked: ci.yanked,
1403                links: ci.links.clone(),
1404                v: ci.v,
1405            });
1406        }
1407        versions.sort_by(|a, b| {
1408            Version::from_unchecked_str(&b.version).cmp(&Version::from_unchecked_str(&a.version))
1409        });
1410
1411        let crate_data = CrateData {
1412            name: krate.original_name,
1413            owners,
1414            max_version: krate.max_version,
1415            total_downloads: krate.total_downloads,
1416            last_updated: krate.last_updated,
1417            homepage: krate.homepage,
1418            description: krate.description,
1419            repository: krate.repository,
1420            categories,
1421            keywords,
1422            authors,
1423            versions,
1424        };
1425
1426        Ok(crate_data)
1427    }
1428
1429    async fn add_empty_crate(&self, name: &str, created: &DateTime<Utc>) -> DbResult<i64> {
1430        let created = created.format(DB_DATE_FORMAT).to_string();
1431        let normalized_name = NormalizedName::from(
1432            OriginalName::try_from(name)
1433                .map_err(|_| DbError::InvalidCrateName(name.to_string()))?,
1434        );
1435        let krate = krate::ActiveModel {
1436            id: ActiveValue::default(),
1437            name: Set(normalized_name.to_string()),
1438            original_name: Set(name.to_string()),
1439            max_version: Set("0.0.0".to_string()),
1440            last_updated: Set(created),
1441            total_downloads: Set(0),
1442            homepage: Set(None),
1443            description: Set(None),
1444            repository: Set(None),
1445            e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1446            restricted_download: Set(false),
1447        };
1448        Ok(krate.insert(&self.db_con).await?.id)
1449    }
1450
1451    async fn add_crate(
1452        &self,
1453        pub_metadata: &PublishMetadata,
1454        cksum: &str,
1455        created: &DateTime<Utc>,
1456        owner: &str,
1457    ) -> DbResult<i64> {
1458        let created = created.format(DB_DATE_FORMAT).to_string();
1459        let normalized_name = NormalizedName::from(
1460            OriginalName::try_from(&pub_metadata.name)
1461                .map_err(|_| DbError::InvalidCrateName(pub_metadata.name.clone()))?,
1462        );
1463
1464        let existing = krate::Entity::find()
1465            .filter(krate::Column::Name.eq(&pub_metadata.name))
1466            .one(&self.db_con)
1467            .await?;
1468
1469        let txn = self.db_con.begin().await?;
1470
1471        let crate_id = if let Some(krate) = existing {
1472            let krate_id = krate.id;
1473            let max_version = max(
1474                parse_db_version(&krate.max_version)?,
1475                parse_db_version(&pub_metadata.vers)?,
1476            );
1477
1478            let mut krate: krate::ActiveModel = krate.into();
1479            krate.last_updated = Set(created.clone());
1480            krate.max_version = Set(max_version.into_inner());
1481            krate.homepage = Set(pub_metadata.homepage.clone());
1482            krate.description = Set(pub_metadata.description.clone());
1483            krate.repository = Set(pub_metadata.repository.clone());
1484            krate.e_tag = Set(String::new()); // Set to empty string, as it can be computed, when the crate index is inserted
1485            krate.update(&txn).await?;
1486            krate_id
1487        } else {
1488            let krate = krate::ActiveModel {
1489                id: ActiveValue::default(),
1490                name: Set(normalized_name.to_string()),
1491                original_name: Set(pub_metadata.name.clone()),
1492                max_version: Set(pub_metadata.vers.clone()),
1493                last_updated: Set(created.clone()),
1494                total_downloads: Set(0),
1495                homepage: Set(pub_metadata.homepage.clone()),
1496                description: Set(pub_metadata.description.clone()),
1497                repository: Set(pub_metadata.repository.clone()),
1498                e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1499                restricted_download: Set(false),
1500            };
1501            let krate = krate.insert(&txn).await?;
1502            krate.id
1503        };
1504
1505        operations::add_owner_if_not_exists(&txn, owner, crate_id).await?;
1506        operations::add_crate_metadata(&txn, pub_metadata, &created, crate_id).await?;
1507        operations::add_crate_index(&txn, pub_metadata, cksum, crate_id).await?;
1508        operations::update_etag(&txn, &pub_metadata.name, crate_id).await?;
1509        operations::update_crate_categories(&txn, pub_metadata, crate_id).await?;
1510        operations::update_crate_keywords(&txn, pub_metadata, crate_id).await?;
1511        operations::update_crate_authors(&txn, pub_metadata, crate_id).await?;
1512
1513        txn.commit().await?;
1514        Ok(crate_id)
1515    }
1516
1517    async fn update_docs_link(
1518        &self,
1519        crate_name: &NormalizedName,
1520        version: &Version,
1521        docs_link: &str,
1522    ) -> DbResult<()> {
1523        let (cm, _c) = crate_meta::Entity::find()
1524            .find_also_related(krate::Entity)
1525            .filter(
1526                Cond::all()
1527                    .add(krate::Column::Name.eq(crate_name))
1528                    .add(crate_meta::Column::Version.eq(version)),
1529            )
1530            .one(&self.db_con)
1531            .await?
1532            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
1533
1534        let mut cm: crate_meta::ActiveModel = cm.into();
1535        cm.documentation = Set(Some(docs_link.to_string()));
1536        cm.update(&self.db_con).await?;
1537        Ok(())
1538    }
1539
1540    async fn add_crate_metadata(
1541        &self,
1542        pub_metadata: &PublishMetadata,
1543        created: &str,
1544        crate_id: i64,
1545    ) -> DbResult<()> {
1546        operations::add_crate_metadata(&self.db_con, pub_metadata, created, crate_id).await
1547    }
1548
1549    async fn get_prefetch_data(&self, crate_name: &str) -> DbResult<Prefetch> {
1550        let mut krate = krate::Entity::find()
1551            .filter(krate::Column::Name.eq(crate_name))
1552            .find_with_related(crate_index::Entity)
1553            .all(&self.db_con)
1554            .await?
1555            .into_iter();
1556
1557        // Expecting only one crate with its related indices
1558        let (Some((krate, crate_indices)), None) = (krate.next(), krate.next()) else {
1559            return Err(DbError::CrateNotFound(crate_name.to_string()));
1560        };
1561
1562        let index_metadata =
1563            operations::crate_index_model_to_index_metadata(crate_name, crate_indices)?;
1564        let data = operations::index_metadata_to_bytes(&index_metadata)?;
1565
1566        Ok(Prefetch {
1567            data,
1568            etag: krate.e_tag,
1569            last_modified: krate.last_updated,
1570        })
1571    }
1572
1573    async fn is_cratesio_cache_up_to_date(
1574        &self,
1575        crate_name: &NormalizedName,
1576        etag: Option<String>,
1577        last_modified: Option<String>,
1578    ) -> DbResult<PrefetchState> {
1579        let Some(krate) = cratesio_crate::Entity::find()
1580            .filter(cratesio_crate::Column::Name.eq(crate_name))
1581            .one(&self.db_con)
1582            .await?
1583        else {
1584            return Ok(PrefetchState::NotFound);
1585        };
1586
1587        let needs_update = match (etag, last_modified) {
1588            (Some(etag), Some(last_modified)) => {
1589                krate.e_tag != etag || krate.last_modified != last_modified
1590            }
1591            (Some(etag), None) => krate.e_tag != etag,
1592            (None, Some(last_modified)) => krate.last_modified != last_modified,
1593            (None, None) => true,
1594        };
1595
1596        if !needs_update {
1597            Ok(PrefetchState::UpToDate)
1598        } else {
1599            let crate_indices = krate
1600                .find_related(cratesio_index::Entity)
1601                .all(&self.db_con)
1602                .await?;
1603            let index_metadata =
1604                operations::cratesio_index_model_to_index_metadata(crate_name, crate_indices)?;
1605            let data = operations::index_metadata_to_bytes(&index_metadata)?;
1606
1607            Ok(PrefetchState::NeedsUpdate(Prefetch {
1608                data,
1609                etag: krate.e_tag.clone(),
1610                last_modified: krate.last_modified,
1611            }))
1612        }
1613    }
1614
1615    async fn add_cratesio_prefetch_data(
1616        &self,
1617        crate_name: &OriginalName,
1618        etag: &str,
1619        last_modified: &str,
1620        description: Option<String>,
1621        indices: &[IndexMetadata],
1622    ) -> DbResult<Prefetch> {
1623        let normalized_name = crate_name.to_normalized();
1624
1625        let max_version = indices
1626            .iter()
1627            .map(|i| Version::from_unchecked_str(&i.vers))
1628            .max()
1629            .ok_or_else(|| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
1630
1631        // Use a transaction to ensure the etag update and all index inserts are
1632        // atomic. Without this, a partial failure would leave the crate record
1633        // with the latest etag but incomplete index data, causing stale cache
1634        // entries that can never self-heal (the background updater would get 304
1635        // from crates.io because the etag matches).
1636        let txn = self.db_con.begin().await?;
1637
1638        let krate = cratesio_crate::Entity::find()
1639            .filter(cratesio_crate::Column::Name.eq(&normalized_name))
1640            .one(&txn)
1641            .await?;
1642
1643        let krate = if let Some(krate) = krate {
1644            let mut krate: cratesio_crate::ActiveModel = krate.into();
1645            krate.e_tag = Set(etag.to_string());
1646            krate.last_modified = Set(last_modified.to_string());
1647            krate.max_version = Set(max_version.into_inner());
1648            krate.update(&txn).await?
1649        } else {
1650            let krate = cratesio_crate::ActiveModel {
1651                id: ActiveValue::default(),
1652                name: Set(normalized_name.to_string()),
1653                original_name: Set(crate_name.to_string()),
1654                description: Set(description),
1655                e_tag: Set(etag.to_string()),
1656                last_modified: Set(last_modified.to_string()),
1657                total_downloads: Set(0),
1658                max_version: Set(max_version.to_string()),
1659            };
1660            krate.insert(&txn).await?
1661        };
1662
1663        let current_indices = cratesio_index::Entity::find()
1664            .filter(cratesio_index::Column::CratesIoFk.eq(krate.id))
1665            .all(&txn)
1666            .await?;
1667
1668        for index in indices {
1669            // Check if the version was yanked or un-yanked and update if so.
1670            if let Some(current_index) = current_indices.iter().find(|ci| index.vers == ci.vers) {
1671                if index.yanked != current_index.yanked {
1672                    let mut ci: cratesio_index::ActiveModel = current_index.to_owned().into();
1673                    ci.yanked = Set(index.yanked);
1674                    ci.update(&txn).await?;
1675                }
1676            } else {
1677                let deps = if index.deps.is_empty() {
1678                    None
1679                } else {
1680                    let deps = serde_json::to_value(&index.deps)
1681                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1682                    Some(deps)
1683                };
1684
1685                let features = serde_json::to_value(&index.features)
1686                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1687
1688                let features2 = serde_json::to_value(&index.features2)
1689                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1690
1691                let new_index = cratesio_index::ActiveModel {
1692                    id: ActiveValue::default(),
1693                    name: Set(index.name.clone()),
1694                    vers: Set(index.vers.clone()),
1695                    deps: Set(deps),
1696                    cksum: Set(index.cksum.clone()),
1697                    features: Set(Some(features)),
1698                    features2: Set(Some(features2)),
1699                    yanked: Set(index.yanked),
1700                    links: Set(index.links.clone()),
1701                    pubtime: Set(index.pubtime.map(|dt| dt.naive_utc())),
1702                    v: Set(index.v.unwrap_or(1) as i32),
1703                    crates_io_fk: Set(krate.id),
1704                };
1705
1706                new_index.insert(&txn).await?;
1707
1708                // Add the meta data for the crate version.
1709                let meta = cratesio_meta::ActiveModel {
1710                    id: ActiveValue::default(),
1711                    version: Set(index.vers.clone()),
1712                    downloads: Set(0),
1713                    crates_io_fk: Set(krate.id),
1714                    documentation: Set(Some(format!(
1715                        "https://docs.rs/{normalized_name}/{}",
1716                        index.vers,
1717                    ))),
1718                };
1719
1720                meta.insert(&txn).await?;
1721            }
1722        }
1723
1724        txn.commit().await?;
1725
1726        Ok(Prefetch {
1727            data: operations::index_metadata_to_bytes(indices)?,
1728            etag: etag.to_string(),
1729            last_modified: last_modified.to_string(),
1730        })
1731    }
1732
1733    async fn get_cratesio_index_update_list(&self) -> DbResult<Vec<CratesioPrefetchMsg>> {
1734        let crates = cratesio_crate::Entity::find().all(&self.db_con).await?;
1735        let msgs = crates
1736            .into_iter()
1737            .map(|krate| {
1738                CratesioPrefetchMsg::Update(UpdateData {
1739                    name: OriginalName::from_unchecked(krate.original_name),
1740                    etag: Some(krate.e_tag),
1741                    last_modified: Some(krate.last_modified),
1742                })
1743            })
1744            .collect();
1745        Ok(msgs)
1746    }
1747
1748    async fn unyank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1749        let mut ci: crate_index::ActiveModel = self
1750            .get_crate_index_model(crate_name, version)
1751            .await?
1752            .into();
1753        ci.yanked = Set(false);
1754        ci.save(&self.db_con).await?;
1755        Ok(())
1756    }
1757
1758    async fn yank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1759        let mut ci: crate_index::ActiveModel = self
1760            .get_crate_index_model(crate_name, version)
1761            .await?
1762            .into();
1763        ci.yanked = Set(true);
1764        ci.save(&self.db_con).await?;
1765        Ok(())
1766    }
1767
1768    async fn register_webhook(&self, webhook: Webhook) -> DbResult<String> {
1769        let w = webhook::ActiveModel {
1770            event: Set(Into::<&str>::into(webhook.event).to_string()),
1771            callback_url: Set(webhook.callback_url),
1772            name: Set(webhook.name),
1773            ..Default::default()
1774        };
1775
1776        let w: webhook::Model = w.insert(&self.db_con).await?;
1777        Ok(w.id.to_string())
1778    }
1779
1780    async fn delete_webhook(&self, id: &str) -> DbResult<()> {
1781        self.get_webhook_model(id)
1782            .await?
1783            .delete(&self.db_con)
1784            .await?;
1785        Ok(())
1786    }
1787
1788    async fn get_webhook(&self, id: &str) -> DbResult<Webhook> {
1789        webhook_model_to_obj(self.get_webhook_model(id).await?)
1790    }
1791
1792    async fn get_all_webhooks(&self) -> DbResult<Vec<Webhook>> {
1793        Ok(webhook::Entity::find()
1794            .all(&self.db_con)
1795            .await?
1796            .into_iter()
1797            .filter_map(|w| webhook_model_to_obj(w).ok())
1798            .collect())
1799    }
1800
1801    async fn add_webhook_queue(
1802        &self,
1803        event: WebhookEvent,
1804        payload: serde_json::Value,
1805    ) -> DbResult<()> {
1806        let w = webhook::Entity::find()
1807            .filter(webhook::Column::Event.eq(Into::<&str>::into(event)))
1808            .all(&self.db_con)
1809            .await?;
1810
1811        if w.is_empty() {
1812            return Ok(());
1813        }
1814
1815        let now = Utc::now();
1816
1817        let entries = w.into_iter().map(|w| webhook_queue::ActiveModel {
1818            webhook_fk: Set(w.id),
1819            payload: Set(payload.clone()),
1820            next_attempt: Set(now.into()),
1821            last_attempt: Set(None),
1822            ..Default::default()
1823        });
1824
1825        webhook_queue::Entity::insert_many(entries)
1826            .exec(&self.db_con)
1827            .await?;
1828        Ok(())
1829    }
1830    async fn get_pending_webhook_queue_entries(
1831        &self,
1832        timestamp: DateTime<Utc>,
1833    ) -> DbResult<Vec<WebhookQueue>> {
1834        let w = webhook_queue::Entity::find()
1835            .find_with_related(webhook::Entity)
1836            .filter(webhook_queue::Column::NextAttempt.lte(timestamp))
1837            .all(&self.db_con)
1838            .await?;
1839
1840        Ok(w.into_iter()
1841            .filter_map(|w| {
1842                Some(WebhookQueue {
1843                    id: w.0.id.to_string(),
1844                    callback_url: w.1.first()?.callback_url.clone(),
1845                    payload: w.0.payload,
1846                    last_attempt: w.0.last_attempt.map(Into::into),
1847                    next_attempt: w.0.next_attempt.into(),
1848                })
1849            })
1850            .collect())
1851    }
1852
1853    async fn update_webhook_queue(
1854        &self,
1855        id: &str,
1856        last_attempt: DateTime<Utc>,
1857        next_attempt: DateTime<Utc>,
1858    ) -> DbResult<()> {
1859        let mut w: webhook_queue::ActiveModel = self.get_webhook_queue_model(id).await?.into();
1860        w.last_attempt = Set(Some(last_attempt.into()));
1861        w.next_attempt = Set(next_attempt.into());
1862        w.update(&self.db_con).await?;
1863        Ok(())
1864    }
1865
1866    async fn delete_webhook_queue(&self, id: &str) -> DbResult<()> {
1867        self.get_webhook_queue_model(id)
1868            .await?
1869            .delete(&self.db_con)
1870            .await?;
1871        Ok(())
1872    }
1873
1874    // OAuth2 identity methods
1875
1876    async fn get_user_by_oauth2_identity(
1877        &self,
1878        issuer: &str,
1879        subject: &str,
1880    ) -> DbResult<Option<User>> {
1881        let identity = oauth2_identity::Entity::find()
1882            .filter(oauth2_identity::Column::ProviderIssuer.eq(issuer))
1883            .filter(oauth2_identity::Column::Subject.eq(subject))
1884            .one(&self.db_con)
1885            .await?;
1886
1887        if let Some(identity) = identity {
1888            let u = user::Entity::find_by_id(identity.user_fk)
1889                .one(&self.db_con)
1890                .await?
1891                .ok_or_else(|| DbError::UserNotFound(format!("user_id={}", identity.user_fk)))?;
1892
1893            Ok(Some(User::from(u)))
1894        } else {
1895            Ok(None)
1896        }
1897    }
1898
1899    async fn create_oauth2_user(
1900        &self,
1901        username: &str,
1902        issuer: &str,
1903        subject: &str,
1904        email: Option<String>,
1905        is_admin: bool,
1906        is_read_only: bool,
1907    ) -> DbResult<User> {
1908        // Use a transaction to ensure atomicity
1909        let txn = self.db_con.begin().await?;
1910
1911        // Generate a random password and salt for OAuth2 users
1912        // They won't use password auth, but we need valid values
1913        let salt = generate_salt();
1914        let random_pwd = Uuid::new_v4().to_string();
1915        let hashed_pwd = hash_pwd(&random_pwd, &salt);
1916        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1917
1918        // Create the user
1919        let new_user = user::ActiveModel {
1920            name: Set(username.to_string()),
1921            pwd: Set(hashed_pwd),
1922            salt: Set(salt),
1923            is_admin: Set(is_admin),
1924            is_read_only: Set(is_read_only),
1925            created: Set(created.clone()),
1926            ..Default::default()
1927        };
1928
1929        let res = user::Entity::insert(new_user).exec(&txn).await?;
1930        let user_id = res.last_insert_id;
1931
1932        // Link the OAuth2 identity
1933        let identity = oauth2_identity::ActiveModel {
1934            user_fk: Set(user_id),
1935            provider_issuer: Set(issuer.to_string()),
1936            subject: Set(subject.to_string()),
1937            email: Set(email),
1938            created: Set(created.clone()),
1939            ..Default::default()
1940        };
1941
1942        oauth2_identity::Entity::insert(identity).exec(&txn).await?;
1943
1944        txn.commit().await?;
1945
1946        Ok(User {
1947            id: user_id as i32,
1948            name: username.to_string(),
1949            pwd: String::new(),  // Don't expose password hash
1950            salt: String::new(), // Don't expose salt
1951            is_admin,
1952            is_read_only,
1953            created,
1954        })
1955    }
1956
1957    async fn link_oauth2_identity(
1958        &self,
1959        user_id: i64,
1960        issuer: &str,
1961        subject: &str,
1962        email: Option<String>,
1963    ) -> DbResult<()> {
1964        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1965        let identity = oauth2_identity::ActiveModel {
1966            user_fk: Set(user_id),
1967            provider_issuer: Set(issuer.to_string()),
1968            subject: Set(subject.to_string()),
1969            email: Set(email),
1970            created: Set(created),
1971            ..Default::default()
1972        };
1973
1974        oauth2_identity::Entity::insert(identity)
1975            .exec(&self.db_con)
1976            .await?;
1977
1978        Ok(())
1979    }
1980
1981    // OAuth2 state methods (CSRF/PKCE during auth flow)
1982
1983    async fn store_oauth2_state(
1984        &self,
1985        state: &str,
1986        pkce_verifier: &str,
1987        nonce: &str,
1988    ) -> DbResult<()> {
1989        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1990        let s = oauth2_state::ActiveModel {
1991            state: Set(state.to_string()),
1992            pkce_verifier: Set(pkce_verifier.to_string()),
1993            nonce: Set(nonce.to_string()),
1994            created: Set(created),
1995            ..Default::default()
1996        };
1997
1998        oauth2_state::Entity::insert(s).exec(&self.db_con).await?;
1999        Ok(())
2000    }
2001
2002    async fn get_and_delete_oauth2_state(&self, state: &str) -> DbResult<Option<OAuth2StateData>> {
2003        let s = oauth2_state::Entity::find()
2004            .filter(oauth2_state::Column::State.eq(state))
2005            .one(&self.db_con)
2006            .await?;
2007
2008        if let Some(s) = s {
2009            let data = OAuth2StateData {
2010                state: s.state.clone(),
2011                pkce_verifier: s.pkce_verifier.clone(),
2012                nonce: s.nonce.clone(),
2013            };
2014
2015            // Delete after retrieving (single use)
2016            s.delete(&self.db_con).await?;
2017
2018            Ok(Some(data))
2019        } else {
2020            Ok(None)
2021        }
2022    }
2023
2024    async fn cleanup_expired_oauth2_states(&self) -> DbResult<u64> {
2025        // States older than 10 minutes are expired
2026        let expiry = Utc::now() - chrono::Duration::minutes(10);
2027        let expiry_str = expiry.format(DB_DATE_FORMAT).to_string();
2028
2029        let result = oauth2_state::Entity::delete_many()
2030            .filter(oauth2_state::Column::Created.lt(expiry_str))
2031            .exec(&self.db_con)
2032            .await?;
2033
2034        Ok(result.rows_affected)
2035    }
2036
2037    async fn is_username_available(&self, username: &str) -> DbResult<bool> {
2038        let existing = user::Entity::find()
2039            .filter(user::Column::Name.eq(username))
2040            .one(&self.db_con)
2041            .await?;
2042
2043        Ok(existing.is_none())
2044    }
2045
2046    // Toolchain distribution methods
2047
2048    async fn add_toolchain(
2049        &self,
2050        name: &str,
2051        version: &str,
2052        date: &str,
2053        channel: Option<String>,
2054    ) -> DbResult<i64> {
2055        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
2056
2057        let model = toolchain::ActiveModel {
2058            id: ActiveValue::NotSet,
2059            name: Set(name.to_string()),
2060            version: Set(version.to_string()),
2061            date: Set(date.to_string()),
2062            channel: Set(channel),
2063            created: Set(created),
2064        };
2065
2066        let result = model.insert(&self.db_con).await?;
2067        Ok(result.id)
2068    }
2069
2070    async fn add_toolchain_target(
2071        &self,
2072        toolchain_id: i64,
2073        target: &str,
2074        storage_path: &str,
2075        hash: &str,
2076        size: i64,
2077    ) -> DbResult<i64> {
2078        let model = toolchain_target::ActiveModel {
2079            id: ActiveValue::NotSet,
2080            toolchain_fk: Set(toolchain_id),
2081            target: Set(target.to_string()),
2082            storage_path: Set(storage_path.to_string()),
2083            hash: Set(hash.to_string()),
2084            size: Set(size),
2085            status: Set("processing".to_string()),
2086        };
2087
2088        let result = model.insert(&self.db_con).await?;
2089        Ok(result.id)
2090    }
2091
2092    async fn set_target_status(&self, target_id: i64, status: &str) -> DbResult<()> {
2093        toolchain_target::Entity::update_many()
2094            .col_expr(toolchain_target::Column::Status, Expr::value(status))
2095            .filter(toolchain_target::Column::Id.eq(target_id))
2096            .exec(&self.db_con)
2097            .await?;
2098        Ok(())
2099    }
2100
2101    async fn add_toolchain_component(
2102        &self,
2103        target_id: i64,
2104        name: &str,
2105        storage_path: &str,
2106        hash: &str,
2107        size: i64,
2108    ) -> DbResult<()> {
2109        let model = toolchain_component::ActiveModel {
2110            id: ActiveValue::NotSet,
2111            toolchain_target_fk: Set(target_id),
2112            name: Set(name.to_string()),
2113            storage_path: Set(storage_path.to_string()),
2114            hash: Set(hash.to_string()),
2115            size: Set(size),
2116        };
2117
2118        model.insert(&self.db_con).await?;
2119        Ok(())
2120    }
2121
2122    async fn get_toolchain_by_channel(
2123        &self,
2124        channel: &str,
2125    ) -> DbResult<Option<ToolchainWithTargets>> {
2126        let tc = toolchain::Entity::find()
2127            .filter(toolchain::Column::Channel.eq(channel))
2128            .one(&self.db_con)
2129            .await?;
2130
2131        Ok(match tc {
2132            Some(tc) => Some(self.toolchain_with_targets(tc).await?),
2133            None => None,
2134        })
2135    }
2136
2137    async fn get_toolchain_by_version(
2138        &self,
2139        name: &str,
2140        version: &str,
2141    ) -> DbResult<Option<ToolchainWithTargets>> {
2142        Ok(
2143            match self.get_toolchain_by_name_version(name, version).await? {
2144                Some(tc) => Some(self.toolchain_with_targets(tc).await?),
2145                None => None,
2146            },
2147        )
2148    }
2149
2150    async fn list_toolchains(&self) -> DbResult<Vec<ToolchainWithTargets>> {
2151        let toolchains = toolchain::Entity::find()
2152            .order_by_desc(toolchain::Column::Created)
2153            .all(&self.db_con)
2154            .await?;
2155
2156        let mut result = Vec::with_capacity(toolchains.len());
2157
2158        for tc in toolchains {
2159            result.push(self.toolchain_with_targets(tc).await?);
2160        }
2161
2162        Ok(result)
2163    }
2164
2165    async fn delete_toolchain(&self, name: &str, version: &str) -> DbResult<()> {
2166        toolchain::Entity::delete_many()
2167            .filter(toolchain::Column::Name.eq(name))
2168            .filter(toolchain::Column::Version.eq(version))
2169            .exec(&self.db_con)
2170            .await?;
2171
2172        Ok(())
2173    }
2174
2175    async fn delete_toolchain_target(
2176        &self,
2177        name: &str,
2178        version: &str,
2179        target: &str,
2180    ) -> DbResult<()> {
2181        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2182            toolchain_target::Entity::delete_many()
2183                .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
2184                .filter(toolchain_target::Column::Target.eq(target))
2185                .exec(&self.db_con)
2186                .await?;
2187        }
2188
2189        Ok(())
2190    }
2191
2192    async fn set_channel(&self, channel: &str, name: &str, version: &str) -> DbResult<()> {
2193        // First, clear the channel from any other toolchain that has it
2194        let toolchains_with_channel = toolchain::Entity::find()
2195            .filter(toolchain::Column::Channel.eq(channel))
2196            .all(&self.db_con)
2197            .await?;
2198
2199        for tc in toolchains_with_channel {
2200            let mut model: toolchain::ActiveModel = tc.into();
2201            model.channel = Set(None);
2202            model.update(&self.db_con).await?;
2203        }
2204
2205        // Then set the channel on the target toolchain
2206        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2207            let mut model: toolchain::ActiveModel = tc.into();
2208            model.channel = Set(Some(channel.to_string()));
2209            model.update(&self.db_con).await?;
2210        }
2211
2212        Ok(())
2213    }
2214
2215    async fn get_channels(&self) -> DbResult<Vec<ChannelInfo>> {
2216        let toolchains = toolchain::Entity::find()
2217            .filter(toolchain::Column::Channel.is_not_null())
2218            .all(&self.db_con)
2219            .await?;
2220
2221        Ok(toolchains
2222            .into_iter()
2223            .filter_map(|tc| {
2224                tc.channel.map(|channel| ChannelInfo {
2225                    name: channel,
2226                    version: tc.version,
2227                    date: tc.date,
2228                })
2229            })
2230            .collect())
2231    }
2232}
2233
2234fn parse_db_version(value: &str) -> DbResult<Version> {
2235    Version::try_from(value).map_err(|_| DbError::InvalidVersion(value.to_owned()))
2236}