1use sea_orm::{ActiveModelTrait, EntityTrait};
2
3pub async fn count(ctx: upub::Context, likes: bool, shares: bool, replies: bool) -> Result<(), sea_orm::DbErr> {
4 use futures::TryStreamExt;
5 let db = ctx.db();
6
7 if likes {
8 tracing::info!("counting likes...");
9 let mut store = std::collections::HashMap::new();
10 {
11 let mut stream = upub::model::like::Entity::find().stream(db).await?;
12 while let Some(like) = stream.try_next().await? {
13 store.insert(like.object, store.get(&like.object).unwrap_or(&0) + 1);
14 }
15 }
16
17 for (k, v) in store {
18 let m = upub::model::object::ActiveModel {
19 internal: sea_orm::Unchanged(k),
20 likes: sea_orm::Set(v),
21 ..Default::default()
22 };
23 if let Err(e) = m.update(db).await {
24 tracing::warn!("record not updated ({k}): {e}");
25 }
26 }
27 }
28
29 if shares {
30 tracing::info!("counting shares...");
31 let mut store = std::collections::HashMap::new();
32 {
33 let mut stream = upub::model::announce::Entity::find().stream(db).await?;
34 while let Some(share) = stream.try_next().await? {
35 store.insert(share.object, store.get(&share.object).unwrap_or(&0) + 1);
36 }
37 }
38
39 for (k, v) in store {
40 let m = upub::model::object::ActiveModel {
41 internal: sea_orm::Unchanged(k),
42 announces: sea_orm::Set(v),
43 ..Default::default()
44 };
45 if let Err(e) = m.update(db).await {
46 tracing::warn!("record not updated ({k}): {e}");
47 }
48 }
49 }
50
51 if replies {
52 tracing::info!("counting replies...");
53 let mut store = std::collections::HashMap::new();
54 {
55 let mut stream = upub::model::object::Entity::find().stream(db).await?;
56 while let Some(object) = stream.try_next().await? {
57 if let Some(reply) = object.in_reply_to {
58 let before = store.get(&reply).unwrap_or(&0);
59 store.insert(reply, before + 1);
60 }
61 }
62 }
63
64 for (k, v) in store {
65 let m = upub::model::object::ActiveModel {
66 id: sea_orm::Unchanged(k.clone()),
67 replies: sea_orm::Set(v),
68 ..Default::default()
69 };
70 if let Err(e) = m.update(db).await {
72 tracing::warn!("record not updated ({k}): {e}");
73 }
74 }
75 }
76
77 tracing::info!("done running fix tasks");
78 Ok(())
79}