1mod operations;
2pub mod test_utils;
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::path::Path;
7
8use chrono::{DateTime, Utc};
9use kellnr_common::crate_data::{CrateData, CrateRegistryDep, CrateVersionData};
10use kellnr_common::crate_overview::CrateOverview;
11use kellnr_common::cratesio_prefetch_msg::{CratesioPrefetchMsg, UpdateData};
12use kellnr_common::index_metadata::{IndexDep, IndexMetadata};
13use kellnr_common::normalized_name::NormalizedName;
14use kellnr_common::original_name::OriginalName;
15use kellnr_common::prefetch::Prefetch;
16use kellnr_common::publish_metadata::PublishMetadata;
17use kellnr_common::version::Version;
18use kellnr_common::webhook::{Webhook, WebhookEvent, WebhookQueue};
19use kellnr_entity::prelude::*;
20use kellnr_entity::{
21 auth_token, crate_author, crate_author_to_crate, crate_category, crate_category_to_crate,
22 crate_group, crate_index, crate_keyword, crate_keyword_to_crate, crate_meta, crate_user,
23 cratesio_crate, cratesio_index, cratesio_meta, doc_queue, group, group_user, krate,
24 oauth2_identity, oauth2_state, owner, session, toolchain, toolchain_target, user, webhook,
25 webhook_queue,
26};
27use kellnr_migration::iden::{
28 AuthTokenIden, CrateIden, CrateMetaIden, CratesIoIden, CratesIoMetaIden, GroupIden,
29};
30use sea_orm::entity::prelude::Uuid;
31use sea_orm::prelude::async_trait::async_trait;
32use sea_orm::query::{QueryOrder, QuerySelect, TransactionTrait};
33use sea_orm::sea_query::{Alias, Cond, Expr, Iden, JoinType, Order, Query, UnionType};
34use sea_orm::{
35 ActiveModelTrait, ActiveValue, ColumnTrait, DatabaseConnection, EntityTrait, ExprTrait,
36 FromQueryResult, ModelTrait, QueryFilter, RelationTrait, Set,
37};
38
39use crate::error::DbError;
40use crate::password::{generate_salt, hash_pwd, hash_token};
41use crate::provider::{
42 ChannelInfo, DbResult, OAuth2StateData, PrefetchState, ToolchainWithTargets,
43};
44use crate::tables::init_database;
45use crate::{
46 AuthToken, ConString, CrateMeta, CrateSummary, DbProvider, DocQueueEntry, Group, User,
47};
48
49const DB_DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
50
51pub struct Database {
52 db_con: DatabaseConnection,
53}
54
55impl Database {
56 pub fn existing(db_con: DatabaseConnection) -> Self {
57 Self { db_con }
58 }
59
60 pub async fn new(con: &ConString, max_con: u32) -> Result<Self, DbError> {
61 let db_con = init_database(con, max_con)
62 .await
63 .map_err(|e| DbError::InitializationError(e.to_string()))?;
64
65 if operations::no_user_exists(&db_con).await? {
66 operations::insert_admin_credentials(&db_con, con).await?;
67 }
68
69 Ok(Self { db_con })
70 }
71
72 async fn get_user_model(&self, name: &str) -> DbResult<user::Model> {
74 user::Entity::find()
75 .filter(user::Column::Name.eq(name))
76 .one(&self.db_con)
77 .await?
78 .ok_or_else(|| DbError::UserNotFound(name.to_owned()))
79 }
80
81 async fn get_krate_model(&self, crate_name: &NormalizedName) -> DbResult<krate::Model> {
83 krate::Entity::find()
84 .filter(krate::Column::Name.eq(crate_name))
85 .one(&self.db_con)
86 .await?
87 .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))
88 }
89
90 async fn get_group_model(&self, name: &str) -> DbResult<group::Model> {
92 group::Entity::find()
93 .filter(group::Column::Name.eq(name))
94 .one(&self.db_con)
95 .await?
96 .ok_or_else(|| DbError::GroupNotFound(name.to_owned()))
97 }
98
99 async fn toolchain_with_targets(&self, tc: toolchain::Model) -> DbResult<ToolchainWithTargets> {
101 let targets = toolchain_target::Entity::find()
102 .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
103 .all(&self.db_con)
104 .await?;
105 Ok(ToolchainWithTargets::from_model(tc, targets))
106 }
107
108 async fn get_crate_index_model(
110 &self,
111 crate_name: &NormalizedName,
112 version: &Version,
113 ) -> DbResult<crate_index::Model> {
114 crate_index::Entity::find()
115 .filter(crate_index::Column::Name.eq(crate_name))
116 .filter(crate_index::Column::Vers.eq(version))
117 .one(&self.db_con)
118 .await?
119 .ok_or_else(|| DbError::CrateIndexNotFound(crate_name.to_string(), version.to_string()))
120 }
121
122 async fn get_toolchain_by_name_version(
124 &self,
125 name: &str,
126 version: &str,
127 ) -> DbResult<Option<toolchain::Model>> {
128 toolchain::Entity::find()
129 .filter(toolchain::Column::Name.eq(name))
130 .filter(toolchain::Column::Version.eq(version))
131 .one(&self.db_con)
132 .await
133 .map_err(Into::into)
134 }
135
136 async fn get_crate_group_by_name_and_group(
138 &self,
139 crate_name: &NormalizedName,
140 group: &str,
141 ) -> DbResult<Option<crate_group::Model>> {
142 crate_group::Entity::find()
143 .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
144 .join(JoinType::InnerJoin, crate_group::Relation::Group.def())
145 .filter(
146 Cond::all()
147 .add(krate::Column::Name.eq(crate_name))
148 .add(group::Column::Name.eq(group)),
149 )
150 .one(&self.db_con)
151 .await
152 .map_err(Into::into)
153 }
154
155 async fn get_group_user_by_group_and_user(
157 &self,
158 group_name: &str,
159 user: &str,
160 ) -> DbResult<Option<group_user::Model>> {
161 group_user::Entity::find()
162 .join(JoinType::InnerJoin, group_user::Relation::Group.def())
163 .join(JoinType::InnerJoin, group_user::Relation::User.def())
164 .filter(
165 Cond::all()
166 .add(group::Column::Name.eq(group_name))
167 .add(user::Column::Name.eq(user)),
168 )
169 .one(&self.db_con)
170 .await
171 .map_err(Into::into)
172 }
173
174 async fn get_crate_user_by_crate_and_user(
176 &self,
177 crate_name: &NormalizedName,
178 user: &str,
179 ) -> DbResult<Option<crate_user::Model>> {
180 crate_user::Entity::find()
181 .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
182 .join(JoinType::InnerJoin, crate_user::Relation::User.def())
183 .filter(
184 Cond::all()
185 .add(krate::Column::Name.eq(crate_name))
186 .add(user::Column::Name.eq(user)),
187 )
188 .one(&self.db_con)
189 .await
190 .map_err(Into::into)
191 }
192
193 async fn query_crates(
195 &self,
196 contains: Option<&str>,
197 limit_offset: Option<(u64, u64)>,
198 cache: bool,
199 ) -> DbResult<Vec<CrateOverview>> {
200 let mut query = Query::select();
201 query
202 .expr_as(Expr::col(CrateIden::OriginalName), Alias::new("name"))
203 .expr_as(Expr::col(CrateIden::MaxVersion), Alias::new("version"))
204 .expr_as(Expr::col(CrateIden::LastUpdated), Alias::new("date"))
205 .expr_as(
206 Expr::col(CrateIden::TotalDownloads),
207 Alias::new("total_downloads"),
208 )
209 .expr_as(Expr::col(CrateIden::Description), Alias::new("description"))
210 .expr_as(
211 Expr::col(CrateMetaIden::Documentation),
212 Alias::new("documentation"),
213 )
214 .expr_as(Expr::cust("false"), Alias::new("is_cache"))
215 .from(CrateMetaIden::Table)
216 .inner_join(
217 CrateIden::Table,
218 Expr::col((CrateMetaIden::Table, CrateMetaIden::CrateFk))
219 .equals((CrateIden::Table, CrateIden::Id)),
220 )
221 .and_where(
222 Expr::col((CrateMetaIden::Table, CrateMetaIden::Version))
223 .equals((CrateIden::Table, CrateIden::MaxVersion)),
224 );
225
226 if let Some(contains) = contains {
228 query.and_where(
229 Expr::col((CrateIden::Table, CrateIden::Name)).like(format!("%{contains}%")),
230 );
231 }
232
233 if cache {
235 let mut query2 = Query::select();
236 query2
237 .expr_as(Expr::col(CratesIoIden::OriginalName), Alias::new("name"))
238 .expr_as(Expr::col(CratesIoIden::MaxVersion), Alias::new("version"))
239 .expr_as(Expr::col(CratesIoIden::LastModified), Alias::new("date"))
240 .expr_as(
241 Expr::col(CratesIoIden::TotalDownloads),
242 Alias::new("total_downloads"),
243 )
244 .expr_as(
245 Expr::col(CratesIoIden::Description),
246 Alias::new("description"),
247 )
248 .expr_as(
249 Expr::col(CratesIoMetaIden::Documentation),
250 Alias::new("documentation"),
251 )
252 .expr_as(Expr::cust("true"), Alias::new("is_cache"))
253 .from(CratesIoMetaIden::Table)
254 .inner_join(
255 CratesIoIden::Table,
256 Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::CratesIoFk))
257 .equals((CratesIoIden::Table, CratesIoIden::Id)),
258 )
259 .and_where(
260 Expr::col((CratesIoMetaIden::Table, CratesIoMetaIden::Version))
261 .equals((CratesIoIden::Table, CratesIoIden::MaxVersion)),
262 );
263 if let Some(contains) = contains {
264 query2.and_where(
265 Expr::col((CratesIoIden::Table, CratesIoIden::OriginalName))
266 .like(format!("%{contains}%")),
267 );
268 }
269 query.union(UnionType::All, query2);
270 }
271
272 query.order_by(Alias::new("name"), Order::Asc);
274 if let Some((limit, offset)) = limit_offset {
275 query.limit(limit).offset(offset);
276 }
277
278 let builder = self.db_con.get_database_backend();
279 CrateOverview::find_by_statement(builder.build(&query))
280 .all(&self.db_con)
281 .await
282 .map_err(DbError::from)
283 }
284
285 async fn count<T>(&self, table: T, id_column: T, error: DbError) -> DbResult<u64>
287 where
288 T: Iden + Copy,
289 {
290 #[derive(Debug, PartialEq, FromQueryResult)]
291 struct CountResult {
292 count: Option<i64>,
293 }
294
295 let statement = self.db_con.get_database_backend().build(
296 Query::select()
297 .expr_as(Expr::col((table, id_column)).count(), Alias::new("count"))
298 .from(table),
299 );
300 let Some(result) = CountResult::find_by_statement(statement)
301 .one(&self.db_con)
302 .await?
303 else {
304 return Err(error);
305 };
306 result
307 .count
308 .and_then(|v| u64::try_from(v).ok())
309 .ok_or(error)
310 }
311
312 async fn get_webhook_model(&self, id: &str) -> DbResult<webhook::Model> {
313 webhook::Entity::find()
314 .filter(webhook::Column::Id.eq(
315 TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
316 ))
317 .one(&self.db_con)
318 .await?
319 .ok_or(DbError::WebhookNotFound)
320 }
321
322 async fn get_webhook_queue_model(&self, id: &str) -> DbResult<webhook_queue::Model> {
323 webhook_queue::Entity::find()
324 .filter(webhook_queue::Column::Id.eq(
325 TryInto::<Uuid>::try_into(id).map_err(|_| DbError::InvalidId(id.to_string()))?,
326 ))
327 .one(&self.db_con)
328 .await?
329 .ok_or(DbError::WebhookNotFound)
330 }
331
332 async fn get_owner_by_crate_and_user(
333 &self,
334 crate_name: &str,
335 owner_name: &str,
336 ) -> DbResult<Option<owner::Model>> {
337 owner::Entity::find()
338 .join(JoinType::InnerJoin, owner::Relation::Krate.def())
339 .join(JoinType::InnerJoin, owner::Relation::User.def())
340 .filter(
341 Cond::all()
342 .add(krate::Column::Name.eq(crate_name))
343 .add(user::Column::Name.eq(owner_name)),
344 )
345 .one(&self.db_con)
346 .await
347 .map_err(Into::into)
348 }
349
350 async fn delete_or_not_found<M, A>(&self, model: Option<M>, err: DbError) -> DbResult<()>
351 where
352 M: ModelTrait + sea_orm::IntoActiveModel<A>,
353 A: ActiveModelTrait<Entity = M::Entity> + sea_orm::ActiveModelBehavior + Send,
354 {
355 model.ok_or(err)?.delete(&self.db_con).await?;
356 Ok(())
357 }
358}
359
360fn webhook_model_to_obj(w: webhook::Model) -> DbResult<Webhook> {
361 let event = w
362 .event
363 .as_str()
364 .try_into()
365 .map_err(|_| DbError::InvalidWebhookEvent(w.event))?;
366 Ok(Webhook {
367 id: Some(w.id.into()),
368 name: w.name,
369 event,
370 callback_url: w.callback_url,
371 })
372}
373
374macro_rules! impl_add_relation {
383 ($method:ident, $relation_entity:ident, $insert_entity:ident, $fk1:ident, $fk2:ident) => {
384 async fn $method(&self, fk1: i64, fk2: i64) -> DbResult<()> {
385 let model = $relation_entity::ActiveModel {
386 $fk1: Set(fk1),
387 $fk2: Set(fk2),
388 ..Default::default()
389 };
390 $insert_entity::insert(model).exec(&self.db_con).await?;
391 Ok(())
392 }
393 };
394}
395
396impl Database {
397 impl_add_relation!(
398 add_crate_user_impl,
399 crate_user,
400 CrateUser,
401 crate_fk,
402 user_fk
403 );
404 impl_add_relation!(
405 add_crate_group_impl,
406 crate_group,
407 CrateGroup,
408 crate_fk,
409 group_fk
410 );
411 impl_add_relation!(
412 add_group_user_impl,
413 group_user,
414 GroupUser,
415 group_fk,
416 user_fk
417 );
418 impl_add_relation!(add_owner_impl, owner, Owner, crate_fk, user_fk);
419}
420
421#[async_trait]
422impl DbProvider for Database {
423 async fn get_total_unique_cached_crates(&self) -> DbResult<u64> {
424 self.count(
425 CratesIoIden::Table,
426 CratesIoIden::Id,
427 DbError::FailedToCountCrates,
428 )
429 .await
430 }
431
432 async fn get_total_cached_crate_versions(&self) -> DbResult<u64> {
433 self.count(
434 CratesIoMetaIden::Table,
435 CratesIoMetaIden::Id,
436 DbError::FailedToCountCrateVersions,
437 )
438 .await
439 }
440
441 async fn get_total_cached_downloads(&self) -> DbResult<u64> {
442 #[derive(FromQueryResult)]
443 struct Model {
444 total_downloads: i64,
445 }
446
447 let total_downloads = cratesio_crate::Entity::find()
448 .select_only()
449 .column(cratesio_crate::Column::TotalDownloads)
450 .into_model::<Model>()
451 .all(&self.db_con)
452 .await?;
453
454 Ok(total_downloads
455 .iter()
456 .map(|m| m.total_downloads as u64)
457 .sum())
458 }
459
460 async fn authenticate_user(&self, name: &str, pwd: &str) -> DbResult<User> {
461 let user = self.get_user(name).await?;
462
463 if hash_pwd(pwd, &user.salt) == user.pwd {
464 Ok(user)
465 } else {
466 Err(DbError::PasswordMismatch)
467 }
468 }
469
470 async fn increase_download_counter(
471 &self,
472 crate_name: &NormalizedName,
473 crate_version: &Version,
474 ) -> DbResult<()> {
475 let krate = self.get_krate_model(crate_name).await?;
476 let crate_id = krate.id;
477 let crate_total_downloads = krate.total_downloads;
478
479 let mut k: krate::ActiveModel = krate.into();
481 k.total_downloads = Set(crate_total_downloads + 1);
482 k.update(&self.db_con).await?;
483
484 crate_meta::Entity::update_many()
486 .col_expr(
487 crate_meta::Column::Downloads,
488 Expr::col(crate_meta::Column::Downloads).add(1),
489 )
490 .filter(
491 Cond::all()
492 .add(crate_meta::Column::Version.eq(crate_version))
493 .add(crate_meta::Column::CrateFk.eq(crate_id)),
494 )
495 .exec(&self.db_con)
496 .await?;
497
498 Ok(())
499 }
500
501 async fn increase_cached_download_counter(
502 &self,
503 crate_name: &NormalizedName,
504 crate_version: &Version,
505 ) -> DbResult<()> {
506 let krate: cratesio_crate::Model = cratesio_crate::Entity::find()
507 .filter(cratesio_crate::Column::Name.eq(crate_name))
508 .one(&self.db_con)
509 .await?
510 .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
511 let crate_id = krate.id;
512 let crate_total_downloads = krate.total_downloads;
513
514 let mut k: cratesio_crate::ActiveModel = krate.into();
516 k.total_downloads = Set(crate_total_downloads + 1);
517 k.update(&self.db_con).await?;
518
519 cratesio_meta::Entity::update_many()
521 .col_expr(
522 cratesio_meta::Column::Downloads,
523 Expr::col(cratesio_meta::Column::Downloads).add(1),
524 )
525 .filter(
526 Cond::all()
527 .add(cratesio_meta::Column::Version.eq(crate_version))
528 .add(cratesio_meta::Column::CratesIoFk.eq(crate_id)),
529 )
530 .exec(&self.db_con)
531 .await?;
532
533 Ok(())
534 }
535
536 async fn get_last_updated_crate(&self) -> DbResult<Option<(OriginalName, Version)>> {
537 let krate = krate::Entity::find()
538 .order_by_desc(krate::Column::LastUpdated)
539 .one(&self.db_con)
540 .await?;
541
542 if let Some(krate) = krate {
543 let name = OriginalName::from_unchecked(krate.original_name);
545 let version = Version::from_unchecked_str(&krate.max_version);
547 Ok(Some((name, version)))
548 } else {
549 Ok(None)
550 }
551 }
552
553 async fn validate_session(&self, session_token: &str) -> DbResult<(String, bool)> {
554 let u = user::Entity::find()
555 .join(JoinType::InnerJoin, user::Relation::Session.def())
556 .filter(session::Column::Token.eq(session_token))
557 .one(&self.db_con)
558 .await?
559 .ok_or(DbError::SessionNotFound)?;
560
561 Ok((u.name, u.is_admin))
562 }
563
564 async fn add_session_token(&self, name: &str, session_token: &str) -> DbResult<()> {
565 let user = self.get_user(name).await?;
566 let created = Utc::now().format(DB_DATE_FORMAT).to_string();
567
568 let s = session::ActiveModel {
569 token: Set(session_token.to_owned()),
570 created: Set(created),
571 user_fk: Set(user.id as i64),
572 ..Default::default()
573 };
574
575 s.insert(&self.db_con).await?;
576 Ok(())
577 }
578
579 async fn add_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
580 let crate_fk = self.get_krate_model(crate_name).await?.id;
581 let user_fk = self.get_user_model(user).await?.id;
582 self.add_crate_user_impl(crate_fk, user_fk).await
583 }
584
585 async fn add_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
586 let crate_fk = self.get_krate_model(crate_name).await?.id;
587 let group_fk = self.get_group_model(group).await?.id;
588 self.add_crate_group_impl(crate_fk, group_fk).await
589 }
590
591 async fn add_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
592 let group_fk = self.get_group_model(group_name).await?.id;
593 let user_fk = self.get_user_model(user).await?.id;
594 self.add_group_user_impl(group_fk, user_fk).await
595 }
596
597 async fn add_owner(&self, crate_name: &NormalizedName, owner: &str) -> DbResult<()> {
598 let crate_fk = self.get_krate_model(crate_name).await?.id;
599 let user_fk = self.get_user_model(owner).await?.id;
600 self.add_owner_impl(crate_fk, user_fk).await
601 }
602
603 async fn is_download_restricted(&self, crate_name: &NormalizedName) -> DbResult<bool> {
604 Ok(krate::Entity::find()
605 .filter(krate::Column::Name.eq(crate_name))
606 .one(&self.db_con)
607 .await?
608 .is_some_and(|model| model.restricted_download))
609 }
610
611 async fn change_download_restricted(
612 &self,
613 crate_name: &NormalizedName,
614 restricted: bool,
615 ) -> DbResult<()> {
616 let mut krate: krate::ActiveModel = self.get_krate_model(crate_name).await?.into();
617 krate.restricted_download = Set(restricted);
618 krate.update(&self.db_con).await?;
619 Ok(())
620 }
621
622 async fn is_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
623 Ok(self
624 .get_crate_user_by_crate_and_user(crate_name, user)
625 .await?
626 .is_some())
627 }
628
629 async fn is_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<bool> {
630 Ok(self
631 .get_crate_group_by_name_and_group(crate_name, group)
632 .await?
633 .is_some())
634 }
635
636 async fn is_crate_group_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
637 let user = user::Entity::find()
638 .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
639 .join(JoinType::InnerJoin, group_user::Relation::Group.def())
640 .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
641 .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
642 .filter(
643 Cond::all()
644 .add(krate::Column::Name.eq(crate_name))
645 .add(user::Column::Name.eq(user)),
646 )
647 .one(&self.db_con)
648 .await?;
649 Ok(user.is_some())
650 }
651
652 async fn is_group_user(&self, group_name: &str, user: &str) -> DbResult<bool> {
653 Ok(self
654 .get_group_user_by_group_and_user(group_name, user)
655 .await?
656 .is_some())
657 }
658
659 async fn is_owner(&self, crate_name: &NormalizedName, user: &str) -> DbResult<bool> {
660 let owner = owner::Entity::find()
661 .join(JoinType::InnerJoin, owner::Relation::Krate.def())
662 .join(JoinType::InnerJoin, owner::Relation::User.def())
663 .filter(
664 Cond::all()
665 .add(krate::Column::Name.eq(crate_name))
666 .add(user::Column::Name.eq(user)),
667 )
668 .one(&self.db_con)
669 .await?;
670
671 Ok(owner.is_some())
672 }
673
674 async fn get_crate_id(&self, crate_name: &NormalizedName) -> DbResult<Option<i64>> {
675 let id = krate::Entity::find()
676 .filter(krate::Column::Name.eq(crate_name))
677 .one(&self.db_con)
678 .await?
679 .map(|model| model.id);
680
681 Ok(id)
682 }
683
684 async fn get_crate_owners(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
685 let u = user::Entity::find()
686 .join(JoinType::InnerJoin, user::Relation::Owner.def())
687 .join(JoinType::InnerJoin, owner::Relation::Krate.def())
688 .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
689 .all(&self.db_con)
690 .await?;
691
692 Ok(u.into_iter().map(User::from).collect())
693 }
694
695 async fn get_crate_users(&self, crate_name: &NormalizedName) -> DbResult<Vec<User>> {
696 let u = user::Entity::find()
697 .join(JoinType::InnerJoin, user::Relation::CrateUser.def())
698 .join(JoinType::InnerJoin, crate_user::Relation::Krate.def())
699 .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
700 .all(&self.db_con)
701 .await?;
702
703 Ok(u.into_iter().map(User::from).collect())
704 }
705
706 async fn get_crate_groups(&self, crate_name: &NormalizedName) -> DbResult<Vec<Group>> {
707 let u = group::Entity::find()
708 .join(JoinType::InnerJoin, group::Relation::CrateGroup.def())
709 .join(JoinType::InnerJoin, crate_group::Relation::Krate.def())
710 .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
711 .all(&self.db_con)
712 .await?;
713
714 Ok(u.into_iter().map(Group::from).collect())
715 }
716
717 async fn get_group_users(&self, group_name: &str) -> DbResult<Vec<User>> {
718 let u = user::Entity::find()
719 .join(JoinType::InnerJoin, user::Relation::GroupUser.def())
720 .join(JoinType::InnerJoin, group_user::Relation::Group.def())
721 .filter(Expr::col((GroupIden::Table, group::Column::Name)).eq(group_name))
722 .all(&self.db_con)
723 .await?;
724
725 Ok(u.into_iter().map(User::from).collect())
726 }
727
728 async fn get_crate_versions(&self, crate_name: &NormalizedName) -> DbResult<Vec<Version>> {
729 let u = crate_meta::Entity::find()
730 .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
731 .filter(Expr::col((CrateIden::Table, krate::Column::Name)).eq(crate_name))
732 .all(&self.db_con)
733 .await?;
734
735 Ok(u.into_iter()
736 .map(|meta| Version::from_unchecked_str(&meta.version))
737 .collect())
738 }
739
740 async fn delete_session_token(&self, session_token: &str) -> DbResult<()> {
741 if let Some(s) = session::Entity::find()
742 .filter(session::Column::Token.eq(session_token))
743 .one(&self.db_con)
744 .await?
745 {
746 s.delete(&self.db_con).await?;
747 }
748
749 Ok(())
750 }
751
752 async fn delete_user(&self, user_name: &str) -> DbResult<()> {
753 self.get_user_model(user_name)
754 .await?
755 .delete(&self.db_con)
756 .await?;
757 Ok(())
758 }
759
760 async fn delete_group(&self, group_name: &str) -> DbResult<()> {
761 self.get_group_model(group_name)
762 .await?
763 .delete(&self.db_con)
764 .await?;
765 Ok(())
766 }
767
768 async fn change_pwd(&self, user_name: &str, new_pwd: &str) -> DbResult<()> {
769 let salt = generate_salt();
770 let hashed = hash_pwd(new_pwd, &salt);
771
772 let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
773 u.pwd = Set(hashed);
774 u.salt = Set(salt);
775
776 u.update(&self.db_con).await?;
777 Ok(())
778 }
779
780 async fn change_read_only_state(&self, user_name: &str, state: bool) -> DbResult<()> {
781 let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
782 u.is_read_only = Set(state);
783
784 u.update(&self.db_con).await?;
785 Ok(())
786 }
787
788 async fn change_admin_state(&self, user_name: &str, state: bool) -> DbResult<()> {
789 let mut u: user::ActiveModel = self.get_user_model(user_name).await?.into();
790 u.is_admin = Set(state);
791
792 u.update(&self.db_con).await?;
793 Ok(())
794 }
795
796 async fn crate_version_exists(&self, crate_id: i64, version: &str) -> DbResult<bool> {
797 let cm = crate_meta::Entity::find()
798 .filter(
799 Cond::all()
800 .add(crate_meta::Column::CrateFk.eq(crate_id))
801 .add(crate_meta::Column::Version.eq(version)),
802 )
803 .one(&self.db_con)
804 .await?;
805
806 Ok(cm.is_some())
807 }
808
809 async fn get_max_version_from_id(&self, crate_id: i64) -> DbResult<Version> {
810 operations::get_max_version_from_id(&self.db_con, crate_id).await
811 }
812
813 async fn get_max_version_from_name(&self, crate_name: &NormalizedName) -> DbResult<Version> {
814 let k = self.get_krate_model(crate_name).await?;
815 let v = Version::try_from(&k.max_version)
816 .map_err(|_| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
817 Ok(v)
818 }
819
820 async fn update_max_version(&self, crate_id: i64, version: &Version) -> DbResult<()> {
821 let krate = krate::Entity::find_by_id(crate_id)
822 .one(&self.db_con)
823 .await?
824 .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
825
826 let mut k: krate::ActiveModel = krate.into();
827 k.max_version = Set(version.to_string());
828 k.update(&self.db_con).await?;
829
830 Ok(())
831 }
832
833 async fn add_auth_token(&self, name: &str, token: &str, user: &str) -> DbResult<()> {
834 let hashed_token = hash_token(token);
835 let user = self.get_user_model(user).await?;
836
837 let at = auth_token::ActiveModel {
838 name: Set(name.to_owned()),
839 token: Set(hashed_token),
840 user_fk: Set(user.id),
841 ..Default::default()
842 };
843
844 at.insert(&self.db_con).await?;
845
846 Ok(())
847 }
848
849 async fn get_user_from_token(&self, token: &str) -> DbResult<User> {
850 let token = hash_token(token);
851
852 let u = user::Entity::find()
853 .join(JoinType::InnerJoin, user::Relation::AuthToken.def())
854 .filter(Expr::col((AuthTokenIden::Table, AuthTokenIden::Token)).eq(token))
855 .one(&self.db_con)
856 .await?
857 .ok_or(DbError::TokenNotFound)?;
858
859 Ok(User::from(u))
860 }
861
862 async fn get_user(&self, name: &str) -> DbResult<User> {
863 Ok(User::from(self.get_user_model(name).await?))
864 }
865
866 async fn get_group(&self, name: &str) -> DbResult<Group> {
867 Ok(Group::from(self.get_group_model(name).await?))
868 }
869
870 async fn get_auth_tokens(&self, user_name: &str) -> DbResult<Vec<AuthToken>> {
871 let at: Vec<auth_token::Model> = auth_token::Entity::find()
872 .join(JoinType::InnerJoin, auth_token::Relation::User.def())
873 .filter(user::Column::Name.eq(user_name))
874 .all(&self.db_con)
875 .await?;
876
877 Ok(at.into_iter().map(AuthToken::from).collect())
878 }
879
880 async fn delete_auth_token(&self, id: i32) -> DbResult<()> {
881 auth_token::Entity::delete_by_id(id as i64)
882 .exec(&self.db_con)
883 .await?;
884 Ok(())
885 }
886
887 async fn delete_owner(&self, crate_name: &str, owner: &str) -> DbResult<()> {
888 let model = self.get_owner_by_crate_and_user(crate_name, owner).await?;
889 self.delete_or_not_found(model, DbError::OwnerNotFound(owner.to_string()))
890 .await
891 }
892
893 async fn delete_crate_user(&self, crate_name: &NormalizedName, user: &str) -> DbResult<()> {
894 let model = self
895 .get_crate_user_by_crate_and_user(crate_name, user)
896 .await?;
897 self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
898 .await
899 }
900
901 async fn delete_crate_group(&self, crate_name: &NormalizedName, group: &str) -> DbResult<()> {
902 let model = self
903 .get_crate_group_by_name_and_group(crate_name, group)
904 .await?;
905 self.delete_or_not_found(model, DbError::GroupNotFound(group.to_string()))
906 .await
907 }
908
909 async fn delete_group_user(&self, group_name: &str, user: &str) -> DbResult<()> {
910 let model = self
911 .get_group_user_by_group_and_user(group_name, user)
912 .await?;
913 self.delete_or_not_found(model, DbError::UserNotFound(user.to_string()))
914 .await
915 }
916
917 async fn add_user(
918 &self,
919 name: &str,
920 pwd: &str,
921 salt: &str,
922 is_admin: bool,
923 is_read_only: bool,
924 ) -> DbResult<()> {
925 let hashed_pwd = hash_pwd(pwd, salt);
926 let created = Utc::now().format(DB_DATE_FORMAT).to_string();
927
928 let u = user::ActiveModel {
929 name: Set(name.to_owned()),
930 pwd: Set(hashed_pwd),
931 salt: Set(salt.to_owned()),
932 is_admin: Set(is_admin),
933 is_read_only: Set(is_read_only),
934 created: Set(created),
935 ..Default::default()
936 };
937
938 u.insert(&self.db_con).await?;
939 Ok(())
940 }
941
942 async fn add_group(&self, name: &str) -> DbResult<()> {
943 let g = group::ActiveModel {
944 name: Set(name.to_owned()),
945 ..Default::default()
946 };
947
948 g.insert(&self.db_con).await?;
949 Ok(())
950 }
951
952 async fn get_users(&self) -> DbResult<Vec<User>> {
953 let users = user::Entity::find()
954 .order_by_asc(user::Column::Name)
955 .all(&self.db_con)
956 .await?;
957
958 Ok(users.into_iter().map(User::from).collect())
959 }
960
961 async fn get_groups(&self) -> DbResult<Vec<Group>> {
962 let groups = group::Entity::find()
963 .order_by_asc(group::Column::Name)
964 .all(&self.db_con)
965 .await?;
966
967 Ok(groups.into_iter().map(Group::from).collect())
968 }
969
970 async fn get_total_unique_crates(&self) -> DbResult<u64> {
971 self.count(
972 CrateIden::Table,
973 CrateIden::Id,
974 DbError::FailedToCountCrates,
975 )
976 .await
977 }
978
979 async fn get_total_crate_versions(&self) -> DbResult<u64> {
980 self.count(
981 CrateMetaIden::Table,
982 CrateMetaIden::Id,
983 DbError::FailedToCountCrateVersions,
984 )
985 .await
986 }
987
988 async fn get_total_downloads(&self) -> DbResult<u64> {
989 #[derive(FromQueryResult)]
990 struct Model {
991 total_downloads: i64,
992 }
993
994 Ok(krate::Entity::find()
995 .select_only()
996 .column(krate::Column::TotalDownloads)
997 .into_model::<Model>()
998 .all(&self.db_con)
999 .await?
1000 .into_iter()
1001 .map(|m| m.total_downloads as u64)
1002 .sum())
1003 }
1004
1005 async fn get_top_crates_downloads(&self, top: u64) -> DbResult<Vec<(String, u64)>> {
1006 #[derive(Debug, PartialEq, FromQueryResult)]
1007 struct SelectResult {
1008 original_name: String,
1009 total_downloads: i64,
1010 }
1011
1012 let stmt = self.db_con.get_database_backend().build(
1013 Query::select()
1014 .columns(vec![CrateIden::OriginalName, CrateIden::TotalDownloads])
1015 .from(CrateIden::Table)
1016 .order_by(CrateIden::TotalDownloads, Order::Desc)
1017 .limit(top),
1018 );
1019
1020 Ok(SelectResult::find_by_statement(stmt)
1021 .all(&self.db_con)
1022 .await?
1023 .into_iter()
1024 .map(|x| (x.original_name, x.total_downloads as u64))
1025 .collect())
1026 }
1027
1028 async fn get_crate_summaries(&self) -> DbResult<Vec<CrateSummary>> {
1029 let krates = krate::Entity::find()
1030 .order_by(krate::Column::Name, Order::Asc)
1031 .all(&self.db_con)
1032 .await?;
1033
1034 Ok(krates.into_iter().map(CrateSummary::from).collect())
1035 }
1036
1037 async fn add_doc_queue(
1038 &self,
1039 krate: &NormalizedName,
1040 version: &Version,
1041 path: &Path,
1042 ) -> DbResult<()> {
1043 let s = doc_queue::ActiveModel {
1044 krate: Set(krate.to_string()),
1045 version: Set(version.to_string()),
1046 path: Set(path.to_string_lossy().to_string()),
1048 ..Default::default()
1049 };
1050
1051 s.insert(&self.db_con).await?;
1052 Ok(())
1053 }
1054
1055 async fn delete_doc_queue(&self, id: i64) -> DbResult<()> {
1056 DocQueue::delete_by_id(id).exec(&self.db_con).await?;
1057 Ok(())
1058 }
1059
1060 async fn get_doc_queue(&self) -> DbResult<Vec<DocQueueEntry>> {
1061 let entities = DocQueue::find().all(&self.db_con).await?;
1062
1063 Ok(entities.into_iter().map(DocQueueEntry::from).collect())
1064 }
1065
1066 async fn delete_crate(&self, krate: &NormalizedName, version: &Version) -> DbResult<()> {
1067 let txn = self.db_con.begin().await?;
1068
1069 let crate_meta_version = crate_meta::Entity::find()
1071 .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1072 .filter(krate::Column::Name.eq(krate))
1073 .filter(crate_meta::Column::Version.eq(version))
1074 .one(&txn)
1075 .await?
1076 .ok_or_else(|| DbError::CrateMetaNotFound(krate.to_string(), version.to_string()))?;
1077 let crate_id = crate_meta_version.crate_fk;
1078 let current_max_version = operations::get_max_version_from_id(&txn, crate_id).await?;
1079 crate_meta_version.delete(&txn).await?;
1080
1081 let crate_index_version = crate_index::Entity::find()
1083 .join(JoinType::InnerJoin, crate_index::Relation::Krate.def())
1084 .filter(krate::Column::Name.eq(krate))
1085 .filter(crate_index::Column::Vers.eq(version))
1086 .one(&txn)
1087 .await?
1088 .ok_or_else(|| DbError::CrateIndexNotFound(krate.to_string(), version.to_string()))?;
1089 crate_index_version.delete(&txn).await?;
1090
1091 let crate_meta_rows = crate_meta::Entity::find()
1094 .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1095 .filter(krate::Column::Name.eq(krate))
1096 .all(&txn)
1097 .await?;
1098
1099 if crate_meta_rows.is_empty() {
1100 krate::Entity::delete_many()
1101 .filter(krate::Column::Name.eq(krate))
1102 .exec(&txn)
1103 .await?;
1104 } else {
1105 let c = krate::Entity::find_by_id(crate_id)
1106 .one(&txn)
1107 .await?
1108 .ok_or(DbError::CrateNotFoundWithId(crate_id))?;
1109 let mut c: krate::ActiveModel = c.into();
1110
1111 if version == ¤t_max_version {
1113 let new_max_version = crate_meta_rows
1114 .into_iter()
1115 .map(|cm| Version::from_unchecked_str(&cm.version))
1116 .max()
1117 .unwrap(); c.max_version = Set(new_max_version.into_inner());
1119 }
1120 let etag = operations::compute_etag(&txn, krate, crate_id).await?;
1122 c.e_tag = Set(etag);
1123 c.update(&txn).await?;
1124 }
1125
1126 txn.commit().await?;
1127
1128 Ok(())
1129 }
1130
1131 async fn get_crate_meta_list(&self, crate_name: &NormalizedName) -> DbResult<Vec<CrateMeta>> {
1132 let crate_meta = crate_meta::Entity::find()
1133 .join(JoinType::InnerJoin, crate_meta::Relation::Krate.def())
1134 .filter(krate::Column::Name.eq(crate_name))
1135 .all(&self.db_con)
1136 .await?;
1137
1138 let crate_meta = crate_meta
1139 .into_iter()
1140 .map(|cm| CrateMeta {
1141 name: crate_name.to_string(),
1142 id: cm.id,
1143 version: cm.version,
1144 created: cm.created,
1145 downloads: cm.downloads,
1146 crate_fk: cm.crate_fk,
1147 })
1148 .collect();
1149
1150 Ok(crate_meta)
1151 }
1152
1153 async fn update_last_updated(&self, id: i64, last_updated: &DateTime<Utc>) -> DbResult<()> {
1154 let krate = krate::Entity::find_by_id(id)
1155 .one(&self.db_con)
1156 .await?
1157 .ok_or(DbError::CrateNotFoundWithId(id))?;
1158
1159 let date = last_updated.format(DB_DATE_FORMAT).to_string();
1160
1161 let mut krate: krate::ActiveModel = krate.into();
1162 krate.last_updated = Set(date);
1163 krate.update(&self.db_con).await?;
1164
1165 Ok(())
1166 }
1167
1168 async fn search_in_crate_name(
1169 &self,
1170 contains: &str,
1171 cache: bool,
1172 ) -> DbResult<Vec<CrateOverview>> {
1173 self.query_crates(Some(contains), None, cache).await
1174 }
1175
1176 async fn get_crate_overview_list(
1177 &self,
1178 limit: u64,
1179 offset: u64,
1180 cache: bool,
1181 ) -> DbResult<Vec<CrateOverview>> {
1182 self.query_crates(None, Some((limit, offset)), cache).await
1183 }
1184
1185 async fn get_crate_data(&self, crate_name: &NormalizedName) -> DbResult<CrateData> {
1186 let krate = self.get_krate_model(crate_name).await?;
1187
1188 let owners: Vec<String> = krate
1189 .find_related(owner::Entity)
1190 .find_also_related(user::Entity)
1191 .all(&self.db_con)
1192 .await?
1193 .into_iter()
1194 .filter_map(|(_, v)| v.map(|v| v.name))
1195 .collect();
1196 let categories: Vec<String> = krate
1197 .find_related(crate_category_to_crate::Entity)
1198 .find_also_related(crate_category::Entity)
1199 .all(&self.db_con)
1200 .await?
1201 .into_iter()
1202 .filter_map(|(_, v)| v.map(|v| v.category))
1203 .collect();
1204 let keywords: Vec<String> = krate
1205 .find_related(crate_keyword_to_crate::Entity)
1206 .find_also_related(crate_keyword::Entity)
1207 .all(&self.db_con)
1208 .await?
1209 .into_iter()
1210 .filter_map(|(_, v)| v.map(|v| v.keyword))
1211 .collect();
1212 let authors: Vec<String> = krate
1213 .find_related(crate_author_to_crate::Entity)
1214 .find_also_related(crate_author::Entity)
1215 .all(&self.db_con)
1216 .await?
1217 .into_iter()
1218 .filter_map(|(_, v)| v.map(|v| v.author))
1219 .collect();
1220 let crate_metas = krate
1221 .find_related(crate_meta::Entity)
1222 .all(&self.db_con)
1223 .await?;
1224 let crate_indices = krate
1225 .find_related(crate_index::Entity)
1226 .all(&self.db_con)
1227 .await?;
1228
1229 let mut versions = Vec::new();
1230 for cm in crate_metas {
1231 let ci = crate_indices
1232 .iter()
1233 .find(|ci| ci.vers == cm.version)
1234 .ok_or_else(|| {
1235 DbError::CrateIndexNotFound(krate.name.clone(), cm.version.clone())
1236 })?;
1237 let dependencies: Vec<CrateRegistryDep> = match ci.deps.clone() {
1238 Some(deps) => {
1239 let ix = serde_json::from_value::<Vec<IndexDep>>(deps)
1240 .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1241
1242 let mut ft = Vec::new();
1243 for dep in ix {
1244 ft.push(CrateRegistryDep::from_index(
1245 operations::get_desc_for_crate_dep(
1246 &self.db_con,
1247 &dep.name,
1248 dep.registry.as_deref(),
1249 )
1250 .await?,
1251 dep,
1252 ));
1253 }
1254
1255 ft
1256 }
1257 None => Vec::default(),
1258 };
1259 let features: BTreeMap<String, Vec<String>> = match ci.features.clone() {
1260 Some(features) => serde_json::from_value::<BTreeMap<String, Vec<String>>>(features)
1261 .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?,
1262 None => BTreeMap::default(),
1263 };
1264
1265 versions.push(CrateVersionData {
1266 version: cm.version,
1267 created: cm.created,
1268 downloads: cm.downloads,
1269 readme: cm.readme,
1270 license: cm.license,
1271 license_file: cm.license_file,
1272 documentation: cm.documentation,
1273 dependencies,
1274 checksum: ci.cksum.clone(),
1275 features,
1276 yanked: ci.yanked,
1277 links: ci.links.clone(),
1278 v: ci.v,
1279 });
1280 }
1281 versions.sort_by(|a, b| {
1282 Version::from_unchecked_str(&b.version).cmp(&Version::from_unchecked_str(&a.version))
1283 });
1284
1285 let crate_data = CrateData {
1286 name: krate.original_name,
1287 owners,
1288 max_version: krate.max_version,
1289 total_downloads: krate.total_downloads,
1290 last_updated: krate.last_updated,
1291 homepage: krate.homepage,
1292 description: krate.description,
1293 repository: krate.repository,
1294 categories,
1295 keywords,
1296 authors,
1297 versions,
1298 };
1299
1300 Ok(crate_data)
1301 }
1302
1303 async fn add_empty_crate(&self, name: &str, created: &DateTime<Utc>) -> DbResult<i64> {
1304 let created = created.format(DB_DATE_FORMAT).to_string();
1305 let normalized_name = NormalizedName::from(
1306 OriginalName::try_from(name)
1307 .map_err(|_| DbError::InvalidCrateName(name.to_string()))?,
1308 );
1309 let krate = krate::ActiveModel {
1310 id: ActiveValue::default(),
1311 name: Set(normalized_name.to_string()),
1312 original_name: Set(name.to_string()),
1313 max_version: Set("0.0.0".to_string()),
1314 last_updated: Set(created),
1315 total_downloads: Set(0),
1316 homepage: Set(None),
1317 description: Set(None),
1318 repository: Set(None),
1319 e_tag: Set(String::new()), restricted_download: Set(false),
1321 };
1322 Ok(krate.insert(&self.db_con).await?.id)
1323 }
1324
1325 async fn add_crate(
1326 &self,
1327 pub_metadata: &PublishMetadata,
1328 cksum: &str,
1329 created: &DateTime<Utc>,
1330 owner: &str,
1331 ) -> DbResult<i64> {
1332 let created = created.format(DB_DATE_FORMAT).to_string();
1333 let normalized_name = NormalizedName::from(
1334 OriginalName::try_from(&pub_metadata.name)
1335 .map_err(|_| DbError::InvalidCrateName(pub_metadata.name.clone()))?,
1336 );
1337
1338 let existing = krate::Entity::find()
1339 .filter(krate::Column::Name.eq(&pub_metadata.name))
1340 .one(&self.db_con)
1341 .await?;
1342
1343 let txn = self.db_con.begin().await?;
1344
1345 let crate_id = if let Some(krate) = existing {
1346 let krate_id = krate.id;
1347 let max_version = max(
1348 parse_db_version(&krate.max_version)?,
1349 parse_db_version(&pub_metadata.vers)?,
1350 );
1351
1352 let mut krate: krate::ActiveModel = krate.into();
1353 krate.last_updated = Set(created.clone());
1354 krate.max_version = Set(max_version.into_inner());
1355 krate.homepage = Set(pub_metadata.homepage.clone());
1356 krate.description = Set(pub_metadata.description.clone());
1357 krate.repository = Set(pub_metadata.repository.clone());
1358 krate.e_tag = Set(String::new()); krate.update(&txn).await?;
1360 krate_id
1361 } else {
1362 let krate = krate::ActiveModel {
1363 id: ActiveValue::default(),
1364 name: Set(normalized_name.to_string()),
1365 original_name: Set(pub_metadata.name.clone()),
1366 max_version: Set(pub_metadata.vers.clone()),
1367 last_updated: Set(created.clone()),
1368 total_downloads: Set(0),
1369 homepage: Set(pub_metadata.homepage.clone()),
1370 description: Set(pub_metadata.description.clone()),
1371 repository: Set(pub_metadata.repository.clone()),
1372 e_tag: Set(String::new()), restricted_download: Set(false),
1374 };
1375 let krate = krate.insert(&txn).await?;
1376 krate.id
1377 };
1378
1379 operations::add_owner_if_not_exists(&txn, owner, crate_id).await?;
1380 operations::add_crate_metadata(&txn, pub_metadata, &created, crate_id).await?;
1381 operations::add_crate_index(&txn, pub_metadata, cksum, crate_id).await?;
1382 operations::update_etag(&txn, &pub_metadata.name, crate_id).await?;
1383 operations::update_crate_categories(&txn, pub_metadata, crate_id).await?;
1384 operations::update_crate_keywords(&txn, pub_metadata, crate_id).await?;
1385 operations::update_crate_authors(&txn, pub_metadata, crate_id).await?;
1386
1387 txn.commit().await?;
1388 Ok(crate_id)
1389 }
1390
1391 async fn update_docs_link(
1392 &self,
1393 crate_name: &NormalizedName,
1394 version: &Version,
1395 docs_link: &str,
1396 ) -> DbResult<()> {
1397 let (cm, _c) = crate_meta::Entity::find()
1398 .find_also_related(krate::Entity)
1399 .filter(
1400 Cond::all()
1401 .add(krate::Column::Name.eq(crate_name))
1402 .add(crate_meta::Column::Version.eq(version)),
1403 )
1404 .one(&self.db_con)
1405 .await?
1406 .ok_or_else(|| DbError::CrateNotFound(crate_name.to_string()))?;
1407
1408 let mut cm: crate_meta::ActiveModel = cm.into();
1409 cm.documentation = Set(Some(docs_link.to_string()));
1410 cm.update(&self.db_con).await?;
1411 Ok(())
1412 }
1413
1414 async fn add_crate_metadata(
1415 &self,
1416 pub_metadata: &PublishMetadata,
1417 created: &str,
1418 crate_id: i64,
1419 ) -> DbResult<()> {
1420 operations::add_crate_metadata(&self.db_con, pub_metadata, created, crate_id).await
1421 }
1422
1423 async fn get_prefetch_data(&self, crate_name: &str) -> DbResult<Prefetch> {
1424 let mut krate = krate::Entity::find()
1425 .filter(krate::Column::Name.eq(crate_name))
1426 .find_with_related(crate_index::Entity)
1427 .all(&self.db_con)
1428 .await?
1429 .into_iter();
1430
1431 let (Some((krate, crate_indices)), None) = (krate.next(), krate.next()) else {
1433 return Err(DbError::CrateNotFound(crate_name.to_string()));
1434 };
1435
1436 let index_metadata =
1437 operations::crate_index_model_to_index_metadata(crate_name, crate_indices)?;
1438 let data = operations::index_metadata_to_bytes(&index_metadata)?;
1439
1440 Ok(Prefetch {
1441 data,
1442 etag: krate.e_tag,
1443 last_modified: krate.last_updated,
1444 })
1445 }
1446
1447 async fn is_cratesio_cache_up_to_date(
1448 &self,
1449 crate_name: &NormalizedName,
1450 etag: Option<String>,
1451 last_modified: Option<String>,
1452 ) -> DbResult<PrefetchState> {
1453 let Some(krate) = cratesio_crate::Entity::find()
1454 .filter(cratesio_crate::Column::Name.eq(crate_name))
1455 .one(&self.db_con)
1456 .await?
1457 else {
1458 return Ok(PrefetchState::NotFound);
1459 };
1460
1461 let needs_update = match (etag, last_modified) {
1462 (Some(etag), Some(last_modified)) => {
1463 krate.e_tag != etag || krate.last_modified != last_modified
1464 }
1465 (Some(etag), None) => krate.e_tag != etag,
1466 (None, Some(last_modified)) => krate.last_modified != last_modified,
1467 (None, None) => true,
1468 };
1469
1470 if !needs_update {
1471 Ok(PrefetchState::UpToDate)
1472 } else {
1473 let crate_indices = krate
1474 .find_related(cratesio_index::Entity)
1475 .all(&self.db_con)
1476 .await?;
1477 let index_metadata =
1478 operations::cratesio_index_model_to_index_metadata(crate_name, crate_indices)?;
1479 let data = operations::index_metadata_to_bytes(&index_metadata)?;
1480
1481 Ok(PrefetchState::NeedsUpdate(Prefetch {
1482 data,
1483 etag: krate.e_tag.clone(),
1484 last_modified: krate.last_modified,
1485 }))
1486 }
1487 }
1488
1489 async fn add_cratesio_prefetch_data(
1490 &self,
1491 crate_name: &OriginalName,
1492 etag: &str,
1493 last_modified: &str,
1494 description: Option<String>,
1495 indices: &[IndexMetadata],
1496 ) -> DbResult<Prefetch> {
1497 let normalized_name = crate_name.to_normalized();
1498
1499 let max_version = indices
1500 .iter()
1501 .map(|i| Version::from_unchecked_str(&i.vers))
1502 .max()
1503 .ok_or_else(|| DbError::FailedToGetMaxVersionByName(crate_name.to_string()))?;
1504
1505 let txn = self.db_con.begin().await?;
1511
1512 let krate = cratesio_crate::Entity::find()
1513 .filter(cratesio_crate::Column::Name.eq(&normalized_name))
1514 .one(&txn)
1515 .await?;
1516
1517 let krate = if let Some(krate) = krate {
1518 let mut krate: cratesio_crate::ActiveModel = krate.into();
1519 krate.e_tag = Set(etag.to_string());
1520 krate.last_modified = Set(last_modified.to_string());
1521 krate.max_version = Set(max_version.into_inner());
1522 krate.update(&txn).await?
1523 } else {
1524 let krate = cratesio_crate::ActiveModel {
1525 id: ActiveValue::default(),
1526 name: Set(normalized_name.to_string()),
1527 original_name: Set(crate_name.to_string()),
1528 description: Set(description),
1529 e_tag: Set(etag.to_string()),
1530 last_modified: Set(last_modified.to_string()),
1531 total_downloads: Set(0),
1532 max_version: Set(max_version.to_string()),
1533 };
1534 krate.insert(&txn).await?
1535 };
1536
1537 let current_indices = cratesio_index::Entity::find()
1538 .filter(cratesio_index::Column::CratesIoFk.eq(krate.id))
1539 .all(&txn)
1540 .await?;
1541
1542 for index in indices {
1543 if let Some(current_index) = current_indices.iter().find(|ci| index.vers == ci.vers) {
1545 if index.yanked != current_index.yanked {
1546 let mut ci: cratesio_index::ActiveModel = current_index.to_owned().into();
1547 ci.yanked = Set(index.yanked);
1548 ci.update(&txn).await?;
1549 }
1550 } else {
1551 let deps = if index.deps.is_empty() {
1552 None
1553 } else {
1554 let deps = serde_json::to_value(&index.deps)
1555 .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1556 Some(deps)
1557 };
1558
1559 let features = serde_json::to_value(&index.features)
1560 .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1561
1562 let features2 = serde_json::to_value(&index.features2)
1563 .map_err(|e| DbError::FailedToConvertToJson(e.to_string()))?;
1564
1565 let new_index = cratesio_index::ActiveModel {
1566 id: ActiveValue::default(),
1567 name: Set(index.name.clone()),
1568 vers: Set(index.vers.clone()),
1569 deps: Set(deps),
1570 cksum: Set(index.cksum.clone()),
1571 features: Set(Some(features)),
1572 features2: Set(Some(features2)),
1573 yanked: Set(index.yanked),
1574 links: Set(index.links.clone()),
1575 pubtime: Set(index.pubtime.map(|dt| dt.naive_utc())),
1576 v: Set(index.v.unwrap_or(1) as i32),
1577 crates_io_fk: Set(krate.id),
1578 };
1579
1580 new_index.insert(&txn).await?;
1581
1582 let meta = cratesio_meta::ActiveModel {
1584 id: ActiveValue::default(),
1585 version: Set(index.vers.clone()),
1586 downloads: Set(0),
1587 crates_io_fk: Set(krate.id),
1588 documentation: Set(Some(format!(
1589 "https://docs.rs/{normalized_name}/{}",
1590 index.vers,
1591 ))),
1592 };
1593
1594 meta.insert(&txn).await?;
1595 }
1596 }
1597
1598 txn.commit().await?;
1599
1600 Ok(Prefetch {
1601 data: operations::index_metadata_to_bytes(indices)?,
1602 etag: etag.to_string(),
1603 last_modified: last_modified.to_string(),
1604 })
1605 }
1606
1607 async fn get_cratesio_index_update_list(&self) -> DbResult<Vec<CratesioPrefetchMsg>> {
1608 let crates = cratesio_crate::Entity::find().all(&self.db_con).await?;
1609 let msgs = crates
1610 .into_iter()
1611 .map(|krate| {
1612 CratesioPrefetchMsg::Update(UpdateData {
1613 name: OriginalName::from_unchecked(krate.original_name),
1614 etag: Some(krate.e_tag),
1615 last_modified: Some(krate.last_modified),
1616 })
1617 })
1618 .collect();
1619 Ok(msgs)
1620 }
1621
1622 async fn unyank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1623 let mut ci: crate_index::ActiveModel = self
1624 .get_crate_index_model(crate_name, version)
1625 .await?
1626 .into();
1627 ci.yanked = Set(false);
1628 ci.save(&self.db_con).await?;
1629 Ok(())
1630 }
1631
1632 async fn yank_crate(&self, crate_name: &NormalizedName, version: &Version) -> DbResult<()> {
1633 let mut ci: crate_index::ActiveModel = self
1634 .get_crate_index_model(crate_name, version)
1635 .await?
1636 .into();
1637 ci.yanked = Set(true);
1638 ci.save(&self.db_con).await?;
1639 Ok(())
1640 }
1641
1642 async fn register_webhook(&self, webhook: Webhook) -> DbResult<String> {
1643 let w = webhook::ActiveModel {
1644 event: Set(Into::<&str>::into(webhook.event).to_string()),
1645 callback_url: Set(webhook.callback_url),
1646 name: Set(webhook.name),
1647 ..Default::default()
1648 };
1649
1650 let w: webhook::Model = w.insert(&self.db_con).await?;
1651 Ok(w.id.to_string())
1652 }
1653
1654 async fn delete_webhook(&self, id: &str) -> DbResult<()> {
1655 self.get_webhook_model(id)
1656 .await?
1657 .delete(&self.db_con)
1658 .await?;
1659 Ok(())
1660 }
1661
1662 async fn get_webhook(&self, id: &str) -> DbResult<Webhook> {
1663 webhook_model_to_obj(self.get_webhook_model(id).await?)
1664 }
1665
1666 async fn get_all_webhooks(&self) -> DbResult<Vec<Webhook>> {
1667 Ok(webhook::Entity::find()
1668 .all(&self.db_con)
1669 .await?
1670 .into_iter()
1671 .filter_map(|w| webhook_model_to_obj(w).ok())
1672 .collect())
1673 }
1674
1675 async fn add_webhook_queue(
1676 &self,
1677 event: WebhookEvent,
1678 payload: serde_json::Value,
1679 ) -> DbResult<()> {
1680 let w = webhook::Entity::find()
1681 .filter(webhook::Column::Event.eq(Into::<&str>::into(event)))
1682 .all(&self.db_con)
1683 .await?;
1684
1685 if w.is_empty() {
1686 return Ok(());
1687 }
1688
1689 let now = Utc::now();
1690
1691 let entries = w.into_iter().map(|w| webhook_queue::ActiveModel {
1692 webhook_fk: Set(w.id),
1693 payload: Set(payload.clone()),
1694 next_attempt: Set(now.into()),
1695 last_attempt: Set(None),
1696 ..Default::default()
1697 });
1698
1699 webhook_queue::Entity::insert_many(entries)
1700 .exec(&self.db_con)
1701 .await?;
1702 Ok(())
1703 }
1704 async fn get_pending_webhook_queue_entries(
1705 &self,
1706 timestamp: DateTime<Utc>,
1707 ) -> DbResult<Vec<WebhookQueue>> {
1708 let w = webhook_queue::Entity::find()
1709 .find_with_related(webhook::Entity)
1710 .filter(webhook_queue::Column::NextAttempt.lte(timestamp))
1711 .all(&self.db_con)
1712 .await?;
1713
1714 Ok(w.into_iter()
1715 .filter_map(|w| {
1716 Some(WebhookQueue {
1717 id: w.0.id.to_string(),
1718 callback_url: w.1.first()?.callback_url.clone(),
1719 payload: w.0.payload,
1720 last_attempt: w.0.last_attempt.map(Into::into),
1721 next_attempt: w.0.next_attempt.into(),
1722 })
1723 })
1724 .collect())
1725 }
1726
1727 async fn update_webhook_queue(
1728 &self,
1729 id: &str,
1730 last_attempt: DateTime<Utc>,
1731 next_attempt: DateTime<Utc>,
1732 ) -> DbResult<()> {
1733 let mut w: webhook_queue::ActiveModel = self.get_webhook_queue_model(id).await?.into();
1734 w.last_attempt = Set(Some(last_attempt.into()));
1735 w.next_attempt = Set(next_attempt.into());
1736 w.update(&self.db_con).await?;
1737 Ok(())
1738 }
1739
1740 async fn delete_webhook_queue(&self, id: &str) -> DbResult<()> {
1741 self.get_webhook_queue_model(id)
1742 .await?
1743 .delete(&self.db_con)
1744 .await?;
1745 Ok(())
1746 }
1747
1748 async fn get_user_by_oauth2_identity(
1751 &self,
1752 issuer: &str,
1753 subject: &str,
1754 ) -> DbResult<Option<User>> {
1755 let identity = oauth2_identity::Entity::find()
1756 .filter(oauth2_identity::Column::ProviderIssuer.eq(issuer))
1757 .filter(oauth2_identity::Column::Subject.eq(subject))
1758 .one(&self.db_con)
1759 .await?;
1760
1761 if let Some(identity) = identity {
1762 let u = user::Entity::find_by_id(identity.user_fk)
1763 .one(&self.db_con)
1764 .await?
1765 .ok_or_else(|| DbError::UserNotFound(format!("user_id={}", identity.user_fk)))?;
1766
1767 Ok(Some(User::from(u)))
1768 } else {
1769 Ok(None)
1770 }
1771 }
1772
1773 async fn create_oauth2_user(
1774 &self,
1775 username: &str,
1776 issuer: &str,
1777 subject: &str,
1778 email: Option<String>,
1779 is_admin: bool,
1780 is_read_only: bool,
1781 ) -> DbResult<User> {
1782 let txn = self.db_con.begin().await?;
1784
1785 let salt = generate_salt();
1788 let random_pwd = Uuid::new_v4().to_string();
1789 let hashed_pwd = hash_pwd(&random_pwd, &salt);
1790 let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1791
1792 let new_user = user::ActiveModel {
1794 name: Set(username.to_string()),
1795 pwd: Set(hashed_pwd),
1796 salt: Set(salt),
1797 is_admin: Set(is_admin),
1798 is_read_only: Set(is_read_only),
1799 created: Set(created.clone()),
1800 ..Default::default()
1801 };
1802
1803 let res = user::Entity::insert(new_user).exec(&txn).await?;
1804 let user_id = res.last_insert_id;
1805
1806 let identity = oauth2_identity::ActiveModel {
1808 user_fk: Set(user_id),
1809 provider_issuer: Set(issuer.to_string()),
1810 subject: Set(subject.to_string()),
1811 email: Set(email),
1812 created: Set(created.clone()),
1813 ..Default::default()
1814 };
1815
1816 oauth2_identity::Entity::insert(identity).exec(&txn).await?;
1817
1818 txn.commit().await?;
1819
1820 Ok(User {
1821 id: user_id as i32,
1822 name: username.to_string(),
1823 pwd: String::new(), salt: String::new(), is_admin,
1826 is_read_only,
1827 created,
1828 })
1829 }
1830
1831 async fn link_oauth2_identity(
1832 &self,
1833 user_id: i64,
1834 issuer: &str,
1835 subject: &str,
1836 email: Option<String>,
1837 ) -> DbResult<()> {
1838 let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1839 let identity = oauth2_identity::ActiveModel {
1840 user_fk: Set(user_id),
1841 provider_issuer: Set(issuer.to_string()),
1842 subject: Set(subject.to_string()),
1843 email: Set(email),
1844 created: Set(created),
1845 ..Default::default()
1846 };
1847
1848 oauth2_identity::Entity::insert(identity)
1849 .exec(&self.db_con)
1850 .await?;
1851
1852 Ok(())
1853 }
1854
1855 async fn store_oauth2_state(
1858 &self,
1859 state: &str,
1860 pkce_verifier: &str,
1861 nonce: &str,
1862 ) -> DbResult<()> {
1863 let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1864 let s = oauth2_state::ActiveModel {
1865 state: Set(state.to_string()),
1866 pkce_verifier: Set(pkce_verifier.to_string()),
1867 nonce: Set(nonce.to_string()),
1868 created: Set(created),
1869 ..Default::default()
1870 };
1871
1872 oauth2_state::Entity::insert(s).exec(&self.db_con).await?;
1873 Ok(())
1874 }
1875
1876 async fn get_and_delete_oauth2_state(&self, state: &str) -> DbResult<Option<OAuth2StateData>> {
1877 let s = oauth2_state::Entity::find()
1878 .filter(oauth2_state::Column::State.eq(state))
1879 .one(&self.db_con)
1880 .await?;
1881
1882 if let Some(s) = s {
1883 let data = OAuth2StateData {
1884 state: s.state.clone(),
1885 pkce_verifier: s.pkce_verifier.clone(),
1886 nonce: s.nonce.clone(),
1887 };
1888
1889 s.delete(&self.db_con).await?;
1891
1892 Ok(Some(data))
1893 } else {
1894 Ok(None)
1895 }
1896 }
1897
1898 async fn cleanup_expired_oauth2_states(&self) -> DbResult<u64> {
1899 let expiry = Utc::now() - chrono::Duration::minutes(10);
1901 let expiry_str = expiry.format(DB_DATE_FORMAT).to_string();
1902
1903 let result = oauth2_state::Entity::delete_many()
1904 .filter(oauth2_state::Column::Created.lt(expiry_str))
1905 .exec(&self.db_con)
1906 .await?;
1907
1908 Ok(result.rows_affected)
1909 }
1910
1911 async fn is_username_available(&self, username: &str) -> DbResult<bool> {
1912 let existing = user::Entity::find()
1913 .filter(user::Column::Name.eq(username))
1914 .one(&self.db_con)
1915 .await?;
1916
1917 Ok(existing.is_none())
1918 }
1919
1920 async fn add_toolchain(
1923 &self,
1924 name: &str,
1925 version: &str,
1926 date: &str,
1927 channel: Option<String>,
1928 ) -> DbResult<i64> {
1929 let created = Utc::now().format(DB_DATE_FORMAT).to_string();
1930
1931 let model = toolchain::ActiveModel {
1932 id: ActiveValue::NotSet,
1933 name: Set(name.to_string()),
1934 version: Set(version.to_string()),
1935 date: Set(date.to_string()),
1936 channel: Set(channel),
1937 created: Set(created),
1938 };
1939
1940 let result = model.insert(&self.db_con).await?;
1941 Ok(result.id)
1942 }
1943
1944 async fn add_toolchain_target(
1945 &self,
1946 toolchain_id: i64,
1947 target: &str,
1948 storage_path: &str,
1949 hash: &str,
1950 size: i64,
1951 ) -> DbResult<()> {
1952 let model = toolchain_target::ActiveModel {
1953 id: ActiveValue::NotSet,
1954 toolchain_fk: Set(toolchain_id),
1955 target: Set(target.to_string()),
1956 storage_path: Set(storage_path.to_string()),
1957 hash: Set(hash.to_string()),
1958 size: Set(size),
1959 };
1960
1961 model.insert(&self.db_con).await?;
1962 Ok(())
1963 }
1964
1965 async fn get_toolchain_by_channel(
1966 &self,
1967 channel: &str,
1968 ) -> DbResult<Option<ToolchainWithTargets>> {
1969 let tc = toolchain::Entity::find()
1970 .filter(toolchain::Column::Channel.eq(channel))
1971 .one(&self.db_con)
1972 .await?;
1973
1974 Ok(match tc {
1975 Some(tc) => Some(self.toolchain_with_targets(tc).await?),
1976 None => None,
1977 })
1978 }
1979
1980 async fn get_toolchain_by_version(
1981 &self,
1982 name: &str,
1983 version: &str,
1984 ) -> DbResult<Option<ToolchainWithTargets>> {
1985 Ok(
1986 match self.get_toolchain_by_name_version(name, version).await? {
1987 Some(tc) => Some(self.toolchain_with_targets(tc).await?),
1988 None => None,
1989 },
1990 )
1991 }
1992
1993 async fn list_toolchains(&self) -> DbResult<Vec<ToolchainWithTargets>> {
1994 let toolchains = toolchain::Entity::find()
1995 .order_by_desc(toolchain::Column::Created)
1996 .all(&self.db_con)
1997 .await?;
1998
1999 let mut result = Vec::with_capacity(toolchains.len());
2000
2001 for tc in toolchains {
2002 result.push(self.toolchain_with_targets(tc).await?);
2003 }
2004
2005 Ok(result)
2006 }
2007
2008 async fn delete_toolchain(&self, name: &str, version: &str) -> DbResult<()> {
2009 toolchain::Entity::delete_many()
2010 .filter(toolchain::Column::Name.eq(name))
2011 .filter(toolchain::Column::Version.eq(version))
2012 .exec(&self.db_con)
2013 .await?;
2014
2015 Ok(())
2016 }
2017
2018 async fn delete_toolchain_target(
2019 &self,
2020 name: &str,
2021 version: &str,
2022 target: &str,
2023 ) -> DbResult<()> {
2024 if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2025 toolchain_target::Entity::delete_many()
2026 .filter(toolchain_target::Column::ToolchainFk.eq(tc.id))
2027 .filter(toolchain_target::Column::Target.eq(target))
2028 .exec(&self.db_con)
2029 .await?;
2030 }
2031
2032 Ok(())
2033 }
2034
2035 async fn set_channel(&self, channel: &str, name: &str, version: &str) -> DbResult<()> {
2036 let toolchains_with_channel = toolchain::Entity::find()
2038 .filter(toolchain::Column::Channel.eq(channel))
2039 .all(&self.db_con)
2040 .await?;
2041
2042 for tc in toolchains_with_channel {
2043 let mut model: toolchain::ActiveModel = tc.into();
2044 model.channel = Set(None);
2045 model.update(&self.db_con).await?;
2046 }
2047
2048 if let Some(tc) = self.get_toolchain_by_name_version(name, version).await? {
2050 let mut model: toolchain::ActiveModel = tc.into();
2051 model.channel = Set(Some(channel.to_string()));
2052 model.update(&self.db_con).await?;
2053 }
2054
2055 Ok(())
2056 }
2057
2058 async fn get_channels(&self) -> DbResult<Vec<ChannelInfo>> {
2059 let toolchains = toolchain::Entity::find()
2060 .filter(toolchain::Column::Channel.is_not_null())
2061 .all(&self.db_con)
2062 .await?;
2063
2064 Ok(toolchains
2065 .into_iter()
2066 .filter_map(|tc| {
2067 tc.channel.map(|channel| ChannelInfo {
2068 name: channel,
2069 version: tc.version,
2070 date: tc.date,
2071 })
2072 })
2073 .collect())
2074 }
2075}
2076
2077fn parse_db_version(value: &str) -> DbResult<Version> {
2078 Version::try_from(value).map_err(|_| DbError::InvalidVersion(value.to_owned()))
2079}