Skip to main content

kellnr_db/database/
mod.rs

1mod operations;
2pub mod test_utils;
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::path::Path;
7
8use chrono::{DateTime, Utc};
9use kellnr_common::crate_data::{CrateData, CrateRegistryDep, CrateVersionData};
10use kellnr_common::crate_overview::CrateOverview;
11use kellnr_common::cratesio_prefetch_msg::{CratesioPrefetchMsg, UpdateData};
12use kellnr_common::index_metadata::{IndexDep, IndexMetadata};
13use kellnr_common::normalized_name::NormalizedName;
14use kellnr_common::original_name::OriginalName;
15use kellnr_common::prefetch::Prefetch;
16use kellnr_common::publish_metadata::PublishMetadata;
17use kellnr_common::version::Version;
18use kellnr_common::webhook::{Webhook, WebhookEvent, WebhookQueue};
19use kellnr_entity::prelude::*;
20use kellnr_entity::{
21    auth_token, crate_author, crate_author_to_crate, crate_category, crate_category_to_crate,
22    crate_group, crate_index, crate_keyword, crate_keyword_to_crate, crate_meta, crate_user,
23    cratesio_crate, cratesio_index, cratesio_meta, doc_queue, group, group_user, krate,
24    oauth2_identity, oauth2_state, owner, session, toolchain, toolchain_target, user, webhook,
25    webhook_queue,
26};
27use kellnr_migration::iden::{
28    AuthTokenIden, CrateIden, CrateMetaIden, CratesIoIden, CratesIoMetaIden, GroupIden,
29};
30use sea_orm::entity::prelude::Uuid;
31use sea_orm::prelude::async_trait::async_trait;
32use sea_orm::query::{QueryOrder, QuerySelect, TransactionTrait};
33use sea_orm::sea_query::{Alias, Cond, Expr, Iden, JoinType, Order, Query, UnionType};
34use sea_orm::{
35    ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, ExprTrait,
36    FromQueryResult, ModelTrait, QueryFilter, RelationTrait, Set,
37};
38
39use crate::error::DbError;
40use crate::password::{generate_salt, hash_pwd, hash_token};
41use crate::provider::{
42    ChannelInfo, DbResult, OAuth2StateData, PrefetchState, ToolchainWithTargets,
43};
44use crate::tables::init_database;
45use crate::{
46    AuthToken, ConString, CrateMeta, CrateSummary, DbProvider, DocQueueEntry, Group, User,
47};
48
49const DB_DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
50
51pub struct Database {
52    db_con: DatabaseConnection,
53}
54
55impl Database {
56    pub fn existing(db_con: DatabaseConnection) -> Self {
57        Self { db_con }
58    }
59
60    pub async fn new(con: &ConString, max_con: u32) -> Result<Self, DbError> {
61        let db_con = init_database(con, max_con)
62            .await
63            .map_err(|e| DbError::InitializationError(e.to_string()))?;
64
65        if operations::no_user_exists(&db_con).await? {
66            operations::insert_admin_credentials(&db_con, con).await?;
67        }
68
69        Ok(Self { db_con })
70    }
71
72    /// Looks up a user by name; returns the entity model or [`DbError::UserNotFound`].
73    async fn get_user_model(&self, name: &str) -> DbResult<user::Model> {
74        user::Entity::find()
75            .filter(user::Column::Name.eq(name))
76            .one(&self.db_con)
77            .await?
78            .ok_or_else(|| DbError::UserNotFound(name.to_owned()))
79    }
80
81    /// Looks up a krate by normalized name; returns the entity model or [`DbError::CrateNotFound`].
82    async fn get_krate_model(&self, crate_name: &NormalizedName) -> DbResult<krate::Model> {
83        krate::Entity::find()
84            .filter(krate::Column::Name.eq(crate_name))
85            .one(&self.db_con)
86            .await?
87            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))
88    }
89
90    /// Looks up a group by name; returns the entity model or [`DbError::GroupNotFound`].
91    async fn get_group_model(&self, name: &str) -> DbResult<group::Model> {
92        group::Entity::find()
93            .filter(group::Column::Name.eq(name))
94            .one(&self.db_con)
95            .await?
96            .ok_or_else(|| DbError::GroupNotFound(name.to_owned()))
97    }
98
99    /// Loads targets for a toolchain and returns a `ToolchainWithTargets`.
100    async fn toolchain_with_targets(&self, tc: toolchain::Model) -> DbResult<ToolchainWithTargets> {
101        let targets = toolchain_target::Entity::find()
102            .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
103            .all(&self.db_con)
104            .await?;
105        Ok(ToolchainWithTargets::from_model(tc, targets))
106    }
107
108    /// Looks up a crate index by name and version; returns the entity model or [`DbError::CrateIndexNotFound`].
109    async fn get_crate_index_model(
110        &self,
111        crate_name: &NormalizedName,
112        version: &Version,
113    ) -> DbResult<crate_index::Model> {
114        crate_index::Entity::find()
115            .filter(crate_index::Column::Name.eq(crate_name))
116            .filter(crate_index::Column::Vers.eq(version))
117            .one(&self.db_con)
118            .await?
119            .ok_or_else(|| DbError::CrateIndexNotFound(crate_name.to_string(), version.to_string()))
120    }
121
122    /// Looks up a toolchain by name and version; returns `None` if not found.
123    async fn get_toolchain_by_name_version(
124        &self,
125        name: &str,
126        version: &str,
127    ) -> DbResult<Option<toolchain::Model>> {
128        toolchain::Entity::find()
129            .filter(toolchain::Column::Name.eq(name))
130            .filter(toolchain::Column::Version.eq(version))
131            .one(&self.db_con)
132            .await
133            .map_err(Into::into)
134    }
135
136    /// Looks up a `crate_group` by crate name and group name; returns `None` if not found.
137    async fn get_crate_group_by_name_and_group(
138        &self,
139        crate_name: &NormalizedName,
140        group: &str,
141    ) -> DbResult<Option<crate_group::Model>> {
142        crate_group::Entity::find()
143            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
144            .join(JoinType::InnerJoin, crate_group::Relation::Group.def())
145            .filter(
146                Cond::all()
147                    .add(krate::Column::Name.eq(crate_name))
148                    .add(group::Column::Name.eq(group)),
149            )
150            .one(&self.db_con)
151            .await
152            .map_err(Into::into)
153    }
154
155    /// Looks up a `group_user` by group name and user name; returns `None` if not found.
156    async fn get_group_user_by_group_and_user(
157        &self,
158        group_name: &str,
159        user: &str,
160    ) -> DbResult<Option<group_user::Model>> {
161        group_user::Entity::find()
162            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
163            .join(JoinType::InnerJoin, group_user::Relation::User.def())
164            .filter(
165                Cond::all()
166                    .add(group::Column::Name.eq(group_name))
167                    .add(user::Column::Name.eq(user)),
168            )
169            .one(&self.db_con)
170            .await
171            .map_err(Into::into)
172    }
173
174    /// Looks up a `crate_user` by crate name and user name; returns `None` if not found.
175    async fn get_crate_user_by_crate_and_user(
176        &self,
177        crate_name: &NormalizedName,
178        user: &str,
179    ) -> DbResult<Option<crate_user::Model>> {
180        crate_user::Entity::find()
181            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
182            .join(JoinType::InnerJoin, crate_user::Relation::User.def())
183            .filter(
184                Cond::all()
185                    .add(krate::Column::Name.eq(crate_name))
186                    .add(user::Column::Name.eq(user)),
187            )
188            .one(&self.db_con)
189            .await
190            .map_err(Into::into)
191    }
192
193    /// Shared implementation for crate overview listing: optional name filter, optional limit/offset, cache union.
194    async fn query_crates(
195        &self,
196        contains: Option<&str>,
197        limit_offset: Option<(u64, u64)>,
198        cache: bool,
199    ) -> DbResult<Vec<CrateOverview>> {
200        let mut query = Query::select();
201        query
202            .expr_as(Expr::col(CrateIden::OriginalName), Alias::new("name"))
203            .expr_as(Expr::col(CrateIden::MaxVersion), Alias::new("version"))
204            .expr_as(Expr::col(CrateIden::LastUpdated), Alias::new("date"))
205            .expr_as(
206                Expr::col(CrateIden::TotalDownloads),
207                Alias::new("total_downloads"),
208            )
209            .expr_as(Expr::col(CrateIden::Description), Alias::new("description"))
210            .expr_as(
211                Expr::col(CrateMetaIden::Documentation),
212                Alias::new("documentation"),
213            )
214            .expr_as(Expr::cust("false"), Alias::new("is_cache"))
215            .from(CrateMetaIden::Table)
216            .inner_join(
217                CrateIden::Table,
218                Expr::col((CrateMetaIden::Table, CrateMetaIden::CrateFk))
219                    .equals((CrateIden::Table, CrateIden::Id)),
220            )
221            .and_where(
222                Expr::col((CrateMetaIden::Table, CrateMetaIden::Version))
223                    .equals((CrateIden::Table, CrateIden::MaxVersion)),
224            );
225
226        // Optional name filter
227        if let Some(contains) = contains {
228            query.and_where(
229                Expr::col((CrateIden::Table, CrateIden::Name)).like(format!("%{contains}%")),
230            );
231        }
232
233        // UNION with cached crates
234        if cache {
235            let mut query2 = Query::select();
236            query2
237                .expr_as(Expr::col(CratesIoIden::OriginalName), Alias::new("name"))
238                .expr_as(Expr::col(CratesIoIden::MaxVersion), Alias::new("version"))
239                .expr_as(Expr::col(CratesIoIden::LastModified), Alias::new("date"))
240                .expr_as(
241                    Expr::col(CratesIoIden::TotalDownloads),
242                    Alias::new("total_downloads"),
243                )
244                .expr_as(
245                    Expr::col(CratesIoIden::Description),
246                    Alias::new("description"),
247                )
248                .expr_as(
249                    Expr::col(CratesIoMetaIden::Documentation),
250                    Alias::new("documentation"),
251                )
252                .expr_as(Expr::cust("true"), Alias::new("is_cache"))
253                .from(CratesIoMetaIden::Table)
254                .inner_join(
255                    CratesIoIden::Table,
256                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::CratesIoFk))
257                        .equals((CratesIoIden::Table, CratesIoIden::Id)),
258                )
259                .and_where(
260                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::Version))
261                        .equals((CratesIoIden::Table, CratesIoIden::MaxVersion)),
262                );
263            if let Some(contains) = contains {
264                query2.and_where(
265                    Expr::col((CratesIoIden::Table, CratesIoIden::OriginalName))
266                        .like(format!("%{contains}%")),
267                );
268            }
269            query.union(UnionType::All, query2);
270        }
271
272        // Ordering and optional limit/offset
273        query.order_by(Alias::new("name"), Order::Asc);
274        if let Some((limit, offset)) = limit_offset {
275            query.limit(limit).offset(offset);
276        }
277
278        let builder = self.db_con.get_database_backend();
279        CrateOverview::find_by_statement(builder.build(&query))
280            .all(&self.db_con)
281            .await
282            .map_err(DbError::from)
283    }
284
285    /// Executes a count query `SELECT COUNT(id_column) FROM table` and returns the count as u64.
286    async fn count<T>(&self, table: T, id_column: T, error: DbError) -> DbResult<u64>
287    where
288        T: Iden + Copy,
289    {
290        #[derive(Debug, PartialEq, FromQueryResult)]
291        struct CountResult {
292            count: Option<i64>,
293        }
294
295        let statement = self.db_con.get_database_backend().build(
296            Query::select()
297                .expr_as(Expr::col((table, id_column)).count(), Alias::new("count"))
298                .from(table),
299        );
300        let Some(result) = CountResult::find_by_statement(statement)
301            .one(&self.db_con)
302            .await?
303        else {
304            return Err(error);
305        };
306        result
307            .count
308            .and_then(|v| u64::try_from(v).ok())
309            .ok_or(error)
310    }
311
312    async fn get_webhook_model(&self, id: &str) -> DbResult<webhook::Model> {
313        webhook::Entity::find()
314            .filter(webhook::Column::Id.eq(
315                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
316            ))
317            .one(&self.db_con)
318            .await?
319            .ok_or(DbError::WebhookNotFound)
320    }
321
322    async fn get_webhook_queue_model(&self, id: &str) -> DbResult<webhook_queue::Model> {
323        webhook_queue::Entity::find()
324            .filter(webhook_queue::Column::Id.eq(
325                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
326            ))
327            .one(&self.db_con)
328            .await?
329            .ok_or(DbError::WebhookNotFound)
330    }
331
332    async fn get_owner_by_crate_and_user(
333        &self,
334        crate_name: &str,
335        owner_name: &str,
336    ) -> DbResult<Option<owner::Model>> {
337        owner::Entity::find()
338            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
339            .join(JoinType::InnerJoin, owner::Relation::User.def())
340            .filter(
341                Cond::all()
342                    .add(krate::Column::Name.eq(crate_name))
343                    .add(user::Column::Name.eq(owner_name)),
344            )
345            .one(&self.db_con)
346            .await
347            .map_err(Into::into)
348    }
349
350    async fn delete_or_not_found<M, A>(&self, model: Option<M>, err: DbError) -> DbResult<()>
351    where
352        M: ModelTrait + sea_orm::IntoActiveModel<A>,
353        A: ActiveModelTrait<Entity = M::Entity> + sea_orm::ActiveModelBehavior + Send,
354    {
355        model.ok_or(err)?.delete(&self.db_con).await?;
356        Ok(())
357    }
358}
359
360fn webhook_model_to_obj(w: webhook::Model) -> DbResult<Webhook> {
361    let event = w
362        .event
363        .as_str()
364        .try_into()
365        .map_err(|_| DbError::InvalidWebhookEvent(w.event))?;
366    Ok(Webhook {
367        id: Some(w.id.into()),
368        name: w.name,
369        event,
370        callback_url: w.callback_url,
371    })
372}
373
374/// Generates an `add_*` method that links two entities via a join table.
375///
376/// # Parameters
377/// - `$method`: The method name to generate (e.g., `add_crate_user_impl`)
378/// - `$relation_entity`: The join table entity module (e.g., `crate_user`)
379/// - `$insert_entity`: The Sea ORM Entity type for insertion (e.g., `CrateUser`)
380/// - `$fk1`: The foreign key field for the first entity
381/// - `$fk2`: The foreign key field for the second entity
382macro_rules! impl_add_relation {
383    ($method:ident, $relation_entity:ident, $insert_entity:ident, $fk1:ident, $fk2:ident) => {
384        async fn $method(&self, fk1: i64, fk2: i64) -> DbResult<()> {
385            let model = $relation_entity::ActiveModel {
386                $fk1: Set(fk1),
387                $fk2: Set(fk2),
388                ..Default::default()
389            };
390            $insert_entity::insert(model).exec(&self.db_con).await?;
391            Ok(())
392        }
393    };
394}
395
396impl Database {
397    impl_add_relation!(
398        add_crate_user_impl,
399        crate_user,
400        CrateUser,
401        crate_fk,
402        user_fk
403    );
404    impl_add_relation!(
405        add_crate_group_impl,
406        crate_group,
407        CrateGroup,
408        crate_fk,
409        group_fk
410    );
411    impl_add_relation!(
412        add_group_user_impl,
413        group_user,
414        GroupUser,
415        group_fk,
416        user_fk
417    );
418    impl_add_relation!(add_owner_impl, owner, Owner, crate_fk, user_fk);
419}
420
421#[async_trait]
422impl DbProvider for Database {
423    async fn get_total_unique_cached_crates(&self) -> DbResult<u64> {
424        self.count(
425            CratesIoIden::Table,
426            CratesIoIden::Id,
427            DbError::FailedToCountCrates,
428        )
429        .await
430    }
431
432    async fn get_total_cached_crate_versions(&self) -> DbResult<u64> {
433        self.count(
434            CratesIoMetaIden::Table,
435            CratesIoMetaIden::Id,
436            DbError::FailedToCountCrateVersions,
437        )
438        .await
439    }
440
441    async fn get_total_cached_downloads(&self) -> DbResult<u64> {
442        #[derive(FromQueryResult)]
443        struct Model {
444            total_downloads: i64,
445        }
446
447        let total_downloads = cratesio_crate::Entity::find()
448            .select_only()
449            .column(cratesio_crate::Column::TotalDownloads)
450            .into_model::<Model>()
451            .all(&self.db_con)
452            .await?;
453
454        Ok(total_downloads
455            .iter()
456            .map(|m| m.total_downloads as u64)
457            .sum())
458    }
459
460    async fn authenticate_user(&self, name: &str, pwd: &str) -> DbResult<User> {
461        let user = self.get_user(name).await?;
462
463        if hash_pwd(pwd, &user.salt) == user.pwd {
464            Ok(user)
465        } else {
466            Err(DbError::PasswordMismatch)
467        }
468    }
469
470    async fn increase_download_counter(
471        &self,
472        crate_name: &NormalizedName,
473        crate_version: &Version,
474    ) -> DbResult<()> {
475        let txn = self.db_con.begin().await?;
476
477        krate::Entity::update_many()
478            .col_expr(
479                krate::Column::TotalDownloads,
480                Expr::col(krate::Column::TotalDownloads).add(1),
481            )
482            .filter(krate::Column::Name.eq(crate_name))
483            .exec(&txn)
484            .await?;
485
486        let crate_id = krate::Entity::find()
487            .filter(krate::Column::Name.eq(crate_name))
488            .one(&txn)
489            .await?
490            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
491            .id;
492
493        crate_meta::Entity::update_many()
494            .col_expr(
495                crate_meta::Column::Downloads,
496                Expr::col(crate_meta::Column::Downloads).add(1),
497            )
498            .filter(
499                Cond::all()
500                    .add(crate_meta::Column::Version.eq(crate_version))
501                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
502            )
503            .exec(&txn)
504            .await?;
505
506        txn.commit().await?;
507        Ok(())
508    }
509
510    async fn increase_cached_download_counter(
511        &self,
512        crate_name: &NormalizedName,
513        crate_version: &Version,
514    ) -> DbResult<()> {
515        let txn = self.db_con.begin().await?;
516
517        cratesio_crate::Entity::update_many()
518            .col_expr(
519                cratesio_crate::Column::TotalDownloads,
520                Expr::col(cratesio_crate::Column::TotalDownloads).add(1),
521            )
522            .filter(cratesio_crate::Column::Name.eq(crate_name))
523            .exec(&txn)
524            .await?;
525
526        let crate_id = cratesio_crate::Entity::find()
527            .filter(cratesio_crate::Column::Name.eq(crate_name))
528            .one(&txn)
529            .await?
530            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
531            .id;
532
533        cratesio_meta::Entity::update_many()
534            .col_expr(
535                cratesio_meta::Column::Downloads,
536                Expr::col(cratesio_meta::Column::Downloads).add(1),
537            )
538            .filter(
539                Cond::all()
540                    .add(cratesio_meta::Column::Version.eq(crate_version))
541                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
542            )
543            .exec(&txn)
544            .await?;
545
546        txn.commit().await?;
547        Ok(())
548    }
549
550    async fn increase_download_counter_by(
551        &self,
552        crate_name: &NormalizedName,
553        crate_version: &Version,
554        count: u64,
555    ) -> DbResult<()> {
556        let txn = self.db_con.begin().await?;
557
558        krate::Entity::update_many()
559            .col_expr(
560                krate::Column::TotalDownloads,
561                Expr::col(krate::Column::TotalDownloads).add(count as i64),
562            )
563            .filter(krate::Column::Name.eq(crate_name))
564            .exec(&txn)
565            .await?;
566
567        let crate_id = krate::Entity::find()
568            .filter(krate::Column::Name.eq(crate_name))
569            .one(&txn)
570            .await?
571            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
572            .id;
573
574        crate_meta::Entity::update_many()
575            .col_expr(
576                crate_meta::Column::Downloads,
577                Expr::col(crate_meta::Column::Downloads).add(count as i64),
578            )
579            .filter(
580                Cond::all()
581                    .add(crate_meta::Column::Version.eq(crate_version))
582                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
583            )
584            .exec(&txn)
585            .await?;
586
587        txn.commit().await?;
588        Ok(())
589    }
590
591    async fn increase_cached_download_counter_by(
592        &self,
593        crate_name: &NormalizedName,
594        crate_version: &Version,
595        count: u64,
596    ) -> DbResult<()> {
597        let txn = self.db_con.begin().await?;
598
599        cratesio_crate::Entity::update_many()
600            .col_expr(
601                cratesio_crate::Column::TotalDownloads,
602                Expr::col(cratesio_crate::Column::TotalDownloads).add(count as i64),
603            )
604            .filter(cratesio_crate::Column::Name.eq(crate_name))
605            .exec(&txn)
606            .await?;
607
608        let crate_id = cratesio_crate::Entity::find()
609            .filter(cratesio_crate::Column::Name.eq(crate_name))
610            .one(&txn)
611            .await?
612            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
613            .id;
614
615        cratesio_meta::Entity::update_many()
616            .col_expr(
617                cratesio_meta::Column::Downloads,
618                Expr::col(cratesio_meta::Column::Downloads).add(count as i64),
619            )
620            .filter(
621                Cond::all()
622                    .add(cratesio_meta::Column::Version.eq(crate_version))
623                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
624            )
625            .exec(&txn)
626            .await?;
627
628        txn.commit().await?;
629        Ok(())
630    }
631
632    async fn get_last_updated_crate(&self) -> DbResult<Option<(OriginalName, Version)>> {
633        let krate = krate::Entity::find()
634            .order_by_desc(krate::Column::LastUpdated)
635            .one(&self.db_con)
636            .await?;
637
638        if let Some(krate) = krate {
639            // SAFETY: Unchecked is ok, as only valid crate names are inserted into the database
640            let name = OriginalName::from_unchecked(krate.original_name);
641            // SAFETY: Unchecked is ok, as only valid versions are inserted into the database
642            let version = Version::from_unchecked_str(&krate.max_version);
643            Ok(Some((name, version)))
644        } else {
645            Ok(None)
646        }
647    }
648
649    async fn validate_session(&self, session_token: &str) -> DbResult<(String, bool)> {
650        let u = user::Entity::find()
651            .join(JoinType::InnerJoin, user::Relation::Session.def())
652            .filter(session::Column::Token.eq(session_token))
653            .one(&self.db_con)
654            .await?
655            .ok_or(DbError::SessionNotFound)?;
656
657        Ok((u.name, u.is_admin))
658    }
659
660    async fn add_session_token(&self, name: &str, session_token: &str) -> DbResult<()> {
661        let user = self.get_user(name).await?;
662        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
663
664        let s = session::ActiveModel {
665            token: Set(session_token.to_owned()),
666            created: Set(created),
667            user_fk: Set(user.id as i64),
668            ..Default::default()
669        };
670
671        s.insert(&self.db_con).await?;
672        Ok(())
673    }
674
675    async fn add_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
676        let crate_fk = self.get_krate_model(crate_name).await?.id;
677        let user_fk = self.get_user_model(user).await?.id;
678        self.add_crate_user_impl(crate_fk, user_fk).await
679    }
680
681    async fn add_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
682        let crate_fk = self.get_krate_model(crate_name).await?.id;
683        let group_fk = self.get_group_model(group).await?.id;
684        self.add_crate_group_impl(crate_fk, group_fk).await
685    }
686
687    async fn add_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
688        let group_fk = self.get_group_model(group_name).await?.id;
689        let user_fk = self.get_user_model(user).await?.id;
690        self.add_group_user_impl(group_fk, user_fk).await
691    }
692
693    async fn add_owner(&self, crate_name: &NormalizedName, owner: &str) -> DbResult<()> {
694        let crate_fk = self.get_krate_model(crate_name).await?.id;
695        let user_fk = self.get_user_model(owner).await?.id;
696        self.add_owner_impl(crate_fk, user_fk).await
697    }
698
699    async fn is_download_restricted(&self, crate_name: &NormalizedName) -> DbResult<bool> {
700        Ok(krate::Entity::find()
701            .filter(krate::Column::Name.eq(crate_name))
702            .one(&self.db_con)
703            .await?
704            .is_some_and(|model| model.restricted_download))
705    }
706
707    async fn change_download_restricted(
708        &self,
709        crate_name: &NormalizedName,
710        restricted: bool,
711    ) -> DbResult<()> {
712        let mut krate: krate::ActiveModel = self.get_krate_model(crate_name).await?.into();
713        krate.restricted_download = Set(restricted);
714        krate.update(&self.db_con).await?;
715        Ok(())
716    }
717
718    async fn is_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
719        Ok(self
720            .get_crate_user_by_crate_and_user(crate_name, user)
721            .await?
722            .is_some())
723    }
724
725    async fn is_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<bool> {
726        Ok(self
727            .get_crate_group_by_name_and_group(crate_name, group)
728            .await?
729            .is_some())
730    }
731
732    async fn is_crate_group_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
733        let user = user::Entity::find()
734            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
735            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
736            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
737            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
738            .filter(
739                Cond::all()
740                    .add(krate::Column::Name.eq(crate_name))
741                    .add(user::Column::Name.eq(user)),
742            )
743            .one(&self.db_con)
744            .await?;
745        Ok(user.is_some())
746    }
747
748    async fn is_group_user(&self, group_name: &str, user: &str) -> DbResult<bool> {
749        Ok(self
750            .get_group_user_by_group_and_user(group_name, user)
751            .await?
752            .is_some())
753    }
754
755    async fn is_owner(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
756        let owner = owner::Entity::find()
757            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
758            .join(JoinType::InnerJoin, owner::Relation::User.def())
759            .filter(
760                Cond::all()
761                    .add(krate::Column::Name.eq(crate_name))
762                    .add(user::Column::Name.eq(user)),
763            )
764            .one(&self.db_con)
765            .await?;
766
767        Ok(owner.is_some())
768    }
769
770    async fn get_crate_id(&self, crate_name: &NormalizedName) -> DbResult<Option<i64>> {
771        let id = krate::Entity::find()
772            .filter(krate::Column::Name.eq(crate_name))
773            .one(&self.db_con)
774            .await?
775            .map(|model| model.id);
776
777        Ok(id)
778    }
779
780    async fn get_crate_owners(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
781        let u = user::Entity::find()
782            .join(JoinType::InnerJoin, user::Relation::Owner.def())
783            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
784            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
785            .all(&self.db_con)
786            .await?;
787
788        Ok(u.into_iter().map(User::from).collect())
789    }
790
791    async fn get_crate_users(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
792        let u = user::Entity::find()
793            .join(JoinType::InnerJoin, user::Relation::CrateUser.def())
794            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
795            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
796            .all(&self.db_con)
797            .await?;
798
799        Ok(u.into_iter().map(User::from).collect())
800    }
801
802    async fn get_crate_groups(&self, crate_name: &NormalizedName) -> DbResult<Vec<Group>> {
803        let u = group::Entity::find()
804            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
805            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
806            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
807            .all(&self.db_con)
808            .await?;
809
810        Ok(u.into_iter().map(Group::from).collect())
811    }
812
813    async fn get_group_users(&self, group_name: &str) -> DbResult<Vec<User>> {
814        let u = user::Entity::find()
815            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
816            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
817            .filter(Expr::col((GroupIden::Table, group::Column::Name)).eq(group_name))
818            .all(&self.db_con)
819            .await?;
820
821        Ok(u.into_iter().map(User::from).collect())
822    }
823
824    async fn get_crate_versions(&self, crate_name: &NormalizedName) -> DbResult<Vec<Version>> {
825        let u = crate_meta::Entity::find()
826            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
827            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
828            .all(&self.db_con)
829            .await?;
830
831        Ok(u.into_iter()
832            .map(|meta| Version::from_unchecked_str(&meta.version))
833            .collect())
834    }
835
836    async fn delete_session_token(&self, session_token: &str) -> DbResult<()> {
837        if let Some(s) = session::Entity::find()
838            .filter(session::Column::Token.eq(session_token))
839            .one(&self.db_con)
840            .await?
841        {
842            s.delete(&self.db_con).await?;
843        }
844
845        Ok(())
846    }
847
848    async fn delete_user(&self, user_name: &str) -> DbResult<()> {
849        self.get_user_model(user_name)
850            .await?
851            .delete(&self.db_con)
852            .await?;
853        Ok(())
854    }
855
856    async fn delete_group(&self, group_name: &str) -> DbResult<()> {
857        self.get_group_model(group_name)
858            .await?
859            .delete(&self.db_con)
860            .await?;
861        Ok(())
862    }
863
864    async fn change_pwd(&self, user_name: &str, new_pwd: &str) -> DbResult<()> {
865        let salt = generate_salt();
866        let hashed = hash_pwd(new_pwd, &salt);
867
868        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
869        u.pwd = Set(hashed);
870        u.salt = Set(salt);
871
872        u.update(&self.db_con).await?;
873        Ok(())
874    }
875
876    async fn change_read_only_state(&self, user_name: &str, state: bool) -> DbResult<()> {
877        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
878        u.is_read_only = Set(state);
879
880        u.update(&self.db_con).await?;
881        Ok(())
882    }
883
884    async fn change_admin_state(&self, user_name: &str, state: bool) -> DbResult<()> {
885        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
886        u.is_admin = Set(state);
887
888        u.update(&self.db_con).await?;
889        Ok(())
890    }
891
892    async fn crate_version_exists(&self, crate_id: i64, version: &str) -> DbResult<bool> {
893        let cm = crate_meta::Entity::find()
894            .filter(
895                Cond::all()
896                    .add(crate_meta::Column::CrateFk.eq(crate_id))
897                    .add(crate_meta::Column::Version.eq(version)),
898            )
899            .one(&self.db_con)
900            .await?;
901
902        Ok(cm.is_some())
903    }
904
905    async fn get_max_version_from_id(&self, crate_id: i64) -> DbResult<Version> {
906        operations::get_max_version_from_id(&self.db_con, crate_id).await
907    }
908
909    async fn get_max_version_from_name(&self, crate_name: &NormalizedName) -> DbResult<Version> {
910        let k = self.get_krate_model(crate_name).await?;
911        let v = Version::try_from(&k.max_version)
912            .map_err(|_| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
913        Ok(v)
914    }
915
916    async fn update_max_version(&self, crate_id: i64, version: &Version) -> DbResult<()> {
917        let krate = krate::Entity::find_by_id(crate_id)
918            .one(&self.db_con)
919            .await?
920            .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
921
922        let mut k: krate::ActiveModel = krate.into();
923        k.max_version = Set(version.to_string());
924        k.update(&self.db_con).await?;
925
926        Ok(())
927    }
928
929    async fn add_auth_token(&self, name: &str, token: &str, user: &str) -> DbResult<()> {
930        let hashed_token = hash_token(token);
931        let user = self.get_user_model(user).await?;
932
933        let at = auth_token::ActiveModel {
934            name: Set(name.to_owned()),
935            token: Set(hashed_token),
936            user_fk: Set(user.id),
937            ..Default::default()
938        };
939
940        at.insert(&self.db_con).await?;
941
942        Ok(())
943    }
944
945    async fn get_user_from_token(&self, token: &str) -> DbResult<User> {
946        let token = hash_token(token);
947
948        let u = user::Entity::find()
949            .join(JoinType::InnerJoin, user::Relation::AuthToken.def())
950            .filter(Expr::col((AuthTokenIden::Table, AuthTokenIden::Token)).eq(token))
951            .one(&self.db_con)
952            .await?
953            .ok_or(DbError::TokenNotFound)?;
954
955        Ok(User::from(u))
956    }
957
958    async fn get_user(&self, name: &str) -> DbResult<User> {
959        Ok(User::from(self.get_user_model(name).await?))
960    }
961
962    async fn get_group(&self, name: &str) -> DbResult<Group> {
963        Ok(Group::from(self.get_group_model(name).await?))
964    }
965
966    async fn get_auth_tokens(&self, user_name: &str) -> DbResult<Vec<AuthToken>> {
967        let at: Vec<auth_token::Model> = auth_token::Entity::find()
968            .join(JoinType::InnerJoin, auth_token::Relation::User.def())
969            .filter(user::Column::Name.eq(user_name))
970            .all(&self.db_con)
971            .await?;
972
973        Ok(at.into_iter().map(AuthToken::from).collect())
974    }
975
976    async fn delete_auth_token(&self, id: i32) -> DbResult<()> {
977        auth_token::Entity::delete_by_id(id as i64)
978            .exec(&self.db_con)
979            .await?;
980        Ok(())
981    }
982
983    async fn delete_owner(&self, crate_name: &str, owner: &str) -> DbResult<()> {
984        let model = self.get_owner_by_crate_and_user(crate_name, owner).await?;
985        self.delete_or_not_found(model, DbError::OwnerNotFound(owner.to_string()))
986            .await
987    }
988
989    async fn delete_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
990        let model = self
991            .get_crate_user_by_crate_and_user(crate_name, user)
992            .await?;
993        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
994            .await
995    }
996
997    async fn delete_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
998        let model = self
999            .get_crate_group_by_name_and_group(crate_name, group)
1000            .await?;
1001        self.delete_or_not_found(model, DbError::GroupNotFound(group.to_string()))
1002            .await
1003    }
1004
1005    async fn delete_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
1006        let model = self
1007            .get_group_user_by_group_and_user(group_name, user)
1008            .await?;
1009        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
1010            .await
1011    }
1012
1013    async fn add_user(
1014        &self,
1015        name: &str,
1016        pwd: &str,
1017        salt: &str,
1018        is_admin: bool,
1019        is_read_only: bool,
1020    ) -> DbResult<()> {
1021        let hashed_pwd = hash_pwd(pwd, salt);
1022        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1023
1024        let u = user::ActiveModel {
1025            name: Set(name.to_owned()),
1026            pwd: Set(hashed_pwd),
1027            salt: Set(salt.to_owned()),
1028            is_admin: Set(is_admin),
1029            is_read_only: Set(is_read_only),
1030            created: Set(created),
1031            ..Default::default()
1032        };
1033
1034        u.insert(&self.db_con).await?;
1035        Ok(())
1036    }
1037
1038    async fn add_group(&self, name: &str) -> DbResult<()> {
1039        let g = group::ActiveModel {
1040            name: Set(name.to_owned()),
1041            ..Default::default()
1042        };
1043
1044        g.insert(&self.db_con).await?;
1045        Ok(())
1046    }
1047
1048    async fn get_users(&self) -> DbResult<Vec<User>> {
1049        let users = user::Entity::find()
1050            .order_by_asc(user::Column::Name)
1051            .all(&self.db_con)
1052            .await?;
1053
1054        Ok(users.into_iter().map(User::from).collect())
1055    }
1056
1057    async fn get_groups(&self) -> DbResult<Vec<Group>> {
1058        let groups = group::Entity::find()
1059            .order_by_asc(group::Column::Name)
1060            .all(&self.db_con)
1061            .await?;
1062
1063        Ok(groups.into_iter().map(Group::from).collect())
1064    }
1065
1066    async fn get_total_unique_crates(&self) -> DbResult<u64> {
1067        self.count(
1068            CrateIden::Table,
1069            CrateIden::Id,
1070            DbError::FailedToCountCrates,
1071        )
1072        .await
1073    }
1074
1075    async fn get_total_crate_versions(&self) -> DbResult<u64> {
1076        self.count(
1077            CrateMetaIden::Table,
1078            CrateMetaIden::Id,
1079            DbError::FailedToCountCrateVersions,
1080        )
1081        .await
1082    }
1083
1084    async fn get_total_downloads(&self) -> DbResult<u64> {
1085        #[derive(FromQueryResult)]
1086        struct Model {
1087            total_downloads: i64,
1088        }
1089
1090        Ok(krate::Entity::find()
1091            .select_only()
1092            .column(krate::Column::TotalDownloads)
1093            .into_model::<Model>()
1094            .all(&self.db_con)
1095            .await?
1096            .into_iter()
1097            .map(|m| m.total_downloads as u64)
1098            .sum())
1099    }
1100
1101    async fn get_top_crates_downloads(&self, top: u64) -> DbResult<Vec<(String, u64)>> {
1102        #[derive(Debug, PartialEq, FromQueryResult)]
1103        struct SelectResult {
1104            original_name: String,
1105            total_downloads: i64,
1106        }
1107
1108        let stmt = self.db_con.get_database_backend().build(
1109            Query::select()
1110                .columns(vec![CrateIden::OriginalName, CrateIden::TotalDownloads])
1111                .from(CrateIden::Table)
1112                .order_by(CrateIden::TotalDownloads, Order::Desc)
1113                .limit(top),
1114        );
1115
1116        Ok(SelectResult::find_by_statement(stmt)
1117            .all(&self.db_con)
1118            .await?
1119            .into_iter()
1120            .map(|x| (x.original_name, x.total_downloads as u64))
1121            .collect())
1122    }
1123
1124    async fn get_crate_summaries(&self) -> DbResult<Vec<CrateSummary>> {
1125        let krates = krate::Entity::find()
1126            .order_by(krate::Column::Name, Order::Asc)
1127            .all(&self.db_con)
1128            .await?;
1129
1130        Ok(krates.into_iter().map(CrateSummary::from).collect())
1131    }
1132
1133    async fn add_doc_queue(
1134        &self,
1135        krate: &NormalizedName,
1136        version: &Version,
1137        path: &Path,
1138    ) -> DbResult<()> {
1139        let s = doc_queue::ActiveModel {
1140            krate: Set(krate.to_string()),
1141            version: Set(version.to_string()),
1142            // FIXME: Convert Path to String properly, handle errors
1143            path: Set(path.to_string_lossy().to_string()),
1144            ..Default::default()
1145        };
1146
1147        s.insert(&self.db_con).await?;
1148        Ok(())
1149    }
1150
1151    async fn delete_doc_queue(&self, id: i64) -> DbResult<()> {
1152        DocQueue::delete_by_id(id).exec(&self.db_con).await?;
1153        Ok(())
1154    }
1155
1156    async fn get_doc_queue(&self) -> DbResult<Vec<DocQueueEntry>> {
1157        let entities = DocQueue::find().all(&self.db_con).await?;
1158
1159        Ok(entities.into_iter().map(DocQueueEntry::from).collect())
1160    }
1161
1162    async fn delete_crate(&self, krate: &NormalizedName, version: &Version) -> DbResult<()> {
1163        let txn = self.db_con.begin().await?;
1164
1165        // Delete the entry from the "crate_meta" table
1166        let crate_meta_version = crate_meta::Entity::find()
1167            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1168            .filter(krate::Column::Name.eq(krate))
1169            .filter(crate_meta::Column::Version.eq(version))
1170            .one(&txn)
1171            .await?
1172            .ok_or_else(|| DbError::CrateMetaNotFound(krate.to_string(), version.to_string()))?;
1173        let crate_id = crate_meta_version.crate_fk;
1174        let current_max_version = operations::get_max_version_from_id(&txn, crate_id).await?;
1175        crate_meta_version.delete(&txn).await?;
1176
1177        // Delete the crate index entry from "crate_index" table
1178        let crate_index_version = crate_index::Entity::find()
1179            .join(JoinType::InnerJoin, crate_index::Relation::Krate.def())
1180            .filter(krate::Column::Name.eq(krate))
1181            .filter(crate_index::Column::Vers.eq(version))
1182            .one(&txn)
1183            .await?
1184            .ok_or_else(|| DbError::CrateIndexNotFound(krate.to_string(), version.to_string()))?;
1185        crate_index_version.delete(&txn).await?;
1186
1187        // If it was the last entry in the "crate_meta" table, delete the entry
1188        // in the "crate" table as well
1189        let crate_meta_rows = crate_meta::Entity::find()
1190            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1191            .filter(krate::Column::Name.eq(krate))
1192            .all(&txn)
1193            .await?;
1194
1195        if crate_meta_rows.is_empty() {
1196            krate::Entity::delete_many()
1197                .filter(krate::Column::Name.eq(krate))
1198                .exec(&txn)
1199                .await?;
1200        } else {
1201            let c = krate::Entity::find_by_id(crate_id)
1202                .one(&txn)
1203                .await?
1204                .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
1205            let mut c: krate::ActiveModel = c.into();
1206
1207            // Update the max. version if the deleted version was the max. version.
1208            if version == &current_max_version {
1209                let new_max_version = crate_meta_rows
1210                    .into_iter()
1211                    .map(|cm| Version::from_unchecked_str(&cm.version))
1212                    .max()
1213                    .unwrap(); // Safe to unwrap, as crate_meta_rows is not empty
1214                c.max_version = Set(new_max_version.into_inner());
1215            }
1216            // Update the ETag value of the crate index.
1217            let etag = operations::compute_etag(&txn, krate, crate_id).await?;
1218            c.e_tag = Set(etag);
1219            c.update(&txn).await?;
1220        }
1221
1222        txn.commit().await?;
1223
1224        Ok(())
1225    }
1226
1227    async fn get_crate_meta_list(&self, crate_name: &NormalizedName) -> DbResult<Vec<CrateMeta>> {
1228        let crate_meta = crate_meta::Entity::find()
1229            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1230            .filter(krate::Column::Name.eq(crate_name))
1231            .all(&self.db_con)
1232            .await?;
1233
1234        let crate_meta = crate_meta
1235            .into_iter()
1236            .map(|cm| CrateMeta {
1237                name: crate_name.to_string(),
1238                id: cm.id,
1239                version: cm.version,
1240                created: cm.created,
1241                downloads: cm.downloads,
1242                crate_fk: cm.crate_fk,
1243            })
1244            .collect();
1245
1246        Ok(crate_meta)
1247    }
1248
1249    async fn update_last_updated(&self, id: i64, last_updated: &DateTime<Utc>) -> DbResult<()> {
1250        let krate = krate::Entity::find_by_id(id)
1251            .one(&self.db_con)
1252            .await?
1253            .ok_or(DbError::CrateNotFoundWithId(id))?;
1254
1255        let date = last_updated.format(DB_DATE_FORMAT).to_string();
1256
1257        let mut krate: krate::ActiveModel = krate.into();
1258        krate.last_updated = Set(date);
1259        krate.update(&self.db_con).await?;
1260
1261        Ok(())
1262    }
1263
1264    async fn search_in_crate_name(
1265        &self,
1266        contains: &str,
1267        cache: bool,
1268    ) -> DbResult<Vec<CrateOverview>> {
1269        self.query_crates(Some(contains), None, cache).await
1270    }
1271
1272    async fn get_crate_overview_list(
1273        &self,
1274        limit: u64,
1275        offset: u64,
1276        cache: bool,
1277    ) -> DbResult<Vec<CrateOverview>> {
1278        self.query_crates(None, Some((limit, offset)), cache).await
1279    }
1280
1281    async fn get_crate_data(&self, crate_name: &NormalizedName) -> DbResult<CrateData> {
1282        let krate = self.get_krate_model(crate_name).await?;
1283
1284        let owners: Vec<String> = krate
1285            .find_related(owner::Entity)
1286            .find_also_related(user::Entity)
1287            .all(&self.db_con)
1288            .await?
1289            .into_iter()
1290            .filter_map(|(_, v)| v.map(|v| v.name))
1291            .collect();
1292        let categories: Vec<String> = krate
1293            .find_related(crate_category_to_crate::Entity)
1294            .find_also_related(crate_category::Entity)
1295            .all(&self.db_con)
1296            .await?
1297            .into_iter()
1298            .filter_map(|(_, v)| v.map(|v| v.category))
1299            .collect();
1300        let keywords: Vec<String> = krate
1301            .find_related(crate_keyword_to_crate::Entity)
1302            .find_also_related(crate_keyword::Entity)
1303            .all(&self.db_con)
1304            .await?
1305            .into_iter()
1306            .filter_map(|(_, v)| v.map(|v| v.keyword))
1307            .collect();
1308        let authors: Vec<String> = krate
1309            .find_related(crate_author_to_crate::Entity)
1310            .find_also_related(crate_author::Entity)
1311            .all(&self.db_con)
1312            .await?
1313            .into_iter()
1314            .filter_map(|(_, v)| v.map(|v| v.author))
1315            .collect();
1316        let crate_metas = krate
1317            .find_related(crate_meta::Entity)
1318            .all(&self.db_con)
1319            .await?;
1320        let crate_indices = krate
1321            .find_related(crate_index::Entity)
1322            .all(&self.db_con)
1323            .await?;
1324
1325        let mut versions = Vec::new();
1326        for cm in crate_metas {
1327            let ci = crate_indices
1328                .iter()
1329                .find(|ci| ci.vers == cm.version)
1330                .ok_or_else(|| {
1331                    DbError::CrateIndexNotFound(krate.name.clone(), cm.version.clone())
1332                })?;
1333            let dependencies: Vec<CrateRegistryDep> = match ci.deps.clone() {
1334                Some(deps) => {
1335                    let ix = serde_json::from_value::<Vec<IndexDep>>(deps)
1336                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1337
1338                    let mut ft = Vec::new();
1339                    for dep in ix {
1340                        ft.push(CrateRegistryDep::from_index(
1341                            operations::get_desc_for_crate_dep(
1342                                &self.db_con,
1343                                &dep.name,
1344                                dep.registry.as_deref(),
1345                            )
1346                            .await?,
1347                            dep,
1348                        ));
1349                    }
1350
1351                    ft
1352                }
1353                None => Vec::default(),
1354            };
1355            let features: BTreeMap<String, Vec<String>> = match ci.features.clone() {
1356                Some(features) => serde_json::from_value::<BTreeMap<String, Vec<String>>>(features)
1357                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?,
1358                None => BTreeMap::default(),
1359            };
1360
1361            versions.push(CrateVersionData {
1362                version: cm.version,
1363                created: cm.created,
1364                downloads: cm.downloads,
1365                readme: cm.readme,
1366                license: cm.license,
1367                license_file: cm.license_file,
1368                documentation: cm.documentation,
1369                dependencies,
1370                checksum: ci.cksum.clone(),
1371                features,
1372                yanked: ci.yanked,
1373                links: ci.links.clone(),
1374                v: ci.v,
1375            });
1376        }
1377        versions.sort_by(|a, b| {
1378            Version::from_unchecked_str(&b.version).cmp(&Version::from_unchecked_str(&a.version))
1379        });
1380
1381        let crate_data = CrateData {
1382            name: krate.original_name,
1383            owners,
1384            max_version: krate.max_version,
1385            total_downloads: krate.total_downloads,
1386            last_updated: krate.last_updated,
1387            homepage: krate.homepage,
1388            description: krate.description,
1389            repository: krate.repository,
1390            categories,
1391            keywords,
1392            authors,
1393            versions,
1394        };
1395
1396        Ok(crate_data)
1397    }
1398
1399    async fn add_empty_crate(&self, name: &str, created: &DateTime<Utc>) -> DbResult<i64> {
1400        let created = created.format(DB_DATE_FORMAT).to_string();
1401        let normalized_name = NormalizedName::from(
1402            OriginalName::try_from(name)
1403                .map_err(|_| DbError::InvalidCrateName(name.to_string()))?,
1404        );
1405        let krate = krate::ActiveModel {
1406            id: ActiveValue::default(),
1407            name: Set(normalized_name.to_string()),
1408            original_name: Set(name.to_string()),
1409            max_version: Set("0.0.0".to_string()),
1410            last_updated: Set(created),
1411            total_downloads: Set(0),
1412            homepage: Set(None),
1413            description: Set(None),
1414            repository: Set(None),
1415            e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1416            restricted_download: Set(false),
1417        };
1418        Ok(krate.insert(&self.db_con).await?.id)
1419    }
1420
1421    async fn add_crate(
1422        &self,
1423        pub_metadata: &PublishMetadata,
1424        cksum: &str,
1425        created: &DateTime<Utc>,
1426        owner: &str,
1427    ) -> DbResult<i64> {
1428        let created = created.format(DB_DATE_FORMAT).to_string();
1429        let normalized_name = NormalizedName::from(
1430            OriginalName::try_from(&pub_metadata.name)
1431                .map_err(|_| DbError::InvalidCrateName(pub_metadata.name.clone()))?,
1432        );
1433
1434        let existing = krate::Entity::find()
1435            .filter(krate::Column::Name.eq(&pub_metadata.name))
1436            .one(&self.db_con)
1437            .await?;
1438
1439        let txn = self.db_con.begin().await?;
1440
1441        let crate_id = if let Some(krate) = existing {
1442            let krate_id = krate.id;
1443            let max_version = max(
1444                parse_db_version(&krate.max_version)?,
1445                parse_db_version(&pub_metadata.vers)?,
1446            );
1447
1448            let mut krate: krate::ActiveModel = krate.into();
1449            krate.last_updated = Set(created.clone());
1450            krate.max_version = Set(max_version.into_inner());
1451            krate.homepage = Set(pub_metadata.homepage.clone());
1452            krate.description = Set(pub_metadata.description.clone());
1453            krate.repository = Set(pub_metadata.repository.clone());
1454            krate.e_tag = Set(String::new()); // Set to empty string, as it can be computed, when the crate index is inserted
1455            krate.update(&txn).await?;
1456            krate_id
1457        } else {
1458            let krate = krate::ActiveModel {
1459                id: ActiveValue::default(),
1460                name: Set(normalized_name.to_string()),
1461                original_name: Set(pub_metadata.name.clone()),
1462                max_version: Set(pub_metadata.vers.clone()),
1463                last_updated: Set(created.clone()),
1464                total_downloads: Set(0),
1465                homepage: Set(pub_metadata.homepage.clone()),
1466                description: Set(pub_metadata.description.clone()),
1467                repository: Set(pub_metadata.repository.clone()),
1468                e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1469                restricted_download: Set(false),
1470            };
1471            let krate = krate.insert(&txn).await?;
1472            krate.id
1473        };
1474
1475        operations::add_owner_if_not_exists(&txn, owner, crate_id).await?;
1476        operations::add_crate_metadata(&txn, pub_metadata, &created, crate_id).await?;
1477        operations::add_crate_index(&txn, pub_metadata, cksum, crate_id).await?;
1478        operations::update_etag(&txn, &pub_metadata.name, crate_id).await?;
1479        operations::update_crate_categories(&txn, pub_metadata, crate_id).await?;
1480        operations::update_crate_keywords(&txn, pub_metadata, crate_id).await?;
1481        operations::update_crate_authors(&txn, pub_metadata, crate_id).await?;
1482
1483        txn.commit().await?;
1484        Ok(crate_id)
1485    }
1486
1487    async fn update_docs_link(
1488        &self,
1489        crate_name: &NormalizedName,
1490        version: &Version,
1491        docs_link: &str,
1492    ) -> DbResult<()> {
1493        let (cm, _c) = crate_meta::Entity::find()
1494            .find_also_related(krate::Entity)
1495            .filter(
1496                Cond::all()
1497                    .add(krate::Column::Name.eq(crate_name))
1498                    .add(crate_meta::Column::Version.eq(version)),
1499            )
1500            .one(&self.db_con)
1501            .await?
1502            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
1503
1504        let mut cm: crate_meta::ActiveModel = cm.into();
1505        cm.documentation = Set(Some(docs_link.to_string()));
1506        cm.update(&self.db_con).await?;
1507        Ok(())
1508    }
1509
1510    async fn add_crate_metadata(
1511        &self,
1512        pub_metadata: &PublishMetadata,
1513        created: &str,
1514        crate_id: i64,
1515    ) -> DbResult<()> {
1516        operations::add_crate_metadata(&self.db_con, pub_metadata, created, crate_id).await
1517    }
1518
1519    async fn get_prefetch_data(&self, crate_name: &str) -> DbResult<Prefetch> {
1520        let mut krate = krate::Entity::find()
1521            .filter(krate::Column::Name.eq(crate_name))
1522            .find_with_related(crate_index::Entity)
1523            .all(&self.db_con)
1524            .await?
1525            .into_iter();
1526
1527        // Expecting only one crate with its related indices
1528        let (Some((krate, crate_indices)), None) = (krate.next(), krate.next()) else {
1529            return Err(DbError::CrateNotFound(crate_name.to_string()));
1530        };
1531
1532        let index_metadata =
1533            operations::crate_index_model_to_index_metadata(crate_name, crate_indices)?;
1534        let data = operations::index_metadata_to_bytes(&index_metadata)?;
1535
1536        Ok(Prefetch {
1537            data,
1538            etag: krate.e_tag,
1539            last_modified: krate.last_updated,
1540        })
1541    }
1542
1543    async fn is_cratesio_cache_up_to_date(
1544        &self,
1545        crate_name: &NormalizedName,
1546        etag: Option<String>,
1547        last_modified: Option<String>,
1548    ) -> DbResult<PrefetchState> {
1549        let Some(krate) = cratesio_crate::Entity::find()
1550            .filter(cratesio_crate::Column::Name.eq(crate_name))
1551            .one(&self.db_con)
1552            .await?
1553        else {
1554            return Ok(PrefetchState::NotFound);
1555        };
1556
1557        let needs_update = match (etag, last_modified) {
1558            (Some(etag), Some(last_modified)) => {
1559                krate.e_tag != etag || krate.last_modified != last_modified
1560            }
1561            (Some(etag), None) => krate.e_tag != etag,
1562            (None, Some(last_modified)) => krate.last_modified != last_modified,
1563            (None, None) => true,
1564        };
1565
1566        if !needs_update {
1567            Ok(PrefetchState::UpToDate)
1568        } else {
1569            let crate_indices = krate
1570                .find_related(cratesio_index::Entity)
1571                .all(&self.db_con)
1572                .await?;
1573            let index_metadata =
1574                operations::cratesio_index_model_to_index_metadata(crate_name, crate_indices)?;
1575            let data = operations::index_metadata_to_bytes(&index_metadata)?;
1576
1577            Ok(PrefetchState::NeedsUpdate(Prefetch {
1578                data,
1579                etag: krate.e_tag.clone(),
1580                last_modified: krate.last_modified,
1581            }))
1582        }
1583    }
1584
1585    async fn add_cratesio_prefetch_data(
1586        &self,
1587        crate_name: &OriginalName,
1588        etag: &str,
1589        last_modified: &str,
1590        description: Option<String>,
1591        indices: &[IndexMetadata],
1592    ) -> DbResult<Prefetch> {
1593        let normalized_name = crate_name.to_normalized();
1594
1595        let max_version = indices
1596            .iter()
1597            .map(|i| Version::from_unchecked_str(&i.vers))
1598            .max()
1599            .ok_or_else(|| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
1600
1601        // Use a transaction to ensure the etag update and all index inserts are
1602        // atomic. Without this, a partial failure would leave the crate record
1603        // with the latest etag but incomplete index data, causing stale cache
1604        // entries that can never self-heal (the background updater would get 304
1605        // from crates.io because the etag matches).
1606        let txn = self.db_con.begin().await?;
1607
1608        let krate = cratesio_crate::Entity::find()
1609            .filter(cratesio_crate::Column::Name.eq(&normalized_name))
1610            .one(&txn)
1611            .await?;
1612
1613        let krate = if let Some(krate) = krate {
1614            let mut krate: cratesio_crate::ActiveModel = krate.into();
1615            krate.e_tag = Set(etag.to_string());
1616            krate.last_modified = Set(last_modified.to_string());
1617            krate.max_version = Set(max_version.into_inner());
1618            krate.update(&txn).await?
1619        } else {
1620            let krate = cratesio_crate::ActiveModel {
1621                id: ActiveValue::default(),
1622                name: Set(normalized_name.to_string()),
1623                original_name: Set(crate_name.to_string()),
1624                description: Set(description),
1625                e_tag: Set(etag.to_string()),
1626                last_modified: Set(last_modified.to_string()),
1627                total_downloads: Set(0),
1628                max_version: Set(max_version.to_string()),
1629            };
1630            krate.insert(&txn).await?
1631        };
1632
1633        let current_indices = cratesio_index::Entity::find()
1634            .filter(cratesio_index::Column::CratesIoFk.eq(krate.id))
1635            .all(&txn)
1636            .await?;
1637
1638        for index in indices {
1639            // Check if the version was yanked or un-yanked and update if so.
1640            if let Some(current_index) = current_indices.iter().find(|ci| index.vers == ci.vers) {
1641                if index.yanked != current_index.yanked {
1642                    let mut ci: cratesio_index::ActiveModel = current_index.to_owned().into();
1643                    ci.yanked = Set(index.yanked);
1644                    ci.update(&txn).await?;
1645                }
1646            } else {
1647                let deps = if index.deps.is_empty() {
1648                    None
1649                } else {
1650                    let deps = serde_json::to_value(&index.deps)
1651                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1652                    Some(deps)
1653                };
1654
1655                let features = serde_json::to_value(&index.features)
1656                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1657
1658                let features2 = serde_json::to_value(&index.features2)
1659                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1660
1661                let new_index = cratesio_index::ActiveModel {
1662                    id: ActiveValue::default(),
1663                    name: Set(index.name.clone()),
1664                    vers: Set(index.vers.clone()),
1665                    deps: Set(deps),
1666                    cksum: Set(index.cksum.clone()),
1667                    features: Set(Some(features)),
1668                    features2: Set(Some(features2)),
1669                    yanked: Set(index.yanked),
1670                    links: Set(index.links.clone()),
1671                    pubtime: Set(index.pubtime.map(|dt| dt.naive_utc())),
1672                    v: Set(index.v.unwrap_or(1) as i32),
1673                    crates_io_fk: Set(krate.id),
1674                };
1675
1676                new_index.insert(&txn).await?;
1677
1678                // Add the meta data for the crate version.
1679                let meta = cratesio_meta::ActiveModel {
1680                    id: ActiveValue::default(),
1681                    version: Set(index.vers.clone()),
1682                    downloads: Set(0),
1683                    crates_io_fk: Set(krate.id),
1684                    documentation: Set(Some(format!(
1685                        "https://docs.rs/{normalized_name}/{}",
1686                        index.vers,
1687                    ))),
1688                };
1689
1690                meta.insert(&txn).await?;
1691            }
1692        }
1693
1694        txn.commit().await?;
1695
1696        Ok(Prefetch {
1697            data: operations::index_metadata_to_bytes(indices)?,
1698            etag: etag.to_string(),
1699            last_modified: last_modified.to_string(),
1700        })
1701    }
1702
1703    async fn get_cratesio_index_update_list(&self) -> DbResult<Vec<CratesioPrefetchMsg>> {
1704        let crates = cratesio_crate::Entity::find().all(&self.db_con).await?;
1705        let msgs = crates
1706            .into_iter()
1707            .map(|krate| {
1708                CratesioPrefetchMsg::Update(UpdateData {
1709                    name: OriginalName::from_unchecked(krate.original_name),
1710                    etag: Some(krate.e_tag),
1711                    last_modified: Some(krate.last_modified),
1712                })
1713            })
1714            .collect();
1715        Ok(msgs)
1716    }
1717
1718    async fn unyank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1719        let mut ci: crate_index::ActiveModel = self
1720            .get_crate_index_model(crate_name, version)
1721            .await?
1722            .into();
1723        ci.yanked = Set(false);
1724        ci.save(&self.db_con).await?;
1725        Ok(())
1726    }
1727
1728    async fn yank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1729        let mut ci: crate_index::ActiveModel = self
1730            .get_crate_index_model(crate_name, version)
1731            .await?
1732            .into();
1733        ci.yanked = Set(true);
1734        ci.save(&self.db_con).await?;
1735        Ok(())
1736    }
1737
1738    async fn register_webhook(&self, webhook: Webhook) -> DbResult<String> {
1739        let w = webhook::ActiveModel {
1740            event: Set(Into::<&str>::into(webhook.event).to_string()),
1741            callback_url: Set(webhook.callback_url),
1742            name: Set(webhook.name),
1743            ..Default::default()
1744        };
1745
1746        let w: webhook::Model = w.insert(&self.db_con).await?;
1747        Ok(w.id.to_string())
1748    }
1749
1750    async fn delete_webhook(&self, id: &str) -> DbResult<()> {
1751        self.get_webhook_model(id)
1752            .await?
1753            .delete(&self.db_con)
1754            .await?;
1755        Ok(())
1756    }
1757
1758    async fn get_webhook(&self, id: &str) -> DbResult<Webhook> {
1759        webhook_model_to_obj(self.get_webhook_model(id).await?)
1760    }
1761
1762    async fn get_all_webhooks(&self) -> DbResult<Vec<Webhook>> {
1763        Ok(webhook::Entity::find()
1764            .all(&self.db_con)
1765            .await?
1766            .into_iter()
1767            .filter_map(|w| webhook_model_to_obj(w).ok())
1768            .collect())
1769    }
1770
1771    async fn add_webhook_queue(
1772        &self,
1773        event: WebhookEvent,
1774        payload: serde_json::Value,
1775    ) -> DbResult<()> {
1776        let w = webhook::Entity::find()
1777            .filter(webhook::Column::Event.eq(Into::<&str>::into(event)))
1778            .all(&self.db_con)
1779            .await?;
1780
1781        if w.is_empty() {
1782            return Ok(());
1783        }
1784
1785        let now = Utc::now();
1786
1787        let entries = w.into_iter().map(|w| webhook_queue::ActiveModel {
1788            webhook_fk: Set(w.id),
1789            payload: Set(payload.clone()),
1790            next_attempt: Set(now.into()),
1791            last_attempt: Set(None),
1792            ..Default::default()
1793        });
1794
1795        webhook_queue::Entity::insert_many(entries)
1796            .exec(&self.db_con)
1797            .await?;
1798        Ok(())
1799    }
1800    async fn get_pending_webhook_queue_entries(
1801        &self,
1802        timestamp: DateTime<Utc>,
1803    ) -> DbResult<Vec<WebhookQueue>> {
1804        let w = webhook_queue::Entity::find()
1805            .find_with_related(webhook::Entity)
1806            .filter(webhook_queue::Column::NextAttempt.lte(timestamp))
1807            .all(&self.db_con)
1808            .await?;
1809
1810        Ok(w.into_iter()
1811            .filter_map(|w| {
1812                Some(WebhookQueue {
1813                    id: w.0.id.to_string(),
1814                    callback_url: w.1.first()?.callback_url.clone(),
1815                    payload: w.0.payload,
1816                    last_attempt: w.0.last_attempt.map(Into::into),
1817                    next_attempt: w.0.next_attempt.into(),
1818                })
1819            })
1820            .collect())
1821    }
1822
1823    async fn update_webhook_queue(
1824        &self,
1825        id: &str,
1826        last_attempt: DateTime<Utc>,
1827        next_attempt: DateTime<Utc>,
1828    ) -> DbResult<()> {
1829        let mut w: webhook_queue::ActiveModel = self.get_webhook_queue_model(id).await?.into();
1830        w.last_attempt = Set(Some(last_attempt.into()));
1831        w.next_attempt = Set(next_attempt.into());
1832        w.update(&self.db_con).await?;
1833        Ok(())
1834    }
1835
1836    async fn delete_webhook_queue(&self, id: &str) -> DbResult<()> {
1837        self.get_webhook_queue_model(id)
1838            .await?
1839            .delete(&self.db_con)
1840            .await?;
1841        Ok(())
1842    }
1843
1844    // OAuth2 identity methods
1845
1846    async fn get_user_by_oauth2_identity(
1847        &self,
1848        issuer: &str,
1849        subject: &str,
1850    ) -> DbResult<Option<User>> {
1851        let identity = oauth2_identity::Entity::find()
1852            .filter(oauth2_identity::Column::ProviderIssuer.eq(issuer))
1853            .filter(oauth2_identity::Column::Subject.eq(subject))
1854            .one(&self.db_con)
1855            .await?;
1856
1857        if let Some(identity) = identity {
1858            let u = user::Entity::find_by_id(identity.user_fk)
1859                .one(&self.db_con)
1860                .await?
1861                .ok_or_else(|| DbError::UserNotFound(format!("user_id={}", identity.user_fk)))?;
1862
1863            Ok(Some(User::from(u)))
1864        } else {
1865            Ok(None)
1866        }
1867    }
1868
1869    async fn create_oauth2_user(
1870        &self,
1871        username: &str,
1872        issuer: &str,
1873        subject: &str,
1874        email: Option<String>,
1875        is_admin: bool,
1876        is_read_only: bool,
1877    ) -> DbResult<User> {
1878        // Use a transaction to ensure atomicity
1879        let txn = self.db_con.begin().await?;
1880
1881        // Generate a random password and salt for OAuth2 users
1882        // They won't use password auth, but we need valid values
1883        let salt = generate_salt();
1884        let random_pwd = Uuid::new_v4().to_string();
1885        let hashed_pwd = hash_pwd(&random_pwd, &salt);
1886        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1887
1888        // Create the user
1889        let new_user = user::ActiveModel {
1890            name: Set(username.to_string()),
1891            pwd: Set(hashed_pwd),
1892            salt: Set(salt),
1893            is_admin: Set(is_admin),
1894            is_read_only: Set(is_read_only),
1895            created: Set(created.clone()),
1896            ..Default::default()
1897        };
1898
1899        let res = user::Entity::insert(new_user).exec(&txn).await?;
1900        let user_id = res.last_insert_id;
1901
1902        // Link the OAuth2 identity
1903        let identity = oauth2_identity::ActiveModel {
1904            user_fk: Set(user_id),
1905            provider_issuer: Set(issuer.to_string()),
1906            subject: Set(subject.to_string()),
1907            email: Set(email),
1908            created: Set(created.clone()),
1909            ..Default::default()
1910        };
1911
1912        oauth2_identity::Entity::insert(identity).exec(&txn).await?;
1913
1914        txn.commit().await?;
1915
1916        Ok(User {
1917            id: user_id as i32,
1918            name: username.to_string(),
1919            pwd: String::new(),  // Don't expose password hash
1920            salt: String::new(), // Don't expose salt
1921            is_admin,
1922            is_read_only,
1923            created,
1924        })
1925    }
1926
1927    async fn link_oauth2_identity(
1928        &self,
1929        user_id: i64,
1930        issuer: &str,
1931        subject: &str,
1932        email: Option<String>,
1933    ) -> DbResult<()> {
1934        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1935        let identity = oauth2_identity::ActiveModel {
1936            user_fk: Set(user_id),
1937            provider_issuer: Set(issuer.to_string()),
1938            subject: Set(subject.to_string()),
1939            email: Set(email),
1940            created: Set(created),
1941            ..Default::default()
1942        };
1943
1944        oauth2_identity::Entity::insert(identity)
1945            .exec(&self.db_con)
1946            .await?;
1947
1948        Ok(())
1949    }
1950
1951    // OAuth2 state methods (CSRF/PKCE during auth flow)
1952
1953    async fn store_oauth2_state(
1954        &self,
1955        state: &str,
1956        pkce_verifier: &str,
1957        nonce: &str,
1958    ) -> DbResult<()> {
1959        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1960        let s = oauth2_state::ActiveModel {
1961            state: Set(state.to_string()),
1962            pkce_verifier: Set(pkce_verifier.to_string()),
1963            nonce: Set(nonce.to_string()),
1964            created: Set(created),
1965            ..Default::default()
1966        };
1967
1968        oauth2_state::Entity::insert(s).exec(&self.db_con).await?;
1969        Ok(())
1970    }
1971
1972    async fn get_and_delete_oauth2_state(&self, state: &str) -> DbResult<Option<OAuth2StateData>> {
1973        let s = oauth2_state::Entity::find()
1974            .filter(oauth2_state::Column::State.eq(state))
1975            .one(&self.db_con)
1976            .await?;
1977
1978        if let Some(s) = s {
1979            let data = OAuth2StateData {
1980                state: s.state.clone(),
1981                pkce_verifier: s.pkce_verifier.clone(),
1982                nonce: s.nonce.clone(),
1983            };
1984
1985            // Delete after retrieving (single use)
1986            s.delete(&self.db_con).await?;
1987
1988            Ok(Some(data))
1989        } else {
1990            Ok(None)
1991        }
1992    }
1993
1994    async fn cleanup_expired_oauth2_states(&self) -> DbResult<u64> {
1995        // States older than 10 minutes are expired
1996        let expiry = Utc::now() - chrono::Duration::minutes(10);
1997        let expiry_str = expiry.format(DB_DATE_FORMAT).to_string();
1998
1999        let result = oauth2_state::Entity::delete_many()
2000            .filter(oauth2_state::Column::Created.lt(expiry_str))
2001            .exec(&self.db_con)
2002            .await?;
2003
2004        Ok(result.rows_affected)
2005    }
2006
2007    async fn is_username_available(&self, username: &str) -> DbResult<bool> {
2008        let existing = user::Entity::find()
2009            .filter(user::Column::Name.eq(username))
2010            .one(&self.db_con)
2011            .await?;
2012
2013        Ok(existing.is_none())
2014    }
2015
2016    // Toolchain distribution methods
2017
2018    async fn add_toolchain(
2019        &self,
2020        name: &str,
2021        version: &str,
2022        date: &str,
2023        channel: Option<String>,
2024    ) -> DbResult<i64> {
2025        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
2026
2027        let model = toolchain::ActiveModel {
2028            id: ActiveValue::NotSet,
2029            name: Set(name.to_string()),
2030            version: Set(version.to_string()),
2031            date: Set(date.to_string()),
2032            channel: Set(channel),
2033            created: Set(created),
2034        };
2035
2036        let result = model.insert(&self.db_con).await?;
2037        Ok(result.id)
2038    }
2039
2040    async fn add_toolchain_target(
2041        &self,
2042        toolchain_id: i64,
2043        target: &str,
2044        storage_path: &str,
2045        hash: &str,
2046        size: i64,
2047    ) -> DbResult<()> {
2048        let model = toolchain_target::ActiveModel {
2049            id: ActiveValue::NotSet,
2050            toolchain_fk: Set(toolchain_id),
2051            target: Set(target.to_string()),
2052            storage_path: Set(storage_path.to_string()),
2053            hash: Set(hash.to_string()),
2054            size: Set(size),
2055        };
2056
2057        model.insert(&self.db_con).await?;
2058        Ok(())
2059    }
2060
2061    async fn get_toolchain_by_channel(
2062        &self,
2063        channel: &str,
2064    ) -> DbResult<Option<ToolchainWithTargets>> {
2065        let tc = toolchain::Entity::find()
2066            .filter(toolchain::Column::Channel.eq(channel))
2067            .one(&self.db_con)
2068            .await?;
2069
2070        Ok(match tc {
2071            Some(tc) => Some(self.toolchain_with_targets(tc).await?),
2072            None => None,
2073        })
2074    }
2075
2076    async fn get_toolchain_by_version(
2077        &self,
2078        name: &str,
2079        version: &str,
2080    ) -> DbResult<Option<ToolchainWithTargets>> {
2081        Ok(
2082            match self.get_toolchain_by_name_version(name, version).await? {
2083                Some(tc) => Some(self.toolchain_with_targets(tc).await?),
2084                None => None,
2085            },
2086        )
2087    }
2088
2089    async fn list_toolchains(&self) -> DbResult<Vec<ToolchainWithTargets>> {
2090        let toolchains = toolchain::Entity::find()
2091            .order_by_desc(toolchain::Column::Created)
2092            .all(&self.db_con)
2093            .await?;
2094
2095        let mut result = Vec::with_capacity(toolchains.len());
2096
2097        for tc in toolchains {
2098            result.push(self.toolchain_with_targets(tc).await?);
2099        }
2100
2101        Ok(result)
2102    }
2103
2104    async fn delete_toolchain(&self, name: &str, version: &str) -> DbResult<()> {
2105        toolchain::Entity::delete_many()
2106            .filter(toolchain::Column::Name.eq(name))
2107            .filter(toolchain::Column::Version.eq(version))
2108            .exec(&self.db_con)
2109            .await?;
2110
2111        Ok(())
2112    }
2113
2114    async fn delete_toolchain_target(
2115        &self,
2116        name: &str,
2117        version: &str,
2118        target: &str,
2119    ) -> DbResult<()> {
2120        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2121            toolchain_target::Entity::delete_many()
2122                .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
2123                .filter(toolchain_target::Column::Target.eq(target))
2124                .exec(&self.db_con)
2125                .await?;
2126        }
2127
2128        Ok(())
2129    }
2130
2131    async fn set_channel(&self, channel: &str, name: &str, version: &str) -> DbResult<()> {
2132        // First, clear the channel from any other toolchain that has it
2133        let toolchains_with_channel = toolchain::Entity::find()
2134            .filter(toolchain::Column::Channel.eq(channel))
2135            .all(&self.db_con)
2136            .await?;
2137
2138        for tc in toolchains_with_channel {
2139            let mut model: toolchain::ActiveModel = tc.into();
2140            model.channel = Set(None);
2141            model.update(&self.db_con).await?;
2142        }
2143
2144        // Then set the channel on the target toolchain
2145        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2146            let mut model: toolchain::ActiveModel = tc.into();
2147            model.channel = Set(Some(channel.to_string()));
2148            model.update(&self.db_con).await?;
2149        }
2150
2151        Ok(())
2152    }
2153
2154    async fn get_channels(&self) -> DbResult<Vec<ChannelInfo>> {
2155        let toolchains = toolchain::Entity::find()
2156            .filter(toolchain::Column::Channel.is_not_null())
2157            .all(&self.db_con)
2158            .await?;
2159
2160        Ok(toolchains
2161            .into_iter()
2162            .filter_map(|tc| {
2163                tc.channel.map(|channel| ChannelInfo {
2164                    name: channel,
2165                    version: tc.version,
2166                    date: tc.date,
2167                })
2168            })
2169            .collect())
2170    }
2171}
2172
2173fn parse_db_version(value: &str) -> DbResult<Version> {
2174    Version::try_from(value).map_err(|_| DbError::InvalidVersion(value.to_owned()))
2175}