upub_cli/
fix_activities.rs1use futures::TryStreamExt;
2use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, ActiveModelTrait};
3
4macro_rules! ok_or_continue {
5 ($x:expr) => {
6 match $x {
7 Some(x) => x,
8 None => continue,
9 }
10 };
11}
12
13pub async fn fix_activities(ctx: upub::Context, likes: bool, announces: bool) -> Result<(), Box<dyn std::error::Error>> {
14 if likes {
15 tracing::info!("fixing like activities...");
16 let mut stream = upub::model::activity::Entity::find()
17 .filter(upub::model::activity::Column::ActivityType.eq(apb::ActivityType::Like))
18 .filter(upub::model::activity::Column::Object.is_not_null())
19 .stream(ctx.db())
20 .await?;
21
22 while let Some(activity) = stream.try_next().await? {
23 let oid = ok_or_continue!(activity.object);
24 let internal_oid = ok_or_continue!(upub::model::object::Entity::ap_to_internal(&oid, ctx.db()).await?);
25 let uid = activity.actor;
26 let internal_uid = ok_or_continue!(upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()).await?);
27 if let Some(like) = upub::model::like::Entity::find()
28 .filter(upub::model::like::Column::Object.eq(internal_oid))
29 .filter(upub::model::like::Column::Actor.eq(internal_uid))
30 .one(ctx.db())
32 .await?
33 {
34 tracing::info!("joining like {} to activity {}", like.internal, activity.id);
35 let mut active = like.into_active_model();
36 active.activity = sea_orm::Set(Some(activity.internal));
37 active.update(ctx.db()).await?;
38 }
39 }
40 }
41
42 if announces {
43 tracing::info!("fixing announce activities...");
44 let mut stream = upub::model::activity::Entity::find()
45 .filter(upub::model::activity::Column::ActivityType.eq(apb::ActivityType::Announce))
46 .filter(upub::model::activity::Column::Object.is_not_null())
47 .stream(ctx.db())
48 .await?;
49
50 while let Some(activity) = stream.try_next().await? {
51 let oid = ok_or_continue!(activity.object);
52 let internal_oid = ok_or_continue!(upub::model::object::Entity::ap_to_internal(&oid, ctx.db()).await?);
53 let uid = activity.actor;
54 let internal_uid = ok_or_continue!(upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()).await?);
55 if let Some(announce) = upub::model::announce::Entity::find()
56 .filter(upub::model::announce::Column::Object.eq(internal_oid))
57 .filter(upub::model::announce::Column::Actor.eq(internal_uid))
58 .filter(upub::model::announce::Column::Published.eq(activity.published))
59 .one(ctx.db())
60 .await?
61 {
62 tracing::info!("joining announce {} to activity {}", announce.internal, activity.id);
63 let mut active = announce.into_active_model();
64 active.activity = sea_orm::Set(Some(activity.internal));
65 active.update(ctx.db()).await?;
66 }
67 }
68 }
69
70 tracing::info!("done");
71
72 Ok(())
73}