1use apb::{ActivityMut, BaseMut, ObjectMut};
2use sea_orm::{ActiveValue::{NotSet, Set}, DbErr, EntityTrait, QueryFilter, ColumnTrait};
3use upub::traits::{fetch::RequestError, Fetcher};
4
5#[derive(Debug, Clone, clap::Subcommand)]
6pub enum RelayCommand {
8 Status,
10 Follow {
12 actor: String,
14 },
15 Accept {
17 actor: String,
19 },
20 Unfollow {
22 actor: String,
24 },
25 Remove {
27 actor: String,
29 },
30}
31
32pub async fn relay(ctx: upub::Context, action: RelayCommand) -> Result<(), RequestError> {
33 let my_internal = upub::model::actor::Entity::ap_to_internal(ctx.base(), ctx.db())
34 .await?
35 .ok_or_else(|| DbErr::RecordNotFound(ctx.base().to_string()))?;
36
37 let their_internal = match &action {
38 RelayCommand::Status => 0,
39 RelayCommand::Follow { actor }
40 | RelayCommand::Accept { actor }
41 | RelayCommand::Unfollow { actor }
42 | RelayCommand::Remove { actor }
43 => ctx.fetch_user(actor, ctx.db()).await?.internal,
44 };
45
46 match action {
47 RelayCommand::Status => {
48 tracing::info!("active sinks:");
49 for sink in upub::Query::related(None, Some(my_internal), false)
50 .into_model::<upub::model::actor::Model>()
51 .all(ctx.db())
52 .await?
53 {
54 tracing::info!("[>>] {} {}", sink.name.unwrap_or_default(), sink.id);
55 }
56
57 tracing::info!("active sources:");
58 for source in upub::Query::related(Some(my_internal), None, false)
59 .into_model::<upub::model::actor::Model>()
60 .all(ctx.db())
61 .await?
62 {
63 tracing::info!("[<<] {} {}", source.name.unwrap_or_default(), source.id);
64 }
65 },
66
67 RelayCommand::Follow { actor } => {
68 let aid = ctx.aid(&upub::Context::new_id());
69 let payload = apb::new()
70 .set_id(Some(aid.clone()))
71 .set_activity_type(Some(apb::ActivityType::Follow))
72 .set_actor(apb::Node::link(ctx.base().to_string()))
73 .set_object(apb::Node::link(actor.clone()))
74 .set_to(apb::Node::links(vec![actor.clone()]))
75 .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
76 .set_published(Some(chrono::Utc::now()));
77 let job = upub::model::job::ActiveModel {
78 internal: NotSet,
79 activity: Set(aid),
80 job_type: Set(upub::model::job::JobType::Outbound),
81 actor: Set(ctx.base().to_string()),
82 target: Set(None),
83 payload: Set(Some(payload)),
84 attempt: Set(0),
85 published: Set(chrono::Utc::now()),
86 not_before: Set(chrono::Utc::now()),
87 error: Set(None),
88 };
89 tracing::info!("following relay {actor}");
90 upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
91 },
92
93 RelayCommand::Accept { actor } => {
94 let relation = upub::model::relation::Entity::find()
95 .filter(upub::model::relation::Column::Follower.eq(their_internal))
96 .filter(upub::model::relation::Column::Following.eq(my_internal))
97 .one(ctx.db())
98 .await?
99 .ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?;
100 let activity = upub::model::activity::Entity::find_by_id(relation.activity)
101 .one(ctx.db())
102 .await?
103 .ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?;
104 let aid = ctx.aid(&upub::Context::new_id());
105 let payload = apb::new()
106 .set_id(Some(aid.clone()))
107 .set_activity_type(Some(apb::ActivityType::Accept(apb::AcceptType::Accept)))
108 .set_actor(apb::Node::link(ctx.base().to_string()))
109 .set_object(apb::Node::link(activity.id))
110 .set_to(apb::Node::links(vec![actor.clone()]))
111 .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
112 .set_published(Some(chrono::Utc::now()));
113 let job = upub::model::job::ActiveModel {
114 internal: NotSet,
115 activity: Set(aid),
116 job_type: Set(upub::model::job::JobType::Outbound),
117 actor: Set(ctx.base().to_string()),
118 target: Set(None),
119 payload: Set(Some(payload)),
120 attempt: Set(0),
121 published: Set(chrono::Utc::now()),
122 not_before: Set(chrono::Utc::now()),
123 error: Set(None),
124 };
125 tracing::info!("accepting relay {actor}");
126 upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
127 },
128
129 RelayCommand::Remove { actor } => {
130 let relation = upub::model::relation::Entity::find()
131 .filter(upub::model::relation::Column::Follower.eq(their_internal))
132 .filter(upub::model::relation::Column::Following.eq(my_internal))
133 .one(ctx.db())
134 .await?
135 .ok_or_else(|| DbErr::RecordNotFound(format!("relation-{their_internal}-{my_internal}")))?;
136 let accept_activity_id = relation.accept.ok_or(DbErr::RecordNotFound(format!("accept-{their_internal}-{my_internal}")))?;
137 let activity = upub::model::activity::Entity::find_by_id(accept_activity_id)
138 .one(ctx.db())
139 .await?
140 .ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", accept_activity_id)))?;
141 let aid = ctx.aid(&upub::Context::new_id());
142 let payload = apb::new()
143 .set_id(Some(aid.clone()))
144 .set_activity_type(Some(apb::ActivityType::Undo))
145 .set_actor(apb::Node::link(ctx.base().to_string()))
146 .set_object(apb::Node::object(ctx.ap(activity)))
147 .set_to(apb::Node::links(vec![actor.clone()]))
148 .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
149 .set_published(Some(chrono::Utc::now()));
150 let job = upub::model::job::ActiveModel {
151 internal: NotSet,
152 activity: Set(aid),
153 job_type: Set(upub::model::job::JobType::Outbound),
154 actor: Set(ctx.base().to_string()),
155 target: Set(None),
156 payload: Set(Some(payload)),
157 attempt: Set(0),
158 published: Set(chrono::Utc::now()),
159 not_before: Set(chrono::Utc::now()),
160 error: Set(None),
161 };
162 tracing::info!("unfollowing relay {actor}");
163 upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
164 },
165
166 RelayCommand::Unfollow { actor } => {
167 let relation = upub::model::relation::Entity::find()
168 .filter(upub::model::relation::Column::Follower.eq(my_internal))
169 .filter(upub::model::relation::Column::Following.eq(their_internal))
170 .one(ctx.db())
171 .await?
172 .ok_or_else(|| DbErr::RecordNotFound(format!("relation-{my_internal}-{their_internal}")))?;
173 let activity = upub::model::activity::Entity::find_by_id(relation.activity)
174 .one(ctx.db())
175 .await?
176 .ok_or_else(|| DbErr::RecordNotFound(format!("activity#{}", relation.activity)))?;
177 let aid = ctx.aid(&upub::Context::new_id());
178 let payload = apb::new()
179 .set_id(Some(aid.clone()))
180 .set_activity_type(Some(apb::ActivityType::Undo))
181 .set_actor(apb::Node::link(ctx.base().to_string()))
182 .set_object(apb::Node::object(ctx.ap(activity)))
183 .set_to(apb::Node::links(vec![actor.clone()]))
184 .set_cc(apb::Node::links(vec![apb::target::PUBLIC.to_string()]))
185 .set_published(Some(chrono::Utc::now()));
186 let job = upub::model::job::ActiveModel {
187 internal: NotSet,
188 activity: Set(aid),
189 job_type: Set(upub::model::job::JobType::Outbound),
190 actor: Set(ctx.base().to_string()),
191 target: Set(None),
192 payload: Set(Some(payload)),
193 attempt: Set(0),
194 published: Set(chrono::Utc::now()),
195 not_before: Set(chrono::Utc::now()),
196 error: Set(None),
197 };
198 tracing::info!("unfollowing relay {actor}");
199 upub::model::job::Entity::insert(job).exec(ctx.db()).await?;
200 },
201 }
202
203 Ok(())
204}