Skip to main content

kellnr_db/database/
mod.rs

1mod operations;
2pub mod test_utils;
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::path::Path;
7
8use chrono::{DateTime, Utc};
9use kellnr_common::crate_data::{CrateData, CrateRegistryDep, CrateVersionData};
10use kellnr_common::crate_overview::CrateOverview;
11use kellnr_common::cratesio_prefetch_msg::{CratesioPrefetchMsg, UpdateData};
12use kellnr_common::index_metadata::{IndexDep, IndexMetadata};
13use kellnr_common::normalized_name::NormalizedName;
14use kellnr_common::original_name::OriginalName;
15use kellnr_common::prefetch::Prefetch;
16use kellnr_common::publish_metadata::PublishMetadata;
17use kellnr_common::version::Version;
18use kellnr_common::webhook::{Webhook, WebhookEvent, WebhookQueue};
19use kellnr_entity::prelude::*;
20use kellnr_entity::{
21    auth_token, crate_author, crate_author_to_crate, crate_category, crate_category_to_crate,
22    crate_group, crate_index, crate_keyword, crate_keyword_to_crate, crate_meta, crate_user,
23    cratesio_crate, cratesio_index, cratesio_meta, doc_queue, group, group_user, krate,
24    oauth2_identity, oauth2_state, owner, session, toolchain, toolchain_target, user, webhook,
25    webhook_queue,
26};
27use kellnr_migration::iden::{
28    AuthTokenIden, CrateIden, CrateMetaIden, CratesIoIden, CratesIoMetaIden, GroupIden,
29};
30use sea_orm::entity::prelude::Uuid;
31use sea_orm::prelude::async_trait::async_trait;
32use sea_orm::query::{QueryOrder, QuerySelect, TransactionTrait};
33use sea_orm::sea_query::{Alias, Cond, Expr, Iden, JoinType, Order, Query, UnionType};
34use sea_orm::{
35    ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, ExprTrait,
36    FromQueryResult, ModelTrait, QueryFilter, RelationTrait, Set,
37};
38
39use crate::error::DbError;
40use crate::password::{generate_salt, hash_pwd, hash_token};
41use crate::provider::{
42    ChannelInfo, DbResult, OAuth2StateData, PrefetchState, ToolchainWithTargets,
43};
44use crate::tables::init_database;
45use crate::{
46    AuthToken, ConString, CrateMeta, CrateSummary, DbProvider, DocQueueEntry, Group, User,
47};
48
49const DB_DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
50
51pub struct Database {
52    db_con: DatabaseConnection,
53}
54
55impl Database {
56    pub fn existing(db_con: DatabaseConnection) -> Self {
57        Self { db_con }
58    }
59
60    pub async fn new(con: &ConString, max_con: u32) -> Result<Self, DbError> {
61        let db_con = init_database(con, max_con)
62            .await
63            .map_err(|e| DbError::InitializationError(e.to_string()))?;
64
65        if operations::no_user_exists(&db_con).await? {
66            operations::insert_admin_credentials(&db_con, con).await?;
67        }
68
69        Ok(Self { db_con })
70    }
71
72    /// Looks up a user by name; returns the entity model or [`DbError::UserNotFound`].
73    async fn get_user_model(&self, name: &str) -> DbResult<user::Model> {
74        user::Entity::find()
75            .filter(user::Column::Name.eq(name))
76            .one(&self.db_con)
77            .await?
78            .ok_or_else(|| DbError::UserNotFound(name.to_owned()))
79    }
80
81    /// Looks up a krate by normalized name; returns the entity model or [`DbError::CrateNotFound`].
82    async fn get_krate_model(&self, crate_name: &NormalizedName) -> DbResult<krate::Model> {
83        krate::Entity::find()
84            .filter(krate::Column::Name.eq(crate_name))
85            .one(&self.db_con)
86            .await?
87            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))
88    }
89
90    /// Looks up a group by name; returns the entity model or [`DbError::GroupNotFound`].
91    async fn get_group_model(&self, name: &str) -> DbResult<group::Model> {
92        group::Entity::find()
93            .filter(group::Column::Name.eq(name))
94            .one(&self.db_con)
95            .await?
96            .ok_or_else(|| DbError::GroupNotFound(name.to_owned()))
97    }
98
99    /// Loads targets for a toolchain and returns a `ToolchainWithTargets`.
100    async fn toolchain_with_targets(&self, tc: toolchain::Model) -> DbResult<ToolchainWithTargets> {
101        let targets = toolchain_target::Entity::find()
102            .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
103            .all(&self.db_con)
104            .await?;
105        Ok(ToolchainWithTargets::from_model(tc, targets))
106    }
107
108    /// Looks up a crate index by name and version; returns the entity model or [`DbError::CrateIndexNotFound`].
109    async fn get_crate_index_model(
110        &self,
111        crate_name: &NormalizedName,
112        version: &Version,
113    ) -> DbResult<crate_index::Model> {
114        crate_index::Entity::find()
115            .filter(crate_index::Column::Name.eq(crate_name))
116            .filter(crate_index::Column::Vers.eq(version))
117            .one(&self.db_con)
118            .await?
119            .ok_or_else(|| DbError::CrateIndexNotFound(crate_name.to_string(), version.to_string()))
120    }
121
122    /// Looks up a toolchain by name and version; returns `None` if not found.
123    async fn get_toolchain_by_name_version(
124        &self,
125        name: &str,
126        version: &str,
127    ) -> DbResult<Option<toolchain::Model>> {
128        toolchain::Entity::find()
129            .filter(toolchain::Column::Name.eq(name))
130            .filter(toolchain::Column::Version.eq(version))
131            .one(&self.db_con)
132            .await
133            .map_err(Into::into)
134    }
135
136    /// Looks up a `crate_group` by crate name and group name; returns `None` if not found.
137    async fn get_crate_group_by_name_and_group(
138        &self,
139        crate_name: &NormalizedName,
140        group: &str,
141    ) -> DbResult<Option<crate_group::Model>> {
142        crate_group::Entity::find()
143            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
144            .join(JoinType::InnerJoin, crate_group::Relation::Group.def())
145            .filter(
146                Cond::all()
147                    .add(krate::Column::Name.eq(crate_name))
148                    .add(group::Column::Name.eq(group)),
149            )
150            .one(&self.db_con)
151            .await
152            .map_err(Into::into)
153    }
154
155    /// Looks up a `group_user` by group name and user name; returns `None` if not found.
156    async fn get_group_user_by_group_and_user(
157        &self,
158        group_name: &str,
159        user: &str,
160    ) -> DbResult<Option<group_user::Model>> {
161        group_user::Entity::find()
162            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
163            .join(JoinType::InnerJoin, group_user::Relation::User.def())
164            .filter(
165                Cond::all()
166                    .add(group::Column::Name.eq(group_name))
167                    .add(user::Column::Name.eq(user)),
168            )
169            .one(&self.db_con)
170            .await
171            .map_err(Into::into)
172    }
173
174    /// Looks up a `crate_user` by crate name and user name; returns `None` if not found.
175    async fn get_crate_user_by_crate_and_user(
176        &self,
177        crate_name: &NormalizedName,
178        user: &str,
179    ) -> DbResult<Option<crate_user::Model>> {
180        crate_user::Entity::find()
181            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
182            .join(JoinType::InnerJoin, crate_user::Relation::User.def())
183            .filter(
184                Cond::all()
185                    .add(krate::Column::Name.eq(crate_name))
186                    .add(user::Column::Name.eq(user)),
187            )
188            .one(&self.db_con)
189            .await
190            .map_err(Into::into)
191    }
192
193    /// Shared implementation for crate overview listing: optional name filter, optional limit/offset, cache union.
194    async fn query_crates(
195        &self,
196        contains: Option<&str>,
197        limit_offset: Option<(u64, u64)>,
198        cache: bool,
199    ) -> DbResult<Vec<CrateOverview>> {
200        let mut query = Query::select();
201        query
202            .expr_as(Expr::col(CrateIden::OriginalName), Alias::new("name"))
203            .expr_as(Expr::col(CrateIden::MaxVersion), Alias::new("version"))
204            .expr_as(Expr::col(CrateIden::LastUpdated), Alias::new("date"))
205            .expr_as(
206                Expr::col(CrateIden::TotalDownloads),
207                Alias::new("total_downloads"),
208            )
209            .expr_as(Expr::col(CrateIden::Description), Alias::new("description"))
210            .expr_as(
211                Expr::col(CrateMetaIden::Documentation),
212                Alias::new("documentation"),
213            )
214            .expr_as(Expr::cust("false"), Alias::new("is_cache"))
215            .from(CrateMetaIden::Table)
216            .inner_join(
217                CrateIden::Table,
218                Expr::col((CrateMetaIden::Table, CrateMetaIden::CrateFk))
219                    .equals((CrateIden::Table, CrateIden::Id)),
220            )
221            .and_where(
222                Expr::col((CrateMetaIden::Table, CrateMetaIden::Version))
223                    .equals((CrateIden::Table, CrateIden::MaxVersion)),
224            );
225
226        // Optional name filter
227        if let Some(contains) = contains {
228            query.and_where(
229                Expr::col((CrateIden::Table, CrateIden::Name)).like(format!("%{contains}%")),
230            );
231        }
232
233        // UNION with cached crates
234        if cache {
235            let mut query2 = Query::select();
236            query2
237                .expr_as(Expr::col(CratesIoIden::OriginalName), Alias::new("name"))
238                .expr_as(Expr::col(CratesIoIden::MaxVersion), Alias::new("version"))
239                .expr_as(Expr::col(CratesIoIden::LastModified), Alias::new("date"))
240                .expr_as(
241                    Expr::col(CratesIoIden::TotalDownloads),
242                    Alias::new("total_downloads"),
243                )
244                .expr_as(
245                    Expr::col(CratesIoIden::Description),
246                    Alias::new("description"),
247                )
248                .expr_as(
249                    Expr::col(CratesIoMetaIden::Documentation),
250                    Alias::new("documentation"),
251                )
252                .expr_as(Expr::cust("true"), Alias::new("is_cache"))
253                .from(CratesIoMetaIden::Table)
254                .inner_join(
255                    CratesIoIden::Table,
256                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::CratesIoFk))
257                        .equals((CratesIoIden::Table, CratesIoIden::Id)),
258                )
259                .and_where(
260                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::Version))
261                        .equals((CratesIoIden::Table, CratesIoIden::MaxVersion)),
262                );
263            if let Some(contains) = contains {
264                query2.and_where(
265                    Expr::col((CratesIoIden::Table, CratesIoIden::OriginalName))
266                        .like(format!("%{contains}%")),
267                );
268            }
269            query.union(UnionType::All, query2);
270        }
271
272        // Ordering and optional limit/offset
273        query.order_by(Alias::new("name"), Order::Asc);
274        if let Some((limit, offset)) = limit_offset {
275            query.limit(limit).offset(offset);
276        }
277
278        let builder = self.db_con.get_database_backend();
279        CrateOverview::find_by_statement(builder.build(&query))
280            .all(&self.db_con)
281            .await
282            .map_err(DbError::from)
283    }
284
285    /// Executes a count query `SELECT COUNT(id_column) FROM table` and returns the count as u64.
286    async fn count<T>(&self, table: T, id_column: T, error: DbError) -> DbResult<u64>
287    where
288        T: Iden + Copy,
289    {
290        #[derive(Debug, PartialEq, FromQueryResult)]
291        struct CountResult {
292            count: Option<i64>,
293        }
294
295        let statement = self.db_con.get_database_backend().build(
296            Query::select()
297                .expr_as(Expr::col((table, id_column)).count(), Alias::new("count"))
298                .from(table),
299        );
300        let Some(result) = CountResult::find_by_statement(statement)
301            .one(&self.db_con)
302            .await?
303        else {
304            return Err(error);
305        };
306        result
307            .count
308            .and_then(|v| u64::try_from(v).ok())
309            .ok_or(error)
310    }
311
312    async fn get_webhook_model(&self, id: &str) -> DbResult<webhook::Model> {
313        webhook::Entity::find()
314            .filter(webhook::Column::Id.eq(
315                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
316            ))
317            .one(&self.db_con)
318            .await?
319            .ok_or(DbError::WebhookNotFound)
320    }
321
322    async fn get_webhook_queue_model(&self, id: &str) -> DbResult<webhook_queue::Model> {
323        webhook_queue::Entity::find()
324            .filter(webhook_queue::Column::Id.eq(
325                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
326            ))
327            .one(&self.db_con)
328            .await?
329            .ok_or(DbError::WebhookNotFound)
330    }
331
332    async fn get_owner_by_crate_and_user(
333        &self,
334        crate_name: &str,
335        owner_name: &str,
336    ) -> DbResult<Option<owner::Model>> {
337        owner::Entity::find()
338            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
339            .join(JoinType::InnerJoin, owner::Relation::User.def())
340            .filter(
341                Cond::all()
342                    .add(krate::Column::Name.eq(crate_name))
343                    .add(user::Column::Name.eq(owner_name)),
344            )
345            .one(&self.db_con)
346            .await
347            .map_err(Into::into)
348    }
349
350    async fn delete_or_not_found<M, A>(&self, model: Option<M>, err: DbError) -> DbResult<()>
351    where
352        M: ModelTrait + sea_orm::IntoActiveModel<A>,
353        A: ActiveModelTrait<Entity = M::Entity> + sea_orm::ActiveModelBehavior + Send,
354    {
355        model.ok_or(err)?.delete(&self.db_con).await?;
356        Ok(())
357    }
358}
359
360fn webhook_model_to_obj(w: webhook::Model) -> DbResult<Webhook> {
361    let event = w
362        .event
363        .as_str()
364        .try_into()
365        .map_err(|_| DbError::InvalidWebhookEvent(w.event))?;
366    Ok(Webhook {
367        id: Some(w.id.into()),
368        name: w.name,
369        event,
370        callback_url: w.callback_url,
371    })
372}
373
374/// Generates an `add_*` method that links two entities via a join table.
375///
376/// # Parameters
377/// - `$method`: The method name to generate (e.g., `add_crate_user_impl`)
378/// - `$relation_entity`: The join table entity module (e.g., `crate_user`)
379/// - `$insert_entity`: The Sea ORM Entity type for insertion (e.g., `CrateUser`)
380/// - `$fk1`: The foreign key field for the first entity
381/// - `$fk2`: The foreign key field for the second entity
382macro_rules! impl_add_relation {
383    ($method:ident, $relation_entity:ident, $insert_entity:ident, $fk1:ident, $fk2:ident) => {
384        async fn $method(&self, fk1: i64, fk2: i64) -> DbResult<()> {
385            let model = $relation_entity::ActiveModel {
386                $fk1: Set(fk1),
387                $fk2: Set(fk2),
388                ..Default::default()
389            };
390            $insert_entity::insert(model).exec(&self.db_con).await?;
391            Ok(())
392        }
393    };
394}
395
396impl Database {
397    impl_add_relation!(
398        add_crate_user_impl,
399        crate_user,
400        CrateUser,
401        crate_fk,
402        user_fk
403    );
404    impl_add_relation!(
405        add_crate_group_impl,
406        crate_group,
407        CrateGroup,
408        crate_fk,
409        group_fk
410    );
411    impl_add_relation!(
412        add_group_user_impl,
413        group_user,
414        GroupUser,
415        group_fk,
416        user_fk
417    );
418    impl_add_relation!(add_owner_impl, owner, Owner, crate_fk, user_fk);
419}
420
421#[async_trait]
422impl DbProvider for Database {
423    async fn get_total_unique_cached_crates(&self) -> DbResult<u64> {
424        self.count(
425            CratesIoIden::Table,
426            CratesIoIden::Id,
427            DbError::FailedToCountCrates,
428        )
429        .await
430    }
431
432    async fn get_total_cached_crate_versions(&self) -> DbResult<u64> {
433        self.count(
434            CratesIoMetaIden::Table,
435            CratesIoMetaIden::Id,
436            DbError::FailedToCountCrateVersions,
437        )
438        .await
439    }
440
441    async fn get_total_cached_downloads(&self) -> DbResult<u64> {
442        #[derive(FromQueryResult)]
443        struct Model {
444            total_downloads: i64,
445        }
446
447        let total_downloads = cratesio_crate::Entity::find()
448            .select_only()
449            .column(cratesio_crate::Column::TotalDownloads)
450            .into_model::<Model>()
451            .all(&self.db_con)
452            .await?;
453
454        Ok(total_downloads
455            .iter()
456            .map(|m| m.total_downloads as u64)
457            .sum())
458    }
459
460    async fn authenticate_user(&self, name: &str, pwd: &str) -> DbResult<User> {
461        let user = self.get_user(name).await?;
462
463        if hash_pwd(pwd, &user.salt) == user.pwd {
464            Ok(user)
465        } else {
466            Err(DbError::PasswordMismatch)
467        }
468    }
469
470    async fn increase_download_counter(
471        &self,
472        crate_name: &NormalizedName,
473        crate_version: &Version,
474    ) -> DbResult<()> {
475        let krate = self.get_krate_model(crate_name).await?;
476        let crate_id = krate.id;
477        let crate_total_downloads = krate.total_downloads;
478
479        // Update the total downloads for the whole crate (all versions)
480        let mut k: krate::ActiveModel = krate.into();
481        k.total_downloads = Set(crate_total_downloads + 1);
482        k.update(&self.db_con).await?;
483
484        // Update the downloads for the specific version
485        crate_meta::Entity::update_many()
486            .col_expr(
487                crate_meta::Column::Downloads,
488                Expr::col(crate_meta::Column::Downloads).add(1),
489            )
490            .filter(
491                Cond::all()
492                    .add(crate_meta::Column::Version.eq(crate_version))
493                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
494            )
495            .exec(&self.db_con)
496            .await?;
497
498        Ok(())
499    }
500
501    async fn increase_cached_download_counter(
502        &self,
503        crate_name: &NormalizedName,
504        crate_version: &Version,
505    ) -> DbResult<()> {
506        let krate: cratesio_crate::Model = cratesio_crate::Entity::find()
507            .filter(cratesio_crate::Column::Name.eq(crate_name))
508            .one(&self.db_con)
509            .await?
510            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
511        let crate_id = krate.id;
512        let crate_total_downloads = krate.total_downloads;
513
514        // Update the total downloads for the whole crate (all versions)
515        let mut k: cratesio_crate::ActiveModel = krate.into();
516        k.total_downloads = Set(crate_total_downloads + 1);
517        k.update(&self.db_con).await?;
518
519        // Update the downloads for the specific version
520        cratesio_meta::Entity::update_many()
521            .col_expr(
522                cratesio_meta::Column::Downloads,
523                Expr::col(cratesio_meta::Column::Downloads).add(1),
524            )
525            .filter(
526                Cond::all()
527                    .add(cratesio_meta::Column::Version.eq(crate_version))
528                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
529            )
530            .exec(&self.db_con)
531            .await?;
532
533        Ok(())
534    }
535
536    async fn get_last_updated_crate(&self) -> DbResult<Option<(OriginalName, Version)>> {
537        let krate = krate::Entity::find()
538            .order_by_desc(krate::Column::LastUpdated)
539            .one(&self.db_con)
540            .await?;
541
542        if let Some(krate) = krate {
543            // SAFETY: Unchecked is ok, as only valid crate names are inserted into the database
544            let name = OriginalName::from_unchecked(krate.original_name);
545            // SAFETY: Unchecked is ok, as only valid versions are inserted into the database
546            let version = Version::from_unchecked_str(&krate.max_version);
547            Ok(Some((name, version)))
548        } else {
549            Ok(None)
550        }
551    }
552
553    async fn validate_session(&self, session_token: &str) -> DbResult<(String, bool)> {
554        let u = user::Entity::find()
555            .join(JoinType::InnerJoin, user::Relation::Session.def())
556            .filter(session::Column::Token.eq(session_token))
557            .one(&self.db_con)
558            .await?
559            .ok_or(DbError::SessionNotFound)?;
560
561        Ok((u.name, u.is_admin))
562    }
563
564    async fn add_session_token(&self, name: &str, session_token: &str) -> DbResult<()> {
565        let user = self.get_user(name).await?;
566        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
567
568        let s = session::ActiveModel {
569            token: Set(session_token.to_owned()),
570            created: Set(created),
571            user_fk: Set(user.id as i64),
572            ..Default::default()
573        };
574
575        s.insert(&self.db_con).await?;
576        Ok(())
577    }
578
579    async fn add_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
580        let crate_fk = self.get_krate_model(crate_name).await?.id;
581        let user_fk = self.get_user_model(user).await?.id;
582        self.add_crate_user_impl(crate_fk, user_fk).await
583    }
584
585    async fn add_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
586        let crate_fk = self.get_krate_model(crate_name).await?.id;
587        let group_fk = self.get_group_model(group).await?.id;
588        self.add_crate_group_impl(crate_fk, group_fk).await
589    }
590
591    async fn add_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
592        let group_fk = self.get_group_model(group_name).await?.id;
593        let user_fk = self.get_user_model(user).await?.id;
594        self.add_group_user_impl(group_fk, user_fk).await
595    }
596
597    async fn add_owner(&self, crate_name: &NormalizedName, owner: &str) -> DbResult<()> {
598        let crate_fk = self.get_krate_model(crate_name).await?.id;
599        let user_fk = self.get_user_model(owner).await?.id;
600        self.add_owner_impl(crate_fk, user_fk).await
601    }
602
603    async fn is_download_restricted(&self, crate_name: &NormalizedName) -> DbResult<bool> {
604        Ok(krate::Entity::find()
605            .filter(krate::Column::Name.eq(crate_name))
606            .one(&self.db_con)
607            .await?
608            .is_some_and(|model| model.restricted_download))
609    }
610
611    async fn change_download_restricted(
612        &self,
613        crate_name: &NormalizedName,
614        restricted: bool,
615    ) -> DbResult<()> {
616        let mut krate: krate::ActiveModel = self.get_krate_model(crate_name).await?.into();
617        krate.restricted_download = Set(restricted);
618        krate.update(&self.db_con).await?;
619        Ok(())
620    }
621
622    async fn is_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
623        Ok(self
624            .get_crate_user_by_crate_and_user(crate_name, user)
625            .await?
626            .is_some())
627    }
628
629    async fn is_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<bool> {
630        Ok(self
631            .get_crate_group_by_name_and_group(crate_name, group)
632            .await?
633            .is_some())
634    }
635
636    async fn is_crate_group_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
637        let user = user::Entity::find()
638            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
639            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
640            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
641            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
642            .filter(
643                Cond::all()
644                    .add(krate::Column::Name.eq(crate_name))
645                    .add(user::Column::Name.eq(user)),
646            )
647            .one(&self.db_con)
648            .await?;
649        Ok(user.is_some())
650    }
651
652    async fn is_group_user(&self, group_name: &str, user: &str) -> DbResult<bool> {
653        Ok(self
654            .get_group_user_by_group_and_user(group_name, user)
655            .await?
656            .is_some())
657    }
658
659    async fn is_owner(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
660        let owner = owner::Entity::find()
661            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
662            .join(JoinType::InnerJoin, owner::Relation::User.def())
663            .filter(
664                Cond::all()
665                    .add(krate::Column::Name.eq(crate_name))
666                    .add(user::Column::Name.eq(user)),
667            )
668            .one(&self.db_con)
669            .await?;
670
671        Ok(owner.is_some())
672    }
673
674    async fn get_crate_id(&self, crate_name: &NormalizedName) -> DbResult<Option<i64>> {
675        let id = krate::Entity::find()
676            .filter(krate::Column::Name.eq(crate_name))
677            .one(&self.db_con)
678            .await?
679            .map(|model| model.id);
680
681        Ok(id)
682    }
683
684    async fn get_crate_owners(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
685        let u = user::Entity::find()
686            .join(JoinType::InnerJoin, user::Relation::Owner.def())
687            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
688            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
689            .all(&self.db_con)
690            .await?;
691
692        Ok(u.into_iter().map(User::from).collect())
693    }
694
695    async fn get_crate_users(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
696        let u = user::Entity::find()
697            .join(JoinType::InnerJoin, user::Relation::CrateUser.def())
698            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
699            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
700            .all(&self.db_con)
701            .await?;
702
703        Ok(u.into_iter().map(User::from).collect())
704    }
705
706    async fn get_crate_groups(&self, crate_name: &NormalizedName) -> DbResult<Vec<Group>> {
707        let u = group::Entity::find()
708            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
709            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
710            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
711            .all(&self.db_con)
712            .await?;
713
714        Ok(u.into_iter().map(Group::from).collect())
715    }
716
717    async fn get_group_users(&self, group_name: &str) -> DbResult<Vec<User>> {
718        let u = user::Entity::find()
719            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
720            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
721            .filter(Expr::col((GroupIden::Table, group::Column::Name)).eq(group_name))
722            .all(&self.db_con)
723            .await?;
724
725        Ok(u.into_iter().map(User::from).collect())
726    }
727
728    async fn get_crate_versions(&self, crate_name: &NormalizedName) -> DbResult<Vec<Version>> {
729        let u = crate_meta::Entity::find()
730            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
731            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
732            .all(&self.db_con)
733            .await?;
734
735        Ok(u.into_iter()
736            .map(|meta| Version::from_unchecked_str(&meta.version))
737            .collect())
738    }
739
740    async fn delete_session_token(&self, session_token: &str) -> DbResult<()> {
741        if let Some(s) = session::Entity::find()
742            .filter(session::Column::Token.eq(session_token))
743            .one(&self.db_con)
744            .await?
745        {
746            s.delete(&self.db_con).await?;
747        }
748
749        Ok(())
750    }
751
752    async fn delete_user(&self, user_name: &str) -> DbResult<()> {
753        self.get_user_model(user_name)
754            .await?
755            .delete(&self.db_con)
756            .await?;
757        Ok(())
758    }
759
760    async fn delete_group(&self, group_name: &str) -> DbResult<()> {
761        self.get_group_model(group_name)
762            .await?
763            .delete(&self.db_con)
764            .await?;
765        Ok(())
766    }
767
768    async fn change_pwd(&self, user_name: &str, new_pwd: &str) -> DbResult<()> {
769        let salt = generate_salt();
770        let hashed = hash_pwd(new_pwd, &salt);
771
772        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
773        u.pwd = Set(hashed);
774        u.salt = Set(salt);
775
776        u.update(&self.db_con).await?;
777        Ok(())
778    }
779
780    async fn change_read_only_state(&self, user_name: &str, state: bool) -> DbResult<()> {
781        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
782        u.is_read_only = Set(state);
783
784        u.update(&self.db_con).await?;
785        Ok(())
786    }
787
788    async fn change_admin_state(&self, user_name: &str, state: bool) -> DbResult<()> {
789        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
790        u.is_admin = Set(state);
791
792        u.update(&self.db_con).await?;
793        Ok(())
794    }
795
796    async fn crate_version_exists(&self, crate_id: i64, version: &str) -> DbResult<bool> {
797        let cm = crate_meta::Entity::find()
798            .filter(
799                Cond::all()
800                    .add(crate_meta::Column::CrateFk.eq(crate_id))
801                    .add(crate_meta::Column::Version.eq(version)),
802            )
803            .one(&self.db_con)
804            .await?;
805
806        Ok(cm.is_some())
807    }
808
809    async fn get_max_version_from_id(&self, crate_id: i64) -> DbResult<Version> {
810        operations::get_max_version_from_id(&self.db_con, crate_id).await
811    }
812
813    async fn get_max_version_from_name(&self, crate_name: &NormalizedName) -> DbResult<Version> {
814        let k = self.get_krate_model(crate_name).await?;
815        let v = Version::try_from(&k.max_version)
816            .map_err(|_| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
817        Ok(v)
818    }
819
820    async fn update_max_version(&self, crate_id: i64, version: &Version) -> DbResult<()> {
821        let krate = krate::Entity::find_by_id(crate_id)
822            .one(&self.db_con)
823            .await?
824            .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
825
826        let mut k: krate::ActiveModel = krate.into();
827        k.max_version = Set(version.to_string());
828        k.update(&self.db_con).await?;
829
830        Ok(())
831    }
832
833    async fn add_auth_token(&self, name: &str, token: &str, user: &str) -> DbResult<()> {
834        let hashed_token = hash_token(token);
835        let user = self.get_user_model(user).await?;
836
837        let at = auth_token::ActiveModel {
838            name: Set(name.to_owned()),
839            token: Set(hashed_token),
840            user_fk: Set(user.id),
841            ..Default::default()
842        };
843
844        at.insert(&self.db_con).await?;
845
846        Ok(())
847    }
848
849    async fn get_user_from_token(&self, token: &str) -> DbResult<User> {
850        let token = hash_token(token);
851
852        let u = user::Entity::find()
853            .join(JoinType::InnerJoin, user::Relation::AuthToken.def())
854            .filter(Expr::col((AuthTokenIden::Table, AuthTokenIden::Token)).eq(token))
855            .one(&self.db_con)
856            .await?
857            .ok_or(DbError::TokenNotFound)?;
858
859        Ok(User::from(u))
860    }
861
862    async fn get_user(&self, name: &str) -> DbResult<User> {
863        Ok(User::from(self.get_user_model(name).await?))
864    }
865
866    async fn get_group(&self, name: &str) -> DbResult<Group> {
867        Ok(Group::from(self.get_group_model(name).await?))
868    }
869
870    async fn get_auth_tokens(&self, user_name: &str) -> DbResult<Vec<AuthToken>> {
871        let at: Vec<auth_token::Model> = auth_token::Entity::find()
872            .join(JoinType::InnerJoin, auth_token::Relation::User.def())
873            .filter(user::Column::Name.eq(user_name))
874            .all(&self.db_con)
875            .await?;
876
877        Ok(at.into_iter().map(AuthToken::from).collect())
878    }
879
880    async fn delete_auth_token(&self, id: i32) -> DbResult<()> {
881        auth_token::Entity::delete_by_id(id as i64)
882            .exec(&self.db_con)
883            .await?;
884        Ok(())
885    }
886
887    async fn delete_owner(&self, crate_name: &str, owner: &str) -> DbResult<()> {
888        let model = self.get_owner_by_crate_and_user(crate_name, owner).await?;
889        self.delete_or_not_found(model, DbError::OwnerNotFound(owner.to_string()))
890            .await
891    }
892
893    async fn delete_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
894        let model = self
895            .get_crate_user_by_crate_and_user(crate_name, user)
896            .await?;
897        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
898            .await
899    }
900
901    async fn delete_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
902        let model = self
903            .get_crate_group_by_name_and_group(crate_name, group)
904            .await?;
905        self.delete_or_not_found(model, DbError::GroupNotFound(group.to_string()))
906            .await
907    }
908
909    async fn delete_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
910        let model = self
911            .get_group_user_by_group_and_user(group_name, user)
912            .await?;
913        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
914            .await
915    }
916
917    async fn add_user(
918        &self,
919        name: &str,
920        pwd: &str,
921        salt: &str,
922        is_admin: bool,
923        is_read_only: bool,
924    ) -> DbResult<()> {
925        let hashed_pwd = hash_pwd(pwd, salt);
926        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
927
928        let u = user::ActiveModel {
929            name: Set(name.to_owned()),
930            pwd: Set(hashed_pwd),
931            salt: Set(salt.to_owned()),
932            is_admin: Set(is_admin),
933            is_read_only: Set(is_read_only),
934            created: Set(created),
935            ..Default::default()
936        };
937
938        u.insert(&self.db_con).await?;
939        Ok(())
940    }
941
942    async fn add_group(&self, name: &str) -> DbResult<()> {
943        let g = group::ActiveModel {
944            name: Set(name.to_owned()),
945            ..Default::default()
946        };
947
948        g.insert(&self.db_con).await?;
949        Ok(())
950    }
951
952    async fn get_users(&self) -> DbResult<Vec<User>> {
953        let users = user::Entity::find()
954            .order_by_asc(user::Column::Name)
955            .all(&self.db_con)
956            .await?;
957
958        Ok(users.into_iter().map(User::from).collect())
959    }
960
961    async fn get_groups(&self) -> DbResult<Vec<Group>> {
962        let groups = group::Entity::find()
963            .order_by_asc(group::Column::Name)
964            .all(&self.db_con)
965            .await?;
966
967        Ok(groups.into_iter().map(Group::from).collect())
968    }
969
970    async fn get_total_unique_crates(&self) -> DbResult<u64> {
971        self.count(
972            CrateIden::Table,
973            CrateIden::Id,
974            DbError::FailedToCountCrates,
975        )
976        .await
977    }
978
979    async fn get_total_crate_versions(&self) -> DbResult<u64> {
980        self.count(
981            CrateMetaIden::Table,
982            CrateMetaIden::Id,
983            DbError::FailedToCountCrateVersions,
984        )
985        .await
986    }
987
988    async fn get_total_downloads(&self) -> DbResult<u64> {
989        #[derive(FromQueryResult)]
990        struct Model {
991            total_downloads: i64,
992        }
993
994        Ok(krate::Entity::find()
995            .select_only()
996            .column(krate::Column::TotalDownloads)
997            .into_model::<Model>()
998            .all(&self.db_con)
999            .await?
1000            .into_iter()
1001            .map(|m| m.total_downloads as u64)
1002            .sum())
1003    }
1004
1005    async fn get_top_crates_downloads(&self, top: u64) -> DbResult<Vec<(String, u64)>> {
1006        #[derive(Debug, PartialEq, FromQueryResult)]
1007        struct SelectResult {
1008            original_name: String,
1009            total_downloads: i64,
1010        }
1011
1012        let stmt = self.db_con.get_database_backend().build(
1013            Query::select()
1014                .columns(vec![CrateIden::OriginalName, CrateIden::TotalDownloads])
1015                .from(CrateIden::Table)
1016                .order_by(CrateIden::TotalDownloads, Order::Desc)
1017                .limit(top),
1018        );
1019
1020        Ok(SelectResult::find_by_statement(stmt)
1021            .all(&self.db_con)
1022            .await?
1023            .into_iter()
1024            .map(|x| (x.original_name, x.total_downloads as u64))
1025            .collect())
1026    }
1027
1028    async fn get_crate_summaries(&self) -> DbResult<Vec<CrateSummary>> {
1029        let krates = krate::Entity::find()
1030            .order_by(krate::Column::Name, Order::Asc)
1031            .all(&self.db_con)
1032            .await?;
1033
1034        Ok(krates.into_iter().map(CrateSummary::from).collect())
1035    }
1036
1037    async fn add_doc_queue(
1038        &self,
1039        krate: &NormalizedName,
1040        version: &Version,
1041        path: &Path,
1042    ) -> DbResult<()> {
1043        let s = doc_queue::ActiveModel {
1044            krate: Set(krate.to_string()),
1045            version: Set(version.to_string()),
1046            // FIXME: Convert Path to String properly, handle errors
1047            path: Set(path.to_string_lossy().to_string()),
1048            ..Default::default()
1049        };
1050
1051        s.insert(&self.db_con).await?;
1052        Ok(())
1053    }
1054
1055    async fn delete_doc_queue(&self, id: i64) -> DbResult<()> {
1056        DocQueue::delete_by_id(id).exec(&self.db_con).await?;
1057        Ok(())
1058    }
1059
1060    async fn get_doc_queue(&self) -> DbResult<Vec<DocQueueEntry>> {
1061        let entities = DocQueue::find().all(&self.db_con).await?;
1062
1063        Ok(entities.into_iter().map(DocQueueEntry::from).collect())
1064    }
1065
1066    async fn delete_crate(&self, krate: &NormalizedName, version: &Version) -> DbResult<()> {
1067        let txn = self.db_con.begin().await?;
1068
1069        // Delete the entry from the "crate_meta" table
1070        let crate_meta_version = crate_meta::Entity::find()
1071            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1072            .filter(krate::Column::Name.eq(krate))
1073            .filter(crate_meta::Column::Version.eq(version))
1074            .one(&txn)
1075            .await?
1076            .ok_or_else(|| DbError::CrateMetaNotFound(krate.to_string(), version.to_string()))?;
1077        let crate_id = crate_meta_version.crate_fk;
1078        let current_max_version = operations::get_max_version_from_id(&txn, crate_id).await?;
1079        crate_meta_version.delete(&txn).await?;
1080
1081        // Delete the crate index entry from "crate_index" table
1082        let crate_index_version = crate_index::Entity::find()
1083            .join(JoinType::InnerJoin, crate_index::Relation::Krate.def())
1084            .filter(krate::Column::Name.eq(krate))
1085            .filter(crate_index::Column::Vers.eq(version))
1086            .one(&txn)
1087            .await?
1088            .ok_or_else(|| DbError::CrateIndexNotFound(krate.to_string(), version.to_string()))?;
1089        crate_index_version.delete(&txn).await?;
1090
1091        // If it was the last entry in the "crate_meta" table, delete the entry
1092        // in the "crate" table as well
1093        let crate_meta_rows = crate_meta::Entity::find()
1094            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1095            .filter(krate::Column::Name.eq(krate))
1096            .all(&txn)
1097            .await?;
1098
1099        if crate_meta_rows.is_empty() {
1100            krate::Entity::delete_many()
1101                .filter(krate::Column::Name.eq(krate))
1102                .exec(&txn)
1103                .await?;
1104        } else {
1105            let c = krate::Entity::find_by_id(crate_id)
1106                .one(&txn)
1107                .await?
1108                .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
1109            let mut c: krate::ActiveModel = c.into();
1110
1111            // Update the max. version if the deleted version was the max. version.
1112            if version == &current_max_version {
1113                let new_max_version = crate_meta_rows
1114                    .into_iter()
1115                    .map(|cm| Version::from_unchecked_str(&cm.version))
1116                    .max()
1117                    .unwrap(); // Safe to unwrap, as crate_meta_rows is not empty
1118                c.max_version = Set(new_max_version.into_inner());
1119            }
1120            // Update the ETag value of the crate index.
1121            let etag = operations::compute_etag(&txn, krate, crate_id).await?;
1122            c.e_tag = Set(etag);
1123            c.update(&txn).await?;
1124        }
1125
1126        txn.commit().await?;
1127
1128        Ok(())
1129    }
1130
1131    async fn get_crate_meta_list(&self, crate_name: &NormalizedName) -> DbResult<Vec<CrateMeta>> {
1132        let crate_meta = crate_meta::Entity::find()
1133            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1134            .filter(krate::Column::Name.eq(crate_name))
1135            .all(&self.db_con)
1136            .await?;
1137
1138        let crate_meta = crate_meta
1139            .into_iter()
1140            .map(|cm| CrateMeta {
1141                name: crate_name.to_string(),
1142                id: cm.id,
1143                version: cm.version,
1144                created: cm.created,
1145                downloads: cm.downloads,
1146                crate_fk: cm.crate_fk,
1147            })
1148            .collect();
1149
1150        Ok(crate_meta)
1151    }
1152
1153    async fn update_last_updated(&self, id: i64, last_updated: &DateTime<Utc>) -> DbResult<()> {
1154        let krate = krate::Entity::find_by_id(id)
1155            .one(&self.db_con)
1156            .await?
1157            .ok_or(DbError::CrateNotFoundWithId(id))?;
1158
1159        let date = last_updated.format(DB_DATE_FORMAT).to_string();
1160
1161        let mut krate: krate::ActiveModel = krate.into();
1162        krate.last_updated = Set(date);
1163        krate.update(&self.db_con).await?;
1164
1165        Ok(())
1166    }
1167
1168    async fn search_in_crate_name(
1169        &self,
1170        contains: &str,
1171        cache: bool,
1172    ) -> DbResult<Vec<CrateOverview>> {
1173        self.query_crates(Some(contains), None, cache).await
1174    }
1175
1176    async fn get_crate_overview_list(
1177        &self,
1178        limit: u64,
1179        offset: u64,
1180        cache: bool,
1181    ) -> DbResult<Vec<CrateOverview>> {
1182        self.query_crates(None, Some((limit, offset)), cache).await
1183    }
1184
1185    async fn get_crate_data(&self, crate_name: &NormalizedName) -> DbResult<CrateData> {
1186        let krate = self.get_krate_model(crate_name).await?;
1187
1188        let owners: Vec<String> = krate
1189            .find_related(owner::Entity)
1190            .find_also_related(user::Entity)
1191            .all(&self.db_con)
1192            .await?
1193            .into_iter()
1194            .filter_map(|(_, v)| v.map(|v| v.name))
1195            .collect();
1196        let categories: Vec<String> = krate
1197            .find_related(crate_category_to_crate::Entity)
1198            .find_also_related(crate_category::Entity)
1199            .all(&self.db_con)
1200            .await?
1201            .into_iter()
1202            .filter_map(|(_, v)| v.map(|v| v.category))
1203            .collect();
1204        let keywords: Vec<String> = krate
1205            .find_related(crate_keyword_to_crate::Entity)
1206            .find_also_related(crate_keyword::Entity)
1207            .all(&self.db_con)
1208            .await?
1209            .into_iter()
1210            .filter_map(|(_, v)| v.map(|v| v.keyword))
1211            .collect();
1212        let authors: Vec<String> = krate
1213            .find_related(crate_author_to_crate::Entity)
1214            .find_also_related(crate_author::Entity)
1215            .all(&self.db_con)
1216            .await?
1217            .into_iter()
1218            .filter_map(|(_, v)| v.map(|v| v.author))
1219            .collect();
1220        let crate_metas = krate
1221            .find_related(crate_meta::Entity)
1222            .all(&self.db_con)
1223            .await?;
1224        let crate_indices = krate
1225            .find_related(crate_index::Entity)
1226            .all(&self.db_con)
1227            .await?;
1228
1229        let mut versions = Vec::new();
1230        for cm in crate_metas {
1231            let ci = crate_indices
1232                .iter()
1233                .find(|ci| ci.vers == cm.version)
1234                .ok_or_else(|| {
1235                    DbError::CrateIndexNotFound(krate.name.clone(), cm.version.clone())
1236                })?;
1237            let dependencies: Vec<CrateRegistryDep> = match ci.deps.clone() {
1238                Some(deps) => {
1239                    let ix = serde_json::from_value::<Vec<IndexDep>>(deps)
1240                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1241
1242                    let mut ft = Vec::new();
1243                    for dep in ix {
1244                        ft.push(CrateRegistryDep::from_index(
1245                            operations::get_desc_for_crate_dep(
1246                                &self.db_con,
1247                                &dep.name,
1248                                dep.registry.as_deref(),
1249                            )
1250                            .await?,
1251                            dep,
1252                        ));
1253                    }
1254
1255                    ft
1256                }
1257                None => Vec::default(),
1258            };
1259            let features: BTreeMap<String, Vec<String>> = match ci.features.clone() {
1260                Some(features) => serde_json::from_value::<BTreeMap<String, Vec<String>>>(features)
1261                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?,
1262                None => BTreeMap::default(),
1263            };
1264
1265            versions.push(CrateVersionData {
1266                version: cm.version,
1267                created: cm.created,
1268                downloads: cm.downloads,
1269                readme: cm.readme,
1270                license: cm.license,
1271                license_file: cm.license_file,
1272                documentation: cm.documentation,
1273                dependencies,
1274                checksum: ci.cksum.clone(),
1275                features,
1276                yanked: ci.yanked,
1277                links: ci.links.clone(),
1278                v: ci.v,
1279            });
1280        }
1281        versions.sort_by(|a, b| {
1282            Version::from_unchecked_str(&b.version).cmp(&Version::from_unchecked_str(&a.version))
1283        });
1284
1285        let crate_data = CrateData {
1286            name: krate.original_name,
1287            owners,
1288            max_version: krate.max_version,
1289            total_downloads: krate.total_downloads,
1290            last_updated: krate.last_updated,
1291            homepage: krate.homepage,
1292            description: krate.description,
1293            repository: krate.repository,
1294            categories,
1295            keywords,
1296            authors,
1297            versions,
1298        };
1299
1300        Ok(crate_data)
1301    }
1302
1303    async fn add_empty_crate(&self, name: &str, created: &DateTime<Utc>) -> DbResult<i64> {
1304        let created = created.format(DB_DATE_FORMAT).to_string();
1305        let normalized_name = NormalizedName::from(
1306            OriginalName::try_from(name)
1307                .map_err(|_| DbError::InvalidCrateName(name.to_string()))?,
1308        );
1309        let krate = krate::ActiveModel {
1310            id: ActiveValue::default(),
1311            name: Set(normalized_name.to_string()),
1312            original_name: Set(name.to_string()),
1313            max_version: Set("0.0.0".to_string()),
1314            last_updated: Set(created),
1315            total_downloads: Set(0),
1316            homepage: Set(None),
1317            description: Set(None),
1318            repository: Set(None),
1319            e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1320            restricted_download: Set(false),
1321        };
1322        Ok(krate.insert(&self.db_con).await?.id)
1323    }
1324
1325    async fn add_crate(
1326        &self,
1327        pub_metadata: &PublishMetadata,
1328        cksum: &str,
1329        created: &DateTime<Utc>,
1330        owner: &str,
1331    ) -> DbResult<i64> {
1332        let created = created.format(DB_DATE_FORMAT).to_string();
1333        let normalized_name = NormalizedName::from(
1334            OriginalName::try_from(&pub_metadata.name)
1335                .map_err(|_| DbError::InvalidCrateName(pub_metadata.name.clone()))?,
1336        );
1337
1338        let existing = krate::Entity::find()
1339            .filter(krate::Column::Name.eq(&pub_metadata.name))
1340            .one(&self.db_con)
1341            .await?;
1342
1343        let txn = self.db_con.begin().await?;
1344
1345        let crate_id = if let Some(krate) = existing {
1346            let krate_id = krate.id;
1347            let max_version = max(
1348                parse_db_version(&krate.max_version)?,
1349                parse_db_version(&pub_metadata.vers)?,
1350            );
1351
1352            let mut krate: krate::ActiveModel = krate.into();
1353            krate.last_updated = Set(created.clone());
1354            krate.max_version = Set(max_version.into_inner());
1355            krate.homepage = Set(pub_metadata.homepage.clone());
1356            krate.description = Set(pub_metadata.description.clone());
1357            krate.repository = Set(pub_metadata.repository.clone());
1358            krate.e_tag = Set(String::new()); // Set to empty string, as it can be computed, when the crate index is inserted
1359            krate.update(&txn).await?;
1360            krate_id
1361        } else {
1362            let krate = krate::ActiveModel {
1363                id: ActiveValue::default(),
1364                name: Set(normalized_name.to_string()),
1365                original_name: Set(pub_metadata.name.clone()),
1366                max_version: Set(pub_metadata.vers.clone()),
1367                last_updated: Set(created.clone()),
1368                total_downloads: Set(0),
1369                homepage: Set(pub_metadata.homepage.clone()),
1370                description: Set(pub_metadata.description.clone()),
1371                repository: Set(pub_metadata.repository.clone()),
1372                e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1373                restricted_download: Set(false),
1374            };
1375            let krate = krate.insert(&txn).await?;
1376            krate.id
1377        };
1378
1379        operations::add_owner_if_not_exists(&txn, owner, crate_id).await?;
1380        operations::add_crate_metadata(&txn, pub_metadata, &created, crate_id).await?;
1381        operations::add_crate_index(&txn, pub_metadata, cksum, crate_id).await?;
1382        operations::update_etag(&txn, &pub_metadata.name, crate_id).await?;
1383        operations::update_crate_categories(&txn, pub_metadata, crate_id).await?;
1384        operations::update_crate_keywords(&txn, pub_metadata, crate_id).await?;
1385        operations::update_crate_authors(&txn, pub_metadata, crate_id).await?;
1386
1387        txn.commit().await?;
1388        Ok(crate_id)
1389    }
1390
1391    async fn update_docs_link(
1392        &self,
1393        crate_name: &NormalizedName,
1394        version: &Version,
1395        docs_link: &str,
1396    ) -> DbResult<()> {
1397        let (cm, _c) = crate_meta::Entity::find()
1398            .find_also_related(krate::Entity)
1399            .filter(
1400                Cond::all()
1401                    .add(krate::Column::Name.eq(crate_name))
1402                    .add(crate_meta::Column::Version.eq(version)),
1403            )
1404            .one(&self.db_con)
1405            .await?
1406            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
1407
1408        let mut cm: crate_meta::ActiveModel = cm.into();
1409        cm.documentation = Set(Some(docs_link.to_string()));
1410        cm.update(&self.db_con).await?;
1411        Ok(())
1412    }
1413
1414    async fn add_crate_metadata(
1415        &self,
1416        pub_metadata: &PublishMetadata,
1417        created: &str,
1418        crate_id: i64,
1419    ) -> DbResult<()> {
1420        operations::add_crate_metadata(&self.db_con, pub_metadata, created, crate_id).await
1421    }
1422
1423    async fn get_prefetch_data(&self, crate_name: &str) -> DbResult<Prefetch> {
1424        let mut krate = krate::Entity::find()
1425            .filter(krate::Column::Name.eq(crate_name))
1426            .find_with_related(crate_index::Entity)
1427            .all(&self.db_con)
1428            .await?
1429            .into_iter();
1430
1431        // Expecting only one crate with its related indices
1432        let (Some((krate, crate_indices)), None) = (krate.next(), krate.next()) else {
1433            return Err(DbError::CrateNotFound(crate_name.to_string()));
1434        };
1435
1436        let index_metadata =
1437            operations::crate_index_model_to_index_metadata(crate_name, crate_indices)?;
1438        let data = operations::index_metadata_to_bytes(&index_metadata)?;
1439
1440        Ok(Prefetch {
1441            data,
1442            etag: krate.e_tag,
1443            last_modified: krate.last_updated,
1444        })
1445    }
1446
1447    async fn is_cratesio_cache_up_to_date(
1448        &self,
1449        crate_name: &NormalizedName,
1450        etag: Option<String>,
1451        last_modified: Option<String>,
1452    ) -> DbResult<PrefetchState> {
1453        let Some(krate) = cratesio_crate::Entity::find()
1454            .filter(cratesio_crate::Column::Name.eq(crate_name))
1455            .one(&self.db_con)
1456            .await?
1457        else {
1458            return Ok(PrefetchState::NotFound);
1459        };
1460
1461        let needs_update = match (etag, last_modified) {
1462            (Some(etag), Some(last_modified)) => {
1463                krate.e_tag != etag || krate.last_modified != last_modified
1464            }
1465            (Some(etag), None) => krate.e_tag != etag,
1466            (None, Some(last_modified)) => krate.last_modified != last_modified,
1467            (None, None) => true,
1468        };
1469
1470        if !needs_update {
1471            Ok(PrefetchState::UpToDate)
1472        } else {
1473            let crate_indices = krate
1474                .find_related(cratesio_index::Entity)
1475                .all(&self.db_con)
1476                .await?;
1477            let index_metadata =
1478                operations::cratesio_index_model_to_index_metadata(crate_name, crate_indices)?;
1479            let data = operations::index_metadata_to_bytes(&index_metadata)?;
1480
1481            Ok(PrefetchState::NeedsUpdate(Prefetch {
1482                data,
1483                etag: krate.e_tag.clone(),
1484                last_modified: krate.last_modified,
1485            }))
1486        }
1487    }
1488
1489    async fn add_cratesio_prefetch_data(
1490        &self,
1491        crate_name: &OriginalName,
1492        etag: &str,
1493        last_modified: &str,
1494        description: Option<String>,
1495        indices: &[IndexMetadata],
1496    ) -> DbResult<Prefetch> {
1497        let normalized_name = crate_name.to_normalized();
1498
1499        let max_version = indices
1500            .iter()
1501            .map(|i| Version::from_unchecked_str(&i.vers))
1502            .max()
1503            .ok_or_else(|| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
1504
1505        let krate = cratesio_crate::Entity::find()
1506            .filter(cratesio_crate::Column::Name.eq(&normalized_name))
1507            .one(&self.db_con)
1508            .await?;
1509
1510        let krate = if let Some(krate) = krate {
1511            let mut krate: cratesio_crate::ActiveModel = krate.into();
1512            krate.e_tag = Set(etag.to_string());
1513            krate.last_modified = Set(last_modified.to_string());
1514            krate.max_version = Set(max_version.into_inner());
1515            krate.update(&self.db_con).await?
1516        } else {
1517            let krate = cratesio_crate::ActiveModel {
1518                id: ActiveValue::default(),
1519                name: Set(normalized_name.to_string()),
1520                original_name: Set(crate_name.to_string()),
1521                description: Set(description),
1522                e_tag: Set(etag.to_string()),
1523                last_modified: Set(last_modified.to_string()),
1524                total_downloads: Set(0),
1525                max_version: Set(max_version.to_string()),
1526            };
1527            krate.insert(&self.db_con).await?
1528        };
1529
1530        let current_indices = cratesio_index::Entity::find()
1531            .filter(cratesio_index::Column::CratesIoFk.eq(krate.id))
1532            .all(&self.db_con)
1533            .await?;
1534
1535        for index in indices {
1536            // Check if the version was yanked or un-yanked and update if so.
1537            if let Some(current_index) = current_indices.iter().find(|ci| index.vers == ci.vers) {
1538                if index.yanked != current_index.yanked {
1539                    let mut ci: cratesio_index::ActiveModel = current_index.to_owned().into();
1540                    ci.yanked = Set(index.yanked);
1541                    ci.update(&self.db_con).await?;
1542                }
1543            } else {
1544                let deps = if index.deps.is_empty() {
1545                    None
1546                } else {
1547                    let deps = serde_json::to_value(&index.deps)
1548                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1549                    Some(deps)
1550                };
1551
1552                let features = serde_json::to_value(&index.features)
1553                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1554
1555                let features2 = serde_json::to_value(&index.features2)
1556                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1557
1558                let new_index = cratesio_index::ActiveModel {
1559                    id: ActiveValue::default(),
1560                    name: Set(index.name.clone()),
1561                    vers: Set(index.vers.clone()),
1562                    deps: Set(deps),
1563                    cksum: Set(index.cksum.clone()),
1564                    features: Set(Some(features)),
1565                    features2: Set(Some(features2)),
1566                    yanked: Set(index.yanked),
1567                    links: Set(index.links.clone()),
1568                    pubtime: Set(index.pubtime.map(|dt| dt.naive_utc())),
1569                    v: Set(index.v.unwrap_or(1) as i32),
1570                    crates_io_fk: Set(krate.id),
1571                };
1572
1573                new_index.insert(&self.db_con).await?;
1574
1575                // Add the meta data for the crate version.
1576                let meta = cratesio_meta::ActiveModel {
1577                    id: ActiveValue::default(),
1578                    version: Set(index.vers.clone()),
1579                    downloads: Set(0),
1580                    crates_io_fk: Set(krate.id),
1581                    documentation: Set(Some(format!(
1582                        "https://docs.rs/{normalized_name}/{}",
1583                        index.vers,
1584                    ))),
1585                };
1586
1587                meta.insert(&self.db_con).await?;
1588            }
1589        }
1590
1591        Ok(Prefetch {
1592            data: operations::index_metadata_to_bytes(indices)?,
1593            etag: etag.to_string(),
1594            last_modified: last_modified.to_string(),
1595        })
1596    }
1597
1598    async fn get_cratesio_index_update_list(&self) -> DbResult<Vec<CratesioPrefetchMsg>> {
1599        let crates = cratesio_crate::Entity::find().all(&self.db_con).await?;
1600        let msgs = crates
1601            .into_iter()
1602            .map(|krate| {
1603                CratesioPrefetchMsg::Update(UpdateData {
1604                    name: OriginalName::from_unchecked(krate.original_name),
1605                    etag: Some(krate.e_tag),
1606                    last_modified: Some(krate.last_modified),
1607                })
1608            })
1609            .collect();
1610        Ok(msgs)
1611    }
1612
1613    async fn unyank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1614        let mut ci: crate_index::ActiveModel = self
1615            .get_crate_index_model(crate_name, version)
1616            .await?
1617            .into();
1618        ci.yanked = Set(false);
1619        ci.save(&self.db_con).await?;
1620        Ok(())
1621    }
1622
1623    async fn yank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1624        let mut ci: crate_index::ActiveModel = self
1625            .get_crate_index_model(crate_name, version)
1626            .await?
1627            .into();
1628        ci.yanked = Set(true);
1629        ci.save(&self.db_con).await?;
1630        Ok(())
1631    }
1632
1633    async fn register_webhook(&self, webhook: Webhook) -> DbResult<String> {
1634        let w = webhook::ActiveModel {
1635            event: Set(Into::<&str>::into(webhook.event).to_string()),
1636            callback_url: Set(webhook.callback_url),
1637            name: Set(webhook.name),
1638            ..Default::default()
1639        };
1640
1641        let w: webhook::Model = w.insert(&self.db_con).await?;
1642        Ok(w.id.to_string())
1643    }
1644
1645    async fn delete_webhook(&self, id: &str) -> DbResult<()> {
1646        self.get_webhook_model(id)
1647            .await?
1648            .delete(&self.db_con)
1649            .await?;
1650        Ok(())
1651    }
1652
1653    async fn get_webhook(&self, id: &str) -> DbResult<Webhook> {
1654        webhook_model_to_obj(self.get_webhook_model(id).await?)
1655    }
1656
1657    async fn get_all_webhooks(&self) -> DbResult<Vec<Webhook>> {
1658        Ok(webhook::Entity::find()
1659            .all(&self.db_con)
1660            .await?
1661            .into_iter()
1662            .filter_map(|w| webhook_model_to_obj(w).ok())
1663            .collect())
1664    }
1665
1666    async fn add_webhook_queue(
1667        &self,
1668        event: WebhookEvent,
1669        payload: serde_json::Value,
1670    ) -> DbResult<()> {
1671        let w = webhook::Entity::find()
1672            .filter(webhook::Column::Event.eq(Into::<&str>::into(event)))
1673            .all(&self.db_con)
1674            .await?;
1675
1676        if w.is_empty() {
1677            return Ok(());
1678        }
1679
1680        let now = Utc::now();
1681
1682        let entries = w.into_iter().map(|w| webhook_queue::ActiveModel {
1683            webhook_fk: Set(w.id),
1684            payload: Set(payload.clone()),
1685            next_attempt: Set(now.into()),
1686            last_attempt: Set(None),
1687            ..Default::default()
1688        });
1689
1690        webhook_queue::Entity::insert_many(entries)
1691            .exec(&self.db_con)
1692            .await?;
1693        Ok(())
1694    }
1695    async fn get_pending_webhook_queue_entries(
1696        &self,
1697        timestamp: DateTime<Utc>,
1698    ) -> DbResult<Vec<WebhookQueue>> {
1699        let w = webhook_queue::Entity::find()
1700            .find_with_related(webhook::Entity)
1701            .filter(webhook_queue::Column::NextAttempt.lte(timestamp))
1702            .all(&self.db_con)
1703            .await?;
1704
1705        Ok(w.into_iter()
1706            .filter_map(|w| {
1707                Some(WebhookQueue {
1708                    id: w.0.id.to_string(),
1709                    callback_url: w.1.first()?.callback_url.clone(),
1710                    payload: w.0.payload,
1711                    last_attempt: w.0.last_attempt.map(Into::into),
1712                    next_attempt: w.0.next_attempt.into(),
1713                })
1714            })
1715            .collect())
1716    }
1717
1718    async fn update_webhook_queue(
1719        &self,
1720        id: &str,
1721        last_attempt: DateTime<Utc>,
1722        next_attempt: DateTime<Utc>,
1723    ) -> DbResult<()> {
1724        let mut w: webhook_queue::ActiveModel = self.get_webhook_queue_model(id).await?.into();
1725        w.last_attempt = Set(Some(last_attempt.into()));
1726        w.next_attempt = Set(next_attempt.into());
1727        w.update(&self.db_con).await?;
1728        Ok(())
1729    }
1730
1731    async fn delete_webhook_queue(&self, id: &str) -> DbResult<()> {
1732        self.get_webhook_queue_model(id)
1733            .await?
1734            .delete(&self.db_con)
1735            .await?;
1736        Ok(())
1737    }
1738
1739    // OAuth2 identity methods
1740
1741    async fn get_user_by_oauth2_identity(
1742        &self,
1743        issuer: &str,
1744        subject: &str,
1745    ) -> DbResult<Option<User>> {
1746        let identity = oauth2_identity::Entity::find()
1747            .filter(oauth2_identity::Column::ProviderIssuer.eq(issuer))
1748            .filter(oauth2_identity::Column::Subject.eq(subject))
1749            .one(&self.db_con)
1750            .await?;
1751
1752        if let Some(identity) = identity {
1753            let u = user::Entity::find_by_id(identity.user_fk)
1754                .one(&self.db_con)
1755                .await?
1756                .ok_or_else(|| DbError::UserNotFound(format!("user_id={}", identity.user_fk)))?;
1757
1758            Ok(Some(User::from(u)))
1759        } else {
1760            Ok(None)
1761        }
1762    }
1763
1764    async fn create_oauth2_user(
1765        &self,
1766        username: &str,
1767        issuer: &str,
1768        subject: &str,
1769        email: Option<String>,
1770        is_admin: bool,
1771        is_read_only: bool,
1772    ) -> DbResult<User> {
1773        // Use a transaction to ensure atomicity
1774        let txn = self.db_con.begin().await?;
1775
1776        // Generate a random password and salt for OAuth2 users
1777        // They won't use password auth, but we need valid values
1778        let salt = generate_salt();
1779        let random_pwd = Uuid::new_v4().to_string();
1780        let hashed_pwd = hash_pwd(&random_pwd, &salt);
1781        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1782
1783        // Create the user
1784        let new_user = user::ActiveModel {
1785            name: Set(username.to_string()),
1786            pwd: Set(hashed_pwd),
1787            salt: Set(salt),
1788            is_admin: Set(is_admin),
1789            is_read_only: Set(is_read_only),
1790            created: Set(created.clone()),
1791            ..Default::default()
1792        };
1793
1794        let res = user::Entity::insert(new_user).exec(&txn).await?;
1795        let user_id = res.last_insert_id;
1796
1797        // Link the OAuth2 identity
1798        let identity = oauth2_identity::ActiveModel {
1799            user_fk: Set(user_id),
1800            provider_issuer: Set(issuer.to_string()),
1801            subject: Set(subject.to_string()),
1802            email: Set(email),
1803            created: Set(created.clone()),
1804            ..Default::default()
1805        };
1806
1807        oauth2_identity::Entity::insert(identity).exec(&txn).await?;
1808
1809        txn.commit().await?;
1810
1811        Ok(User {
1812            id: user_id as i32,
1813            name: username.to_string(),
1814            pwd: String::new(),  // Don't expose password hash
1815            salt: String::new(), // Don't expose salt
1816            is_admin,
1817            is_read_only,
1818            created,
1819        })
1820    }
1821
1822    async fn link_oauth2_identity(
1823        &self,
1824        user_id: i64,
1825        issuer: &str,
1826        subject: &str,
1827        email: Option<String>,
1828    ) -> DbResult<()> {
1829        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1830        let identity = oauth2_identity::ActiveModel {
1831            user_fk: Set(user_id),
1832            provider_issuer: Set(issuer.to_string()),
1833            subject: Set(subject.to_string()),
1834            email: Set(email),
1835            created: Set(created),
1836            ..Default::default()
1837        };
1838
1839        oauth2_identity::Entity::insert(identity)
1840            .exec(&self.db_con)
1841            .await?;
1842
1843        Ok(())
1844    }
1845
1846    // OAuth2 state methods (CSRF/PKCE during auth flow)
1847
1848    async fn store_oauth2_state(
1849        &self,
1850        state: &str,
1851        pkce_verifier: &str,
1852        nonce: &str,
1853    ) -> DbResult<()> {
1854        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1855        let s = oauth2_state::ActiveModel {
1856            state: Set(state.to_string()),
1857            pkce_verifier: Set(pkce_verifier.to_string()),
1858            nonce: Set(nonce.to_string()),
1859            created: Set(created),
1860            ..Default::default()
1861        };
1862
1863        oauth2_state::Entity::insert(s).exec(&self.db_con).await?;
1864        Ok(())
1865    }
1866
1867    async fn get_and_delete_oauth2_state(&self, state: &str) -> DbResult<Option<OAuth2StateData>> {
1868        let s = oauth2_state::Entity::find()
1869            .filter(oauth2_state::Column::State.eq(state))
1870            .one(&self.db_con)
1871            .await?;
1872
1873        if let Some(s) = s {
1874            let data = OAuth2StateData {
1875                state: s.state.clone(),
1876                pkce_verifier: s.pkce_verifier.clone(),
1877                nonce: s.nonce.clone(),
1878            };
1879
1880            // Delete after retrieving (single use)
1881            s.delete(&self.db_con).await?;
1882
1883            Ok(Some(data))
1884        } else {
1885            Ok(None)
1886        }
1887    }
1888
1889    async fn cleanup_expired_oauth2_states(&self) -> DbResult<u64> {
1890        // States older than 10 minutes are expired
1891        let expiry = Utc::now() - chrono::Duration::minutes(10);
1892        let expiry_str = expiry.format(DB_DATE_FORMAT).to_string();
1893
1894        let result = oauth2_state::Entity::delete_many()
1895            .filter(oauth2_state::Column::Created.lt(expiry_str))
1896            .exec(&self.db_con)
1897            .await?;
1898
1899        Ok(result.rows_affected)
1900    }
1901
1902    async fn is_username_available(&self, username: &str) -> DbResult<bool> {
1903        let existing = user::Entity::find()
1904            .filter(user::Column::Name.eq(username))
1905            .one(&self.db_con)
1906            .await?;
1907
1908        Ok(existing.is_none())
1909    }
1910
1911    // Toolchain distribution methods
1912
1913    async fn add_toolchain(
1914        &self,
1915        name: &str,
1916        version: &str,
1917        date: &str,
1918        channel: Option<String>,
1919    ) -> DbResult<i64> {
1920        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1921
1922        let model = toolchain::ActiveModel {
1923            id: ActiveValue::NotSet,
1924            name: Set(name.to_string()),
1925            version: Set(version.to_string()),
1926            date: Set(date.to_string()),
1927            channel: Set(channel),
1928            created: Set(created),
1929        };
1930
1931        let result = model.insert(&self.db_con).await?;
1932        Ok(result.id)
1933    }
1934
1935    async fn add_toolchain_target(
1936        &self,
1937        toolchain_id: i64,
1938        target: &str,
1939        storage_path: &str,
1940        hash: &str,
1941        size: i64,
1942    ) -> DbResult<()> {
1943        let model = toolchain_target::ActiveModel {
1944            id: ActiveValue::NotSet,
1945            toolchain_fk: Set(toolchain_id),
1946            target: Set(target.to_string()),
1947            storage_path: Set(storage_path.to_string()),
1948            hash: Set(hash.to_string()),
1949            size: Set(size),
1950        };
1951
1952        model.insert(&self.db_con).await?;
1953        Ok(())
1954    }
1955
1956    async fn get_toolchain_by_channel(
1957        &self,
1958        channel: &str,
1959    ) -> DbResult<Option<ToolchainWithTargets>> {
1960        let tc = toolchain::Entity::find()
1961            .filter(toolchain::Column::Channel.eq(channel))
1962            .one(&self.db_con)
1963            .await?;
1964
1965        Ok(match tc {
1966            Some(tc) => Some(self.toolchain_with_targets(tc).await?),
1967            None => None,
1968        })
1969    }
1970
1971    async fn get_toolchain_by_version(
1972        &self,
1973        name: &str,
1974        version: &str,
1975    ) -> DbResult<Option<ToolchainWithTargets>> {
1976        Ok(
1977            match self.get_toolchain_by_name_version(name, version).await? {
1978                Some(tc) => Some(self.toolchain_with_targets(tc).await?),
1979                None => None,
1980            },
1981        )
1982    }
1983
1984    async fn list_toolchains(&self) -> DbResult<Vec<ToolchainWithTargets>> {
1985        let toolchains = toolchain::Entity::find()
1986            .order_by_desc(toolchain::Column::Created)
1987            .all(&self.db_con)
1988            .await?;
1989
1990        let mut result = Vec::with_capacity(toolchains.len());
1991
1992        for tc in toolchains {
1993            result.push(self.toolchain_with_targets(tc).await?);
1994        }
1995
1996        Ok(result)
1997    }
1998
1999    async fn delete_toolchain(&self, name: &str, version: &str) -> DbResult<()> {
2000        toolchain::Entity::delete_many()
2001            .filter(toolchain::Column::Name.eq(name))
2002            .filter(toolchain::Column::Version.eq(version))
2003            .exec(&self.db_con)
2004            .await?;
2005
2006        Ok(())
2007    }
2008
2009    async fn delete_toolchain_target(
2010        &self,
2011        name: &str,
2012        version: &str,
2013        target: &str,
2014    ) -> DbResult<()> {
2015        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2016            toolchain_target::Entity::delete_many()
2017                .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
2018                .filter(toolchain_target::Column::Target.eq(target))
2019                .exec(&self.db_con)
2020                .await?;
2021        }
2022
2023        Ok(())
2024    }
2025
2026    async fn set_channel(&self, channel: &str, name: &str, version: &str) -> DbResult<()> {
2027        // First, clear the channel from any other toolchain that has it
2028        let toolchains_with_channel = toolchain::Entity::find()
2029            .filter(toolchain::Column::Channel.eq(channel))
2030            .all(&self.db_con)
2031            .await?;
2032
2033        for tc in toolchains_with_channel {
2034            let mut model: toolchain::ActiveModel = tc.into();
2035            model.channel = Set(None);
2036            model.update(&self.db_con).await?;
2037        }
2038
2039        // Then set the channel on the target toolchain
2040        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2041            let mut model: toolchain::ActiveModel = tc.into();
2042            model.channel = Set(Some(channel.to_string()));
2043            model.update(&self.db_con).await?;
2044        }
2045
2046        Ok(())
2047    }
2048
2049    async fn get_channels(&self) -> DbResult<Vec<ChannelInfo>> {
2050        let toolchains = toolchain::Entity::find()
2051            .filter(toolchain::Column::Channel.is_not_null())
2052            .all(&self.db_con)
2053            .await?;
2054
2055        Ok(toolchains
2056            .into_iter()
2057            .filter_map(|tc| {
2058                tc.channel.map(|channel| ChannelInfo {
2059                    name: channel,
2060                    version: tc.version,
2061                    date: tc.date,
2062                })
2063            })
2064            .collect())
2065    }
2066}
2067
2068fn parse_db_version(value: &str) -> DbResult<Version> {
2069    Version::try_from(value).map_err(|_| DbError::InvalidVersion(value.to_owned()))
2070}