Skip to main content

kellnr_db/database/
mod.rs

1mod operations;
2pub mod test_utils;
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::path::Path;
7
8use chrono::{DateTime, Utc};
9use kellnr_common::crate_data::{CrateData, CrateRegistryDep, CrateVersionData};
10use kellnr_common::crate_overview::CrateOverview;
11use kellnr_common::cratesio_prefetch_msg::{CratesioPrefetchMsg, UpdateData};
12use kellnr_common::index_metadata::{IndexDep, IndexMetadata};
13use kellnr_common::normalized_name::NormalizedName;
14use kellnr_common::original_name::OriginalName;
15use kellnr_common::prefetch::Prefetch;
16use kellnr_common::publish_metadata::PublishMetadata;
17use kellnr_common::version::Version;
18use kellnr_common::webhook::{Webhook, WebhookEvent, WebhookQueue};
19use kellnr_entity::prelude::*;
20use kellnr_entity::{
21    auth_token, crate_author, crate_author_to_crate, crate_category, crate_category_to_crate,
22    crate_group, crate_index, crate_keyword, crate_keyword_to_crate, crate_meta, crate_user,
23    cratesio_crate, cratesio_index, cratesio_meta, doc_queue, group, group_user, krate,
24    oauth2_identity, oauth2_state, owner, session, toolchain, toolchain_target, user, webhook,
25    webhook_queue,
26};
27use kellnr_migration::iden::{
28    AuthTokenIden, CrateIden, CrateMetaIden, CratesIoIden, CratesIoMetaIden, GroupIden,
29};
30use sea_orm::entity::prelude::Uuid;
31use sea_orm::prelude::async_trait::async_trait;
32use sea_orm::query::{QueryOrder, QuerySelect, TransactionTrait};
33use sea_orm::sea_query::{Alias, Cond, Expr, Iden, JoinType, Order, Query, UnionType};
34use sea_orm::{
35    ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, ExprTrait,
36    FromQueryResult, ModelTrait, QueryFilter, RelationTrait, Set,
37};
38
39use crate::error::DbError;
40use crate::password::{generate_salt, hash_pwd, hash_token};
41use crate::provider::{
42    ChannelInfo, DbResult, OAuth2StateData, PrefetchState, ToolchainWithTargets,
43};
44use crate::tables::init_database;
45use crate::{
46    AuthToken, ConString, CrateMeta, CrateSummary, DbProvider, DocQueueEntry, Group, User,
47};
48
49const DB_DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
50
51pub struct Database {
52    db_con: DatabaseConnection,
53}
54
55impl Database {
56    pub fn existing(db_con: DatabaseConnection) -> Self {
57        Self { db_con }
58    }
59
60    pub async fn new(con: &ConString, max_con: u32) -> Result<Self, DbError> {
61        let db_con = init_database(con, max_con)
62            .await
63            .map_err(|e| DbError::InitializationError(e.to_string()))?;
64
65        if operations::no_user_exists(&db_con).await? {
66            operations::insert_admin_credentials(&db_con, con).await?;
67        }
68
69        Ok(Self { db_con })
70    }
71
72    /// Looks up a user by name; returns the entity model or [`DbError::UserNotFound`].
73    async fn get_user_model(&self, name: &str) -> DbResult<user::Model> {
74        user::Entity::find()
75            .filter(user::Column::Name.eq(name))
76            .one(&self.db_con)
77            .await?
78            .ok_or_else(|| DbError::UserNotFound(name.to_owned()))
79    }
80
81    /// Looks up a krate by normalized name; returns the entity model or [`DbError::CrateNotFound`].
82    async fn get_krate_model(&self, crate_name: &NormalizedName) -> DbResult<krate::Model> {
83        krate::Entity::find()
84            .filter(krate::Column::Name.eq(crate_name))
85            .one(&self.db_con)
86            .await?
87            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))
88    }
89
90    /// Looks up a group by name; returns the entity model or [`DbError::GroupNotFound`].
91    async fn get_group_model(&self, name: &str) -> DbResult<group::Model> {
92        group::Entity::find()
93            .filter(group::Column::Name.eq(name))
94            .one(&self.db_con)
95            .await?
96            .ok_or_else(|| DbError::GroupNotFound(name.to_owned()))
97    }
98
99    /// Loads targets for a toolchain and returns a `ToolchainWithTargets`.
100    async fn toolchain_with_targets(&self, tc: toolchain::Model) -> DbResult<ToolchainWithTargets> {
101        let targets = toolchain_target::Entity::find()
102            .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
103            .all(&self.db_con)
104            .await?;
105        Ok(ToolchainWithTargets::from_model(tc, targets))
106    }
107
108    /// Looks up a crate index by name and version; returns the entity model or [`DbError::CrateIndexNotFound`].
109    async fn get_crate_index_model(
110        &self,
111        crate_name: &NormalizedName,
112        version: &Version,
113    ) -> DbResult<crate_index::Model> {
114        crate_index::Entity::find()
115            .filter(crate_index::Column::Name.eq(crate_name))
116            .filter(crate_index::Column::Vers.eq(version))
117            .one(&self.db_con)
118            .await?
119            .ok_or_else(|| DbError::CrateIndexNotFound(crate_name.to_string(), version.to_string()))
120    }
121
122    /// Looks up a toolchain by name and version; returns `None` if not found.
123    async fn get_toolchain_by_name_version(
124        &self,
125        name: &str,
126        version: &str,
127    ) -> DbResult<Option<toolchain::Model>> {
128        toolchain::Entity::find()
129            .filter(toolchain::Column::Name.eq(name))
130            .filter(toolchain::Column::Version.eq(version))
131            .one(&self.db_con)
132            .await
133            .map_err(Into::into)
134    }
135
136    /// Looks up a `crate_group` by crate name and group name; returns `None` if not found.
137    async fn get_crate_group_by_name_and_group(
138        &self,
139        crate_name: &NormalizedName,
140        group: &str,
141    ) -> DbResult<Option<crate_group::Model>> {
142        crate_group::Entity::find()
143            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
144            .join(JoinType::InnerJoin, crate_group::Relation::Group.def())
145            .filter(
146                Cond::all()
147                    .add(krate::Column::Name.eq(crate_name))
148                    .add(group::Column::Name.eq(group)),
149            )
150            .one(&self.db_con)
151            .await
152            .map_err(Into::into)
153    }
154
155    /// Looks up a `group_user` by group name and user name; returns `None` if not found.
156    async fn get_group_user_by_group_and_user(
157        &self,
158        group_name: &str,
159        user: &str,
160    ) -> DbResult<Option<group_user::Model>> {
161        group_user::Entity::find()
162            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
163            .join(JoinType::InnerJoin, group_user::Relation::User.def())
164            .filter(
165                Cond::all()
166                    .add(group::Column::Name.eq(group_name))
167                    .add(user::Column::Name.eq(user)),
168            )
169            .one(&self.db_con)
170            .await
171            .map_err(Into::into)
172    }
173
174    /// Looks up a `crate_user` by crate name and user name; returns `None` if not found.
175    async fn get_crate_user_by_crate_and_user(
176        &self,
177        crate_name: &NormalizedName,
178        user: &str,
179    ) -> DbResult<Option<crate_user::Model>> {
180        crate_user::Entity::find()
181            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
182            .join(JoinType::InnerJoin, crate_user::Relation::User.def())
183            .filter(
184                Cond::all()
185                    .add(krate::Column::Name.eq(crate_name))
186                    .add(user::Column::Name.eq(user)),
187            )
188            .one(&self.db_con)
189            .await
190            .map_err(Into::into)
191    }
192
193    /// Shared implementation for crate overview listing: optional name filter, optional limit/offset, cache union.
194    async fn query_crates(
195        &self,
196        contains: Option<&str>,
197        limit_offset: Option<(u64, u64)>,
198        cache: bool,
199    ) -> DbResult<Vec<CrateOverview>> {
200        let mut query = Query::select();
201        query
202            .expr_as(Expr::col(CrateIden::OriginalName), Alias::new("name"))
203            .expr_as(Expr::col(CrateIden::MaxVersion), Alias::new("version"))
204            .expr_as(Expr::col(CrateIden::LastUpdated), Alias::new("date"))
205            .expr_as(
206                Expr::col(CrateIden::TotalDownloads),
207                Alias::new("total_downloads"),
208            )
209            .expr_as(Expr::col(CrateIden::Description), Alias::new("description"))
210            .expr_as(
211                Expr::col(CrateMetaIden::Documentation),
212                Alias::new("documentation"),
213            )
214            .expr_as(Expr::cust("false"), Alias::new("is_cache"))
215            .from(CrateMetaIden::Table)
216            .inner_join(
217                CrateIden::Table,
218                Expr::col((CrateMetaIden::Table, CrateMetaIden::CrateFk))
219                    .equals((CrateIden::Table, CrateIden::Id)),
220            )
221            .and_where(
222                Expr::col((CrateMetaIden::Table, CrateMetaIden::Version))
223                    .equals((CrateIden::Table, CrateIden::MaxVersion)),
224            );
225
226        // Optional name filter
227        if let Some(contains) = contains {
228            query.and_where(
229                Expr::col((CrateIden::Table, CrateIden::Name)).like(format!("%{contains}%")),
230            );
231        }
232
233        // UNION with cached crates
234        if cache {
235            let mut query2 = Query::select();
236            query2
237                .expr_as(Expr::col(CratesIoIden::OriginalName), Alias::new("name"))
238                .expr_as(Expr::col(CratesIoIden::MaxVersion), Alias::new("version"))
239                .expr_as(Expr::col(CratesIoIden::LastModified), Alias::new("date"))
240                .expr_as(
241                    Expr::col(CratesIoIden::TotalDownloads),
242                    Alias::new("total_downloads"),
243                )
244                .expr_as(
245                    Expr::col(CratesIoIden::Description),
246                    Alias::new("description"),
247                )
248                .expr_as(
249                    Expr::col(CratesIoMetaIden::Documentation),
250                    Alias::new("documentation"),
251                )
252                .expr_as(Expr::cust("true"), Alias::new("is_cache"))
253                .from(CratesIoMetaIden::Table)
254                .inner_join(
255                    CratesIoIden::Table,
256                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::CratesIoFk))
257                        .equals((CratesIoIden::Table, CratesIoIden::Id)),
258                )
259                .and_where(
260                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::Version))
261                        .equals((CratesIoIden::Table, CratesIoIden::MaxVersion)),
262                );
263            if let Some(contains) = contains {
264                query2.and_where(
265                    Expr::col((CratesIoIden::Table, CratesIoIden::OriginalName))
266                        .like(format!("%{contains}%")),
267                );
268            }
269            query.union(UnionType::All, query2);
270        }
271
272        // Ordering and optional limit/offset
273        query.order_by(Alias::new("name"), Order::Asc);
274        if let Some((limit, offset)) = limit_offset {
275            query.limit(limit).offset(offset);
276        }
277
278        let builder = self.db_con.get_database_backend();
279        CrateOverview::find_by_statement(builder.build(&query))
280            .all(&self.db_con)
281            .await
282            .map_err(DbError::from)
283    }
284
285    /// Executes a count query `SELECT COUNT(id_column) FROM table` and returns the count as u64.
286    async fn count<T>(&self, table: T, id_column: T, error: DbError) -> DbResult<u64>
287    where
288        T: Iden + Copy,
289    {
290        #[derive(Debug, PartialEq, FromQueryResult)]
291        struct CountResult {
292            count: Option<i64>,
293        }
294
295        let statement = self.db_con.get_database_backend().build(
296            Query::select()
297                .expr_as(Expr::col((table, id_column)).count(), Alias::new("count"))
298                .from(table),
299        );
300        let Some(result) = CountResult::find_by_statement(statement)
301            .one(&self.db_con)
302            .await?
303        else {
304            return Err(error);
305        };
306        result
307            .count
308            .and_then(|v| u64::try_from(v).ok())
309            .ok_or(error)
310    }
311
312    async fn get_webhook_model(&self, id: &str) -> DbResult<webhook::Model> {
313        webhook::Entity::find()
314            .filter(webhook::Column::Id.eq(
315                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
316            ))
317            .one(&self.db_con)
318            .await?
319            .ok_or(DbError::WebhookNotFound)
320    }
321
322    async fn get_webhook_queue_model(&self, id: &str) -> DbResult<webhook_queue::Model> {
323        webhook_queue::Entity::find()
324            .filter(webhook_queue::Column::Id.eq(
325                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
326            ))
327            .one(&self.db_con)
328            .await?
329            .ok_or(DbError::WebhookNotFound)
330    }
331
332    async fn get_owner_by_crate_and_user(
333        &self,
334        crate_name: &str,
335        owner_name: &str,
336    ) -> DbResult<Option<owner::Model>> {
337        owner::Entity::find()
338            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
339            .join(JoinType::InnerJoin, owner::Relation::User.def())
340            .filter(
341                Cond::all()
342                    .add(krate::Column::Name.eq(crate_name))
343                    .add(user::Column::Name.eq(owner_name)),
344            )
345            .one(&self.db_con)
346            .await
347            .map_err(Into::into)
348    }
349
350    async fn delete_or_not_found<M, A>(&self, model: Option<M>, err: DbError) -> DbResult<()>
351    where
352        M: ModelTrait + sea_orm::IntoActiveModel<A>,
353        A: ActiveModelTrait<Entity = M::Entity> + sea_orm::ActiveModelBehavior + Send,
354    {
355        model.ok_or(err)?.delete(&self.db_con).await?;
356        Ok(())
357    }
358}
359
360fn webhook_model_to_obj(w: webhook::Model) -> DbResult<Webhook> {
361    let event = w
362        .event
363        .as_str()
364        .try_into()
365        .map_err(|_| DbError::InvalidWebhookEvent(w.event))?;
366    Ok(Webhook {
367        id: Some(w.id.into()),
368        name: w.name,
369        event,
370        callback_url: w.callback_url,
371    })
372}
373
374/// Generates an `add_*` method that links two entities via a join table.
375///
376/// # Parameters
377/// - `$method`: The method name to generate (e.g., `add_crate_user_impl`)
378/// - `$relation_entity`: The join table entity module (e.g., `crate_user`)
379/// - `$insert_entity`: The Sea ORM Entity type for insertion (e.g., `CrateUser`)
380/// - `$fk1`: The foreign key field for the first entity
381/// - `$fk2`: The foreign key field for the second entity
382macro_rules! impl_add_relation {
383    ($method:ident, $relation_entity:ident, $insert_entity:ident, $fk1:ident, $fk2:ident) => {
384        async fn $method(&self, fk1: i64, fk2: i64) -> DbResult<()> {
385            let model = $relation_entity::ActiveModel {
386                $fk1: Set(fk1),
387                $fk2: Set(fk2),
388                ..Default::default()
389            };
390            $insert_entity::insert(model).exec(&self.db_con).await?;
391            Ok(())
392        }
393    };
394}
395
396impl Database {
397    impl_add_relation!(
398        add_crate_user_impl,
399        crate_user,
400        CrateUser,
401        crate_fk,
402        user_fk
403    );
404    impl_add_relation!(
405        add_crate_group_impl,
406        crate_group,
407        CrateGroup,
408        crate_fk,
409        group_fk
410    );
411    impl_add_relation!(
412        add_group_user_impl,
413        group_user,
414        GroupUser,
415        group_fk,
416        user_fk
417    );
418    impl_add_relation!(add_owner_impl, owner, Owner, crate_fk, user_fk);
419}
420
421#[async_trait]
422impl DbProvider for Database {
423    async fn get_total_unique_cached_crates(&self) -> DbResult<u64> {
424        self.count(
425            CratesIoIden::Table,
426            CratesIoIden::Id,
427            DbError::FailedToCountCrates,
428        )
429        .await
430    }
431
432    async fn get_total_cached_crate_versions(&self) -> DbResult<u64> {
433        self.count(
434            CratesIoMetaIden::Table,
435            CratesIoMetaIden::Id,
436            DbError::FailedToCountCrateVersions,
437        )
438        .await
439    }
440
441    async fn get_total_cached_downloads(&self) -> DbResult<u64> {
442        #[derive(FromQueryResult)]
443        struct Model {
444            total_downloads: i64,
445        }
446
447        let total_downloads = cratesio_crate::Entity::find()
448            .select_only()
449            .column(cratesio_crate::Column::TotalDownloads)
450            .into_model::<Model>()
451            .all(&self.db_con)
452            .await?;
453
454        Ok(total_downloads
455            .iter()
456            .map(|m| m.total_downloads as u64)
457            .sum())
458    }
459
460    async fn authenticate_user(&self, name: &str, pwd: &str) -> DbResult<User> {
461        let user = self.get_user(name).await?;
462
463        if hash_pwd(pwd, &user.salt) == user.pwd {
464            Ok(user)
465        } else {
466            Err(DbError::PasswordMismatch)
467        }
468    }
469
470    async fn increase_download_counter(
471        &self,
472        crate_name: &NormalizedName,
473        crate_version: &Version,
474    ) -> DbResult<()> {
475        let krate = self.get_krate_model(crate_name).await?;
476        let crate_id = krate.id;
477        let crate_total_downloads = krate.total_downloads;
478
479        // Update the total downloads for the whole crate (all versions)
480        let mut k: krate::ActiveModel = krate.into();
481        k.total_downloads = Set(crate_total_downloads + 1);
482        k.update(&self.db_con).await?;
483
484        // Update the downloads for the specific version
485        crate_meta::Entity::update_many()
486            .col_expr(
487                crate_meta::Column::Downloads,
488                Expr::col(crate_meta::Column::Downloads).add(1),
489            )
490            .filter(
491                Cond::all()
492                    .add(crate_meta::Column::Version.eq(crate_version))
493                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
494            )
495            .exec(&self.db_con)
496            .await?;
497
498        Ok(())
499    }
500
501    async fn increase_cached_download_counter(
502        &self,
503        crate_name: &NormalizedName,
504        crate_version: &Version,
505    ) -> DbResult<()> {
506        let krate: cratesio_crate::Model = cratesio_crate::Entity::find()
507            .filter(cratesio_crate::Column::Name.eq(crate_name))
508            .one(&self.db_con)
509            .await?
510            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
511        let crate_id = krate.id;
512        let crate_total_downloads = krate.total_downloads;
513
514        // Update the total downloads for the whole crate (all versions)
515        let mut k: cratesio_crate::ActiveModel = krate.into();
516        k.total_downloads = Set(crate_total_downloads + 1);
517        k.update(&self.db_con).await?;
518
519        // Update the downloads for the specific version
520        cratesio_meta::Entity::update_many()
521            .col_expr(
522                cratesio_meta::Column::Downloads,
523                Expr::col(cratesio_meta::Column::Downloads).add(1),
524            )
525            .filter(
526                Cond::all()
527                    .add(cratesio_meta::Column::Version.eq(crate_version))
528                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
529            )
530            .exec(&self.db_con)
531            .await?;
532
533        Ok(())
534    }
535
536    async fn get_last_updated_crate(&self) -> DbResult<Option<(OriginalName, Version)>> {
537        let krate = krate::Entity::find()
538            .order_by_desc(krate::Column::LastUpdated)
539            .one(&self.db_con)
540            .await?;
541
542        if let Some(krate) = krate {
543            // SAFETY: Unchecked is ok, as only valid crate names are inserted into the database
544            let name = OriginalName::from_unchecked(krate.original_name);
545            // SAFETY: Unchecked is ok, as only valid versions are inserted into the database
546            let version = Version::from_unchecked_str(&krate.max_version);
547            Ok(Some((name, version)))
548        } else {
549            Ok(None)
550        }
551    }
552
553    async fn validate_session(&self, session_token: &str) -> DbResult<(String, bool)> {
554        let u = user::Entity::find()
555            .join(JoinType::InnerJoin, user::Relation::Session.def())
556            .filter(session::Column::Token.eq(session_token))
557            .one(&self.db_con)
558            .await?
559            .ok_or(DbError::SessionNotFound)?;
560
561        Ok((u.name, u.is_admin))
562    }
563
564    async fn add_session_token(&self, name: &str, session_token: &str) -> DbResult<()> {
565        let user = self.get_user(name).await?;
566        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
567
568        let s = session::ActiveModel {
569            token: Set(session_token.to_owned()),
570            created: Set(created),
571            user_fk: Set(user.id as i64),
572            ..Default::default()
573        };
574
575        s.insert(&self.db_con).await?;
576        Ok(())
577    }
578
579    async fn add_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
580        let crate_fk = self.get_krate_model(crate_name).await?.id;
581        let user_fk = self.get_user_model(user).await?.id;
582        self.add_crate_user_impl(crate_fk, user_fk).await
583    }
584
585    async fn add_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
586        let crate_fk = self.get_krate_model(crate_name).await?.id;
587        let group_fk = self.get_group_model(group).await?.id;
588        self.add_crate_group_impl(crate_fk, group_fk).await
589    }
590
591    async fn add_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
592        let group_fk = self.get_group_model(group_name).await?.id;
593        let user_fk = self.get_user_model(user).await?.id;
594        self.add_group_user_impl(group_fk, user_fk).await
595    }
596
597    async fn add_owner(&self, crate_name: &NormalizedName, owner: &str) -> DbResult<()> {
598        let crate_fk = self.get_krate_model(crate_name).await?.id;
599        let user_fk = self.get_user_model(owner).await?.id;
600        self.add_owner_impl(crate_fk, user_fk).await
601    }
602
603    async fn is_download_restricted(&self, crate_name: &NormalizedName) -> DbResult<bool> {
604        Ok(krate::Entity::find()
605            .filter(krate::Column::Name.eq(crate_name))
606            .one(&self.db_con)
607            .await?
608            .is_some_and(|model| model.restricted_download))
609    }
610
611    async fn change_download_restricted(
612        &self,
613        crate_name: &NormalizedName,
614        restricted: bool,
615    ) -> DbResult<()> {
616        let mut krate: krate::ActiveModel = self.get_krate_model(crate_name).await?.into();
617        krate.restricted_download = Set(restricted);
618        krate.update(&self.db_con).await?;
619        Ok(())
620    }
621
622    async fn is_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
623        Ok(self
624            .get_crate_user_by_crate_and_user(crate_name, user)
625            .await?
626            .is_some())
627    }
628
629    async fn is_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<bool> {
630        Ok(self
631            .get_crate_group_by_name_and_group(crate_name, group)
632            .await?
633            .is_some())
634    }
635
636    async fn is_crate_group_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
637        let user = user::Entity::find()
638            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
639            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
640            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
641            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
642            .filter(
643                Cond::all()
644                    .add(krate::Column::Name.eq(crate_name))
645                    .add(user::Column::Name.eq(user)),
646            )
647            .one(&self.db_con)
648            .await?;
649        Ok(user.is_some())
650    }
651
652    async fn is_group_user(&self, group_name: &str, user: &str) -> DbResult<bool> {
653        Ok(self
654            .get_group_user_by_group_and_user(group_name, user)
655            .await?
656            .is_some())
657    }
658
659    async fn is_owner(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
660        let owner = owner::Entity::find()
661            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
662            .join(JoinType::InnerJoin, owner::Relation::User.def())
663            .filter(
664                Cond::all()
665                    .add(krate::Column::Name.eq(crate_name))
666                    .add(user::Column::Name.eq(user)),
667            )
668            .one(&self.db_con)
669            .await?;
670
671        Ok(owner.is_some())
672    }
673
674    async fn get_crate_id(&self, crate_name: &NormalizedName) -> DbResult<Option<i64>> {
675        let id = krate::Entity::find()
676            .filter(krate::Column::Name.eq(crate_name))
677            .one(&self.db_con)
678            .await?
679            .map(|model| model.id);
680
681        Ok(id)
682    }
683
684    async fn get_crate_owners(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
685        let u = user::Entity::find()
686            .join(JoinType::InnerJoin, user::Relation::Owner.def())
687            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
688            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
689            .all(&self.db_con)
690            .await?;
691
692        Ok(u.into_iter().map(User::from).collect())
693    }
694
695    async fn get_crate_users(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
696        let u = user::Entity::find()
697            .join(JoinType::InnerJoin, user::Relation::CrateUser.def())
698            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
699            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
700            .all(&self.db_con)
701            .await?;
702
703        Ok(u.into_iter().map(User::from).collect())
704    }
705
706    async fn get_crate_groups(&self, crate_name: &NormalizedName) -> DbResult<Vec<Group>> {
707        let u = group::Entity::find()
708            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
709            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
710            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
711            .all(&self.db_con)
712            .await?;
713
714        Ok(u.into_iter().map(Group::from).collect())
715    }
716
717    async fn get_group_users(&self, group_name: &str) -> DbResult<Vec<User>> {
718        let u = user::Entity::find()
719            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
720            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
721            .filter(Expr::col((GroupIden::Table, group::Column::Name)).eq(group_name))
722            .all(&self.db_con)
723            .await?;
724
725        Ok(u.into_iter().map(User::from).collect())
726    }
727
728    async fn get_crate_versions(&self, crate_name: &NormalizedName) -> DbResult<Vec<Version>> {
729        let u = crate_meta::Entity::find()
730            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
731            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
732            .all(&self.db_con)
733            .await?;
734
735        Ok(u.into_iter()
736            .map(|meta| Version::from_unchecked_str(&meta.version))
737            .collect())
738    }
739
740    async fn delete_session_token(&self, session_token: &str) -> DbResult<()> {
741        if let Some(s) = session::Entity::find()
742            .filter(session::Column::Token.eq(session_token))
743            .one(&self.db_con)
744            .await?
745        {
746            s.delete(&self.db_con).await?;
747        }
748
749        Ok(())
750    }
751
752    async fn delete_user(&self, user_name: &str) -> DbResult<()> {
753        self.get_user_model(user_name)
754            .await?
755            .delete(&self.db_con)
756            .await?;
757        Ok(())
758    }
759
760    async fn delete_group(&self, group_name: &str) -> DbResult<()> {
761        self.get_group_model(group_name)
762            .await?
763            .delete(&self.db_con)
764            .await?;
765        Ok(())
766    }
767
768    async fn change_pwd(&self, user_name: &str, new_pwd: &str) -> DbResult<()> {
769        let salt = generate_salt();
770        let hashed = hash_pwd(new_pwd, &salt);
771
772        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
773        u.pwd = Set(hashed);
774        u.salt = Set(salt);
775
776        u.update(&self.db_con).await?;
777        Ok(())
778    }
779
780    async fn change_read_only_state(&self, user_name: &str, state: bool) -> DbResult<()> {
781        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
782        u.is_read_only = Set(state);
783
784        u.update(&self.db_con).await?;
785        Ok(())
786    }
787
788    async fn change_admin_state(&self, user_name: &str, state: bool) -> DbResult<()> {
789        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
790        u.is_admin = Set(state);
791
792        u.update(&self.db_con).await?;
793        Ok(())
794    }
795
796    async fn crate_version_exists(&self, crate_id: i64, version: &str) -> DbResult<bool> {
797        let cm = crate_meta::Entity::find()
798            .filter(
799                Cond::all()
800                    .add(crate_meta::Column::CrateFk.eq(crate_id))
801                    .add(crate_meta::Column::Version.eq(version)),
802            )
803            .one(&self.db_con)
804            .await?;
805
806        Ok(cm.is_some())
807    }
808
809    async fn get_max_version_from_id(&self, crate_id: i64) -> DbResult<Version> {
810        operations::get_max_version_from_id(&self.db_con, crate_id).await
811    }
812
813    async fn get_max_version_from_name(&self, crate_name: &NormalizedName) -> DbResult<Version> {
814        let k = self.get_krate_model(crate_name).await?;
815        let v = Version::try_from(&k.max_version)
816            .map_err(|_| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
817        Ok(v)
818    }
819
820    async fn update_max_version(&self, crate_id: i64, version: &Version) -> DbResult<()> {
821        let krate = krate::Entity::find_by_id(crate_id)
822            .one(&self.db_con)
823            .await?
824            .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
825
826        let mut k: krate::ActiveModel = krate.into();
827        k.max_version = Set(version.to_string());
828        k.update(&self.db_con).await?;
829
830        Ok(())
831    }
832
833    async fn add_auth_token(&self, name: &str, token: &str, user: &str) -> DbResult<()> {
834        let hashed_token = hash_token(token);
835        let user = self.get_user_model(user).await?;
836
837        let at = auth_token::ActiveModel {
838            name: Set(name.to_owned()),
839            token: Set(hashed_token),
840            user_fk: Set(user.id),
841            ..Default::default()
842        };
843
844        at.insert(&self.db_con).await?;
845
846        Ok(())
847    }
848
849    async fn get_user_from_token(&self, token: &str) -> DbResult<User> {
850        let token = hash_token(token);
851
852        let u = user::Entity::find()
853            .join(JoinType::InnerJoin, user::Relation::AuthToken.def())
854            .filter(Expr::col((AuthTokenIden::Table, AuthTokenIden::Token)).eq(token))
855            .one(&self.db_con)
856            .await?
857            .ok_or(DbError::TokenNotFound)?;
858
859        Ok(User::from(u))
860    }
861
862    async fn get_user(&self, name: &str) -> DbResult<User> {
863        Ok(User::from(self.get_user_model(name).await?))
864    }
865
866    async fn get_group(&self, name: &str) -> DbResult<Group> {
867        Ok(Group::from(self.get_group_model(name).await?))
868    }
869
870    async fn get_auth_tokens(&self, user_name: &str) -> DbResult<Vec<AuthToken>> {
871        let at: Vec<auth_token::Model> = auth_token::Entity::find()
872            .join(JoinType::InnerJoin, auth_token::Relation::User.def())
873            .filter(user::Column::Name.eq(user_name))
874            .all(&self.db_con)
875            .await?;
876
877        Ok(at.into_iter().map(AuthToken::from).collect())
878    }
879
880    async fn delete_auth_token(&self, id: i32) -> DbResult<()> {
881        auth_token::Entity::delete_by_id(id as i64)
882            .exec(&self.db_con)
883            .await?;
884        Ok(())
885    }
886
887    async fn delete_owner(&self, crate_name: &str, owner: &str) -> DbResult<()> {
888        let model = self.get_owner_by_crate_and_user(crate_name, owner).await?;
889        self.delete_or_not_found(model, DbError::OwnerNotFound(owner.to_string()))
890            .await
891    }
892
893    async fn delete_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
894        let model = self
895            .get_crate_user_by_crate_and_user(crate_name, user)
896            .await?;
897        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
898            .await
899    }
900
901    async fn delete_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
902        let model = self
903            .get_crate_group_by_name_and_group(crate_name, group)
904            .await?;
905        self.delete_or_not_found(model, DbError::GroupNotFound(group.to_string()))
906            .await
907    }
908
909    async fn delete_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
910        let model = self
911            .get_group_user_by_group_and_user(group_name, user)
912            .await?;
913        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
914            .await
915    }
916
917    async fn add_user(
918        &self,
919        name: &str,
920        pwd: &str,
921        salt: &str,
922        is_admin: bool,
923        is_read_only: bool,
924    ) -> DbResult<()> {
925        let hashed_pwd = hash_pwd(pwd, salt);
926        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
927
928        let u = user::ActiveModel {
929            name: Set(name.to_owned()),
930            pwd: Set(hashed_pwd),
931            salt: Set(salt.to_owned()),
932            is_admin: Set(is_admin),
933            is_read_only: Set(is_read_only),
934            created: Set(created),
935            ..Default::default()
936        };
937
938        u.insert(&self.db_con).await?;
939        Ok(())
940    }
941
942    async fn add_group(&self, name: &str) -> DbResult<()> {
943        let g = group::ActiveModel {
944            name: Set(name.to_owned()),
945            ..Default::default()
946        };
947
948        g.insert(&self.db_con).await?;
949        Ok(())
950    }
951
952    async fn get_users(&self) -> DbResult<Vec<User>> {
953        let users = user::Entity::find()
954            .order_by_asc(user::Column::Name)
955            .all(&self.db_con)
956            .await?;
957
958        Ok(users.into_iter().map(User::from).collect())
959    }
960
961    async fn get_groups(&self) -> DbResult<Vec<Group>> {
962        let groups = group::Entity::find()
963            .order_by_asc(group::Column::Name)
964            .all(&self.db_con)
965            .await?;
966
967        Ok(groups.into_iter().map(Group::from).collect())
968    }
969
970    async fn get_total_unique_crates(&self) -> DbResult<u64> {
971        self.count(
972            CrateIden::Table,
973            CrateIden::Id,
974            DbError::FailedToCountCrates,
975        )
976        .await
977    }
978
979    async fn get_total_crate_versions(&self) -> DbResult<u64> {
980        self.count(
981            CrateMetaIden::Table,
982            CrateMetaIden::Id,
983            DbError::FailedToCountCrateVersions,
984        )
985        .await
986    }
987
988    async fn get_total_downloads(&self) -> DbResult<u64> {
989        #[derive(FromQueryResult)]
990        struct Model {
991            total_downloads: i64,
992        }
993
994        Ok(krate::Entity::find()
995            .select_only()
996            .column(krate::Column::TotalDownloads)
997            .into_model::<Model>()
998            .all(&self.db_con)
999            .await?
1000            .into_iter()
1001            .map(|m| m.total_downloads as u64)
1002            .sum())
1003    }
1004
1005    async fn get_top_crates_downloads(&self, top: u64) -> DbResult<Vec<(String, u64)>> {
1006        #[derive(Debug, PartialEq, FromQueryResult)]
1007        struct SelectResult {
1008            original_name: String,
1009            total_downloads: i64,
1010        }
1011
1012        let stmt = self.db_con.get_database_backend().build(
1013            Query::select()
1014                .columns(vec![CrateIden::OriginalName, CrateIden::TotalDownloads])
1015                .from(CrateIden::Table)
1016                .order_by(CrateIden::TotalDownloads, Order::Desc)
1017                .limit(top),
1018        );
1019
1020        Ok(SelectResult::find_by_statement(stmt)
1021            .all(&self.db_con)
1022            .await?
1023            .into_iter()
1024            .map(|x| (x.original_name, x.total_downloads as u64))
1025            .collect())
1026    }
1027
1028    async fn get_crate_summaries(&self) -> DbResult<Vec<CrateSummary>> {
1029        let krates = krate::Entity::find()
1030            .order_by(krate::Column::Name, Order::Asc)
1031            .all(&self.db_con)
1032            .await?;
1033
1034        Ok(krates.into_iter().map(CrateSummary::from).collect())
1035    }
1036
1037    async fn add_doc_queue(
1038        &self,
1039        krate: &NormalizedName,
1040        version: &Version,
1041        path: &Path,
1042    ) -> DbResult<()> {
1043        let s = doc_queue::ActiveModel {
1044            krate: Set(krate.to_string()),
1045            version: Set(version.to_string()),
1046            // FIXME: Convert Path to String properly, handle errors
1047            path: Set(path.to_string_lossy().to_string()),
1048            ..Default::default()
1049        };
1050
1051        s.insert(&self.db_con).await?;
1052        Ok(())
1053    }
1054
1055    async fn delete_doc_queue(&self, id: i64) -> DbResult<()> {
1056        DocQueue::delete_by_id(id).exec(&self.db_con).await?;
1057        Ok(())
1058    }
1059
1060    async fn get_doc_queue(&self) -> DbResult<Vec<DocQueueEntry>> {
1061        let entities = DocQueue::find().all(&self.db_con).await?;
1062
1063        Ok(entities.into_iter().map(DocQueueEntry::from).collect())
1064    }
1065
1066    async fn delete_crate(&self, krate: &NormalizedName, version: &Version) -> DbResult<()> {
1067        let txn = self.db_con.begin().await?;
1068
1069        // Delete the entry from the "crate_meta" table
1070        let crate_meta_version = crate_meta::Entity::find()
1071            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1072            .filter(krate::Column::Name.eq(krate))
1073            .filter(crate_meta::Column::Version.eq(version))
1074            .one(&txn)
1075            .await?
1076            .ok_or_else(|| DbError::CrateMetaNotFound(krate.to_string(), version.to_string()))?;
1077        let crate_id = crate_meta_version.crate_fk;
1078        let current_max_version = operations::get_max_version_from_id(&txn, crate_id).await?;
1079        crate_meta_version.delete(&txn).await?;
1080
1081        // Delete the crate index entry from "crate_index" table
1082        let crate_index_version = crate_index::Entity::find()
1083            .join(JoinType::InnerJoin, crate_index::Relation::Krate.def())
1084            .filter(krate::Column::Name.eq(krate))
1085            .filter(crate_index::Column::Vers.eq(version))
1086            .one(&txn)
1087            .await?
1088            .ok_or_else(|| DbError::CrateIndexNotFound(krate.to_string(), version.to_string()))?;
1089        crate_index_version.delete(&txn).await?;
1090
1091        // If it was the last entry in the "crate_meta" table, delete the entry
1092        // in the "crate" table as well
1093        let crate_meta_rows = crate_meta::Entity::find()
1094            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1095            .filter(krate::Column::Name.eq(krate))
1096            .all(&txn)
1097            .await?;
1098
1099        if crate_meta_rows.is_empty() {
1100            krate::Entity::delete_many()
1101                .filter(krate::Column::Name.eq(krate))
1102                .exec(&txn)
1103                .await?;
1104        } else {
1105            let c = krate::Entity::find_by_id(crate_id)
1106                .one(&txn)
1107                .await?
1108                .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
1109            let mut c: krate::ActiveModel = c.into();
1110
1111            // Update the max. version if the deleted version was the max. version.
1112            if version == &current_max_version {
1113                let new_max_version = crate_meta_rows
1114                    .into_iter()
1115                    .map(|cm| Version::from_unchecked_str(&cm.version))
1116                    .max()
1117                    .unwrap(); // Safe to unwrap, as crate_meta_rows is not empty
1118                c.max_version = Set(new_max_version.into_inner());
1119            }
1120            // Update the ETag value of the crate index.
1121            let etag = operations::compute_etag(&txn, krate, crate_id).await?;
1122            c.e_tag = Set(etag);
1123            c.update(&txn).await?;
1124        }
1125
1126        txn.commit().await?;
1127
1128        Ok(())
1129    }
1130
1131    async fn get_crate_meta_list(&self, crate_name: &NormalizedName) -> DbResult<Vec<CrateMeta>> {
1132        let crate_meta = crate_meta::Entity::find()
1133            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1134            .filter(krate::Column::Name.eq(crate_name))
1135            .all(&self.db_con)
1136            .await?;
1137
1138        let crate_meta = crate_meta
1139            .into_iter()
1140            .map(|cm| CrateMeta {
1141                name: crate_name.to_string(),
1142                id: cm.id,
1143                version: cm.version,
1144                created: cm.created,
1145                downloads: cm.downloads,
1146                crate_fk: cm.crate_fk,
1147            })
1148            .collect();
1149
1150        Ok(crate_meta)
1151    }
1152
1153    async fn update_last_updated(&self, id: i64, last_updated: &DateTime<Utc>) -> DbResult<()> {
1154        let krate = krate::Entity::find_by_id(id)
1155            .one(&self.db_con)
1156            .await?
1157            .ok_or(DbError::CrateNotFoundWithId(id))?;
1158
1159        let date = last_updated.format(DB_DATE_FORMAT).to_string();
1160
1161        let mut krate: krate::ActiveModel = krate.into();
1162        krate.last_updated = Set(date);
1163        krate.update(&self.db_con).await?;
1164
1165        Ok(())
1166    }
1167
1168    async fn search_in_crate_name(
1169        &self,
1170        contains: &str,
1171        cache: bool,
1172    ) -> DbResult<Vec<CrateOverview>> {
1173        self.query_crates(Some(contains), None, cache).await
1174    }
1175
1176    async fn get_crate_overview_list(
1177        &self,
1178        limit: u64,
1179        offset: u64,
1180        cache: bool,
1181    ) -> DbResult<Vec<CrateOverview>> {
1182        self.query_crates(None, Some((limit, offset)), cache).await
1183    }
1184
1185    async fn get_crate_data(&self, crate_name: &NormalizedName) -> DbResult<CrateData> {
1186        let krate = self.get_krate_model(crate_name).await?;
1187
1188        let owners: Vec<String> = krate
1189            .find_related(owner::Entity)
1190            .find_also_related(user::Entity)
1191            .all(&self.db_con)
1192            .await?
1193            .into_iter()
1194            .filter_map(|(_, v)| v.map(|v| v.name))
1195            .collect();
1196        let categories: Vec<String> = krate
1197            .find_related(crate_category_to_crate::Entity)
1198            .find_also_related(crate_category::Entity)
1199            .all(&self.db_con)
1200            .await?
1201            .into_iter()
1202            .filter_map(|(_, v)| v.map(|v| v.category))
1203            .collect();
1204        let keywords: Vec<String> = krate
1205            .find_related(crate_keyword_to_crate::Entity)
1206            .find_also_related(crate_keyword::Entity)
1207            .all(&self.db_con)
1208            .await?
1209            .into_iter()
1210            .filter_map(|(_, v)| v.map(|v| v.keyword))
1211            .collect();
1212        let authors: Vec<String> = krate
1213            .find_related(crate_author_to_crate::Entity)
1214            .find_also_related(crate_author::Entity)
1215            .all(&self.db_con)
1216            .await?
1217            .into_iter()
1218            .filter_map(|(_, v)| v.map(|v| v.author))
1219            .collect();
1220        let crate_metas = krate
1221            .find_related(crate_meta::Entity)
1222            .all(&self.db_con)
1223            .await?;
1224        let crate_indices = krate
1225            .find_related(crate_index::Entity)
1226            .all(&self.db_con)
1227            .await?;
1228
1229        let mut versions = Vec::new();
1230        for cm in crate_metas {
1231            let ci = crate_indices
1232                .iter()
1233                .find(|ci| ci.vers == cm.version)
1234                .ok_or_else(|| {
1235                    DbError::CrateIndexNotFound(krate.name.clone(), cm.version.clone())
1236                })?;
1237            let dependencies: Vec<CrateRegistryDep> = match ci.deps.clone() {
1238                Some(deps) => {
1239                    let ix = serde_json::from_value::<Vec<IndexDep>>(deps)
1240                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1241
1242                    let mut ft = Vec::new();
1243                    for dep in ix {
1244                        ft.push(CrateRegistryDep::from_index(
1245                            operations::get_desc_for_crate_dep(
1246                                &self.db_con,
1247                                &dep.name,
1248                                dep.registry.as_deref(),
1249                            )
1250                            .await?,
1251                            dep,
1252                        ));
1253                    }
1254
1255                    ft
1256                }
1257                None => Vec::default(),
1258            };
1259            let features: BTreeMap<String, Vec<String>> = match ci.features.clone() {
1260                Some(features) => serde_json::from_value::<BTreeMap<String, Vec<String>>>(features)
1261                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?,
1262                None => BTreeMap::default(),
1263            };
1264
1265            versions.push(CrateVersionData {
1266                version: cm.version,
1267                created: cm.created,
1268                downloads: cm.downloads,
1269                readme: cm.readme,
1270                license: cm.license,
1271                license_file: cm.license_file,
1272                documentation: cm.documentation,
1273                dependencies,
1274                checksum: ci.cksum.clone(),
1275                features,
1276                yanked: ci.yanked,
1277                links: ci.links.clone(),
1278                v: ci.v,
1279            });
1280        }
1281        versions.sort_by(|a, b| {
1282            Version::from_unchecked_str(&b.version).cmp(&Version::from_unchecked_str(&a.version))
1283        });
1284
1285        let crate_data = CrateData {
1286            name: krate.original_name,
1287            owners,
1288            max_version: krate.max_version,
1289            total_downloads: krate.total_downloads,
1290            last_updated: krate.last_updated,
1291            homepage: krate.homepage,
1292            description: krate.description,
1293            repository: krate.repository,
1294            categories,
1295            keywords,
1296            authors,
1297            versions,
1298        };
1299
1300        Ok(crate_data)
1301    }
1302
1303    async fn add_empty_crate(&self, name: &str, created: &DateTime<Utc>) -> DbResult<i64> {
1304        let created = created.format(DB_DATE_FORMAT).to_string();
1305        let normalized_name = NormalizedName::from(
1306            OriginalName::try_from(name)
1307                .map_err(|_| DbError::InvalidCrateName(name.to_string()))?,
1308        );
1309        let krate = krate::ActiveModel {
1310            id: ActiveValue::default(),
1311            name: Set(normalized_name.to_string()),
1312            original_name: Set(name.to_string()),
1313            max_version: Set("0.0.0".to_string()),
1314            last_updated: Set(created),
1315            total_downloads: Set(0),
1316            homepage: Set(None),
1317            description: Set(None),
1318            repository: Set(None),
1319            e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1320            restricted_download: Set(false),
1321        };
1322        Ok(krate.insert(&self.db_con).await?.id)
1323    }
1324
1325    async fn add_crate(
1326        &self,
1327        pub_metadata: &PublishMetadata,
1328        cksum: &str,
1329        created: &DateTime<Utc>,
1330        owner: &str,
1331    ) -> DbResult<i64> {
1332        let created = created.format(DB_DATE_FORMAT).to_string();
1333        let normalized_name = NormalizedName::from(
1334            OriginalName::try_from(&pub_metadata.name)
1335                .map_err(|_| DbError::InvalidCrateName(pub_metadata.name.clone()))?,
1336        );
1337
1338        let existing = krate::Entity::find()
1339            .filter(krate::Column::Name.eq(&pub_metadata.name))
1340            .one(&self.db_con)
1341            .await?;
1342
1343        let txn = self.db_con.begin().await?;
1344
1345        let crate_id = if let Some(krate) = existing {
1346            let krate_id = krate.id;
1347            let max_version = max(
1348                parse_db_version(&krate.max_version)?,
1349                parse_db_version(&pub_metadata.vers)?,
1350            );
1351
1352            let mut krate: krate::ActiveModel = krate.into();
1353            krate.last_updated = Set(created.clone());
1354            krate.max_version = Set(max_version.into_inner());
1355            krate.homepage = Set(pub_metadata.homepage.clone());
1356            krate.description = Set(pub_metadata.description.clone());
1357            krate.repository = Set(pub_metadata.repository.clone());
1358            krate.e_tag = Set(String::new()); // Set to empty string, as it can be computed, when the crate index is inserted
1359            krate.update(&txn).await?;
1360            krate_id
1361        } else {
1362            let krate = krate::ActiveModel {
1363                id: ActiveValue::default(),
1364                name: Set(normalized_name.to_string()),
1365                original_name: Set(pub_metadata.name.clone()),
1366                max_version: Set(pub_metadata.vers.clone()),
1367                last_updated: Set(created.clone()),
1368                total_downloads: Set(0),
1369                homepage: Set(pub_metadata.homepage.clone()),
1370                description: Set(pub_metadata.description.clone()),
1371                repository: Set(pub_metadata.repository.clone()),
1372                e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1373                restricted_download: Set(false),
1374            };
1375            let krate = krate.insert(&txn).await?;
1376            krate.id
1377        };
1378
1379        operations::add_owner_if_not_exists(&txn, owner, crate_id).await?;
1380        operations::add_crate_metadata(&txn, pub_metadata, &created, crate_id).await?;
1381        operations::add_crate_index(&txn, pub_metadata, cksum, crate_id).await?;
1382        operations::update_etag(&txn, &pub_metadata.name, crate_id).await?;
1383        operations::update_crate_categories(&txn, pub_metadata, crate_id).await?;
1384        operations::update_crate_keywords(&txn, pub_metadata, crate_id).await?;
1385        operations::update_crate_authors(&txn, pub_metadata, crate_id).await?;
1386
1387        txn.commit().await?;
1388        Ok(crate_id)
1389    }
1390
1391    async fn update_docs_link(
1392        &self,
1393        crate_name: &NormalizedName,
1394        version: &Version,
1395        docs_link: &str,
1396    ) -> DbResult<()> {
1397        let (cm, _c) = crate_meta::Entity::find()
1398            .find_also_related(krate::Entity)
1399            .filter(
1400                Cond::all()
1401                    .add(krate::Column::Name.eq(crate_name))
1402                    .add(crate_meta::Column::Version.eq(version)),
1403            )
1404            .one(&self.db_con)
1405            .await?
1406            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
1407
1408        let mut cm: crate_meta::ActiveModel = cm.into();
1409        cm.documentation = Set(Some(docs_link.to_string()));
1410        cm.update(&self.db_con).await?;
1411        Ok(())
1412    }
1413
1414    async fn add_crate_metadata(
1415        &self,
1416        pub_metadata: &PublishMetadata,
1417        created: &str,
1418        crate_id: i64,
1419    ) -> DbResult<()> {
1420        operations::add_crate_metadata(&self.db_con, pub_metadata, created, crate_id).await
1421    }
1422
1423    async fn get_prefetch_data(&self, crate_name: &str) -> DbResult<Prefetch> {
1424        let mut krate = krate::Entity::find()
1425            .filter(krate::Column::Name.eq(crate_name))
1426            .find_with_related(crate_index::Entity)
1427            .all(&self.db_con)
1428            .await?
1429            .into_iter();
1430
1431        // Expecting only one crate with its related indices
1432        let (Some((krate, crate_indices)), None) = (krate.next(), krate.next()) else {
1433            return Err(DbError::CrateNotFound(crate_name.to_string()));
1434        };
1435
1436        let index_metadata =
1437            operations::crate_index_model_to_index_metadata(crate_name, crate_indices)?;
1438        let data = operations::index_metadata_to_bytes(&index_metadata)?;
1439
1440        Ok(Prefetch {
1441            data,
1442            etag: krate.e_tag,
1443            last_modified: krate.last_updated,
1444        })
1445    }
1446
1447    async fn is_cratesio_cache_up_to_date(
1448        &self,
1449        crate_name: &NormalizedName,
1450        etag: Option<String>,
1451        last_modified: Option<String>,
1452    ) -> DbResult<PrefetchState> {
1453        let Some(krate) = cratesio_crate::Entity::find()
1454            .filter(cratesio_crate::Column::Name.eq(crate_name))
1455            .one(&self.db_con)
1456            .await?
1457        else {
1458            return Ok(PrefetchState::NotFound);
1459        };
1460
1461        let needs_update = match (etag, last_modified) {
1462            (Some(etag), Some(last_modified)) => {
1463                krate.e_tag != etag || krate.last_modified != last_modified
1464            }
1465            (Some(etag), None) => krate.e_tag != etag,
1466            (None, Some(last_modified)) => krate.last_modified != last_modified,
1467            (None, None) => true,
1468        };
1469
1470        if !needs_update {
1471            Ok(PrefetchState::UpToDate)
1472        } else {
1473            let crate_indices = krate
1474                .find_related(cratesio_index::Entity)
1475                .all(&self.db_con)
1476                .await?;
1477            let index_metadata =
1478                operations::cratesio_index_model_to_index_metadata(crate_name, crate_indices)?;
1479            let data = operations::index_metadata_to_bytes(&index_metadata)?;
1480
1481            Ok(PrefetchState::NeedsUpdate(Prefetch {
1482                data,
1483                etag: krate.e_tag.clone(),
1484                last_modified: krate.last_modified,
1485            }))
1486        }
1487    }
1488
1489    async fn add_cratesio_prefetch_data(
1490        &self,
1491        crate_name: &OriginalName,
1492        etag: &str,
1493        last_modified: &str,
1494        description: Option<String>,
1495        indices: &[IndexMetadata],
1496    ) -> DbResult<Prefetch> {
1497        let normalized_name = crate_name.to_normalized();
1498
1499        let max_version = indices
1500            .iter()
1501            .map(|i| Version::from_unchecked_str(&i.vers))
1502            .max()
1503            .ok_or_else(|| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
1504
1505        // Use a transaction to ensure the etag update and all index inserts are
1506        // atomic. Without this, a partial failure would leave the crate record
1507        // with the latest etag but incomplete index data, causing stale cache
1508        // entries that can never self-heal (the background updater would get 304
1509        // from crates.io because the etag matches).
1510        let txn = self.db_con.begin().await?;
1511
1512        let krate = cratesio_crate::Entity::find()
1513            .filter(cratesio_crate::Column::Name.eq(&normalized_name))
1514            .one(&txn)
1515            .await?;
1516
1517        let krate = if let Some(krate) = krate {
1518            let mut krate: cratesio_crate::ActiveModel = krate.into();
1519            krate.e_tag = Set(etag.to_string());
1520            krate.last_modified = Set(last_modified.to_string());
1521            krate.max_version = Set(max_version.into_inner());
1522            krate.update(&txn).await?
1523        } else {
1524            let krate = cratesio_crate::ActiveModel {
1525                id: ActiveValue::default(),
1526                name: Set(normalized_name.to_string()),
1527                original_name: Set(crate_name.to_string()),
1528                description: Set(description),
1529                e_tag: Set(etag.to_string()),
1530                last_modified: Set(last_modified.to_string()),
1531                total_downloads: Set(0),
1532                max_version: Set(max_version.to_string()),
1533            };
1534            krate.insert(&txn).await?
1535        };
1536
1537        let current_indices = cratesio_index::Entity::find()
1538            .filter(cratesio_index::Column::CratesIoFk.eq(krate.id))
1539            .all(&txn)
1540            .await?;
1541
1542        for index in indices {
1543            // Check if the version was yanked or un-yanked and update if so.
1544            if let Some(current_index) = current_indices.iter().find(|ci| index.vers == ci.vers) {
1545                if index.yanked != current_index.yanked {
1546                    let mut ci: cratesio_index::ActiveModel = current_index.to_owned().into();
1547                    ci.yanked = Set(index.yanked);
1548                    ci.update(&txn).await?;
1549                }
1550            } else {
1551                let deps = if index.deps.is_empty() {
1552                    None
1553                } else {
1554                    let deps = serde_json::to_value(&index.deps)
1555                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1556                    Some(deps)
1557                };
1558
1559                let features = serde_json::to_value(&index.features)
1560                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1561
1562                let features2 = serde_json::to_value(&index.features2)
1563                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1564
1565                let new_index = cratesio_index::ActiveModel {
1566                    id: ActiveValue::default(),
1567                    name: Set(index.name.clone()),
1568                    vers: Set(index.vers.clone()),
1569                    deps: Set(deps),
1570                    cksum: Set(index.cksum.clone()),
1571                    features: Set(Some(features)),
1572                    features2: Set(Some(features2)),
1573                    yanked: Set(index.yanked),
1574                    links: Set(index.links.clone()),
1575                    pubtime: Set(index.pubtime.map(|dt| dt.naive_utc())),
1576                    v: Set(index.v.unwrap_or(1) as i32),
1577                    crates_io_fk: Set(krate.id),
1578                };
1579
1580                new_index.insert(&txn).await?;
1581
1582                // Add the meta data for the crate version.
1583                let meta = cratesio_meta::ActiveModel {
1584                    id: ActiveValue::default(),
1585                    version: Set(index.vers.clone()),
1586                    downloads: Set(0),
1587                    crates_io_fk: Set(krate.id),
1588                    documentation: Set(Some(format!(
1589                        "https://docs.rs/{normalized_name}/{}",
1590                        index.vers,
1591                    ))),
1592                };
1593
1594                meta.insert(&txn).await?;
1595            }
1596        }
1597
1598        txn.commit().await?;
1599
1600        Ok(Prefetch {
1601            data: operations::index_metadata_to_bytes(indices)?,
1602            etag: etag.to_string(),
1603            last_modified: last_modified.to_string(),
1604        })
1605    }
1606
1607    async fn get_cratesio_index_update_list(&self) -> DbResult<Vec<CratesioPrefetchMsg>> {
1608        let crates = cratesio_crate::Entity::find().all(&self.db_con).await?;
1609        let msgs = crates
1610            .into_iter()
1611            .map(|krate| {
1612                CratesioPrefetchMsg::Update(UpdateData {
1613                    name: OriginalName::from_unchecked(krate.original_name),
1614                    etag: Some(krate.e_tag),
1615                    last_modified: Some(krate.last_modified),
1616                })
1617            })
1618            .collect();
1619        Ok(msgs)
1620    }
1621
1622    async fn unyank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1623        let mut ci: crate_index::ActiveModel = self
1624            .get_crate_index_model(crate_name, version)
1625            .await?
1626            .into();
1627        ci.yanked = Set(false);
1628        ci.save(&self.db_con).await?;
1629        Ok(())
1630    }
1631
1632    async fn yank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1633        let mut ci: crate_index::ActiveModel = self
1634            .get_crate_index_model(crate_name, version)
1635            .await?
1636            .into();
1637        ci.yanked = Set(true);
1638        ci.save(&self.db_con).await?;
1639        Ok(())
1640    }
1641
1642    async fn register_webhook(&self, webhook: Webhook) -> DbResult<String> {
1643        let w = webhook::ActiveModel {
1644            event: Set(Into::<&str>::into(webhook.event).to_string()),
1645            callback_url: Set(webhook.callback_url),
1646            name: Set(webhook.name),
1647            ..Default::default()
1648        };
1649
1650        let w: webhook::Model = w.insert(&self.db_con).await?;
1651        Ok(w.id.to_string())
1652    }
1653
1654    async fn delete_webhook(&self, id: &str) -> DbResult<()> {
1655        self.get_webhook_model(id)
1656            .await?
1657            .delete(&self.db_con)
1658            .await?;
1659        Ok(())
1660    }
1661
1662    async fn get_webhook(&self, id: &str) -> DbResult<Webhook> {
1663        webhook_model_to_obj(self.get_webhook_model(id).await?)
1664    }
1665
1666    async fn get_all_webhooks(&self) -> DbResult<Vec<Webhook>> {
1667        Ok(webhook::Entity::find()
1668            .all(&self.db_con)
1669            .await?
1670            .into_iter()
1671            .filter_map(|w| webhook_model_to_obj(w).ok())
1672            .collect())
1673    }
1674
1675    async fn add_webhook_queue(
1676        &self,
1677        event: WebhookEvent,
1678        payload: serde_json::Value,
1679    ) -> DbResult<()> {
1680        let w = webhook::Entity::find()
1681            .filter(webhook::Column::Event.eq(Into::<&str>::into(event)))
1682            .all(&self.db_con)
1683            .await?;
1684
1685        if w.is_empty() {
1686            return Ok(());
1687        }
1688
1689        let now = Utc::now();
1690
1691        let entries = w.into_iter().map(|w| webhook_queue::ActiveModel {
1692            webhook_fk: Set(w.id),
1693            payload: Set(payload.clone()),
1694            next_attempt: Set(now.into()),
1695            last_attempt: Set(None),
1696            ..Default::default()
1697        });
1698
1699        webhook_queue::Entity::insert_many(entries)
1700            .exec(&self.db_con)
1701            .await?;
1702        Ok(())
1703    }
1704    async fn get_pending_webhook_queue_entries(
1705        &self,
1706        timestamp: DateTime<Utc>,
1707    ) -> DbResult<Vec<WebhookQueue>> {
1708        let w = webhook_queue::Entity::find()
1709            .find_with_related(webhook::Entity)
1710            .filter(webhook_queue::Column::NextAttempt.lte(timestamp))
1711            .all(&self.db_con)
1712            .await?;
1713
1714        Ok(w.into_iter()
1715            .filter_map(|w| {
1716                Some(WebhookQueue {
1717                    id: w.0.id.to_string(),
1718                    callback_url: w.1.first()?.callback_url.clone(),
1719                    payload: w.0.payload,
1720                    last_attempt: w.0.last_attempt.map(Into::into),
1721                    next_attempt: w.0.next_attempt.into(),
1722                })
1723            })
1724            .collect())
1725    }
1726
1727    async fn update_webhook_queue(
1728        &self,
1729        id: &str,
1730        last_attempt: DateTime<Utc>,
1731        next_attempt: DateTime<Utc>,
1732    ) -> DbResult<()> {
1733        let mut w: webhook_queue::ActiveModel = self.get_webhook_queue_model(id).await?.into();
1734        w.last_attempt = Set(Some(last_attempt.into()));
1735        w.next_attempt = Set(next_attempt.into());
1736        w.update(&self.db_con).await?;
1737        Ok(())
1738    }
1739
1740    async fn delete_webhook_queue(&self, id: &str) -> DbResult<()> {
1741        self.get_webhook_queue_model(id)
1742            .await?
1743            .delete(&self.db_con)
1744            .await?;
1745        Ok(())
1746    }
1747
1748    // OAuth2 identity methods
1749
1750    async fn get_user_by_oauth2_identity(
1751        &self,
1752        issuer: &str,
1753        subject: &str,
1754    ) -> DbResult<Option<User>> {
1755        let identity = oauth2_identity::Entity::find()
1756            .filter(oauth2_identity::Column::ProviderIssuer.eq(issuer))
1757            .filter(oauth2_identity::Column::Subject.eq(subject))
1758            .one(&self.db_con)
1759            .await?;
1760
1761        if let Some(identity) = identity {
1762            let u = user::Entity::find_by_id(identity.user_fk)
1763                .one(&self.db_con)
1764                .await?
1765                .ok_or_else(|| DbError::UserNotFound(format!("user_id={}", identity.user_fk)))?;
1766
1767            Ok(Some(User::from(u)))
1768        } else {
1769            Ok(None)
1770        }
1771    }
1772
1773    async fn create_oauth2_user(
1774        &self,
1775        username: &str,
1776        issuer: &str,
1777        subject: &str,
1778        email: Option<String>,
1779        is_admin: bool,
1780        is_read_only: bool,
1781    ) -> DbResult<User> {
1782        // Use a transaction to ensure atomicity
1783        let txn = self.db_con.begin().await?;
1784
1785        // Generate a random password and salt for OAuth2 users
1786        // They won't use password auth, but we need valid values
1787        let salt = generate_salt();
1788        let random_pwd = Uuid::new_v4().to_string();
1789        let hashed_pwd = hash_pwd(&random_pwd, &salt);
1790        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1791
1792        // Create the user
1793        let new_user = user::ActiveModel {
1794            name: Set(username.to_string()),
1795            pwd: Set(hashed_pwd),
1796            salt: Set(salt),
1797            is_admin: Set(is_admin),
1798            is_read_only: Set(is_read_only),
1799            created: Set(created.clone()),
1800            ..Default::default()
1801        };
1802
1803        let res = user::Entity::insert(new_user).exec(&txn).await?;
1804        let user_id = res.last_insert_id;
1805
1806        // Link the OAuth2 identity
1807        let identity = oauth2_identity::ActiveModel {
1808            user_fk: Set(user_id),
1809            provider_issuer: Set(issuer.to_string()),
1810            subject: Set(subject.to_string()),
1811            email: Set(email),
1812            created: Set(created.clone()),
1813            ..Default::default()
1814        };
1815
1816        oauth2_identity::Entity::insert(identity).exec(&txn).await?;
1817
1818        txn.commit().await?;
1819
1820        Ok(User {
1821            id: user_id as i32,
1822            name: username.to_string(),
1823            pwd: String::new(),  // Don't expose password hash
1824            salt: String::new(), // Don't expose salt
1825            is_admin,
1826            is_read_only,
1827            created,
1828        })
1829    }
1830
1831    async fn link_oauth2_identity(
1832        &self,
1833        user_id: i64,
1834        issuer: &str,
1835        subject: &str,
1836        email: Option<String>,
1837    ) -> DbResult<()> {
1838        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1839        let identity = oauth2_identity::ActiveModel {
1840            user_fk: Set(user_id),
1841            provider_issuer: Set(issuer.to_string()),
1842            subject: Set(subject.to_string()),
1843            email: Set(email),
1844            created: Set(created),
1845            ..Default::default()
1846        };
1847
1848        oauth2_identity::Entity::insert(identity)
1849            .exec(&self.db_con)
1850            .await?;
1851
1852        Ok(())
1853    }
1854
1855    // OAuth2 state methods (CSRF/PKCE during auth flow)
1856
1857    async fn store_oauth2_state(
1858        &self,
1859        state: &str,
1860        pkce_verifier: &str,
1861        nonce: &str,
1862    ) -> DbResult<()> {
1863        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1864        let s = oauth2_state::ActiveModel {
1865            state: Set(state.to_string()),
1866            pkce_verifier: Set(pkce_verifier.to_string()),
1867            nonce: Set(nonce.to_string()),
1868            created: Set(created),
1869            ..Default::default()
1870        };
1871
1872        oauth2_state::Entity::insert(s).exec(&self.db_con).await?;
1873        Ok(())
1874    }
1875
1876    async fn get_and_delete_oauth2_state(&self, state: &str) -> DbResult<Option<OAuth2StateData>> {
1877        let s = oauth2_state::Entity::find()
1878            .filter(oauth2_state::Column::State.eq(state))
1879            .one(&self.db_con)
1880            .await?;
1881
1882        if let Some(s) = s {
1883            let data = OAuth2StateData {
1884                state: s.state.clone(),
1885                pkce_verifier: s.pkce_verifier.clone(),
1886                nonce: s.nonce.clone(),
1887            };
1888
1889            // Delete after retrieving (single use)
1890            s.delete(&self.db_con).await?;
1891
1892            Ok(Some(data))
1893        } else {
1894            Ok(None)
1895        }
1896    }
1897
1898    async fn cleanup_expired_oauth2_states(&self) -> DbResult<u64> {
1899        // States older than 10 minutes are expired
1900        let expiry = Utc::now() - chrono::Duration::minutes(10);
1901        let expiry_str = expiry.format(DB_DATE_FORMAT).to_string();
1902
1903        let result = oauth2_state::Entity::delete_many()
1904            .filter(oauth2_state::Column::Created.lt(expiry_str))
1905            .exec(&self.db_con)
1906            .await?;
1907
1908        Ok(result.rows_affected)
1909    }
1910
1911    async fn is_username_available(&self, username: &str) -> DbResult<bool> {
1912        let existing = user::Entity::find()
1913            .filter(user::Column::Name.eq(username))
1914            .one(&self.db_con)
1915            .await?;
1916
1917        Ok(existing.is_none())
1918    }
1919
1920    // Toolchain distribution methods
1921
1922    async fn add_toolchain(
1923        &self,
1924        name: &str,
1925        version: &str,
1926        date: &str,
1927        channel: Option<String>,
1928    ) -> DbResult<i64> {
1929        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1930
1931        let model = toolchain::ActiveModel {
1932            id: ActiveValue::NotSet,
1933            name: Set(name.to_string()),
1934            version: Set(version.to_string()),
1935            date: Set(date.to_string()),
1936            channel: Set(channel),
1937            created: Set(created),
1938        };
1939
1940        let result = model.insert(&self.db_con).await?;
1941        Ok(result.id)
1942    }
1943
1944    async fn add_toolchain_target(
1945        &self,
1946        toolchain_id: i64,
1947        target: &str,
1948        storage_path: &str,
1949        hash: &str,
1950        size: i64,
1951    ) -> DbResult<()> {
1952        let model = toolchain_target::ActiveModel {
1953            id: ActiveValue::NotSet,
1954            toolchain_fk: Set(toolchain_id),
1955            target: Set(target.to_string()),
1956            storage_path: Set(storage_path.to_string()),
1957            hash: Set(hash.to_string()),
1958            size: Set(size),
1959        };
1960
1961        model.insert(&self.db_con).await?;
1962        Ok(())
1963    }
1964
1965    async fn get_toolchain_by_channel(
1966        &self,
1967        channel: &str,
1968    ) -> DbResult<Option<ToolchainWithTargets>> {
1969        let tc = toolchain::Entity::find()
1970            .filter(toolchain::Column::Channel.eq(channel))
1971            .one(&self.db_con)
1972            .await?;
1973
1974        Ok(match tc {
1975            Some(tc) => Some(self.toolchain_with_targets(tc).await?),
1976            None => None,
1977        })
1978    }
1979
1980    async fn get_toolchain_by_version(
1981        &self,
1982        name: &str,
1983        version: &str,
1984    ) -> DbResult<Option<ToolchainWithTargets>> {
1985        Ok(
1986            match self.get_toolchain_by_name_version(name, version).await? {
1987                Some(tc) => Some(self.toolchain_with_targets(tc).await?),
1988                None => None,
1989            },
1990        )
1991    }
1992
1993    async fn list_toolchains(&self) -> DbResult<Vec<ToolchainWithTargets>> {
1994        let toolchains = toolchain::Entity::find()
1995            .order_by_desc(toolchain::Column::Created)
1996            .all(&self.db_con)
1997            .await?;
1998
1999        let mut result = Vec::with_capacity(toolchains.len());
2000
2001        for tc in toolchains {
2002            result.push(self.toolchain_with_targets(tc).await?);
2003        }
2004
2005        Ok(result)
2006    }
2007
2008    async fn delete_toolchain(&self, name: &str, version: &str) -> DbResult<()> {
2009        toolchain::Entity::delete_many()
2010            .filter(toolchain::Column::Name.eq(name))
2011            .filter(toolchain::Column::Version.eq(version))
2012            .exec(&self.db_con)
2013            .await?;
2014
2015        Ok(())
2016    }
2017
2018    async fn delete_toolchain_target(
2019        &self,
2020        name: &str,
2021        version: &str,
2022        target: &str,
2023    ) -> DbResult<()> {
2024        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2025            toolchain_target::Entity::delete_many()
2026                .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
2027                .filter(toolchain_target::Column::Target.eq(target))
2028                .exec(&self.db_con)
2029                .await?;
2030        }
2031
2032        Ok(())
2033    }
2034
2035    async fn set_channel(&self, channel: &str, name: &str, version: &str) -> DbResult<()> {
2036        // First, clear the channel from any other toolchain that has it
2037        let toolchains_with_channel = toolchain::Entity::find()
2038            .filter(toolchain::Column::Channel.eq(channel))
2039            .all(&self.db_con)
2040            .await?;
2041
2042        for tc in toolchains_with_channel {
2043            let mut model: toolchain::ActiveModel = tc.into();
2044            model.channel = Set(None);
2045            model.update(&self.db_con).await?;
2046        }
2047
2048        // Then set the channel on the target toolchain
2049        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2050            let mut model: toolchain::ActiveModel = tc.into();
2051            model.channel = Set(Some(channel.to_string()));
2052            model.update(&self.db_con).await?;
2053        }
2054
2055        Ok(())
2056    }
2057
2058    async fn get_channels(&self) -> DbResult<Vec<ChannelInfo>> {
2059        let toolchains = toolchain::Entity::find()
2060            .filter(toolchain::Column::Channel.is_not_null())
2061            .all(&self.db_con)
2062            .await?;
2063
2064        Ok(toolchains
2065            .into_iter()
2066            .filter_map(|tc| {
2067                tc.channel.map(|channel| ChannelInfo {
2068                    name: channel,
2069                    version: tc.version,
2070                    date: tc.date,
2071                })
2072            })
2073            .collect())
2074    }
2075}
2076
2077fn parse_db_version(value: &str) -> DbResult<Version> {
2078    Version::try_from(value).map_err(|_| DbError::InvalidVersion(value.to_owned()))
2079}