1use crate::{
4 repo::{Dereference, Repo},
5 session::Session,
6};
7use std::{rc::Rc, sync::Arc};
8use url::{Host, Url};
9
10#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
20pub enum Authority {
21 None,
23
24 Server(Url),
26
27 Actor(Url),
29}
30
31#[async_trait::async_trait(?Send)]
35pub trait Ingest<Object> {
36 type ActorId;
42
43 type Local: Repo;
45
46 type Error: From<<Self::Local as Repo>::Error>;
48
49 fn local_repo(&self) -> &Self::Local;
51
52 fn is_local(&self, url: &Url) -> bool;
54
55 async fn ingest<Remote: Repo, S: Session>(
64 &self,
65 authority: Authority,
66 actor_id: Self::ActorId,
67 activity: &Object,
68 remote_repo: Remote,
69 session: S,
70 ) -> Result<(), Self::Error>
71 where
72 Self::Error: From<Remote::Error>;
73
74 async fn fetch<D: Dereference<Output = Object>, Remote: Repo, S: Session>(
82 &self,
83 id: D,
84 actor_id: Self::ActorId,
85 remote_repo: Remote,
86 mut session: S,
87 ) -> Result<Option<D::Output>, Self::Error>
88 where
89 Self::ActorId: 'static,
90 Self::Error: From<Remote::Error>,
91 {
92 let opt = self.local_repo().fetch(&id, &mut session).await?;
93
94 if self.is_local(id.url()) {
95 return Ok(opt);
96 }
97
98 let opt = remote_repo.fetch(&id, &mut session).await?;
99
100 if let Some(object) = opt.as_ref() {
101 let authority = Authority::Server(id.url().clone());
102 self.ingest(authority, actor_id, object, remote_repo, session)
103 .await?;
104 }
105
106 Ok(opt)
107 }
108}
109
110pub trait IngestFactory<A> {
112 type Ingest: Ingest<A>;
114
115 fn build_ingest(&self) -> Self::Ingest;
117}
118
119impl std::fmt::Display for Authority {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 match self {
122 Authority::Actor(actor_id) => write!(f, "Actor - '{}'", actor_id),
123 Authority::Server(url) => write!(f, "Server URL - '{}'", url),
124 Authority::None => write!(f, "None"),
125 }
126 }
127}
128
129pub fn is_local(local_host: &Host<String>, local_port: Option<u16>, url: &Url) -> bool {
131 Some(borrow_host(local_host)) == url.host() && local_port == url.port()
132}
133
134fn borrow_host(host: &Host<String>) -> Host<&str> {
135 match host {
136 Host::Ipv4(ip) => Host::Ipv4(*ip),
137 Host::Ipv6(ip) => Host::Ipv6(*ip),
138 Host::Domain(ref domain) => Host::Domain(domain),
139 }
140}
141
142#[async_trait::async_trait(?Send)]
143impl<'a, Object, T> Ingest<Object> for &'a T
144where
145 T: Ingest<Object>,
146 Object: 'static,
147{
148 type Local = T::Local;
149 type ActorId = T::ActorId;
150 type Error = T::Error;
151
152 fn local_repo(&self) -> &Self::Local {
153 T::local_repo(self)
154 }
155
156 fn is_local(&self, url: &Url) -> bool {
157 T::is_local(self, url)
158 }
159
160 async fn ingest<Remote: Repo, S: Session>(
161 &self,
162 authority: Authority,
163 actor_id: Self::ActorId,
164 activity: &Object,
165 remote_repo: Remote,
166 session: S,
167 ) -> Result<(), Self::Error>
168 where
169 Self::Error: From<Remote::Error>,
170 {
171 T::ingest(self, authority, actor_id, activity, remote_repo, session).await
172 }
173}
174
175#[async_trait::async_trait(?Send)]
176impl<'a, Object, T> Ingest<Object> for &'a mut T
177where
178 T: Ingest<Object>,
179 Object: 'static,
180{
181 type Local = T::Local;
182 type Error = T::Error;
183 type ActorId = T::ActorId;
184
185 fn local_repo(&self) -> &Self::Local {
186 T::local_repo(self)
187 }
188
189 fn is_local(&self, url: &Url) -> bool {
190 T::is_local(self, url)
191 }
192
193 async fn ingest<Remote: Repo, S: Session>(
194 &self,
195 authority: Authority,
196 actor_id: Self::ActorId,
197 activity: &Object,
198 remote_repo: Remote,
199 session: S,
200 ) -> Result<(), Self::Error>
201 where
202 Self::Error: From<Remote::Error>,
203 {
204 T::ingest(self, authority, actor_id, activity, remote_repo, session).await
205 }
206}
207
208#[async_trait::async_trait(?Send)]
209impl<Object, T> Ingest<Object> for Box<T>
210where
211 T: Ingest<Object>,
212 Object: 'static,
213{
214 type Local = T::Local;
215 type Error = T::Error;
216 type ActorId = T::ActorId;
217
218 fn local_repo(&self) -> &Self::Local {
219 T::local_repo(self)
220 }
221
222 fn is_local(&self, url: &Url) -> bool {
223 T::is_local(self, url)
224 }
225
226 async fn ingest<Remote: Repo, S: Session>(
227 &self,
228 authority: Authority,
229 actor_id: Self::ActorId,
230 activity: &Object,
231 remote_repo: Remote,
232 session: S,
233 ) -> Result<(), Self::Error>
234 where
235 Self::Error: From<Remote::Error>,
236 {
237 T::ingest(self, authority, actor_id, activity, remote_repo, session).await
238 }
239}
240
241#[async_trait::async_trait(?Send)]
242impl<Object, T> Ingest<Object> for Rc<T>
243where
244 T: Ingest<Object>,
245 Object: 'static,
246{
247 type Local = T::Local;
248 type Error = T::Error;
249 type ActorId = T::ActorId;
250
251 fn local_repo(&self) -> &Self::Local {
252 T::local_repo(self)
253 }
254
255 fn is_local(&self, url: &Url) -> bool {
256 T::is_local(self, url)
257 }
258
259 async fn ingest<Remote: Repo, S: Session>(
260 &self,
261 authority: Authority,
262 actor_id: Self::ActorId,
263 activity: &Object,
264 remote_repo: Remote,
265 session: S,
266 ) -> Result<(), Self::Error>
267 where
268 Self::Error: From<Remote::Error>,
269 {
270 T::ingest(self, authority, actor_id, activity, remote_repo, session).await
271 }
272}
273
274#[async_trait::async_trait(?Send)]
275impl<Object, T> Ingest<Object> for Arc<T>
276where
277 T: Ingest<Object>,
278 Object: 'static,
279{
280 type Local = T::Local;
281 type Error = T::Error;
282 type ActorId = T::ActorId;
283
284 fn local_repo(&self) -> &Self::Local {
285 T::local_repo(self)
286 }
287
288 fn is_local(&self, url: &Url) -> bool {
289 T::is_local(self, url)
290 }
291
292 async fn ingest<Remote: Repo, S: Session>(
293 &self,
294 authority: Authority,
295 actor_id: Self::ActorId,
296 activity: &Object,
297 remote_repo: Remote,
298 session: S,
299 ) -> Result<(), Self::Error>
300 where
301 Self::Error: From<Remote::Error>,
302 {
303 T::ingest(self, authority, actor_id, activity, remote_repo, session).await
304 }
305}