upub_worker/
dispatcher.rs

1use reqwest::StatusCode;
2use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
3
4use upub::{model, traits::{fetch::RequestError, process::ProcessorError}, Context};
5
6#[derive(Debug, thiserror::Error)]
7pub enum JobError {
8	#[error("database error: {0:?}")]
9	Database(#[from] sea_orm::DbErr),
10
11	#[error("invalid payload json: {0:?}")]
12	Json(#[from] serde_json::Error),
13
14	#[error("malformed payload: {0}")]
15	Malformed(#[from] apb::FieldErr),
16
17	#[error("malformed job: missing payload")]
18	MissingPayload,
19
20	#[error("error processing activity: {0:?}")]
21	ProcessorError(#[from] upub::traits::process::ProcessorError),
22
23	#[error("error delivering activity: {0}")]
24	DeliveryError(#[from] upub::traits::fetch::RequestError),
25
26	#[error("creator is not authorized to carry out this job")]
27	Forbidden,
28}
29
30pub type JobResult<T> = Result<T, JobError>;
31
32#[allow(async_fn_in_trait)]
33pub trait JobDispatcher : Sized {
34	async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
35	async fn lock(&self, job_internal: i64) -> JobResult<bool>;
36	async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, wake: impl crate::WakeToken);
37}
38
39impl JobDispatcher for Context {
40	async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>> {
41		let mut s = model::job::Entity::find()
42			.filter(model::job::Column::NotBefore.lte(chrono::Utc::now()));
43
44		if let Some(t) = filter {
45			s = s.filter(model::job::Column::JobType.eq(t));
46		}
47		
48		Ok(
49			s
50				.order_by(model::job::Column::NotBefore, Order::Asc)
51				.one(self.db())
52				.await?
53		)
54	}
55
56	async fn lock(&self, job_internal: i64) -> JobResult<bool> {
57		let res = model::job::Entity::delete(
58			model::job::ActiveModel {
59				internal: sea_orm::ActiveValue::Set(job_internal),
60				..Default::default()
61			}
62		)
63			.exec(self.db())
64			.await?;
65
66		if res.rows_affected < 1 {
67			return Ok(false);
68		}
69
70		Ok(true)
71	}
72
73	async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) {
74		macro_rules! restart {
75			(now) => { continue };
76			() => {
77				{
78					tokio::select! {
79						_ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
80						_ = wake.wait() => {},
81					}
82					continue;
83				}
84			}
85		}
86
87		let mut pool = tokio::task::JoinSet::new();
88	
89		loop {
90			if stop.stop() { break }
91
92			let job = match self.poll(job_filter).await {
93				Ok(Some(j)) => j,
94				Ok(None) => restart!(),
95				Err(e) => {
96					tracing::error!("error polling for jobs: {e}");
97					restart!()
98				},
99			};
100	
101			match self.lock(job.internal).await {
102				Ok(true) => {},
103				Ok(false) => restart!(now),
104				Err(e) => {
105					tracing::error!("error locking job: {e}");
106					restart!()
107				},
108			}
109	
110			if chrono::Utc::now() > job.published + chrono::Duration::days(self.cfg().security.job_expiration_days as i64) {
111				tracing::info!("dropping expired job {job:?}");
112				restart!(now);
113			}
114
115			if job.job_type != model::job::JobType::Delivery {
116				// delivery jobs are all pre-processed activities
117				// inbound/outbound jobs carry side effects which should only happen once
118				if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity)
119					.one(self.db())
120					.await
121				{
122					tracing::info!("dropping already processed job '{}'", job.activity);
123					restart!(now);
124				}
125			}
126
127			let _ctx = self.clone();
128			pool.spawn(async move {
129				let res = match job.job_type {
130					model::job::JobType::Inbound => crate::inbound::process(_ctx.clone(), &job).await,
131					model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await,
132					model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await,
133				};
134
135				match res {
136					Ok(()) => tracing::debug!("job {} completed", job.activity),
137					Err(JobError::Json(x)) =>
138						tracing::error!("dropping job with invalid json payload: {x}"),
139					Err(JobError::MissingPayload) =>
140						tracing::warn!("dropping job without payload"),
141					Err(JobError::Malformed(f)) =>
142						tracing::error!("dropping job with malformed activity (missing field {f})"),
143					Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) =>
144						tracing::info!("dropping job already processed: {}", job.activity),
145					Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::FORBIDDEN, e)))) => 
146						tracing::warn!("dropping job because requested resource is not accessible: {e}"),
147					Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::NOT_FOUND, e)))) => 
148						tracing::warn!("dropping job because requested resource is not available: {e}"),
149					Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::GONE, e)))) => 
150						tracing::warn!("dropping job because requested resource is no longer available: {e}"),
151					Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Malformed(f)))) => 
152						tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"),
153					Err(e) => {
154						if let JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(status, ref e))) = e {
155							// TODO maybe convert this in generic .is_client_error() check, but excluding 401s
156							//      and 400s because we want to retry those. also maybe 406s? idk theres a lot i
157							//      just want to drop lemmy.cafe jobs
158							if status.as_u16() == 447 {
159								tracing::warn!("dropping job with non-standard error {status} because requested resource is not available: {e}");
160								return;
161							}
162						}
163						tracing::error!("failed processing job '{}': {e}", job.activity);
164						let active = job.clone().repeat(Some(e.to_string()));
165						let mut count = 0;
166						loop {
167							match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {
168								Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity),
169								Ok(_) => break,
170							}
171							count += 1;
172							if count > _ctx.cfg().security.reinsertion_attempt_limit {
173								tracing::error!("reached job reinsertion limit, dropping {job:#?}");
174								break;
175							}
176							tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
177						}
178					}
179				}
180			});
181
182			while pool.len() >= concurrency {
183				if let Some(Err(e)) =  pool.join_next().await {
184					tracing::error!("failed joining processing task: {e}");
185				}
186			}
187		}
188
189		while let Some(joined) = pool.join_next().await {
190			if let Err(e) = joined {
191				tracing::error!("failed joining process task: {e}");
192			}
193		}
194
195	}
196}