upub_worker/
dispatcher.rs1use reqwest::StatusCode;
2use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
3
4use upub::{model, traits::{fetch::RequestError, process::ProcessorError}, Context};
5
6#[derive(Debug, thiserror::Error)]
7pub enum JobError {
8 #[error("database error: {0:?}")]
9 Database(#[from] sea_orm::DbErr),
10
11 #[error("invalid payload json: {0:?}")]
12 Json(#[from] serde_json::Error),
13
14 #[error("malformed payload: {0}")]
15 Malformed(#[from] apb::FieldErr),
16
17 #[error("malformed job: missing payload")]
18 MissingPayload,
19
20 #[error("error processing activity: {0:?}")]
21 ProcessorError(#[from] upub::traits::process::ProcessorError),
22
23 #[error("error delivering activity: {0}")]
24 DeliveryError(#[from] upub::traits::fetch::RequestError),
25
26 #[error("creator is not authorized to carry out this job")]
27 Forbidden,
28}
29
30pub type JobResult<T> = Result<T, JobError>;
31
32#[allow(async_fn_in_trait)]
33pub trait JobDispatcher : Sized {
34 async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>>;
35 async fn lock(&self, job_internal: i64) -> JobResult<bool>;
36 async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, wake: impl crate::WakeToken);
37}
38
39impl JobDispatcher for Context {
40 async fn poll(&self, filter: Option<model::job::JobType>) -> JobResult<Option<model::job::Model>> {
41 let mut s = model::job::Entity::find()
42 .filter(model::job::Column::NotBefore.lte(chrono::Utc::now()));
43
44 if let Some(t) = filter {
45 s = s.filter(model::job::Column::JobType.eq(t));
46 }
47
48 Ok(
49 s
50 .order_by(model::job::Column::NotBefore, Order::Asc)
51 .one(self.db())
52 .await?
53 )
54 }
55
56 async fn lock(&self, job_internal: i64) -> JobResult<bool> {
57 let res = model::job::Entity::delete(
58 model::job::ActiveModel {
59 internal: sea_orm::ActiveValue::Set(job_internal),
60 ..Default::default()
61 }
62 )
63 .exec(self.db())
64 .await?;
65
66 if res.rows_affected < 1 {
67 return Ok(false);
68 }
69
70 Ok(true)
71 }
72
73 async fn run(self, concurrency: usize, poll_interval: u64, job_filter: Option<model::job::JobType>, stop: impl crate::StopToken, mut wake: impl crate::WakeToken) {
74 macro_rules! restart {
75 (now) => { continue };
76 () => {
77 {
78 tokio::select! {
79 _ = tokio::time::sleep(std::time::Duration::from_secs(poll_interval)) => {},
80 _ = wake.wait() => {},
81 }
82 continue;
83 }
84 }
85 }
86
87 let mut pool = tokio::task::JoinSet::new();
88
89 loop {
90 if stop.stop() { break }
91
92 let job = match self.poll(job_filter).await {
93 Ok(Some(j)) => j,
94 Ok(None) => restart!(),
95 Err(e) => {
96 tracing::error!("error polling for jobs: {e}");
97 restart!()
98 },
99 };
100
101 match self.lock(job.internal).await {
102 Ok(true) => {},
103 Ok(false) => restart!(now),
104 Err(e) => {
105 tracing::error!("error locking job: {e}");
106 restart!()
107 },
108 }
109
110 if chrono::Utc::now() > job.published + chrono::Duration::days(self.cfg().security.job_expiration_days as i64) {
111 tracing::info!("dropping expired job {job:?}");
112 restart!(now);
113 }
114
115 if job.job_type != model::job::JobType::Delivery {
116 if let Ok(Some(_)) = model::activity::Entity::find_by_ap_id(&job.activity)
119 .one(self.db())
120 .await
121 {
122 tracing::info!("dropping already processed job '{}'", job.activity);
123 restart!(now);
124 }
125 }
126
127 let _ctx = self.clone();
128 pool.spawn(async move {
129 let res = match job.job_type {
130 model::job::JobType::Inbound => crate::inbound::process(_ctx.clone(), &job).await,
131 model::job::JobType::Outbound => crate::outbound::process(_ctx.clone(), &job).await,
132 model::job::JobType::Delivery => crate::delivery::process(_ctx.clone(), &job).await,
133 };
134
135 match res {
136 Ok(()) => tracing::debug!("job {} completed", job.activity),
137 Err(JobError::Json(x)) =>
138 tracing::error!("dropping job with invalid json payload: {x}"),
139 Err(JobError::MissingPayload) =>
140 tracing::warn!("dropping job without payload"),
141 Err(JobError::Malformed(f)) =>
142 tracing::error!("dropping job with malformed activity (missing field {f})"),
143 Err(JobError::ProcessorError(ProcessorError::AlreadyProcessed)) =>
144 tracing::info!("dropping job already processed: {}", job.activity),
145 Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::FORBIDDEN, e)))) =>
146 tracing::warn!("dropping job because requested resource is not accessible: {e}"),
147 Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::NOT_FOUND, e)))) =>
148 tracing::warn!("dropping job because requested resource is not available: {e}"),
149 Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(StatusCode::GONE, e)))) =>
150 tracing::warn!("dropping job because requested resource is no longer available: {e}"),
151 Err(JobError::ProcessorError(ProcessorError::PullError(RequestError::Malformed(f)))) =>
152 tracing::warn!("dropping job because requested resource could not be verified (fetch is invalid AP object: {f})"),
153 Err(e) => {
154 if let JobError::ProcessorError(ProcessorError::PullError(RequestError::Fetch(status, ref e))) = e {
155 if status.as_u16() == 447 {
159 tracing::warn!("dropping job with non-standard error {status} because requested resource is not available: {e}");
160 return;
161 }
162 }
163 tracing::error!("failed processing job '{}': {e}", job.activity);
164 let active = job.clone().repeat(Some(e.to_string()));
165 let mut count = 0;
166 loop {
167 match model::job::Entity::insert(active.clone()).exec(_ctx.db()).await {
168 Err(e) => tracing::error!("could not insert back job '{}': {e}", job.activity),
169 Ok(_) => break,
170 }
171 count += 1;
172 if count > _ctx.cfg().security.reinsertion_attempt_limit {
173 tracing::error!("reached job reinsertion limit, dropping {job:#?}");
174 break;
175 }
176 tokio::time::sleep(std::time::Duration::from_secs(poll_interval)).await;
177 }
178 }
179 }
180 });
181
182 while pool.len() >= concurrency {
183 if let Some(Err(e)) = pool.join_next().await {
184 tracing::error!("failed joining processing task: {e}");
185 }
186 }
187 }
188
189 while let Some(joined) = pool.join_next().await {
190 if let Err(e) = joined {
191 tracing::error!("failed joining process task: {e}");
192 }
193 }
194
195 }
196}