upub_cli/
nuke.rs

1use std::collections::HashSet;
2
3use apb::{ActivityMut, BaseMut, ObjectMut};
4use futures::TryStreamExt;
5use sea_orm::{ActiveValue::{Set, NotSet}, ColumnTrait, EntityTrait, QueryFilter, QuerySelect, SelectColumns};
6
7
8pub async fn nuke(ctx: upub::Context, for_real: bool, delete_posts: bool) -> Result<(), sea_orm::DbErr> {
9	if !for_real {
10		tracing::warn!("THIS IS A DRY RUN! pass --for-real to actually nuke this instance");
11	}
12
13	let mut to_undo = Vec::new();
14
15	// TODO rather expensive to find all local users with a LIKE query, should add an isLocal flag
16	let local_users_vec = upub::model::actor::Entity::find()
17		.filter(upub::model::actor::Column::Id.like(format!("{}%", ctx.base())))
18		.select_only()
19		.select_column(upub::model::actor::Column::Internal)
20		.into_tuple::<i64>()
21		.all(ctx.db())
22		.await?;
23	
24	let local_users : HashSet<i64> = HashSet::from_iter(local_users_vec);
25
26	{
27		let mut stream = upub::model::relation::Entity::find().stream(ctx.db()).await?;
28		while let Some(like) = stream.try_next().await? {
29			if local_users.contains(&like.follower) {
30				to_undo.push(like.activity);
31			} else if local_users.contains(&like.following) {
32				if let Some(accept) = like.accept {
33					to_undo.push(accept);
34				}
35			}
36		}
37	}
38
39	for internal in to_undo {
40		let Some(activity) = upub::model::activity::Entity::find_by_id(internal)
41			.one(ctx.db())
42			.await?
43		else {
44			tracing::error!("could not load activity #{internal}");
45			continue;
46		};
47
48		let Some(ref oid) = activity.object
49		else {
50			tracing::error!("can't undo activity without object");
51			continue;
52		};
53
54		let (target, undone) = if matches!(activity.activity_type, apb::ActivityType::Follow) {
55			(oid.clone(), ctx.ap(activity.clone()))
56		} else {
57			let follow_activity = upub::model::activity::Entity::find_by_ap_id(oid)
58				.one(ctx.db())
59				.await?
60				.ok_or(sea_orm::DbErr::RecordNotFound(oid.clone()))?;
61			(follow_activity.clone().object.unwrap_or_default(), ctx.ap(follow_activity))
62		};
63
64		let aid = ctx.aid(&upub::Context::new_id());
65		let undo_activity = apb::new()
66			.set_id(Some(aid.clone()))
67			.set_activity_type(Some(apb::ActivityType::Undo))
68			.set_actor(apb::Node::link(activity.actor.clone()))
69			.set_object(apb::Node::object(undone))
70			.set_to(apb::Node::links(vec![target]))
71			.set_published(Some(chrono::Utc::now()));
72
73
74		let job = upub::model::job::ActiveModel {
75			internal: NotSet,
76			activity: Set(aid),
77			job_type: Set(upub::model::job::JobType::Outbound),
78			actor: Set(activity.actor),
79			target: Set(None),
80			published: Set(chrono::Utc::now()),
81			not_before: Set(chrono::Utc::now()),
82			attempt: Set(0),
83			payload: Set(Some(undo_activity)),
84			error: Set(None),
85		};
86
87		tracing::info!("undoing {}", activity.id);
88
89		if for_real {
90			upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
91		}
92	}
93
94	if delete_posts {
95		let mut stream = upub::model::object::Entity::find()
96			.filter(upub::model::object::Column::Id.like(format!("{}%", ctx.base())))
97			.stream(ctx.db())
98			.await?;
99
100		while let Some(object) = stream.try_next().await? {
101			let aid = ctx.aid(&upub::Context::new_id());
102			let actor = object.attributed_to.unwrap_or_else(|| ctx.domain().to_string());
103			let undo_activity = apb::new()
104				.set_id(Some(aid.clone()))
105				.set_activity_type(Some(apb::ActivityType::Delete))
106				.set_actor(apb::Node::link(actor.clone()))
107				.set_object(apb::Node::link(object.id.clone()))
108				.set_to(apb::Node::links(object.to.0))
109				.set_cc(apb::Node::links(object.cc.0))
110				.set_bto(apb::Node::links(object.bto.0))
111				.set_bcc(apb::Node::links(object.bcc.0))
112				.set_published(Some(chrono::Utc::now()));
113
114
115			let job = upub::model::job::ActiveModel {
116				internal: NotSet,
117				activity: Set(aid),
118				job_type: Set(upub::model::job::JobType::Outbound),
119				actor: Set(actor),
120				target: Set(None),
121				published: Set(chrono::Utc::now()),
122				not_before: Set(chrono::Utc::now()),
123				attempt: Set(0),
124				payload: Set(Some(undo_activity)),
125				error: Set(None),
126			};
127
128			tracing::info!("deleting {}", object.id);
129
130			if for_real {
131				upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
132			}
133		}
134	}
135
136	Ok(())
137}