upub_cli/
relay.rs

1use apb::{ActivityMut, BaseMut, ObjectMut};
2use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait};
3use upub::traits::{fetch::RequestError, Fetcher};
4
5#[derive(Debug, Clone, clap::Subcommand)]
6/// available actions to take on relays
7pub enum RelayCommand {
8	/// get all current pending and accepted relays
9	Status,
10	/// request to follow a specific relay
11	Follow {
12		/// relay actor to follow (must be full AP id, like for pleroma)
13		actor: String,
14	},
15	/// accept a pending relay request
16	Accept {
17		/// relay actor to accept (must be full AP id, like for pleroma)
18		actor: String,
19	},
20	/// retract a follow relation to a relay, stopping receiving content
21	Unfollow {
22		/// relay actor to unfollow (must be full AP id, like for pleroma)
23		actor: String,
24	},
25	/// remove a follow relation from a relay, stopping sending content
26	Remove {
27		/// relay actor to unfollow (must be full AP id, like for pleroma)
28		actor: String,
29	},
30}
31
32pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), RequestError> {
33	let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
34		.await?
35		.ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
36
37	let their_internal = match &action {
38		RelayCommand::Status => 0,
39		RelayCommand::Follow { actor }
40		| RelayCommand::Accept { actor }
41		| RelayCommand::Unfollow { actor }
42		| RelayCommand::Remove { actor }
43		=> ctx.fetch_user(actor, ctx.db()).await?.internal,
44	};
45
46	match action {
47		RelayCommand::Status => {
48			tracing::info!("active sinks:");
49			for sink in upub::Query::related(None, Some(my_internal), false)
50				.into_model::<upub::model::actor::Model>()
51				.all(ctx.db())
52				.await?
53			{
54				tracing::info!("[>>] {} {}", sink.name.unwrap_or_default(), sink.id);
55			}
56
57			tracing::info!("active sources:");
58			for source in  upub::Query::related(Some(my_internal), None, false)
59				.into_model::<upub::model::actor::Model>()
60				.all(ctx.db())
61				.await?
62			{
63				tracing::info!("[<<] {} {}", source.name.unwrap_or_default(), source.id);
64			}
65		},
66
67		RelayCommand::Follow { actor } => {
68			let aid = ctx.aid(&upub::Context::new_id());
69			let payload = apb::new()
70				.set_id(Some(aid.clone()))
71				.set_activity_type(Some(apb::ActivityType::Follow))
72				.set_actor(apb::Node::link(ctx.base().to_string()))
73				.set_object(apb::Node::link(actor.clone()))
74				.set_to(apb::Node::links(vec![actor.clone()]))
75				.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
76				.set_published(Some(chrono::Utc::now()));
77			let job = upub::model::job::ActiveModel {
78				internal: NotSet,
79				activity: Set(aid),
80				job_type: Set(upub::model::job::JobType::Outbound),
81				actor: Set(ctx.base().to_string()),
82				target: Set(None),
83				payload: Set(Some(payload)),
84				attempt: Set(0),
85				published: Set(chrono::Utc::now()),
86				not_before: Set(chrono::Utc::now()),
87				error: Set(None),
88			};
89			tracing::info!("following relay {actor}");
90			upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
91		},
92
93		RelayCommand::Accept { actor } => {
94			let relation = upub::model::relation::Entity::find()
95				.filter(upub::model::relation::Column::Follower.eq(their_internal))
96				.filter(upub::model::relation::Column::Following.eq(my_internal))
97				.one(ctx.db())
98				.await?
99				.ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?;
100			let activity = upub::model::activity::Entity::find_by_id(relation.activity)
101				.one(ctx.db())
102				.await?
103				.ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?;
104			let aid = ctx.aid(&upub::Context::new_id());
105			let payload = apb::new()
106				.set_id(Some(aid.clone()))
107				.set_activity_type(Some(apb::ActivityType::Accept(apb::AcceptType::Accept)))
108				.set_actor(apb::Node::link(ctx.base().to_string()))
109				.set_object(apb::Node::link(activity.id))
110				.set_to(apb::Node::links(vec![actor.clone()]))
111				.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
112				.set_published(Some(chrono::Utc::now()));
113			let job = upub::model::job::ActiveModel {
114				internal: NotSet,
115				activity: Set(aid),
116				job_type: Set(upub::model::job::JobType::Outbound),
117				actor: Set(ctx.base().to_string()),
118				target: Set(None),
119				payload: Set(Some(payload)),
120				attempt: Set(0),
121				published: Set(chrono::Utc::now()),
122				not_before: Set(chrono::Utc::now()),
123				error: Set(None),
124			};
125			tracing::info!("accepting relay {actor}");
126			upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
127		},
128
129		RelayCommand::Remove { actor } => {
130			let relation = upub::model::relation::Entity::find()
131				.filter(upub::model::relation::Column::Follower.eq(their_internal))
132				.filter(upub::model::relation::Column::Following.eq(my_internal))
133				.one(ctx.db())
134				.await?
135				.ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?;
136			let accept_activity_id = relation.accept.ok_or(DbErr::RecordNotFound(format!("accept-{their_internal}-{my_internal}")))?;
137			let activity = upub::model::activity::Entity::find_by_id(accept_activity_id)
138				.one(ctx.db())
139				.await?
140				.ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", accept_activity_id)))?;
141			let aid = ctx.aid(&upub::Context::new_id());
142			let payload = apb::new()
143				.set_id(Some(aid.clone()))
144				.set_activity_type(Some(apb::ActivityType::Undo))
145				.set_actor(apb::Node::link(ctx.base().to_string()))
146				.set_object(apb::Node::object(ctx.ap(activity)))
147				.set_to(apb::Node::links(vec![actor.clone()]))
148				.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
149				.set_published(Some(chrono::Utc::now()));
150			let job = upub::model::job::ActiveModel {
151				internal: NotSet,
152				activity: Set(aid),
153				job_type: Set(upub::model::job::JobType::Outbound),
154				actor: Set(ctx.base().to_string()),
155				target: Set(None),
156				payload: Set(Some(payload)),
157				attempt: Set(0),
158				published: Set(chrono::Utc::now()),
159				not_before: Set(chrono::Utc::now()),
160				error: Set(None),
161			};
162			tracing::info!("unfollowing relay {actor}");
163			upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
164		},
165
166		RelayCommand::Unfollow { actor } => {
167			let relation = upub::model::relation::Entity::find()
168				.filter(upub::model::relation::Column::Follower.eq(my_internal))
169				.filter(upub::model::relation::Column::Following.eq(their_internal))
170				.one(ctx.db())
171				.await?
172				.ok_or_else(|| DbErr::RecordNotFound(format!("relation-{my_internal}-{their_internal}")))?;
173			let activity = upub::model::activity::Entity::find_by_id(relation.activity)
174				.one(ctx.db())
175				.await?
176				.ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?;
177			let aid = ctx.aid(&upub::Context::new_id());
178			let payload = apb::new()
179				.set_id(Some(aid.clone()))
180				.set_activity_type(Some(apb::ActivityType::Undo))
181				.set_actor(apb::Node::link(ctx.base().to_string()))
182				.set_object(apb::Node::object(ctx.ap(activity)))
183				.set_to(apb::Node::links(vec![actor.clone()]))
184				.set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
185				.set_published(Some(chrono::Utc::now()));
186			let job = upub::model::job::ActiveModel {
187				internal: NotSet,
188				activity: Set(aid),
189				job_type: Set(upub::model::job::JobType::Outbound),
190				actor: Set(ctx.base().to_string()),
191				target: Set(None),
192				payload: Set(Some(payload)),
193				attempt: Set(0),
194				published: Set(chrono::Utc::now()),
195				not_before: Set(chrono::Utc::now()),
196				error: Set(None),
197			};
198			tracing::info!("unfollowing relay {actor}");
199			upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
200		},
201	}
202
203	Ok(())
204}