tag2upload_service_manager/
webhook.rs

1//! webhook handling (forge-agnostic)
2//!
3//! Processing happens as follows:
4//!
5//!  * HHTP POST data
6//!    1. processed by Rocket route handler
7//!    2. `FromData for RoutePayloadParameter<F>`
8//!       * Checks the vhost.
9//!       * Extracts the client IP address and checks it
10//!        against the union of the forge ACLs.
11//!       * Deserialises the JSON into serde_json::Value.
12//!    2. Per-forge webhook implementation route calls
13//!       `RoutePayloadParameter<F>::webhook_impl`.
14//!    3. Global implementation `webhook_impl`, here:
15//!       * Goes via `webhook_impl_logged` and
16//!         `RawPayload.decode_analyse_validate_record`
17//!         to structure the logging and error handling.
18//!       * Calls the forge-specific prefilter.
19//!       * Further deserialises the payload into
20//!         the forge-specific data `We3bhookForge::Payload`.
21//!       * Calls `ForgeKind::analyse_payload` to get a
22//!         `webhook::AnalysedPayloadData`.
23//!       * Checks the host name and IP address against the specific ACL.
24//!       * Parses the tag metadata.
25//!       * Makes a `JobRow` and Inserts it into the database.
26use crate::prelude::*;
27
28use rocket::data::{Data, FromData, Outcome};
29use rocket::request::Request;
30use rocket::serde::json::Json;
31
32/// Used in `#[rocket::post]`
33pub struct RoutePayloadParameter<F> {
34    payload: Result<RawPayload, WebError>,
35    forge: PhantomData<F>,
36}
37
38/// Payload, JSON deserialised but not yet decoded by forge impl
39struct RawPayload {
40    client: ActualClient,
41    #[allow(dead_code)]
42    unified_acl_checked: IsAllowedClient,
43    #[allow(dead_code)]
44    vhost_checked: ui_vhost::IsWebhook,
45    raw: serde_json::Value,
46}
47
48#[derive(Deftly, Debug)]
49pub struct AnalysedPayload<F: ForgeReceiver> {
50    pub repo_git_url: String,
51    pub tag_name: String,
52    pub tag_objectid: GitObjectId,
53    pub tag_message: String,
54    pub forge_data: F::DbData,
55}
56
57struct RequestBeingProcessed<'a, F: ForgeReceiver> {
58    globals: &'a Globals,
59    forge: &'a F,
60    payload: AnalysedPayload<F>,
61    raw: &'a RawPayload,
62}
63
64/// An error E that has been logged, iff it needed to be
65struct Logged<E>(E);
66
67impl Display for Logged<WebError> {
68    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
69        write!(f, "{}, {}", self.0.http_status(), self.0)
70    }
71}
72
73impl<'r> RawPayload {
74    /// First non-Rocket call site for a webhook POST request
75    ///
76    /// Here we try to handle the POST data, but only if we decide
77    /// that's appropriate.  We might just return `Err`.
78    async fn from_data(
79        req: &'r Request<'_>,
80        data: Data<'r>,
81    ) -> Result<Self, WE> {
82        use Outcome as O;
83        let gl = globals();
84
85        let vhost_checked = ui_vhost::IsWebhook::from_request(req).await
86            .map_err(|e| WE::PageNotFoundHere(e.into()))?;
87
88        let client = req.client_ip()
89            .ok_or_else(|| internal!("missing client IP addr"))?;
90        let client = ActualClient::new(client);
91
92        let unified_acl_checked: IsAllowedClient = client.allowed_by(
93            &gl.computed_config.unified_webhook_acl
94        ).await?;
95
96        // Now we know the caller is authorised, somehow,
97        // and we can reasonably safely process the POST data.
98
99        let raw = match Json::from_data(req, data).await {
100            O::Success(Json(raw)) => Ok(raw),
101            O::Error((s, e)) => Err(WebError::MisconfiguredWebhook(
102                anyhow!("body parsing failed ({s}): {e}")
103            )),
104            x @ O::Forward(_) => Err(
105                internal!("forwarded?! {x:?}").into()
106            ),
107        }?;
108
109        Ok(RawPayload { vhost_checked, client, unified_acl_checked, raw })
110    }
111}
112
113#[async_trait]
114impl<'r, F: ForgeKind> FromData<'r> for RoutePayloadParameter<F> {
115    /// Errors go into `RawWebhookPayload.payload`
116    ///
117    /// This is because Rocket throws the error away when generating
118    /// a response!
119    type Error = Void;
120    
121    async fn from_data(
122        req: &'r Request<'_>,
123        data: Data<'r>,
124    ) -> Outcome<'r, Self, Self::Error> {
125        use Outcome as O;
126
127        let payload = RawPayload::from_data(req, data).await;
128
129        match &payload {
130            Ok(y) => trace!(client=?y.client, raw=?y.raw, "webhook"),
131            Err(err) => trace!(client=?req.client_ip(), %err, "webhook"),
132        };
133
134        let forge = PhantomData;
135
136        O::Success(RoutePayloadParameter { payload, forge })
137    }
138}
139
140impl<F: ForgeReceiver> RoutePayloadParameter<F> {
141    /// Implementation of the webhook, main entrypoint
142    ///
143    /// All POST requests to the right path are handled here,
144    /// so this function is responsible for most error handling.
145    ///
146    /// Errors that are checked for *before* processing POST data
147    /// (including client IP addresses that aren't on any of our ACLs)
148    /// show up as `self.payload` being `Err`.
149    pub async fn webhook_impl(self) -> Result<String, WebError> {
150        self.webhook_impl_logged().await
151            .map_err(|Logged(e)| e)
152    }
153
154    /// Implementation of `webhook_impl`, with assurance of logging
155    async fn webhook_impl_logged(self) -> Result<String, Logged<WebError>> {
156        let raw = self.payload
157            .map_err(|e| {
158                let e = Logged(e);
159                debug!("rejected early: {e}");
160                e
161            })?;
162
163        let mut log_info = format!("from {}", &raw.client);
164        let forge = F::default();
165
166        forge.prefilter_payload(&raw.raw)
167            .map_err(|e| {
168                let e = Logged(e);
169                debug!("filtered: {log_info}: {e}");
170                e
171            })?;
172
173        raw.decode_analyse_validate_record(&forge, &mut log_info).await
174            .map_err(|e| {
175                let e = Logged(e);
176                write_string!(log_info, ": {e}");
177                match &e.0 {
178                    WE::Throttled { .. } |
179                    WE::DisallowedClient { .. } |
180                    WE::PageNotFoundHere { .. } |
181                    // reported as Error when we generated it
182                    WE::InternalError { .. } => 
183                        info!("rejected: {log_info}"),
184                    WE::MisconfiguredWebhook { .. } |
185                    WE::NotForUs { .. } => {
186 // info_debug_with_raw! from decode_analyse_validate_record is not suitable
187 // here, because here we have no parameters so the message text must be
188 // different.  Anyway, here the repetition is much less.
189                        info!("ignored: {log_info}");
190                        debug!("ignored: {log_info}; raw={:?}", &raw.raw);
191                    }
192                }
193                e
194            })
195    }
196}
197
198impl RawPayload {
199    /// Core of webhook processing, after pre-checks have been done
200    ///
201    /// The caller will log errors thrown from here.
202    async fn decode_analyse_validate_record<F: ForgeReceiver>(
203        &self,
204        forge: &F,
205        log_info: &mut String,
206    ) -> Result<String, WebError> {
207        let payload = Deserialize::deserialize(&self.raw)
208            .context("JSON payload does not conform to expected schema")
209            .map_err(WE::MisconfiguredWebhook)?;
210
211        let payload = forge.analyse_payload(payload)?;
212
213        write_string!(log_info,
214                      ", url={} tag={} objectid={}",
215                      payload.repo_git_url,
216                      payload.tag_name,
217                      payload.tag_objectid);
218
219        let req = RequestBeingProcessed {
220            globals: &globals(),
221            payload,
222            forge,
223            raw: self,
224        };
225
226        let forge_host = req.check_permission().await?;
227
228        // precheck, so we don't do a bunch of work if we are throttled
229        db_transaction(TN::Readonly, |dbt| check_not_throttled(dbt))??;
230
231        let forge_namever = forge.namever().to_owned().into();
232
233        req.check_tag_name()?;
234
235        let AnalysedPayload {
236            repo_git_url,
237            tag_name,
238            tag_objectid,
239            forge_data,
240            tag_message,
241        } = req.payload;
242
243        let tag_meta = t2umeta::Parsed::from_tag_message(&tag_message)?;
244
245        *log_info = format!("source={}, {log_info}", tag_meta.source);
246
247        let validated_data = JobData {
248            repo_git_url,
249            tag_objectid,
250            tag_name,
251            forge_host,
252            forge_namever,
253            forge_data: ForgeData::from_raw_string(forge_data.to_string()),
254            tag_meta,
255        };
256
257        let now = req.globals.now();
258
259        let initial_state = JobState {
260            last_update: now,
261            status: JobStatus::Noticed,
262            info: format!("job received, tag not yet fetched"),
263            processing: None.into(),
264            duplicate_of: None,
265            retry_salient_count: 0,
266        };
267
268        let job_row = JobRow {
269            jid: JobId::none(),
270            data: validated_data,
271            received: now,
272            retry_earliest: TreatZeroAsNone::none(),
273            tag_data: None.into(),
274            s: initial_state,
275        };
276
277        let jid = db_transaction(TN::Update { 
278            this_jid: None,
279            tag_objectid: &job_row.data.tag_objectid,
280        }, |dbt| {
281            match check_not_throttled(dbt)? {
282                Ok(()) => {}
283                Err(throttled) => return Ok(Err(throttled)),
284            }
285
286            let jid = dbt.bsql_insert(&bsql!(
287                "INSERT INTO jobs " .insert_row(&job_row) ""
288            ))?;
289
290            Ok(Ok(jid))
291        })??;
292
293        let msg = format!("job received, jid={jid}");
294
295        macro_rules! info_debug_with_raw { {
296            [ $($sval:tt)* ], $raw:expr, $fmt:literal $($rest:tt)*
297        } => {
298            info !($($sval)*,            $fmt $($rest)*);
299            debug!($($sval)*, raw=?$raw, $fmt $($rest)*);
300        } }
301
302        info_debug_with_raw!(
303            [ jid=%jid, now=?job_row.s.status, info=?job_row.s.info,
304              event=%format_args!(" {log_info}") ],
305                self.raw,
306            "[{}] received", job_row.data.forge_host
307        );
308
309        Ok(msg)
310    }
311}
312
313impl<'a, F: ForgeReceiver> RequestBeingProcessed<'a, F> {
314    async fn check_permission(&self) -> Result<Hostname, WE> {
315        let forge_host = (|| {
316            let rhs = if let Some(fake) = &self.globals.config
317                .testing.fake_https_dir
318            {
319                let strip = format!("file://{fake}/");
320                self.payload.repo_git_url
321                    .strip_prefix(&strip)
322                    .ok_or_else(|| anyhow!(
323                        "failed to strip expected faked {strip:?} from {:?}",
324                        self.payload.repo_git_url
325                    ))?
326            } else {
327                self.payload.repo_git_url
328                    .strip_prefix("https://")
329                    .ok_or_else(|| anyhow!("scheme not https"))?
330            };
331            let (host, rhs) = rhs.split_once('/')
332                .ok_or_else(|| anyhow!("missing / after host"))?;
333
334            rhs.chars().all(|c| c.is_ascii_graphic()).then_some(())
335                .ok_or_else(|| anyhow!("nonprintable characters in url"))?;
336
337            let host: Hostname = host.parse()?;
338
339            Ok::<_, AE>(host)
340        })()
341            .context("bad project repository URL")
342            .map_err(WE::MisconfiguredWebhook)?;
343
344        let correct_host_forges = self.globals.config.t2u.forges.iter()
345            .filter(|cf| cf.host == forge_host);
346
347        let check_kind = |cf: &config::Forge| {
348            (cf.kind == self.forge.kind_name()).then(|| ())
349                .ok_or_else(|| anyhow!(
350                    "wrong webhook path used, expected /hook/{}",
351                    cf.kind,
352                ))
353        };
354
355        let forge: &config::Forge =
356            correct_host_forges.clone()
357            .find(|cf| check_kind(cf).is_ok())
358            .ok_or_else(|| {
359                let mut emsg = format!("no matching forge in config");
360                for cf in correct_host_forges.clone() {
361                    let wrong = check_kind(cf).expect_err("suddenly good?");
362                    write!(emsg, "; forge host {:?}: {wrong}", cf.host)
363                        .expect("write");
364                }
365                if correct_host_forges.clone().next().is_none() {
366                    write!(emsg, "; no matching forge hosts")
367                        .expect("write");
368                }
369                anyhow!("{}", emsg)
370            })
371            // TODO this is perhaps anthe actual permission denied variant,
372            // log them at lower severity ?
373            .map_err(WE::MisconfiguredWebhook)?;
374
375        let _: IsAllowedClient = self.raw.client.allowed_by(&forge.allow)
376            // TODO these are the actual permission denied variants,
377            // log them at lower severity ?
378            .await?;
379
380        Ok(forge.host.clone())
381    }
382
383    fn check_tag_name(&self) -> Result<(), NotForUsReason> {
384        let app_config = &self.globals.config.t2u;
385
386        let (distro, version) = self.payload.tag_name.split('/')
387            .collect_tuple()
388            .ok_or_else(|| NFR::TagNameUnexpectedSyntax)?;
389        (distro == app_config.distro).then(||())
390            .ok_or_else(|| NFR::TagNameNotOurDistro)?;
391        if !version.chars().all(
392            |c| c.is_ascii_alphanumeric() || ".+-%_#".chars().contains(&c)
393        ) {
394            return Err(NFR::TagNameUnexpectedSyntax)
395        }
396        if version == "." || version == ".." {
397            return Err(NFR::TagNameUnexpectedSyntax)
398        }
399        Ok(())
400    }
401}