1use futures::TryStreamExt;
2use sea_orm::{ActiveModelTrait, ActiveValue::{Unchanged, Set}, ColumnTrait, EntityTrait, ModelTrait, QueryFilter, QueryOrder};
3use upub::traits::Fetcher;
4
5pub async fn update_users(ctx: upub::Context, days: i64, limit: Option<u64>) -> Result<(), sea_orm::DbErr> {
6 let mut count = 0;
7 let mut stream = upub::model::actor::Entity::find()
8 .filter(upub::model::actor::Column::Updated.lt(chrono::Utc::now() - chrono::Duration::days(days)))
9 .order_by_asc(upub::model::actor::Column::Updated)
10 .stream(ctx.db())
11 .await?;
12
13
14 while let Some(user) = stream.try_next().await? {
15 if ctx.is_local(&user.id) { continue }
16 if let Some(limit) = limit {
17 if count >= limit { break }
18 }
19 let server = upub::Context::server(&user.id);
20 if upub::downtime::get(ctx.db(), &server).await?.is_some() { continue }
21 match ctx.pull(&user.id).await.and_then(|x| x.actor()) {
22 Err(upub::traits::fetch::RequestError::Fetch(status, msg)) => {
23 if status.as_u16() == 410 {
24 tracing::info!("user {} has been deleted", user.id);
25 user.delete(ctx.db()).await?;
26 }
27 else if status.as_u16() == 404 {
28 tracing::info!("user {} does not exist anymore", user.id);
29 user.delete(ctx.db()).await?;
30 }
31 else {
32 upub::downtime::set(ctx.db(), &server).await?;
33 tracing::warn!("could not fetch user {}: failed with status {status} -- {msg}", user.id);
34 }
35 },
36 Err(e) => {
37 upub::downtime::set(ctx.db(), &server).await?;
38 tracing::warn!("could not fetch user {}: {e}", user.id)
39 },
40 Ok(doc) => match ctx.resolve_user(doc, ctx.db()).await {
41 Err(e) => {
42 upub::downtime::set(ctx.db(), &server).await?;
43 tracing::warn!("failed deserializing user '{}': {e}", user.id)
44 },
45 Ok(mut u) => {
46 tracing::info!("updating user {}", user.id);
47 u.internal = Unchanged(user.internal);
48 u.updated = Set(chrono::Utc::now());
49 u.update(ctx.db()).await?;
50 count += 1;
51 },
52 },
53 }
54 }
55
56 tracing::info!("updated {count} users");
57
58 Ok(())
59}