upub_cli/
fix_activities.rs

1use futures::TryStreamExt;
2use sea_orm::{ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, ActiveModelTrait};
3
4macro_rules! ok_or_continue {
5	($x:expr) => {
6		match $x {
7			Some(x) => x,
8			None => continue,
9		}
10	};
11}
12
13pub async fn fix_activities(ctx: upub::Context, likes: bool, announces: bool) -> Result<(), Box<dyn std::error::Error>> {
14	if likes {
15		tracing::info!("fixing like activities...");
16		let mut stream = upub::model::activity::Entity::find()
17			.filter(upub::model::activity::Column::ActivityType.eq(apb::ActivityType::Like))
18			.filter(upub::model::activity::Column::Object.is_not_null())
19			.stream(ctx.db())
20			.await?;
21
22		while let Some(activity) = stream.try_next().await? {
23			let oid = ok_or_continue!(activity.object);
24			let internal_oid = ok_or_continue!(upub::model::object::Entity::ap_to_internal(&oid, ctx.db()).await?);
25			let uid = activity.actor;
26			let internal_uid = ok_or_continue!(upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()).await?);
27			if let Some(like) = upub::model::like::Entity::find()
28				.filter(upub::model::like::Column::Object.eq(internal_oid))
29				.filter(upub::model::like::Column::Actor.eq(internal_uid))
30				// .filter(upub::model::like::Column::Published.eq(activity.published))
31				.one(ctx.db())
32				.await?
33			{
34				tracing::info!("joining like {} to activity {}", like.internal, activity.id);
35				let mut active = like.into_active_model();
36				active.activity = sea_orm::Set(Some(activity.internal));
37				active.update(ctx.db()).await?;
38			}
39		}
40	}
41
42	if announces {
43		tracing::info!("fixing announce activities...");
44		let mut stream = upub::model::activity::Entity::find()
45			.filter(upub::model::activity::Column::ActivityType.eq(apb::ActivityType::Announce))
46			.filter(upub::model::activity::Column::Object.is_not_null())
47			.stream(ctx.db())
48			.await?;
49
50		while let Some(activity) = stream.try_next().await? {
51			let oid = ok_or_continue!(activity.object);
52			let internal_oid = ok_or_continue!(upub::model::object::Entity::ap_to_internal(&oid, ctx.db()).await?);
53			let uid = activity.actor;
54			let internal_uid = ok_or_continue!(upub::model::actor::Entity::ap_to_internal(&uid, ctx.db()).await?);
55			if let Some(announce) = upub::model::announce::Entity::find()
56				.filter(upub::model::announce::Column::Object.eq(internal_oid))
57				.filter(upub::model::announce::Column::Actor.eq(internal_uid))
58				.filter(upub::model::announce::Column::Published.eq(activity.published))
59				.one(ctx.db())
60				.await?
61			{
62				tracing::info!("joining announce {} to activity {}", announce.internal, activity.id);
63				let mut active = announce.into_active_model();
64				active.activity = sea_orm::Set(Some(activity.internal));
65				active.update(ctx.db()).await?;
66			}
67		}
68	}
69
70	tracing::info!("done");
71
72	Ok(())
73}