Skip to main content

kellnr_db/database/
mod.rs

1mod operations;
2pub mod test_utils;
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::path::Path;
7
8use chrono::{DateTime, Utc};
9use kellnr_common::crate_data::{CrateData, CrateRegistryDep, CrateVersionData};
10use kellnr_common::crate_overview::CrateOverview;
11use kellnr_common::cratesio_prefetch_msg::{CratesioPrefetchMsg, UpdateData};
12use kellnr_common::index_metadata::{IndexDep, IndexMetadata};
13use kellnr_common::normalized_name::NormalizedName;
14use kellnr_common::original_name::OriginalName;
15use kellnr_common::prefetch::Prefetch;
16use kellnr_common::publish_metadata::PublishMetadata;
17use kellnr_common::version::Version;
18use kellnr_common::webhook::{Webhook, WebhookEvent, WebhookQueue};
19use kellnr_entity::prelude::*;
20use kellnr_entity::{
21    auth_token, crate_author, crate_author_to_crate, crate_category, crate_category_to_crate,
22    crate_group, crate_index, crate_keyword, crate_keyword_to_crate, crate_meta, crate_user,
23    cratesio_crate, cratesio_index, cratesio_meta, doc_queue, group, group_user, krate,
24    oauth2_identity, oauth2_state, owner, session, toolchain, toolchain_component,
25    toolchain_target, user, webhook, webhook_queue,
26};
27use kellnr_migration::iden::{
28    AuthTokenIden, CrateIden, CrateMetaIden, CratesIoIden, CratesIoMetaIden, GroupIden,
29};
30use sea_orm::entity::prelude::Uuid;
31use sea_orm::prelude::async_trait::async_trait;
32use sea_orm::query::{QueryOrder, QuerySelect, TransactionTrait};
33use sea_orm::sea_query::{Alias, Cond, Expr, Iden, JoinType, Order, Query, UnionType};
34use sea_orm::{
35    ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, ExprTrait,
36    FromQueryResult, ModelTrait, QueryFilter, RelationTrait, Set,
37};
38
39use crate::error::DbError;
40use crate::password::{generate_salt, hash_pwd, hash_token};
41use crate::provider::{
42    ChannelInfo, DbResult, OAuth2StateData, PrefetchState, ToolchainComponentInfo,
43    ToolchainTargetInfo, ToolchainWithTargets,
44};
45use crate::tables::init_database;
46use crate::{
47    AuthToken, ConString, CrateMeta, CrateSummary, DbProvider, DocQueueEntry, Group, User,
48};
49
50const DB_DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
51
52pub struct Database {
53    db_con: DatabaseConnection,
54}
55
56impl Database {
57    pub fn existing(db_con: DatabaseConnection) -> Self {
58        Self { db_con }
59    }
60
61    pub async fn new(con: &ConString, max_con: u32) -> Result<Self, DbError> {
62        let db_con = init_database(con, max_con)
63            .await
64            .map_err(|e| DbError::InitializationError(e.to_string()))?;
65
66        if operations::no_user_exists(&db_con).await? {
67            operations::insert_admin_credentials(&db_con, con).await?;
68        }
69
70        Ok(Self { db_con })
71    }
72
73    /// Looks up a user by name; returns the entity model or [`DbError::UserNotFound`].
74    async fn get_user_model(&self, name: &str) -> DbResult<user::Model> {
75        user::Entity::find()
76            .filter(user::Column::Name.eq(name))
77            .one(&self.db_con)
78            .await?
79            .ok_or_else(|| DbError::UserNotFound(name.to_owned()))
80    }
81
82    /// Looks up a krate by normalized name; returns the entity model or [`DbError::CrateNotFound`].
83    async fn get_krate_model(&self, crate_name: &NormalizedName) -> DbResult<krate::Model> {
84        krate::Entity::find()
85            .filter(krate::Column::Name.eq(crate_name))
86            .one(&self.db_con)
87            .await?
88            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))
89    }
90
91    /// Looks up a group by name; returns the entity model or [`DbError::GroupNotFound`].
92    async fn get_group_model(&self, name: &str) -> DbResult<group::Model> {
93        group::Entity::find()
94            .filter(group::Column::Name.eq(name))
95            .one(&self.db_con)
96            .await?
97            .ok_or_else(|| DbError::GroupNotFound(name.to_owned()))
98    }
99
100    /// Loads targets (with their components) for a toolchain and returns a `ToolchainWithTargets`.
101    async fn toolchain_with_targets(&self, tc: toolchain::Model) -> DbResult<ToolchainWithTargets> {
102        let targets = toolchain_target::Entity::find()
103            .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
104            .all(&self.db_con)
105            .await?;
106
107        let mut target_infos = Vec::with_capacity(targets.len());
108        for t in targets {
109            let components = toolchain_component::Entity::find()
110                .filter(toolchain_component::Column::ToolchainTargetFk.eq(t.id))
111                .all(&self.db_con)
112                .await?;
113            target_infos.push(ToolchainTargetInfo {
114                id: t.id,
115                target: t.target,
116                storage_path: t.storage_path,
117                hash: t.hash,
118                size: t.size,
119                status: t.status,
120                components: components
121                    .into_iter()
122                    .map(ToolchainComponentInfo::from)
123                    .collect(),
124            });
125        }
126
127        Ok(ToolchainWithTargets {
128            id: tc.id,
129            name: tc.name,
130            version: tc.version,
131            date: tc.date,
132            channel: tc.channel,
133            created: tc.created,
134            targets: target_infos,
135        })
136    }
137
138    /// Looks up a crate index by name and version; returns the entity model or [`DbError::CrateIndexNotFound`].
139    async fn get_crate_index_model(
140        &self,
141        crate_name: &NormalizedName,
142        version: &Version,
143    ) -> DbResult<crate_index::Model> {
144        crate_index::Entity::find()
145            .filter(crate_index::Column::Name.eq(crate_name))
146            .filter(crate_index::Column::Vers.eq(version))
147            .one(&self.db_con)
148            .await?
149            .ok_or_else(|| DbError::CrateIndexNotFound(crate_name.to_string(), version.to_string()))
150    }
151
152    /// Looks up a toolchain by name and version; returns `None` if not found.
153    async fn get_toolchain_by_name_version(
154        &self,
155        name: &str,
156        version: &str,
157    ) -> DbResult<Option<toolchain::Model>> {
158        toolchain::Entity::find()
159            .filter(toolchain::Column::Name.eq(name))
160            .filter(toolchain::Column::Version.eq(version))
161            .one(&self.db_con)
162            .await
163            .map_err(Into::into)
164    }
165
166    /// Looks up a `crate_group` by crate name and group name; returns `None` if not found.
167    async fn get_crate_group_by_name_and_group(
168        &self,
169        crate_name: &NormalizedName,
170        group: &str,
171    ) -> DbResult<Option<crate_group::Model>> {
172        crate_group::Entity::find()
173            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
174            .join(JoinType::InnerJoin, crate_group::Relation::Group.def())
175            .filter(
176                Cond::all()
177                    .add(krate::Column::Name.eq(crate_name))
178                    .add(group::Column::Name.eq(group)),
179            )
180            .one(&self.db_con)
181            .await
182            .map_err(Into::into)
183    }
184
185    /// Looks up a `group_user` by group name and user name; returns `None` if not found.
186    async fn get_group_user_by_group_and_user(
187        &self,
188        group_name: &str,
189        user: &str,
190    ) -> DbResult<Option<group_user::Model>> {
191        group_user::Entity::find()
192            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
193            .join(JoinType::InnerJoin, group_user::Relation::User.def())
194            .filter(
195                Cond::all()
196                    .add(group::Column::Name.eq(group_name))
197                    .add(user::Column::Name.eq(user)),
198            )
199            .one(&self.db_con)
200            .await
201            .map_err(Into::into)
202    }
203
204    /// Looks up a `crate_user` by crate name and user name; returns `None` if not found.
205    async fn get_crate_user_by_crate_and_user(
206        &self,
207        crate_name: &NormalizedName,
208        user: &str,
209    ) -> DbResult<Option<crate_user::Model>> {
210        crate_user::Entity::find()
211            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
212            .join(JoinType::InnerJoin, crate_user::Relation::User.def())
213            .filter(
214                Cond::all()
215                    .add(krate::Column::Name.eq(crate_name))
216                    .add(user::Column::Name.eq(user)),
217            )
218            .one(&self.db_con)
219            .await
220            .map_err(Into::into)
221    }
222
223    /// Shared implementation for crate overview listing: optional name filter, optional limit/offset, cache union.
224    async fn query_crates(
225        &self,
226        contains: Option<&str>,
227        limit_offset: Option<(u64, u64)>,
228        cache: bool,
229    ) -> DbResult<Vec<CrateOverview>> {
230        let mut query = Query::select();
231        query
232            .expr_as(Expr::col(CrateIden::OriginalName), Alias::new("name"))
233            .expr_as(Expr::col(CrateIden::MaxVersion), Alias::new("version"))
234            .expr_as(Expr::col(CrateIden::LastUpdated), Alias::new("date"))
235            .expr_as(
236                Expr::col(CrateIden::TotalDownloads),
237                Alias::new("total_downloads"),
238            )
239            .expr_as(Expr::col(CrateIden::Description), Alias::new("description"))
240            .expr_as(
241                Expr::col(CrateMetaIden::Documentation),
242                Alias::new("documentation"),
243            )
244            .expr_as(Expr::cust("false"), Alias::new("is_cache"))
245            .from(CrateMetaIden::Table)
246            .inner_join(
247                CrateIden::Table,
248                Expr::col((CrateMetaIden::Table, CrateMetaIden::CrateFk))
249                    .equals((CrateIden::Table, CrateIden::Id)),
250            )
251            .and_where(
252                Expr::col((CrateMetaIden::Table, CrateMetaIden::Version))
253                    .equals((CrateIden::Table, CrateIden::MaxVersion)),
254            );
255
256        // Optional name filter
257        if let Some(contains) = contains {
258            query.and_where(
259                Expr::col((CrateIden::Table, CrateIden::Name)).like(format!("%{contains}%")),
260            );
261        }
262
263        // UNION with cached crates
264        if cache {
265            let mut query2 = Query::select();
266            query2
267                .expr_as(Expr::col(CratesIoIden::OriginalName), Alias::new("name"))
268                .expr_as(Expr::col(CratesIoIden::MaxVersion), Alias::new("version"))
269                .expr_as(Expr::col(CratesIoIden::LastModified), Alias::new("date"))
270                .expr_as(
271                    Expr::col(CratesIoIden::TotalDownloads),
272                    Alias::new("total_downloads"),
273                )
274                .expr_as(
275                    Expr::col(CratesIoIden::Description),
276                    Alias::new("description"),
277                )
278                .expr_as(
279                    Expr::col(CratesIoMetaIden::Documentation),
280                    Alias::new("documentation"),
281                )
282                .expr_as(Expr::cust("true"), Alias::new("is_cache"))
283                .from(CratesIoMetaIden::Table)
284                .inner_join(
285                    CratesIoIden::Table,
286                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::CratesIoFk))
287                        .equals((CratesIoIden::Table, CratesIoIden::Id)),
288                )
289                .and_where(
290                    Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::Version))
291                        .equals((CratesIoIden::Table, CratesIoIden::MaxVersion)),
292                );
293            if let Some(contains) = contains {
294                query2.and_where(
295                    Expr::col((CratesIoIden::Table, CratesIoIden::OriginalName))
296                        .like(format!("%{contains}%")),
297                );
298            }
299            query.union(UnionType::All, query2);
300        }
301
302        // Ordering and optional limit/offset
303        query.order_by(Alias::new("name"), Order::Asc);
304        if let Some((limit, offset)) = limit_offset {
305            query.limit(limit).offset(offset);
306        }
307
308        let builder = self.db_con.get_database_backend();
309        CrateOverview::find_by_statement(builder.build(&query))
310            .all(&self.db_con)
311            .await
312            .map_err(DbError::from)
313    }
314
315    /// Executes a count query `SELECT COUNT(id_column) FROM table` and returns the count as u64.
316    async fn count<T>(&self, table: T, id_column: T, error: DbError) -> DbResult<u64>
317    where
318        T: Iden + Copy,
319    {
320        #[derive(Debug, PartialEq, FromQueryResult)]
321        struct CountResult {
322            count: Option<i64>,
323        }
324
325        let statement = self.db_con.get_database_backend().build(
326            Query::select()
327                .expr_as(Expr::col((table, id_column)).count(), Alias::new("count"))
328                .from(table),
329        );
330        let Some(result) = CountResult::find_by_statement(statement)
331            .one(&self.db_con)
332            .await?
333        else {
334            return Err(error);
335        };
336        result
337            .count
338            .and_then(|v| u64::try_from(v).ok())
339            .ok_or(error)
340    }
341
342    async fn get_webhook_model(&self, id: &str) -> DbResult<webhook::Model> {
343        webhook::Entity::find()
344            .filter(webhook::Column::Id.eq(
345                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
346            ))
347            .one(&self.db_con)
348            .await?
349            .ok_or(DbError::WebhookNotFound)
350    }
351
352    async fn get_webhook_queue_model(&self, id: &str) -> DbResult<webhook_queue::Model> {
353        webhook_queue::Entity::find()
354            .filter(webhook_queue::Column::Id.eq(
355                TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
356            ))
357            .one(&self.db_con)
358            .await?
359            .ok_or(DbError::WebhookNotFound)
360    }
361
362    async fn get_owner_by_crate_and_user(
363        &self,
364        crate_name: &str,
365        owner_name: &str,
366    ) -> DbResult<Option<owner::Model>> {
367        owner::Entity::find()
368            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
369            .join(JoinType::InnerJoin, owner::Relation::User.def())
370            .filter(
371                Cond::all()
372                    .add(krate::Column::Name.eq(crate_name))
373                    .add(user::Column::Name.eq(owner_name)),
374            )
375            .one(&self.db_con)
376            .await
377            .map_err(Into::into)
378    }
379
380    async fn delete_or_not_found<M, A>(&self, model: Option<M>, err: DbError) -> DbResult<()>
381    where
382        M: ModelTrait + sea_orm::IntoActiveModel<A>,
383        A: ActiveModelTrait<Entity = M::Entity> + sea_orm::ActiveModelBehavior + Send,
384    {
385        model.ok_or(err)?.delete(&self.db_con).await?;
386        Ok(())
387    }
388}
389
390fn webhook_model_to_obj(w: webhook::Model) -> DbResult<Webhook> {
391    let event = w
392        .event
393        .as_str()
394        .try_into()
395        .map_err(|_| DbError::InvalidWebhookEvent(w.event))?;
396    Ok(Webhook {
397        id: Some(w.id.into()),
398        name: w.name,
399        event,
400        callback_url: w.callback_url,
401    })
402}
403
404/// Generates an `add_*` method that links two entities via a join table.
405///
406/// # Parameters
407/// - `$method`: The method name to generate (e.g., `add_crate_user_impl`)
408/// - `$relation_entity`: The join table entity module (e.g., `crate_user`)
409/// - `$insert_entity`: The Sea ORM Entity type for insertion (e.g., `CrateUser`)
410/// - `$fk1`: The foreign key field for the first entity
411/// - `$fk2`: The foreign key field for the second entity
412macro_rules! impl_add_relation {
413    ($method:ident, $relation_entity:ident, $insert_entity:ident, $fk1:ident, $fk2:ident) => {
414        async fn $method(&self, fk1: i64, fk2: i64) -> DbResult<()> {
415            let model = $relation_entity::ActiveModel {
416                $fk1: Set(fk1),
417                $fk2: Set(fk2),
418                ..Default::default()
419            };
420            $insert_entity::insert(model).exec(&self.db_con).await?;
421            Ok(())
422        }
423    };
424}
425
426impl Database {
427    impl_add_relation!(
428        add_crate_user_impl,
429        crate_user,
430        CrateUser,
431        crate_fk,
432        user_fk
433    );
434    impl_add_relation!(
435        add_crate_group_impl,
436        crate_group,
437        CrateGroup,
438        crate_fk,
439        group_fk
440    );
441    impl_add_relation!(
442        add_group_user_impl,
443        group_user,
444        GroupUser,
445        group_fk,
446        user_fk
447    );
448    impl_add_relation!(add_owner_impl, owner, Owner, crate_fk, user_fk);
449}
450
451#[async_trait]
452impl DbProvider for Database {
453    async fn get_total_unique_cached_crates(&self) -> DbResult<u64> {
454        self.count(
455            CratesIoIden::Table,
456            CratesIoIden::Id,
457            DbError::FailedToCountCrates,
458        )
459        .await
460    }
461
462    async fn get_total_cached_crate_versions(&self) -> DbResult<u64> {
463        self.count(
464            CratesIoMetaIden::Table,
465            CratesIoMetaIden::Id,
466            DbError::FailedToCountCrateVersions,
467        )
468        .await
469    }
470
471    async fn get_total_cached_downloads(&self) -> DbResult<u64> {
472        #[derive(FromQueryResult)]
473        struct Model {
474            total_downloads: i64,
475        }
476
477        let total_downloads = cratesio_crate::Entity::find()
478            .select_only()
479            .column(cratesio_crate::Column::TotalDownloads)
480            .into_model::<Model>()
481            .all(&self.db_con)
482            .await?;
483
484        Ok(total_downloads
485            .iter()
486            .map(|m| m.total_downloads as u64)
487            .sum())
488    }
489
490    async fn authenticate_user(&self, name: &str, pwd: &str) -> DbResult<User> {
491        let user = self.get_user(name).await?;
492
493        if hash_pwd(pwd, &user.salt) == user.pwd {
494            Ok(user)
495        } else {
496            Err(DbError::PasswordMismatch)
497        }
498    }
499
500    async fn increase_download_counter(
501        &self,
502        crate_name: &NormalizedName,
503        crate_version: &Version,
504    ) -> DbResult<()> {
505        let txn = self.db_con.begin().await?;
506
507        krate::Entity::update_many()
508            .col_expr(
509                krate::Column::TotalDownloads,
510                Expr::col(krate::Column::TotalDownloads).add(1),
511            )
512            .filter(krate::Column::Name.eq(crate_name))
513            .exec(&txn)
514            .await?;
515
516        let crate_id = krate::Entity::find()
517            .filter(krate::Column::Name.eq(crate_name))
518            .one(&txn)
519            .await?
520            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
521            .id;
522
523        crate_meta::Entity::update_many()
524            .col_expr(
525                crate_meta::Column::Downloads,
526                Expr::col(crate_meta::Column::Downloads).add(1),
527            )
528            .filter(
529                Cond::all()
530                    .add(crate_meta::Column::Version.eq(crate_version))
531                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
532            )
533            .exec(&txn)
534            .await?;
535
536        txn.commit().await?;
537        Ok(())
538    }
539
540    async fn increase_cached_download_counter(
541        &self,
542        crate_name: &NormalizedName,
543        crate_version: &Version,
544    ) -> DbResult<()> {
545        let txn = self.db_con.begin().await?;
546
547        cratesio_crate::Entity::update_many()
548            .col_expr(
549                cratesio_crate::Column::TotalDownloads,
550                Expr::col(cratesio_crate::Column::TotalDownloads).add(1),
551            )
552            .filter(cratesio_crate::Column::Name.eq(crate_name))
553            .exec(&txn)
554            .await?;
555
556        let crate_id = cratesio_crate::Entity::find()
557            .filter(cratesio_crate::Column::Name.eq(crate_name))
558            .one(&txn)
559            .await?
560            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
561            .id;
562
563        cratesio_meta::Entity::update_many()
564            .col_expr(
565                cratesio_meta::Column::Downloads,
566                Expr::col(cratesio_meta::Column::Downloads).add(1),
567            )
568            .filter(
569                Cond::all()
570                    .add(cratesio_meta::Column::Version.eq(crate_version))
571                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
572            )
573            .exec(&txn)
574            .await?;
575
576        txn.commit().await?;
577        Ok(())
578    }
579
580    async fn increase_download_counter_by(
581        &self,
582        crate_name: &NormalizedName,
583        crate_version: &Version,
584        count: u64,
585    ) -> DbResult<()> {
586        let txn = self.db_con.begin().await?;
587
588        krate::Entity::update_many()
589            .col_expr(
590                krate::Column::TotalDownloads,
591                Expr::col(krate::Column::TotalDownloads).add(count as i64),
592            )
593            .filter(krate::Column::Name.eq(crate_name))
594            .exec(&txn)
595            .await?;
596
597        let crate_id = krate::Entity::find()
598            .filter(krate::Column::Name.eq(crate_name))
599            .one(&txn)
600            .await?
601            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
602            .id;
603
604        crate_meta::Entity::update_many()
605            .col_expr(
606                crate_meta::Column::Downloads,
607                Expr::col(crate_meta::Column::Downloads).add(count as i64),
608            )
609            .filter(
610                Cond::all()
611                    .add(crate_meta::Column::Version.eq(crate_version))
612                    .add(crate_meta::Column::CrateFk.eq(crate_id)),
613            )
614            .exec(&txn)
615            .await?;
616
617        txn.commit().await?;
618        Ok(())
619    }
620
621    async fn increase_cached_download_counter_by(
622        &self,
623        crate_name: &NormalizedName,
624        crate_version: &Version,
625        count: u64,
626    ) -> DbResult<()> {
627        let txn = self.db_con.begin().await?;
628
629        cratesio_crate::Entity::update_many()
630            .col_expr(
631                cratesio_crate::Column::TotalDownloads,
632                Expr::col(cratesio_crate::Column::TotalDownloads).add(count as i64),
633            )
634            .filter(cratesio_crate::Column::Name.eq(crate_name))
635            .exec(&txn)
636            .await?;
637
638        let crate_id = cratesio_crate::Entity::find()
639            .filter(cratesio_crate::Column::Name.eq(crate_name))
640            .one(&txn)
641            .await?
642            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?
643            .id;
644
645        cratesio_meta::Entity::update_many()
646            .col_expr(
647                cratesio_meta::Column::Downloads,
648                Expr::col(cratesio_meta::Column::Downloads).add(count as i64),
649            )
650            .filter(
651                Cond::all()
652                    .add(cratesio_meta::Column::Version.eq(crate_version))
653                    .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
654            )
655            .exec(&txn)
656            .await?;
657
658        txn.commit().await?;
659        Ok(())
660    }
661
662    async fn get_last_updated_crate(&self) -> DbResult<Option<(OriginalName, Version)>> {
663        let krate = krate::Entity::find()
664            .order_by_desc(krate::Column::LastUpdated)
665            .one(&self.db_con)
666            .await?;
667
668        if let Some(krate) = krate {
669            // SAFETY: Unchecked is ok, as only valid crate names are inserted into the database
670            let name = OriginalName::from_unchecked(krate.original_name);
671            // SAFETY: Unchecked is ok, as only valid versions are inserted into the database
672            let version = Version::from_unchecked_str(&krate.max_version);
673            Ok(Some((name, version)))
674        } else {
675            Ok(None)
676        }
677    }
678
679    async fn validate_session(&self, session_token: &str) -> DbResult<(String, bool)> {
680        let u = user::Entity::find()
681            .join(JoinType::InnerJoin, user::Relation::Session.def())
682            .filter(session::Column::Token.eq(session_token))
683            .one(&self.db_con)
684            .await?
685            .ok_or(DbError::SessionNotFound)?;
686
687        Ok((u.name, u.is_admin))
688    }
689
690    async fn add_session_token(&self, name: &str, session_token: &str) -> DbResult<()> {
691        let user = self.get_user(name).await?;
692        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
693
694        let s = session::ActiveModel {
695            token: Set(session_token.to_owned()),
696            created: Set(created),
697            user_fk: Set(user.id as i64),
698            ..Default::default()
699        };
700
701        s.insert(&self.db_con).await?;
702        Ok(())
703    }
704
705    async fn add_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
706        let crate_fk = self.get_krate_model(crate_name).await?.id;
707        let user_fk = self.get_user_model(user).await?.id;
708        self.add_crate_user_impl(crate_fk, user_fk).await
709    }
710
711    async fn add_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
712        let crate_fk = self.get_krate_model(crate_name).await?.id;
713        let group_fk = self.get_group_model(group).await?.id;
714        self.add_crate_group_impl(crate_fk, group_fk).await
715    }
716
717    async fn add_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
718        let group_fk = self.get_group_model(group_name).await?.id;
719        let user_fk = self.get_user_model(user).await?.id;
720        self.add_group_user_impl(group_fk, user_fk).await
721    }
722
723    async fn add_owner(&self, crate_name: &NormalizedName, owner: &str) -> DbResult<()> {
724        let crate_fk = self.get_krate_model(crate_name).await?.id;
725        let user_fk = self.get_user_model(owner).await?.id;
726        self.add_owner_impl(crate_fk, user_fk).await
727    }
728
729    async fn is_download_restricted(&self, crate_name: &NormalizedName) -> DbResult<bool> {
730        Ok(krate::Entity::find()
731            .filter(krate::Column::Name.eq(crate_name))
732            .one(&self.db_con)
733            .await?
734            .is_some_and(|model| model.restricted_download))
735    }
736
737    async fn change_download_restricted(
738        &self,
739        crate_name: &NormalizedName,
740        restricted: bool,
741    ) -> DbResult<()> {
742        let mut krate: krate::ActiveModel = self.get_krate_model(crate_name).await?.into();
743        krate.restricted_download = Set(restricted);
744        krate.update(&self.db_con).await?;
745        Ok(())
746    }
747
748    async fn is_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
749        Ok(self
750            .get_crate_user_by_crate_and_user(crate_name, user)
751            .await?
752            .is_some())
753    }
754
755    async fn is_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<bool> {
756        Ok(self
757            .get_crate_group_by_name_and_group(crate_name, group)
758            .await?
759            .is_some())
760    }
761
762    async fn is_crate_group_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
763        let user = user::Entity::find()
764            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
765            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
766            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
767            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
768            .filter(
769                Cond::all()
770                    .add(krate::Column::Name.eq(crate_name))
771                    .add(user::Column::Name.eq(user)),
772            )
773            .one(&self.db_con)
774            .await?;
775        Ok(user.is_some())
776    }
777
778    async fn is_group_user(&self, group_name: &str, user: &str) -> DbResult<bool> {
779        Ok(self
780            .get_group_user_by_group_and_user(group_name, user)
781            .await?
782            .is_some())
783    }
784
785    async fn is_owner(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
786        let owner = owner::Entity::find()
787            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
788            .join(JoinType::InnerJoin, owner::Relation::User.def())
789            .filter(
790                Cond::all()
791                    .add(krate::Column::Name.eq(crate_name))
792                    .add(user::Column::Name.eq(user)),
793            )
794            .one(&self.db_con)
795            .await?;
796
797        Ok(owner.is_some())
798    }
799
800    async fn get_crate_id(&self, crate_name: &NormalizedName) -> DbResult<Option<i64>> {
801        let id = krate::Entity::find()
802            .filter(krate::Column::Name.eq(crate_name))
803            .one(&self.db_con)
804            .await?
805            .map(|model| model.id);
806
807        Ok(id)
808    }
809
810    async fn get_crate_owners(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
811        let u = user::Entity::find()
812            .join(JoinType::InnerJoin, user::Relation::Owner.def())
813            .join(JoinType::InnerJoin, owner::Relation::Krate.def())
814            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
815            .all(&self.db_con)
816            .await?;
817
818        Ok(u.into_iter().map(User::from).collect())
819    }
820
821    async fn get_crate_users(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
822        let u = user::Entity::find()
823            .join(JoinType::InnerJoin, user::Relation::CrateUser.def())
824            .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
825            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
826            .all(&self.db_con)
827            .await?;
828
829        Ok(u.into_iter().map(User::from).collect())
830    }
831
832    async fn get_crate_groups(&self, crate_name: &NormalizedName) -> DbResult<Vec<Group>> {
833        let u = group::Entity::find()
834            .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
835            .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
836            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
837            .all(&self.db_con)
838            .await?;
839
840        Ok(u.into_iter().map(Group::from).collect())
841    }
842
843    async fn get_group_users(&self, group_name: &str) -> DbResult<Vec<User>> {
844        let u = user::Entity::find()
845            .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
846            .join(JoinType::InnerJoin, group_user::Relation::Group.def())
847            .filter(Expr::col((GroupIden::Table, group::Column::Name)).eq(group_name))
848            .all(&self.db_con)
849            .await?;
850
851        Ok(u.into_iter().map(User::from).collect())
852    }
853
854    async fn get_crate_versions(&self, crate_name: &NormalizedName) -> DbResult<Vec<Version>> {
855        let u = crate_meta::Entity::find()
856            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
857            .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
858            .all(&self.db_con)
859            .await?;
860
861        Ok(u.into_iter()
862            .map(|meta| Version::from_unchecked_str(&meta.version))
863            .collect())
864    }
865
866    async fn delete_session_token(&self, session_token: &str) -> DbResult<()> {
867        if let Some(s) = session::Entity::find()
868            .filter(session::Column::Token.eq(session_token))
869            .one(&self.db_con)
870            .await?
871        {
872            s.delete(&self.db_con).await?;
873        }
874
875        Ok(())
876    }
877
878    async fn delete_user(&self, user_name: &str) -> DbResult<()> {
879        self.get_user_model(user_name)
880            .await?
881            .delete(&self.db_con)
882            .await?;
883        Ok(())
884    }
885
886    async fn delete_group(&self, group_name: &str) -> DbResult<()> {
887        self.get_group_model(group_name)
888            .await?
889            .delete(&self.db_con)
890            .await?;
891        Ok(())
892    }
893
894    async fn change_pwd(&self, user_name: &str, new_pwd: &str) -> DbResult<()> {
895        let salt = generate_salt();
896        let hashed = hash_pwd(new_pwd, &salt);
897
898        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
899        u.pwd = Set(hashed);
900        u.salt = Set(salt);
901
902        u.update(&self.db_con).await?;
903        Ok(())
904    }
905
906    async fn change_read_only_state(&self, user_name: &str, state: bool) -> DbResult<()> {
907        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
908        u.is_read_only = Set(state);
909
910        u.update(&self.db_con).await?;
911        Ok(())
912    }
913
914    async fn change_admin_state(&self, user_name: &str, state: bool) -> DbResult<()> {
915        let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
916        u.is_admin = Set(state);
917
918        u.update(&self.db_con).await?;
919        Ok(())
920    }
921
922    async fn crate_version_exists(&self, crate_id: i64, version: &str) -> DbResult<bool> {
923        let cm = crate_meta::Entity::find()
924            .filter(
925                Cond::all()
926                    .add(crate_meta::Column::CrateFk.eq(crate_id))
927                    .add(crate_meta::Column::Version.eq(version)),
928            )
929            .one(&self.db_con)
930            .await?;
931
932        Ok(cm.is_some())
933    }
934
935    async fn get_max_version_from_id(&self, crate_id: i64) -> DbResult<Version> {
936        operations::get_max_version_from_id(&self.db_con, crate_id).await
937    }
938
939    async fn get_max_version_from_name(&self, crate_name: &NormalizedName) -> DbResult<Version> {
940        let k = self.get_krate_model(crate_name).await?;
941        let v = Version::try_from(&k.max_version)
942            .map_err(|_| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
943        Ok(v)
944    }
945
946    async fn update_max_version(&self, crate_id: i64, version: &Version) -> DbResult<()> {
947        let krate = krate::Entity::find_by_id(crate_id)
948            .one(&self.db_con)
949            .await?
950            .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
951
952        let mut k: krate::ActiveModel = krate.into();
953        k.max_version = Set(version.to_string());
954        k.update(&self.db_con).await?;
955
956        Ok(())
957    }
958
959    async fn add_auth_token(&self, name: &str, token: &str, user: &str) -> DbResult<()> {
960        let hashed_token = hash_token(token);
961        let user = self.get_user_model(user).await?;
962
963        let at = auth_token::ActiveModel {
964            name: Set(name.to_owned()),
965            token: Set(hashed_token),
966            user_fk: Set(user.id),
967            ..Default::default()
968        };
969
970        at.insert(&self.db_con).await?;
971
972        Ok(())
973    }
974
975    async fn get_user_from_token(&self, token: &str) -> DbResult<User> {
976        let token = hash_token(token);
977
978        let u = user::Entity::find()
979            .join(JoinType::InnerJoin, user::Relation::AuthToken.def())
980            .filter(Expr::col((AuthTokenIden::Table, AuthTokenIden::Token)).eq(token))
981            .one(&self.db_con)
982            .await?
983            .ok_or(DbError::TokenNotFound)?;
984
985        Ok(User::from(u))
986    }
987
988    async fn get_user(&self, name: &str) -> DbResult<User> {
989        Ok(User::from(self.get_user_model(name).await?))
990    }
991
992    async fn get_group(&self, name: &str) -> DbResult<Group> {
993        Ok(Group::from(self.get_group_model(name).await?))
994    }
995
996    async fn get_auth_tokens(&self, user_name: &str) -> DbResult<Vec<AuthToken>> {
997        let at: Vec<auth_token::Model> = auth_token::Entity::find()
998            .join(JoinType::InnerJoin, auth_token::Relation::User.def())
999            .filter(user::Column::Name.eq(user_name))
1000            .all(&self.db_con)
1001            .await?;
1002
1003        Ok(at.into_iter().map(AuthToken::from).collect())
1004    }
1005
1006    async fn delete_auth_token(&self, id: i32) -> DbResult<()> {
1007        auth_token::Entity::delete_by_id(id as i64)
1008            .exec(&self.db_con)
1009            .await?;
1010        Ok(())
1011    }
1012
1013    async fn delete_owner(&self, crate_name: &str, owner: &str) -> DbResult<()> {
1014        let model = self.get_owner_by_crate_and_user(crate_name, owner).await?;
1015        self.delete_or_not_found(model, DbError::OwnerNotFound(owner.to_string()))
1016            .await
1017    }
1018
1019    async fn delete_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
1020        let model = self
1021            .get_crate_user_by_crate_and_user(crate_name, user)
1022            .await?;
1023        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
1024            .await
1025    }
1026
1027    async fn delete_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
1028        let model = self
1029            .get_crate_group_by_name_and_group(crate_name, group)
1030            .await?;
1031        self.delete_or_not_found(model, DbError::GroupNotFound(group.to_string()))
1032            .await
1033    }
1034
1035    async fn delete_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
1036        let model = self
1037            .get_group_user_by_group_and_user(group_name, user)
1038            .await?;
1039        self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
1040            .await
1041    }
1042
1043    async fn add_user(
1044        &self,
1045        name: &str,
1046        pwd: &str,
1047        salt: &str,
1048        is_admin: bool,
1049        is_read_only: bool,
1050    ) -> DbResult<()> {
1051        let hashed_pwd = hash_pwd(pwd, salt);
1052        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1053
1054        let u = user::ActiveModel {
1055            name: Set(name.to_owned()),
1056            pwd: Set(hashed_pwd),
1057            salt: Set(salt.to_owned()),
1058            is_admin: Set(is_admin),
1059            is_read_only: Set(is_read_only),
1060            created: Set(created),
1061            ..Default::default()
1062        };
1063
1064        u.insert(&self.db_con).await?;
1065        Ok(())
1066    }
1067
1068    async fn add_group(&self, name: &str) -> DbResult<()> {
1069        let g = group::ActiveModel {
1070            name: Set(name.to_owned()),
1071            ..Default::default()
1072        };
1073
1074        g.insert(&self.db_con).await?;
1075        Ok(())
1076    }
1077
1078    async fn get_users(&self) -> DbResult<Vec<User>> {
1079        let users = user::Entity::find()
1080            .order_by_asc(user::Column::Name)
1081            .all(&self.db_con)
1082            .await?;
1083
1084        Ok(users.into_iter().map(User::from).collect())
1085    }
1086
1087    async fn get_groups(&self) -> DbResult<Vec<Group>> {
1088        let groups = group::Entity::find()
1089            .order_by_asc(group::Column::Name)
1090            .all(&self.db_con)
1091            .await?;
1092
1093        Ok(groups.into_iter().map(Group::from).collect())
1094    }
1095
1096    async fn get_total_unique_crates(&self) -> DbResult<u64> {
1097        self.count(
1098            CrateIden::Table,
1099            CrateIden::Id,
1100            DbError::FailedToCountCrates,
1101        )
1102        .await
1103    }
1104
1105    async fn get_total_crate_versions(&self) -> DbResult<u64> {
1106        self.count(
1107            CrateMetaIden::Table,
1108            CrateMetaIden::Id,
1109            DbError::FailedToCountCrateVersions,
1110        )
1111        .await
1112    }
1113
1114    async fn get_total_downloads(&self) -> DbResult<u64> {
1115        #[derive(FromQueryResult)]
1116        struct Model {
1117            total_downloads: i64,
1118        }
1119
1120        Ok(krate::Entity::find()
1121            .select_only()
1122            .column(krate::Column::TotalDownloads)
1123            .into_model::<Model>()
1124            .all(&self.db_con)
1125            .await?
1126            .into_iter()
1127            .map(|m| m.total_downloads as u64)
1128            .sum())
1129    }
1130
1131    async fn get_top_crates_downloads(&self, top: u64) -> DbResult<Vec<(String, u64)>> {
1132        #[derive(Debug, PartialEq, FromQueryResult)]
1133        struct SelectResult {
1134            original_name: String,
1135            total_downloads: i64,
1136        }
1137
1138        let stmt = self.db_con.get_database_backend().build(
1139            Query::select()
1140                .columns(vec![CrateIden::OriginalName, CrateIden::TotalDownloads])
1141                .from(CrateIden::Table)
1142                .order_by(CrateIden::TotalDownloads, Order::Desc)
1143                .limit(top),
1144        );
1145
1146        Ok(SelectResult::find_by_statement(stmt)
1147            .all(&self.db_con)
1148            .await?
1149            .into_iter()
1150            .map(|x| (x.original_name, x.total_downloads as u64))
1151            .collect())
1152    }
1153
1154    async fn get_crate_summaries(&self) -> DbResult<Vec<CrateSummary>> {
1155        let krates = krate::Entity::find()
1156            .order_by(krate::Column::Name, Order::Asc)
1157            .all(&self.db_con)
1158            .await?;
1159
1160        Ok(krates.into_iter().map(CrateSummary::from).collect())
1161    }
1162
1163    async fn add_doc_queue(
1164        &self,
1165        krate: &NormalizedName,
1166        version: &Version,
1167        path: &Path,
1168    ) -> DbResult<()> {
1169        let s = doc_queue::ActiveModel {
1170            krate: Set(krate.to_string()),
1171            version: Set(version.to_string()),
1172            // FIXME: Convert Path to String properly, handle errors
1173            path: Set(path.to_string_lossy().to_string()),
1174            ..Default::default()
1175        };
1176
1177        s.insert(&self.db_con).await?;
1178        Ok(())
1179    }
1180
1181    async fn delete_doc_queue(&self, id: i64) -> DbResult<()> {
1182        DocQueue::delete_by_id(id).exec(&self.db_con).await?;
1183        Ok(())
1184    }
1185
1186    async fn get_doc_queue(&self) -> DbResult<Vec<DocQueueEntry>> {
1187        let entities = DocQueue::find().all(&self.db_con).await?;
1188
1189        Ok(entities.into_iter().map(DocQueueEntry::from).collect())
1190    }
1191
1192    async fn delete_crate(&self, krate: &NormalizedName, version: &Version) -> DbResult<()> {
1193        let txn = self.db_con.begin().await?;
1194
1195        // Delete the entry from the "crate_meta" table
1196        let crate_meta_version = crate_meta::Entity::find()
1197            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1198            .filter(krate::Column::Name.eq(krate))
1199            .filter(crate_meta::Column::Version.eq(version))
1200            .one(&txn)
1201            .await?
1202            .ok_or_else(|| DbError::CrateMetaNotFound(krate.to_string(), version.to_string()))?;
1203        let crate_id = crate_meta_version.crate_fk;
1204        let current_max_version = operations::get_max_version_from_id(&txn, crate_id).await?;
1205        crate_meta_version.delete(&txn).await?;
1206
1207        // Delete the crate index entry from "crate_index" table
1208        let crate_index_version = crate_index::Entity::find()
1209            .join(JoinType::InnerJoin, crate_index::Relation::Krate.def())
1210            .filter(krate::Column::Name.eq(krate))
1211            .filter(crate_index::Column::Vers.eq(version))
1212            .one(&txn)
1213            .await?
1214            .ok_or_else(|| DbError::CrateIndexNotFound(krate.to_string(), version.to_string()))?;
1215        crate_index_version.delete(&txn).await?;
1216
1217        // If it was the last entry in the "crate_meta" table, delete the entry
1218        // in the "crate" table as well
1219        let crate_meta_rows = crate_meta::Entity::find()
1220            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1221            .filter(krate::Column::Name.eq(krate))
1222            .all(&txn)
1223            .await?;
1224
1225        if crate_meta_rows.is_empty() {
1226            krate::Entity::delete_many()
1227                .filter(krate::Column::Name.eq(krate))
1228                .exec(&txn)
1229                .await?;
1230        } else {
1231            let c = krate::Entity::find_by_id(crate_id)
1232                .one(&txn)
1233                .await?
1234                .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
1235            let mut c: krate::ActiveModel = c.into();
1236
1237            // Update the max. version if the deleted version was the max. version.
1238            if version == &current_max_version {
1239                let new_max_version = crate_meta_rows
1240                    .into_iter()
1241                    .map(|cm| Version::from_unchecked_str(&cm.version))
1242                    .max()
1243                    .unwrap(); // Safe to unwrap, as crate_meta_rows is not empty
1244                c.max_version = Set(new_max_version.into_inner());
1245            }
1246            // Update the ETag value of the crate index.
1247            let etag = operations::compute_etag(&txn, krate, crate_id).await?;
1248            c.e_tag = Set(etag);
1249            c.update(&txn).await?;
1250        }
1251
1252        txn.commit().await?;
1253
1254        Ok(())
1255    }
1256
1257    async fn get_crate_meta_list(&self, crate_name: &NormalizedName) -> DbResult<Vec<CrateMeta>> {
1258        let crate_meta = crate_meta::Entity::find()
1259            .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1260            .filter(krate::Column::Name.eq(crate_name))
1261            .all(&self.db_con)
1262            .await?;
1263
1264        let crate_meta = crate_meta
1265            .into_iter()
1266            .map(|cm| CrateMeta {
1267                name: crate_name.to_string(),
1268                id: cm.id,
1269                version: cm.version,
1270                created: cm.created,
1271                downloads: cm.downloads,
1272                crate_fk: cm.crate_fk,
1273            })
1274            .collect();
1275
1276        Ok(crate_meta)
1277    }
1278
1279    async fn update_last_updated(&self, id: i64, last_updated: &DateTime<Utc>) -> DbResult<()> {
1280        let krate = krate::Entity::find_by_id(id)
1281            .one(&self.db_con)
1282            .await?
1283            .ok_or(DbError::CrateNotFoundWithId(id))?;
1284
1285        let date = last_updated.format(DB_DATE_FORMAT).to_string();
1286
1287        let mut krate: krate::ActiveModel = krate.into();
1288        krate.last_updated = Set(date);
1289        krate.update(&self.db_con).await?;
1290
1291        Ok(())
1292    }
1293
1294    async fn search_in_crate_name(
1295        &self,
1296        contains: &str,
1297        cache: bool,
1298    ) -> DbResult<Vec<CrateOverview>> {
1299        self.query_crates(Some(contains), None, cache).await
1300    }
1301
1302    async fn get_crate_overview_list(
1303        &self,
1304        limit: u64,
1305        offset: u64,
1306        cache: bool,
1307    ) -> DbResult<Vec<CrateOverview>> {
1308        self.query_crates(None, Some((limit, offset)), cache).await
1309    }
1310
1311    async fn get_crate_data(&self, crate_name: &NormalizedName) -> DbResult<CrateData> {
1312        let krate = self.get_krate_model(crate_name).await?;
1313
1314        let owners: Vec<String> = krate
1315            .find_related(owner::Entity)
1316            .find_also_related(user::Entity)
1317            .all(&self.db_con)
1318            .await?
1319            .into_iter()
1320            .filter_map(|(_, v)| v.map(|v| v.name))
1321            .collect();
1322        let categories: Vec<String> = krate
1323            .find_related(crate_category_to_crate::Entity)
1324            .find_also_related(crate_category::Entity)
1325            .all(&self.db_con)
1326            .await?
1327            .into_iter()
1328            .filter_map(|(_, v)| v.map(|v| v.category))
1329            .collect();
1330        let keywords: Vec<String> = krate
1331            .find_related(crate_keyword_to_crate::Entity)
1332            .find_also_related(crate_keyword::Entity)
1333            .all(&self.db_con)
1334            .await?
1335            .into_iter()
1336            .filter_map(|(_, v)| v.map(|v| v.keyword))
1337            .collect();
1338        let authors: Vec<String> = krate
1339            .find_related(crate_author_to_crate::Entity)
1340            .find_also_related(crate_author::Entity)
1341            .all(&self.db_con)
1342            .await?
1343            .into_iter()
1344            .filter_map(|(_, v)| v.map(|v| v.author))
1345            .collect();
1346        let crate_metas = krate
1347            .find_related(crate_meta::Entity)
1348            .all(&self.db_con)
1349            .await?;
1350        let crate_indices = krate
1351            .find_related(crate_index::Entity)
1352            .all(&self.db_con)
1353            .await?;
1354
1355        let mut versions = Vec::new();
1356        for cm in crate_metas {
1357            let ci = crate_indices
1358                .iter()
1359                .find(|ci| ci.vers == cm.version)
1360                .ok_or_else(|| {
1361                    DbError::CrateIndexNotFound(krate.name.clone(), cm.version.clone())
1362                })?;
1363            let dependencies: Vec<CrateRegistryDep> = match ci.deps.clone() {
1364                Some(deps) => {
1365                    let ix = serde_json::from_value::<Vec<IndexDep>>(deps)
1366                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1367
1368                    let mut ft = Vec::new();
1369
1370                    let dep_descs = operations::get_desc_for_deps(&self.db_con, ix.iter()).await?;
1371
1372                    for dep in &ix {
1373                        ft.push(CrateRegistryDep::from_index(
1374                            dep_descs.get(&dep.name).cloned(),
1375                            dep.clone(),
1376                        ));
1377                    }
1378
1379                    ft
1380                }
1381                None => Vec::default(),
1382            };
1383            let features: BTreeMap<String, Vec<String>> = match ci.features.clone() {
1384                Some(features) => serde_json::from_value::<BTreeMap<String, Vec<String>>>(features)
1385                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?,
1386                None => BTreeMap::default(),
1387            };
1388
1389            versions.push(CrateVersionData {
1390                version: cm.version,
1391                created: cm.created,
1392                downloads: cm.downloads,
1393                readme: cm.readme,
1394                license: cm.license,
1395                license_file: cm.license_file,
1396                documentation: cm.documentation,
1397                dependencies,
1398                checksum: ci.cksum.clone(),
1399                features,
1400                yanked: ci.yanked,
1401                links: ci.links.clone(),
1402                v: ci.v,
1403            });
1404        }
1405        versions.sort_by(|a, b| {
1406            Version::from_unchecked_str(&b.version).cmp(&Version::from_unchecked_str(&a.version))
1407        });
1408
1409        let crate_data = CrateData {
1410            name: krate.original_name,
1411            owners,
1412            max_version: krate.max_version,
1413            total_downloads: krate.total_downloads,
1414            last_updated: krate.last_updated,
1415            homepage: krate.homepage,
1416            description: krate.description,
1417            repository: krate.repository,
1418            categories,
1419            keywords,
1420            authors,
1421            versions,
1422        };
1423
1424        Ok(crate_data)
1425    }
1426
1427    async fn add_empty_crate(&self, name: &str, created: &DateTime<Utc>) -> DbResult<i64> {
1428        let created = created.format(DB_DATE_FORMAT).to_string();
1429        let normalized_name = NormalizedName::from(
1430            OriginalName::try_from(name)
1431                .map_err(|_| DbError::InvalidCrateName(name.to_string()))?,
1432        );
1433        let krate = krate::ActiveModel {
1434            id: ActiveValue::default(),
1435            name: Set(normalized_name.to_string()),
1436            original_name: Set(name.to_string()),
1437            max_version: Set("0.0.0".to_string()),
1438            last_updated: Set(created),
1439            total_downloads: Set(0),
1440            homepage: Set(None),
1441            description: Set(None),
1442            repository: Set(None),
1443            e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1444            restricted_download: Set(false),
1445        };
1446        Ok(krate.insert(&self.db_con).await?.id)
1447    }
1448
1449    async fn add_crate(
1450        &self,
1451        pub_metadata: &PublishMetadata,
1452        cksum: &str,
1453        created: &DateTime<Utc>,
1454        owner: &str,
1455    ) -> DbResult<i64> {
1456        let created = created.format(DB_DATE_FORMAT).to_string();
1457        let normalized_name = NormalizedName::from(
1458            OriginalName::try_from(&pub_metadata.name)
1459                .map_err(|_| DbError::InvalidCrateName(pub_metadata.name.clone()))?,
1460        );
1461
1462        let existing = krate::Entity::find()
1463            .filter(krate::Column::Name.eq(&pub_metadata.name))
1464            .one(&self.db_con)
1465            .await?;
1466
1467        let txn = self.db_con.begin().await?;
1468
1469        let crate_id = if let Some(krate) = existing {
1470            let krate_id = krate.id;
1471            let max_version = max(
1472                parse_db_version(&krate.max_version)?,
1473                parse_db_version(&pub_metadata.vers)?,
1474            );
1475
1476            let mut krate: krate::ActiveModel = krate.into();
1477            krate.last_updated = Set(created.clone());
1478            krate.max_version = Set(max_version.into_inner());
1479            krate.homepage = Set(pub_metadata.homepage.clone());
1480            krate.description = Set(pub_metadata.description.clone());
1481            krate.repository = Set(pub_metadata.repository.clone());
1482            krate.e_tag = Set(String::new()); // Set to empty string, as it can be computed, when the crate index is inserted
1483            krate.update(&txn).await?;
1484            krate_id
1485        } else {
1486            let krate = krate::ActiveModel {
1487                id: ActiveValue::default(),
1488                name: Set(normalized_name.to_string()),
1489                original_name: Set(pub_metadata.name.clone()),
1490                max_version: Set(pub_metadata.vers.clone()),
1491                last_updated: Set(created.clone()),
1492                total_downloads: Set(0),
1493                homepage: Set(pub_metadata.homepage.clone()),
1494                description: Set(pub_metadata.description.clone()),
1495                repository: Set(pub_metadata.repository.clone()),
1496                e_tag: Set(String::new()), // Set to empty string, as it can be computed, when the crate index is inserted
1497                restricted_download: Set(false),
1498            };
1499            let krate = krate.insert(&txn).await?;
1500            krate.id
1501        };
1502
1503        operations::add_owner_if_not_exists(&txn, owner, crate_id).await?;
1504        operations::add_crate_metadata(&txn, pub_metadata, &created, crate_id).await?;
1505        operations::add_crate_index(&txn, pub_metadata, cksum, crate_id).await?;
1506        operations::update_etag(&txn, &pub_metadata.name, crate_id).await?;
1507        operations::update_crate_categories(&txn, pub_metadata, crate_id).await?;
1508        operations::update_crate_keywords(&txn, pub_metadata, crate_id).await?;
1509        operations::update_crate_authors(&txn, pub_metadata, crate_id).await?;
1510
1511        txn.commit().await?;
1512        Ok(crate_id)
1513    }
1514
1515    async fn update_docs_link(
1516        &self,
1517        crate_name: &NormalizedName,
1518        version: &Version,
1519        docs_link: &str,
1520    ) -> DbResult<()> {
1521        let (cm, _c) = crate_meta::Entity::find()
1522            .find_also_related(krate::Entity)
1523            .filter(
1524                Cond::all()
1525                    .add(krate::Column::Name.eq(crate_name))
1526                    .add(crate_meta::Column::Version.eq(version)),
1527            )
1528            .one(&self.db_con)
1529            .await?
1530            .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
1531
1532        let mut cm: crate_meta::ActiveModel = cm.into();
1533        cm.documentation = Set(Some(docs_link.to_string()));
1534        cm.update(&self.db_con).await?;
1535        Ok(())
1536    }
1537
1538    async fn add_crate_metadata(
1539        &self,
1540        pub_metadata: &PublishMetadata,
1541        created: &str,
1542        crate_id: i64,
1543    ) -> DbResult<()> {
1544        operations::add_crate_metadata(&self.db_con, pub_metadata, created, crate_id).await
1545    }
1546
1547    async fn get_prefetch_data(&self, crate_name: &str) -> DbResult<Prefetch> {
1548        let mut krate = krate::Entity::find()
1549            .filter(krate::Column::Name.eq(crate_name))
1550            .find_with_related(crate_index::Entity)
1551            .all(&self.db_con)
1552            .await?
1553            .into_iter();
1554
1555        // Expecting only one crate with its related indices
1556        let (Some((krate, crate_indices)), None) = (krate.next(), krate.next()) else {
1557            return Err(DbError::CrateNotFound(crate_name.to_string()));
1558        };
1559
1560        let index_metadata =
1561            operations::crate_index_model_to_index_metadata(crate_name, crate_indices)?;
1562        let data = operations::index_metadata_to_bytes(&index_metadata)?;
1563
1564        Ok(Prefetch {
1565            data,
1566            etag: krate.e_tag,
1567            last_modified: krate.last_updated,
1568        })
1569    }
1570
1571    async fn is_cratesio_cache_up_to_date(
1572        &self,
1573        crate_name: &NormalizedName,
1574        etag: Option<String>,
1575        last_modified: Option<String>,
1576    ) -> DbResult<PrefetchState> {
1577        let Some(krate) = cratesio_crate::Entity::find()
1578            .filter(cratesio_crate::Column::Name.eq(crate_name))
1579            .one(&self.db_con)
1580            .await?
1581        else {
1582            return Ok(PrefetchState::NotFound);
1583        };
1584
1585        let needs_update = match (etag, last_modified) {
1586            (Some(etag), Some(last_modified)) => {
1587                krate.e_tag != etag || krate.last_modified != last_modified
1588            }
1589            (Some(etag), None) => krate.e_tag != etag,
1590            (None, Some(last_modified)) => krate.last_modified != last_modified,
1591            (None, None) => true,
1592        };
1593
1594        if !needs_update {
1595            Ok(PrefetchState::UpToDate)
1596        } else {
1597            let crate_indices = krate
1598                .find_related(cratesio_index::Entity)
1599                .all(&self.db_con)
1600                .await?;
1601            let index_metadata =
1602                operations::cratesio_index_model_to_index_metadata(crate_name, crate_indices)?;
1603            let data = operations::index_metadata_to_bytes(&index_metadata)?;
1604
1605            Ok(PrefetchState::NeedsUpdate(Prefetch {
1606                data,
1607                etag: krate.e_tag.clone(),
1608                last_modified: krate.last_modified,
1609            }))
1610        }
1611    }
1612
1613    async fn add_cratesio_prefetch_data(
1614        &self,
1615        crate_name: &OriginalName,
1616        etag: &str,
1617        last_modified: &str,
1618        description: Option<String>,
1619        indices: &[IndexMetadata],
1620    ) -> DbResult<Prefetch> {
1621        let normalized_name = crate_name.to_normalized();
1622
1623        let max_version = indices
1624            .iter()
1625            .map(|i| Version::from_unchecked_str(&i.vers))
1626            .max()
1627            .ok_or_else(|| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
1628
1629        // Use a transaction to ensure the etag update and all index inserts are
1630        // atomic. Without this, a partial failure would leave the crate record
1631        // with the latest etag but incomplete index data, causing stale cache
1632        // entries that can never self-heal (the background updater would get 304
1633        // from crates.io because the etag matches).
1634        let txn = self.db_con.begin().await?;
1635
1636        let krate = cratesio_crate::Entity::find()
1637            .filter(cratesio_crate::Column::Name.eq(&normalized_name))
1638            .one(&txn)
1639            .await?;
1640
1641        let krate = if let Some(krate) = krate {
1642            let mut krate: cratesio_crate::ActiveModel = krate.into();
1643            krate.e_tag = Set(etag.to_string());
1644            krate.last_modified = Set(last_modified.to_string());
1645            krate.max_version = Set(max_version.into_inner());
1646            krate.update(&txn).await?
1647        } else {
1648            let krate = cratesio_crate::ActiveModel {
1649                id: ActiveValue::default(),
1650                name: Set(normalized_name.to_string()),
1651                original_name: Set(crate_name.to_string()),
1652                description: Set(description),
1653                e_tag: Set(etag.to_string()),
1654                last_modified: Set(last_modified.to_string()),
1655                total_downloads: Set(0),
1656                max_version: Set(max_version.to_string()),
1657            };
1658            krate.insert(&txn).await?
1659        };
1660
1661        let current_indices = cratesio_index::Entity::find()
1662            .filter(cratesio_index::Column::CratesIoFk.eq(krate.id))
1663            .all(&txn)
1664            .await?;
1665
1666        for index in indices {
1667            // Check if the version was yanked or un-yanked and update if so.
1668            if let Some(current_index) = current_indices.iter().find(|ci| index.vers == ci.vers) {
1669                if index.yanked != current_index.yanked {
1670                    let mut ci: cratesio_index::ActiveModel = current_index.to_owned().into();
1671                    ci.yanked = Set(index.yanked);
1672                    ci.update(&txn).await?;
1673                }
1674            } else {
1675                let deps = if index.deps.is_empty() {
1676                    None
1677                } else {
1678                    let deps = serde_json::to_value(&index.deps)
1679                        .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1680                    Some(deps)
1681                };
1682
1683                let features = serde_json::to_value(&index.features)
1684                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1685
1686                let features2 = serde_json::to_value(&index.features2)
1687                    .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1688
1689                let new_index = cratesio_index::ActiveModel {
1690                    id: ActiveValue::default(),
1691                    name: Set(index.name.clone()),
1692                    vers: Set(index.vers.clone()),
1693                    deps: Set(deps),
1694                    cksum: Set(index.cksum.clone()),
1695                    features: Set(Some(features)),
1696                    features2: Set(Some(features2)),
1697                    yanked: Set(index.yanked),
1698                    links: Set(index.links.clone()),
1699                    pubtime: Set(index.pubtime.map(|dt| dt.naive_utc())),
1700                    v: Set(index.v.unwrap_or(1) as i32),
1701                    crates_io_fk: Set(krate.id),
1702                };
1703
1704                new_index.insert(&txn).await?;
1705
1706                // Add the meta data for the crate version.
1707                let meta = cratesio_meta::ActiveModel {
1708                    id: ActiveValue::default(),
1709                    version: Set(index.vers.clone()),
1710                    downloads: Set(0),
1711                    crates_io_fk: Set(krate.id),
1712                    documentation: Set(Some(format!(
1713                        "https://docs.rs/{normalized_name}/{}",
1714                        index.vers,
1715                    ))),
1716                };
1717
1718                meta.insert(&txn).await?;
1719            }
1720        }
1721
1722        txn.commit().await?;
1723
1724        Ok(Prefetch {
1725            data: operations::index_metadata_to_bytes(indices)?,
1726            etag: etag.to_string(),
1727            last_modified: last_modified.to_string(),
1728        })
1729    }
1730
1731    async fn get_cratesio_index_update_list(&self) -> DbResult<Vec<CratesioPrefetchMsg>> {
1732        let crates = cratesio_crate::Entity::find().all(&self.db_con).await?;
1733        let msgs = crates
1734            .into_iter()
1735            .map(|krate| {
1736                CratesioPrefetchMsg::Update(UpdateData {
1737                    name: OriginalName::from_unchecked(krate.original_name),
1738                    etag: Some(krate.e_tag),
1739                    last_modified: Some(krate.last_modified),
1740                })
1741            })
1742            .collect();
1743        Ok(msgs)
1744    }
1745
1746    async fn unyank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1747        let mut ci: crate_index::ActiveModel = self
1748            .get_crate_index_model(crate_name, version)
1749            .await?
1750            .into();
1751        ci.yanked = Set(false);
1752        ci.save(&self.db_con).await?;
1753        Ok(())
1754    }
1755
1756    async fn yank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1757        let mut ci: crate_index::ActiveModel = self
1758            .get_crate_index_model(crate_name, version)
1759            .await?
1760            .into();
1761        ci.yanked = Set(true);
1762        ci.save(&self.db_con).await?;
1763        Ok(())
1764    }
1765
1766    async fn register_webhook(&self, webhook: Webhook) -> DbResult<String> {
1767        let w = webhook::ActiveModel {
1768            event: Set(Into::<&str>::into(webhook.event).to_string()),
1769            callback_url: Set(webhook.callback_url),
1770            name: Set(webhook.name),
1771            ..Default::default()
1772        };
1773
1774        let w: webhook::Model = w.insert(&self.db_con).await?;
1775        Ok(w.id.to_string())
1776    }
1777
1778    async fn delete_webhook(&self, id: &str) -> DbResult<()> {
1779        self.get_webhook_model(id)
1780            .await?
1781            .delete(&self.db_con)
1782            .await?;
1783        Ok(())
1784    }
1785
1786    async fn get_webhook(&self, id: &str) -> DbResult<Webhook> {
1787        webhook_model_to_obj(self.get_webhook_model(id).await?)
1788    }
1789
1790    async fn get_all_webhooks(&self) -> DbResult<Vec<Webhook>> {
1791        Ok(webhook::Entity::find()
1792            .all(&self.db_con)
1793            .await?
1794            .into_iter()
1795            .filter_map(|w| webhook_model_to_obj(w).ok())
1796            .collect())
1797    }
1798
1799    async fn add_webhook_queue(
1800        &self,
1801        event: WebhookEvent,
1802        payload: serde_json::Value,
1803    ) -> DbResult<()> {
1804        let w = webhook::Entity::find()
1805            .filter(webhook::Column::Event.eq(Into::<&str>::into(event)))
1806            .all(&self.db_con)
1807            .await?;
1808
1809        if w.is_empty() {
1810            return Ok(());
1811        }
1812
1813        let now = Utc::now();
1814
1815        let entries = w.into_iter().map(|w| webhook_queue::ActiveModel {
1816            webhook_fk: Set(w.id),
1817            payload: Set(payload.clone()),
1818            next_attempt: Set(now.into()),
1819            last_attempt: Set(None),
1820            ..Default::default()
1821        });
1822
1823        webhook_queue::Entity::insert_many(entries)
1824            .exec(&self.db_con)
1825            .await?;
1826        Ok(())
1827    }
1828    async fn get_pending_webhook_queue_entries(
1829        &self,
1830        timestamp: DateTime<Utc>,
1831    ) -> DbResult<Vec<WebhookQueue>> {
1832        let w = webhook_queue::Entity::find()
1833            .find_with_related(webhook::Entity)
1834            .filter(webhook_queue::Column::NextAttempt.lte(timestamp))
1835            .all(&self.db_con)
1836            .await?;
1837
1838        Ok(w.into_iter()
1839            .filter_map(|w| {
1840                Some(WebhookQueue {
1841                    id: w.0.id.to_string(),
1842                    callback_url: w.1.first()?.callback_url.clone(),
1843                    payload: w.0.payload,
1844                    last_attempt: w.0.last_attempt.map(Into::into),
1845                    next_attempt: w.0.next_attempt.into(),
1846                })
1847            })
1848            .collect())
1849    }
1850
1851    async fn update_webhook_queue(
1852        &self,
1853        id: &str,
1854        last_attempt: DateTime<Utc>,
1855        next_attempt: DateTime<Utc>,
1856    ) -> DbResult<()> {
1857        let mut w: webhook_queue::ActiveModel = self.get_webhook_queue_model(id).await?.into();
1858        w.last_attempt = Set(Some(last_attempt.into()));
1859        w.next_attempt = Set(next_attempt.into());
1860        w.update(&self.db_con).await?;
1861        Ok(())
1862    }
1863
1864    async fn delete_webhook_queue(&self, id: &str) -> DbResult<()> {
1865        self.get_webhook_queue_model(id)
1866            .await?
1867            .delete(&self.db_con)
1868            .await?;
1869        Ok(())
1870    }
1871
1872    // OAuth2 identity methods
1873
1874    async fn get_user_by_oauth2_identity(
1875        &self,
1876        issuer: &str,
1877        subject: &str,
1878    ) -> DbResult<Option<User>> {
1879        let identity = oauth2_identity::Entity::find()
1880            .filter(oauth2_identity::Column::ProviderIssuer.eq(issuer))
1881            .filter(oauth2_identity::Column::Subject.eq(subject))
1882            .one(&self.db_con)
1883            .await?;
1884
1885        if let Some(identity) = identity {
1886            let u = user::Entity::find_by_id(identity.user_fk)
1887                .one(&self.db_con)
1888                .await?
1889                .ok_or_else(|| DbError::UserNotFound(format!("user_id={}", identity.user_fk)))?;
1890
1891            Ok(Some(User::from(u)))
1892        } else {
1893            Ok(None)
1894        }
1895    }
1896
1897    async fn create_oauth2_user(
1898        &self,
1899        username: &str,
1900        issuer: &str,
1901        subject: &str,
1902        email: Option<String>,
1903        is_admin: bool,
1904        is_read_only: bool,
1905    ) -> DbResult<User> {
1906        // Use a transaction to ensure atomicity
1907        let txn = self.db_con.begin().await?;
1908
1909        // Generate a random password and salt for OAuth2 users
1910        // They won't use password auth, but we need valid values
1911        let salt = generate_salt();
1912        let random_pwd = Uuid::new_v4().to_string();
1913        let hashed_pwd = hash_pwd(&random_pwd, &salt);
1914        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1915
1916        // Create the user
1917        let new_user = user::ActiveModel {
1918            name: Set(username.to_string()),
1919            pwd: Set(hashed_pwd),
1920            salt: Set(salt),
1921            is_admin: Set(is_admin),
1922            is_read_only: Set(is_read_only),
1923            created: Set(created.clone()),
1924            ..Default::default()
1925        };
1926
1927        let res = user::Entity::insert(new_user).exec(&txn).await?;
1928        let user_id = res.last_insert_id;
1929
1930        // Link the OAuth2 identity
1931        let identity = oauth2_identity::ActiveModel {
1932            user_fk: Set(user_id),
1933            provider_issuer: Set(issuer.to_string()),
1934            subject: Set(subject.to_string()),
1935            email: Set(email),
1936            created: Set(created.clone()),
1937            ..Default::default()
1938        };
1939
1940        oauth2_identity::Entity::insert(identity).exec(&txn).await?;
1941
1942        txn.commit().await?;
1943
1944        Ok(User {
1945            id: user_id as i32,
1946            name: username.to_string(),
1947            pwd: String::new(),  // Don't expose password hash
1948            salt: String::new(), // Don't expose salt
1949            is_admin,
1950            is_read_only,
1951            created,
1952        })
1953    }
1954
1955    async fn link_oauth2_identity(
1956        &self,
1957        user_id: i64,
1958        issuer: &str,
1959        subject: &str,
1960        email: Option<String>,
1961    ) -> DbResult<()> {
1962        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1963        let identity = oauth2_identity::ActiveModel {
1964            user_fk: Set(user_id),
1965            provider_issuer: Set(issuer.to_string()),
1966            subject: Set(subject.to_string()),
1967            email: Set(email),
1968            created: Set(created),
1969            ..Default::default()
1970        };
1971
1972        oauth2_identity::Entity::insert(identity)
1973            .exec(&self.db_con)
1974            .await?;
1975
1976        Ok(())
1977    }
1978
1979    // OAuth2 state methods (CSRF/PKCE during auth flow)
1980
1981    async fn store_oauth2_state(
1982        &self,
1983        state: &str,
1984        pkce_verifier: &str,
1985        nonce: &str,
1986    ) -> DbResult<()> {
1987        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1988        let s = oauth2_state::ActiveModel {
1989            state: Set(state.to_string()),
1990            pkce_verifier: Set(pkce_verifier.to_string()),
1991            nonce: Set(nonce.to_string()),
1992            created: Set(created),
1993            ..Default::default()
1994        };
1995
1996        oauth2_state::Entity::insert(s).exec(&self.db_con).await?;
1997        Ok(())
1998    }
1999
2000    async fn get_and_delete_oauth2_state(&self, state: &str) -> DbResult<Option<OAuth2StateData>> {
2001        let s = oauth2_state::Entity::find()
2002            .filter(oauth2_state::Column::State.eq(state))
2003            .one(&self.db_con)
2004            .await?;
2005
2006        if let Some(s) = s {
2007            let data = OAuth2StateData {
2008                state: s.state.clone(),
2009                pkce_verifier: s.pkce_verifier.clone(),
2010                nonce: s.nonce.clone(),
2011            };
2012
2013            // Delete after retrieving (single use)
2014            s.delete(&self.db_con).await?;
2015
2016            Ok(Some(data))
2017        } else {
2018            Ok(None)
2019        }
2020    }
2021
2022    async fn cleanup_expired_oauth2_states(&self) -> DbResult<u64> {
2023        // States older than 10 minutes are expired
2024        let expiry = Utc::now() - chrono::Duration::minutes(10);
2025        let expiry_str = expiry.format(DB_DATE_FORMAT).to_string();
2026
2027        let result = oauth2_state::Entity::delete_many()
2028            .filter(oauth2_state::Column::Created.lt(expiry_str))
2029            .exec(&self.db_con)
2030            .await?;
2031
2032        Ok(result.rows_affected)
2033    }
2034
2035    async fn is_username_available(&self, username: &str) -> DbResult<bool> {
2036        let existing = user::Entity::find()
2037            .filter(user::Column::Name.eq(username))
2038            .one(&self.db_con)
2039            .await?;
2040
2041        Ok(existing.is_none())
2042    }
2043
2044    // Toolchain distribution methods
2045
2046    async fn add_toolchain(
2047        &self,
2048        name: &str,
2049        version: &str,
2050        date: &str,
2051        channel: Option<String>,
2052    ) -> DbResult<i64> {
2053        let created = Utc::now().format(DB_DATE_FORMAT).to_string();
2054
2055        let model = toolchain::ActiveModel {
2056            id: ActiveValue::NotSet,
2057            name: Set(name.to_string()),
2058            version: Set(version.to_string()),
2059            date: Set(date.to_string()),
2060            channel: Set(channel),
2061            created: Set(created),
2062        };
2063
2064        let result = model.insert(&self.db_con).await?;
2065        Ok(result.id)
2066    }
2067
2068    async fn add_toolchain_target(
2069        &self,
2070        toolchain_id: i64,
2071        target: &str,
2072        storage_path: &str,
2073        hash: &str,
2074        size: i64,
2075    ) -> DbResult<i64> {
2076        let model = toolchain_target::ActiveModel {
2077            id: ActiveValue::NotSet,
2078            toolchain_fk: Set(toolchain_id),
2079            target: Set(target.to_string()),
2080            storage_path: Set(storage_path.to_string()),
2081            hash: Set(hash.to_string()),
2082            size: Set(size),
2083            status: Set("processing".to_string()),
2084        };
2085
2086        let result = model.insert(&self.db_con).await?;
2087        Ok(result.id)
2088    }
2089
2090    async fn set_target_status(&self, target_id: i64, status: &str) -> DbResult<()> {
2091        toolchain_target::Entity::update_many()
2092            .col_expr(toolchain_target::Column::Status, Expr::value(status))
2093            .filter(toolchain_target::Column::Id.eq(target_id))
2094            .exec(&self.db_con)
2095            .await?;
2096        Ok(())
2097    }
2098
2099    async fn add_toolchain_component(
2100        &self,
2101        target_id: i64,
2102        name: &str,
2103        storage_path: &str,
2104        hash: &str,
2105        size: i64,
2106    ) -> DbResult<()> {
2107        let model = toolchain_component::ActiveModel {
2108            id: ActiveValue::NotSet,
2109            toolchain_target_fk: Set(target_id),
2110            name: Set(name.to_string()),
2111            storage_path: Set(storage_path.to_string()),
2112            hash: Set(hash.to_string()),
2113            size: Set(size),
2114        };
2115
2116        model.insert(&self.db_con).await?;
2117        Ok(())
2118    }
2119
2120    async fn get_toolchain_by_channel(
2121        &self,
2122        channel: &str,
2123    ) -> DbResult<Option<ToolchainWithTargets>> {
2124        let tc = toolchain::Entity::find()
2125            .filter(toolchain::Column::Channel.eq(channel))
2126            .one(&self.db_con)
2127            .await?;
2128
2129        Ok(match tc {
2130            Some(tc) => Some(self.toolchain_with_targets(tc).await?),
2131            None => None,
2132        })
2133    }
2134
2135    async fn get_toolchain_by_version(
2136        &self,
2137        name: &str,
2138        version: &str,
2139    ) -> DbResult<Option<ToolchainWithTargets>> {
2140        Ok(
2141            match self.get_toolchain_by_name_version(name, version).await? {
2142                Some(tc) => Some(self.toolchain_with_targets(tc).await?),
2143                None => None,
2144            },
2145        )
2146    }
2147
2148    async fn list_toolchains(&self) -> DbResult<Vec<ToolchainWithTargets>> {
2149        let toolchains = toolchain::Entity::find()
2150            .order_by_desc(toolchain::Column::Created)
2151            .all(&self.db_con)
2152            .await?;
2153
2154        let mut result = Vec::with_capacity(toolchains.len());
2155
2156        for tc in toolchains {
2157            result.push(self.toolchain_with_targets(tc).await?);
2158        }
2159
2160        Ok(result)
2161    }
2162
2163    async fn delete_toolchain(&self, name: &str, version: &str) -> DbResult<()> {
2164        toolchain::Entity::delete_many()
2165            .filter(toolchain::Column::Name.eq(name))
2166            .filter(toolchain::Column::Version.eq(version))
2167            .exec(&self.db_con)
2168            .await?;
2169
2170        Ok(())
2171    }
2172
2173    async fn delete_toolchain_target(
2174        &self,
2175        name: &str,
2176        version: &str,
2177        target: &str,
2178    ) -> DbResult<()> {
2179        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2180            toolchain_target::Entity::delete_many()
2181                .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
2182                .filter(toolchain_target::Column::Target.eq(target))
2183                .exec(&self.db_con)
2184                .await?;
2185        }
2186
2187        Ok(())
2188    }
2189
2190    async fn set_channel(&self, channel: &str, name: &str, version: &str) -> DbResult<()> {
2191        // First, clear the channel from any other toolchain that has it
2192        let toolchains_with_channel = toolchain::Entity::find()
2193            .filter(toolchain::Column::Channel.eq(channel))
2194            .all(&self.db_con)
2195            .await?;
2196
2197        for tc in toolchains_with_channel {
2198            let mut model: toolchain::ActiveModel = tc.into();
2199            model.channel = Set(None);
2200            model.update(&self.db_con).await?;
2201        }
2202
2203        // Then set the channel on the target toolchain
2204        if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2205            let mut model: toolchain::ActiveModel = tc.into();
2206            model.channel = Set(Some(channel.to_string()));
2207            model.update(&self.db_con).await?;
2208        }
2209
2210        Ok(())
2211    }
2212
2213    async fn get_channels(&self) -> DbResult<Vec<ChannelInfo>> {
2214        let toolchains = toolchain::Entity::find()
2215            .filter(toolchain::Column::Channel.is_not_null())
2216            .all(&self.db_con)
2217            .await?;
2218
2219        Ok(toolchains
2220            .into_iter()
2221            .filter_map(|tc| {
2222                tc.channel.map(|channel| ChannelInfo {
2223                    name: channel,
2224                    version: tc.version,
2225                    date: tc.date,
2226                })
2227            })
2228            .collect())
2229    }
2230}
2231
2232fn parse_db_version(value: &str) -> DbResult<Version> {
2233    Version::try_from(value).map_err(|_| DbError::InvalidVersion(value.to_owned()))
2234}