apub_core/
ingest.rs

1//! traits around accepting activities
2
3use crate::{
4    repo::{Dereference, Repo},
5    session::Session,
6};
7use std::{rc::Rc, sync::Arc};
8use url::{Host, Url};
9
10/// The Authority that is providing the ingested data
11///
12/// - An Authority of None means the data is untrustworthy, as no entity can be verified to have
13///    provided this data
14/// - An Authority of Server means the data to be ingested has been provided on behalf of it's
15///   origin server. A URL is provided in the Server variant to describe the specific URL the data
16///   originates from.
17/// - An Authority of Actor(Url) means the data to be ingested has been provided on behalf of the
18///   Actor identified by the associated URL
19#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
20pub enum Authority {
21    /// No Authority provided
22    None,
23
24    /// The Authority for ingested data is the server the data is hosted on
25    Server(Url),
26
27    /// The Authority for the ingested data is the provided url
28    Actor(Url),
29}
30
31/// Describes accepting a new Object into the system
32///
33/// This type is implemented by users of `apub` to hook into provied inbox methods
34#[async_trait::async_trait(?Send)]
35pub trait Ingest<Object> {
36    /// The actor that is receiving the activity
37    ///
38    /// e.g.
39    /// - the community that is receiving a new post
40    /// - the server actor (in the case of the shared inbox)
41    type ActorId;
42
43    /// The local repository
44    type Local: Repo;
45
46    /// The error produced when ingesting activities
47    type Error: From<<Self::Local as Repo>::Error>;
48
49    /// Get the local repository
50    fn local_repo(&self) -> &Self::Local;
51
52    /// Determine if a given URL is local
53    fn is_local(&self, url: &Url) -> bool;
54
55    /// Accept and process a given activity
56    ///
57    /// Args:
58    /// - authority: the source of the information, either the Actor that provided the object, or the URL it was fetched from
59    /// - actor_id: the ID of the actor accepting the object.
60    /// - activity: the Object being ingested
61    /// - remote_repo: a handle to a remote repository (probably an HTTP client) for ingesting further objects
62    /// - session: the request session associated with the Remote Repo
63    async fn ingest<Remote: Repo, S: Session>(
64        &self,
65        authority: Authority,
66        actor_id: Self::ActorId,
67        activity: &Object,
68        remote_repo: Remote,
69        session: S,
70    ) -> Result<(), Self::Error>
71    where
72        Self::Error: From<Remote::Error>;
73
74    /// Dereference an ID from the local or remote repo
75    ///
76    /// Args:
77    /// - id: the ID of the object to be fetched
78    /// - actor_id: the ID of the actor fetching the object.
79    /// - remote_repo: a handle to a remote repository (probably an HTTP client) for ingesting further objects
80    /// - session: the request session associated with the Remote Repo
81    async fn fetch<D: Dereference<Output = Object>, Remote: Repo, S: Session>(
82        &self,
83        id: D,
84        actor_id: Self::ActorId,
85        remote_repo: Remote,
86        mut session: S,
87    ) -> Result<Option<D::Output>, Self::Error>
88    where
89        Self::ActorId: 'static,
90        Self::Error: From<Remote::Error>,
91    {
92        let opt = self.local_repo().fetch(&id, &mut session).await?;
93
94        if self.is_local(id.url()) {
95            return Ok(opt);
96        }
97
98        let opt = remote_repo.fetch(&id, &mut session).await?;
99
100        if let Some(object) = opt.as_ref() {
101            let authority = Authority::Server(id.url().clone());
102            self.ingest(authority, actor_id, object, remote_repo, session)
103                .await?;
104        }
105
106        Ok(opt)
107    }
108}
109
110/// Describes a type that can produce an Ingest
111pub trait IngestFactory<A> {
112    /// The Ingest type
113    type Ingest: Ingest<A>;
114
115    /// Build the ingest type
116    fn build_ingest(&self) -> Self::Ingest;
117}
118
119impl std::fmt::Display for Authority {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        match self {
122            Authority::Actor(actor_id) => write!(f, "Actor - '{}'", actor_id),
123            Authority::Server(url) => write!(f, "Server URL - '{}'", url),
124            Authority::None => write!(f, "None"),
125        }
126    }
127}
128
129/// A helper for determining if a given URL matches a Host and Port
130pub fn is_local(local_host: &Host<String>, local_port: Option<u16>, url: &Url) -> bool {
131    Some(borrow_host(local_host)) == url.host() && local_port == url.port()
132}
133
134fn borrow_host(host: &Host<String>) -> Host<&str> {
135    match host {
136        Host::Ipv4(ip) => Host::Ipv4(*ip),
137        Host::Ipv6(ip) => Host::Ipv6(*ip),
138        Host::Domain(ref domain) => Host::Domain(domain),
139    }
140}
141
142#[async_trait::async_trait(?Send)]
143impl<'a, Object, T> Ingest<Object> for &'a T
144where
145    T: Ingest<Object>,
146    Object: 'static,
147{
148    type Local = T::Local;
149    type ActorId = T::ActorId;
150    type Error = T::Error;
151
152    fn local_repo(&self) -> &Self::Local {
153        T::local_repo(self)
154    }
155
156    fn is_local(&self, url: &Url) -> bool {
157        T::is_local(self, url)
158    }
159
160    async fn ingest<Remote: Repo, S: Session>(
161        &self,
162        authority: Authority,
163        actor_id: Self::ActorId,
164        activity: &Object,
165        remote_repo: Remote,
166        session: S,
167    ) -> Result<(), Self::Error>
168    where
169        Self::Error: From<Remote::Error>,
170    {
171        T::ingest(self, authority, actor_id, activity, remote_repo, session).await
172    }
173}
174
175#[async_trait::async_trait(?Send)]
176impl<'a, Object, T> Ingest<Object> for &'a mut T
177where
178    T: Ingest<Object>,
179    Object: 'static,
180{
181    type Local = T::Local;
182    type Error = T::Error;
183    type ActorId = T::ActorId;
184
185    fn local_repo(&self) -> &Self::Local {
186        T::local_repo(self)
187    }
188
189    fn is_local(&self, url: &Url) -> bool {
190        T::is_local(self, url)
191    }
192
193    async fn ingest<Remote: Repo, S: Session>(
194        &self,
195        authority: Authority,
196        actor_id: Self::ActorId,
197        activity: &Object,
198        remote_repo: Remote,
199        session: S,
200    ) -> Result<(), Self::Error>
201    where
202        Self::Error: From<Remote::Error>,
203    {
204        T::ingest(self, authority, actor_id, activity, remote_repo, session).await
205    }
206}
207
208#[async_trait::async_trait(?Send)]
209impl<Object, T> Ingest<Object> for Box<T>
210where
211    T: Ingest<Object>,
212    Object: 'static,
213{
214    type Local = T::Local;
215    type Error = T::Error;
216    type ActorId = T::ActorId;
217
218    fn local_repo(&self) -> &Self::Local {
219        T::local_repo(self)
220    }
221
222    fn is_local(&self, url: &Url) -> bool {
223        T::is_local(self, url)
224    }
225
226    async fn ingest<Remote: Repo, S: Session>(
227        &self,
228        authority: Authority,
229        actor_id: Self::ActorId,
230        activity: &Object,
231        remote_repo: Remote,
232        session: S,
233    ) -> Result<(), Self::Error>
234    where
235        Self::Error: From<Remote::Error>,
236    {
237        T::ingest(self, authority, actor_id, activity, remote_repo, session).await
238    }
239}
240
241#[async_trait::async_trait(?Send)]
242impl<Object, T> Ingest<Object> for Rc<T>
243where
244    T: Ingest<Object>,
245    Object: 'static,
246{
247    type Local = T::Local;
248    type Error = T::Error;
249    type ActorId = T::ActorId;
250
251    fn local_repo(&self) -> &Self::Local {
252        T::local_repo(self)
253    }
254
255    fn is_local(&self, url: &Url) -> bool {
256        T::is_local(self, url)
257    }
258
259    async fn ingest<Remote: Repo, S: Session>(
260        &self,
261        authority: Authority,
262        actor_id: Self::ActorId,
263        activity: &Object,
264        remote_repo: Remote,
265        session: S,
266    ) -> Result<(), Self::Error>
267    where
268        Self::Error: From<Remote::Error>,
269    {
270        T::ingest(self, authority, actor_id, activity, remote_repo, session).await
271    }
272}
273
274#[async_trait::async_trait(?Send)]
275impl<Object, T> Ingest<Object> for Arc<T>
276where
277    T: Ingest<Object>,
278    Object: 'static,
279{
280    type Local = T::Local;
281    type Error = T::Error;
282    type ActorId = T::ActorId;
283
284    fn local_repo(&self) -> &Self::Local {
285        T::local_repo(self)
286    }
287
288    fn is_local(&self, url: &Url) -> bool {
289        T::is_local(self, url)
290    }
291
292    async fn ingest<Remote: Repo, S: Session>(
293        &self,
294        authority: Authority,
295        actor_id: Self::ActorId,
296        activity: &Object,
297        remote_repo: Remote,
298        session: S,
299    ) -> Result<(), Self::Error>
300    where
301        Self::Error: From<Remote::Error>,
302    {
303        T::ingest(self, authority, actor_id, activity, remote_repo, session).await
304    }
305}