xapi_rs/lrs/resources/
statement.rs

1// SPDX-License-Identifier: GPL-3.0-or-later
2
3#![allow(non_snake_case)]
4#![allow(clippy::too_many_arguments)]
5
6//! Statement Resource (/statements)
7//!
8//! Statements are the key data structure of xAPI. This resource facilitates
9//! their storage and retrieval.
10//!
11//! Any deviation from section [4.1.6.1 Statement Resource (/statements)][1] of
12//! the xAPI specification is a bug.
13//!
14//! [1]: https://opensource.ieee.org/xapi/xapi-base-standard-documentation/-/blob/main/9274.1.xAPI%20Base%20Standard%20for%20LRSs.md#4161-statement-resource-statements
15
16use crate::{
17    DataError, MyError, config,
18    data::{Actor, Attachment, Format, Statement, StatementIDs, statement_type::StatementType},
19    db::{
20        filter::{Filter, register_new_filter},
21        statement::{
22            find_more_statements, find_statement_by_uuid, find_statement_to_void,
23            find_statements_by_filter, insert_statement, statement_exists, void_statement,
24        },
25    },
26    emit_response, eval_preconditions,
27    lrs::{
28        DB, Signature, User, compute_etag,
29        headers::{CONSISTENT_THRU_HDR, CONTENT_TRANSFER_ENCODING_HDR, HASH_HDR, Headers},
30        resources::{WithETag, WithResource},
31        server::{get_consistent_thru, qp},
32    },
33};
34use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD};
35use chrono::{DateTime, SecondsFormat, Utc};
36use mime::{APPLICATION_JSON, Mime};
37use openssl::sha::Sha256;
38use rocket::{
39    Request, Responder, State,
40    futures::{Stream, TryFutureExt},
41    get,
42    http::{ContentType, Header, Status, hyper::header},
43    post, put,
44    request::{FromRequest, Outcome},
45    response::stream::stream,
46    routes,
47    serde::json::Json,
48    tokio::{
49        fs::{DirBuilder, File},
50        io::{AsyncReadExt, AsyncWriteExt},
51    },
52};
53use rocket_multipart::{MultipartReadSection, MultipartReader, MultipartSection, MultipartStream};
54use serde::{Deserialize, de::DeserializeOwned};
55use serde_json::{Map, Value};
56use serde_with::serde_as;
57use sqlx::PgPool;
58use std::{collections::HashMap, path::PathBuf, str::FromStr};
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62/// A derived Rocket Responder structure w/ an OK Status, a body consisting
63/// of a Statement, and an `Etag` header.
64#[derive(Responder)]
65struct PutResponse {
66    inner: WithETag,
67}
68
69/// A derived Rocket Responder structure w/ an OK Status, a body consisting
70/// of an array of Statement identifiers.
71#[derive(Responder)]
72struct PostResponse {
73    inner: WithResource<StatementIDs>,
74}
75
76/// A derived Rocket Responder structure w/ an OK Status, a body consisting
77/// of the JSON Serialized string of a generic type `T`, an `Etag` and
78/// `Last-Modified` Headers.  The Type to serialize here is [Statement].
79#[derive(Responder)]
80struct GetResponse {
81    inner: WithResource<StatementType>,
82}
83
84/// General purpose Rocket Responder to use w/ `GET` Requests to cater for the
85/// possibility of responding w/ an `application/json` contents as well as
86/// `multipart/mixed` depending on input query parameters.
87#[derive(Responder)]
88enum EitherOr<T> {
89    JsonX(Box<GetResponse>),
90    Mixed(MultipartStream<T>),
91}
92
93/// Construct a file-name from an Attachment hash signature. A file w/ that
94/// name will be created and stored under the `static` folder.
95fn sha2_path(sha2: &str) -> PathBuf {
96    let bytes = hex::decode(sha2).expect("Failed decoding signature");
97    let mut hasher = Sha256::new();
98    hasher.update(&bytes);
99    let signature = hasher.finish();
100    let name = BASE64_URL_SAFE_NO_PAD.encode(signature);
101    config().static_dir.join(format!("_{name}"))
102}
103
104/// Captures information about a potential Attachment w/in a multipart/mixed
105/// Request.
106#[derive(Debug, PartialEq)]
107struct InPartInfo {
108    path: PathBuf,
109    mime: Mime,
110    len: i64,
111    sha2: String,
112    unpopulated: bool,
113    signature: bool,
114}
115
116impl InPartInfo {
117    fn from(att: &Attachment) -> Self {
118        InPartInfo {
119            path: sha2_path(att.sha2()),
120            mime: att.content_type().clone(),
121            len: att.length(),
122            sha2: att.sha2().to_string(),
123            unpopulated: att.file_url().is_none(),
124            signature: att.is_signature(),
125        }
126    }
127}
128
129/// A vector of one or more JSON Objects.
130#[serde_as]
131#[derive(Debug, Default, Deserialize)]
132struct Statements(#[serde_as(as = "serde_with::OneOrMany<_>")] Vec<Map<String, Value>>);
133
134/// Query parameters of the GET end-point as a struct.
135#[derive(Debug, Default)]
136struct QueryParams<'a> {
137    statement_id: Option<&'a str>,
138    voided_statement_id: Option<&'a str>,
139    agent: Option<&'a str>,
140    verb: Option<&'a str>,
141    activity: Option<&'a str>,
142    registration: Option<&'a str>,
143    since: Option<&'a str>,
144    until: Option<&'a str>,
145    limit: Option<u32>,
146    related_activities: Option<bool>,
147    related_agents: Option<bool>,
148    attachments: Option<bool>,
149    ascending: Option<bool>,
150    format: Option<&'a str>,
151}
152
153#[rocket::async_trait]
154impl<'r> FromRequest<'r> for QueryParams<'r> {
155    type Error = ();
156
157    async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
158        let statement_id = qp::<&str>(req, "statementId");
159        let voided_statement_id = qp::<&str>(req, "voidedStatementId");
160        let agent = qp::<&str>(req, "agent");
161        let verb = qp::<&str>(req, "verb");
162        let activity = qp::<&str>(req, "activity");
163        let registration = qp::<&str>(req, "registration");
164        let since = qp::<&str>(req, "since");
165        let until = qp::<&str>(req, "until");
166
167        let limit = qp::<u32>(req, "limit");
168
169        let related_activities = qp::<bool>(req, "related_activities");
170        let related_agents = qp::<bool>(req, "related_agents");
171        let attachments = qp::<bool>(req, "attachments");
172        let ascending = qp::<bool>(req, "ascending");
173
174        let format = qp::<&str>(req, "format");
175
176        Outcome::Success(QueryParams {
177            statement_id,
178            voided_statement_id,
179            agent,
180            verb,
181            activity,
182            registration,
183            since,
184            until,
185            limit,
186            related_activities,
187            related_agents,
188            attachments,
189            ascending,
190            format,
191        })
192    }
193}
194
195/// Captures information about a potential Attachment to stream w/in a multipart/
196/// mixed Response.
197#[derive(Debug)]
198struct OutPartInfo {
199    /// Local file system path to object housing the Part's contents.
200    pub(crate) path: PathBuf,
201    /// The Part's `Content-Type` MIME.
202    pub(crate) content_type: ContentType,
203    /// The Part's `Content-Length` in bytes.
204    ///
205    /// IMPORTANT (rsn) 20240917 - This value may not reflect the actual size
206    /// (bytes count) of the contents. This is b/c when ingesting attachment's
207    /// data while parsing incoming requests a conformant LRS must match a Part
208    /// to an [Attachment] whose `length` property is different than the actual
209    /// size provided (a) the `sha2` hash match, and (b) there's no `Content-Length`
210    /// header, or (c) a `Content-Length` header is present w/ a value that
211    /// matches the one declared in [Attachment] whether it's equal or not to
212    /// the actual size. In other words, this value *is* the same that was
213    /// declared to be the value of the [Attachment] `length` field when the
214    /// owning [Statement] was previously persisted.
215    pub(crate) len: i64,
216    /// And finally the Part's SHA-2 hash string digest.
217    pub(crate) sha2: Option<String>,
218}
219
220impl OutPartInfo {
221    fn from(att: &Attachment) -> Option<Self> {
222        let path = sha2_path(att.sha2());
223        if !path.exists() {
224            None
225        } else {
226            Some(OutPartInfo {
227                path,
228                content_type: ContentType::from_str(att.content_type().as_ref())
229                    .expect("Failed finding MIME"),
230                len: att.length(),
231                sha2: Some(att.sha2().to_owned()),
232            })
233        }
234    }
235}
236
237#[doc(hidden)]
238pub fn routes() -> Vec<rocket::Route> {
239    routes![
240        put_mixed, put_json, post_mixed, post_json, __post, post_form, get_some, get_more
241    ]
242}
243
244/// From section 4.1.6.1 Statement Resource (/statements) [PUT Request][1]:
245///
246/// Summary: Stores a single Statement with the given id.
247/// Body: The Statement object to be stored.
248/// Returns: 204 No Content
249///
250/// * The LRS may respond before Statements that have been stored are available
251///   for retrieval.
252/// * An LRS shall not make any modifications to its state based on receiving a
253///   Statement with a statementId that it already has a Statement for. Whether
254///   it responds with 409 Conflict or 204 No Content, it shall not modify the
255///   Statement or any other Object.
256/// * If the LRS receives a Statement with an id it already has a Statement for,
257///   it should verify the received Statement matches the existing one and should
258///   return 409 Conflict if they do not match.
259///
260/// [1]: <https://opensource.ieee.org/xapi/xapi-base-standard-documentation/-/blob/main/9274.1.1%20xAPI%20Base%20Standard%20for%20LRSs.md#put-request>
261///
262#[put("/?<statementId>", data = "<data>", format = "multipart/mixed")]
263async fn put_mixed(
264    c: Headers,
265    statementId: &str,
266    data: MultipartReader<'_>,
267    db: &State<DB>,
268    user: User,
269) -> Result<PutResponse, MyError> {
270    debug!("----- put_mixed ----- {}", user);
271    user.can_use_xapi()?;
272
273    let uuid = Uuid::parse_str(statementId)
274        .map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
275    debug!("Statement UUID = {}", uuid);
276
277    // we use this here for a single Statement as w/ POST for multiple ones
278    // to locally store included attachments' data if any.
279    let mut statements = ingest_multipart(data, false).await?;
280
281    let statement = statements.iter_mut().next().unwrap();
282    if statement.id().is_none() {
283        statement.set_id(uuid)
284    } else if *statement.id().unwrap() != uuid {
285        return Err(MyError::HTTP {
286            status: Status::BadRequest,
287            info: "Statement ID in URL does not match one in body".into(),
288        });
289    }
290
291    return persist_one(db.pool(), c, statement, &user).await;
292}
293
294#[put("/?<statementId>", data = "<json>", format = "application/json")]
295async fn put_json(
296    c: Headers,
297    statementId: &str,
298    json: &str,
299    db: &State<DB>,
300    user: User,
301) -> Result<PutResponse, MyError> {
302    debug!("----- put_json ----- {}", user);
303    user.can_use_xapi()?;
304
305    let uuid = Uuid::parse_str(statementId)
306        .map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
307    debug!("statement UUID = {}", uuid);
308
309    let mut statement =
310        Statement::from_str(json).map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
311
312    // NOTE (rsn) 202410004 /4.1.3 Content Types/ - When receiving a PUT or
313    // POST request with application/json content-type, an LRS shall respond
314    // w/ HTTP 400 Bad Request if, when present, Attachment objects in the
315    // Statement(s) do not have populated fileUrl property.
316    let mut count = 0;
317    for att in statement.attachments() {
318        if att.file_url().is_none() {
319            count += 1;
320        }
321    }
322    if count > 0 {
323        error!("Found {} Attachment(s) w/ unpopulated 'fileUrl'", count);
324        return Err(MyError::HTTP {
325            status: Status::BadRequest,
326            info: format!("Found {count} Attachment(s) w/ unpopulated 'fileUrl'").into(),
327        });
328    }
329
330    if statement.id().is_none() {
331        statement.set_id(uuid)
332    } else if *statement.id().unwrap() != uuid {
333        return Err(MyError::HTTP {
334            status: Status::BadRequest,
335            info: "Statement ID in URL does not match one in body".into(),
336        });
337    }
338
339    return persist_one(db.pool(), c, &mut statement, &user).await;
340}
341
342/// From section 4.1.6.1 Statement Resource (/statements) [POST Request][1]:
343///
344/// Summary: Stores a Statement, or a set of Statements.
345/// Body: An array of Statements or a single Statement to be stored.
346/// Returns: 200 OK, Array of Statement id(s) (UUID) in the same order as the
347/// corresponding stored Statements.
348///
349/// * The LRS may respond before Statements that have been stored are available
350///   for retrieval.
351/// * An LRS shall not make any modifications to its state based on receiving a
352///   Statement with an id that it already has a Statement for. Whether it
353///   responds with 409 Conflict or 204 No Content, it shall not modify the
354///   Statement or any other Object.
355/// * If the LRS receives a Statement with an id it already has a Statement for,
356///   it should verify the received Statement matches the existing one and should
357///   return 409 Conflict if they do not match.
358/// * If the LRS receives a batch of Statements containing two or more Statements
359///   with the same id, it shall reject the batch and return 400 Bad Request.
360///
361/// [1]: <https://opensource.ieee.org/xapi/xapi-base-standard-documentation/-/blob/main/9274.1.1%20xAPI%20Base%20Standard%20for%20LRSs.md#post-request>
362///
363#[post("/", data = "<data>", format = "multipart/mixed")]
364async fn post_mixed(
365    c: Headers,
366    data: MultipartReader<'_>,
367    db: &State<DB>,
368    user: User,
369) -> Result<PostResponse, MyError> {
370    debug!("----- post_mixed ----- {}", user);
371    user.can_use_xapi()?;
372
373    debug!("c = {:?}", c);
374    let statements = ingest_multipart(data, true).await?;
375
376    persist_many(db.pool(), c, statements, &user).await
377}
378
379#[post("/", data = "<json>", format = "application/json")]
380async fn post_json(
381    c: Headers,
382    json: Json<Statements>,
383    db: &State<DB>,
384    user: User,
385) -> Result<PostResponse, MyError> {
386    debug!("----- post_json ----- {}", user);
387    user.can_use_xapi()?;
388
389    debug!("c = {:?}", c);
390    let mut statements = vec![];
391    for map in json.0.0 {
392        let x = Statement::from_json_obj(map)
393            .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
394        statements.push(x)
395    }
396
397    // NOTE (rsn) 202410004 /4.1.3 Content Types/ - When receiving a PUT or
398    // POST request with application/json content-type, an LRS shall respond
399    // w/ HTTP 400 Bad Request if, when present, Attachment objects in the
400    // Statement(s) do not have populated fileUrl property.
401    let mut count = 0;
402    for s in &statements {
403        for att in s.attachments() {
404            if att.file_url().is_none() {
405                count += 1;
406            }
407        }
408    }
409    if count > 0 {
410        return Err(MyError::HTTP {
411            status: Status::BadRequest,
412            info: format!("Statement w/ {count} unresolved Attachment(s)").into(),
413        });
414    }
415
416    persist_many(db.pool(), c, statements, &user).await
417}
418
419// IMPORTANT (rsn) 20241111 - CTS runs show that requests w/ malformed CT headers
420// are sent to the LRS.  unfortunately however Rocket responds to those requests
421// w/ a 404 not 400 :(  this is a stop-gap to catch such requests...
422#[post("/", data = "<ignored>", rank = 1)]
423async fn __post(ignored: &str) -> Result<PostResponse, MyError> {
424    debug!("----- __post -----");
425    let _ = ignored;
426    Err(MyError::HTTP {
427        status: Status::BadRequest,
428        info: "Rocket-specific stopgap. Redirect 404 to 400".into(),
429    })
430}
431
432#[post("/", format = "multipart/form-data")]
433async fn post_form() -> Result<PostResponse, MyError> {
434    debug!("----- post_form -----");
435    Err(MyError::HTTP {
436        status: Status::BadRequest,
437        info: "Abort. xAPI V2 does not support multipart/form-data".into(),
438    })
439}
440
441const VALID_GET_PARAMS: [&str; 14] = [
442    "statementId",
443    "voidedStatementId",
444    "agent",
445    "verb",
446    "activity",
447    "registration",
448    "related_activities",
449    "related_agents",
450    "since",
451    "until",
452    "limit",
453    "format",
454    "attachments",
455    "ascending",
456];
457
458/// The Response implementation for this end-point is a bit complicated due to
459/// the possibility of returning either `application/json` or `multipart/mixed`
460/// content based on whether or not the `attachments` query parameter is set
461/// and if it is, if it's TRUE or FALSE. By default (i.e. when absent) it's
462/// set to FALSE and when that's the case the Response is `application/json`.
463/// When `attachments` is TRUE and there are no raw attachments to stream as
464/// part of the Response, the Response is also `application/json`. The Response
465/// is `multipart/mixed` iff `attachments` is TRUE **and** at least one raw
466/// Attachment is included in the Response.
467///
468#[get("/?<extras..>")]
469async fn get_some<'r>(
470    c: Headers,
471    q: QueryParams<'_>,
472    mut extras: HashMap<&'r str, &'r str>,
473    db: &State<DB>,
474    user: User,
475) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
476    debug!("----- get_some ----- {}", user);
477    user.can_use_xapi()?;
478
479    debug!("q = {:?}", q);
480    // NOTE (rsn) 20241003 - `extras` will capture *all* query string parameters
481    // including those that are already captured as fields of `QueryParams`.
482    // we need to remove those to see if Clients sent us more than they should.
483    extras.retain(|k, _| !VALID_GET_PARAMS.contains(k));
484    debug!("extras = {:?}", extras);
485    if !extras.is_empty() {
486        return Err(MyError::HTTP {
487            status: Status::BadRequest,
488            info: format!("Received extraneous query string parameters: {extras:?}").into(),
489        });
490    }
491
492    // The LRS shall reject with a 400 Bad Request error any requests to this
493    // resource which contain both statementId and voidedStatementId parameters.
494    if let (Some(_), Some(_)) = (q.statement_id, q.voided_statement_id) {
495        return Err(MyError::HTTP {
496            status: Status::BadRequest,
497            info: "Either 'statementId' or 'voidedStatementId' should be present. Not both".into(),
498        });
499    }
500
501    let with_attachments = q.attachments.unwrap_or(false);
502    let format = Format::new(q.format.unwrap_or("exact"), c.languages().to_vec())
503        .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
504
505    let single = q.statement_id.is_some() || q.voided_statement_id.is_some();
506    let resource = if single {
507        // The LRS shall reject with a 400 Bad Request error any requests to
508        // this resource which contain statementId or voidedStatementId
509        // parameters, and also contain any other parameter besides
510        // "attachments" or "format".
511        if q.agent.is_some()
512            || q.verb.is_some()
513            || q.activity.is_some()
514            || q.registration.is_some()
515            || q.related_activities.is_some()
516            || q.related_agents.is_some()
517            || q.since.is_some()
518            || q.until.is_some()
519            || q.limit.is_some()
520            || q.ascending.is_some()
521        {
522            return Err(MyError::HTTP {
523                status: Status::BadRequest,
524                info:
525                    "Only 'attachments' and 'format' can be present when 1 Statement is requested"
526                        .into(),
527            });
528        }
529
530        let (voided, uuid) = if q.statement_id.is_some() {
531            (false, q.statement_id.unwrap())
532        } else {
533            (true, q.voided_statement_id.unwrap())
534        };
535
536        let uuid = Uuid::from_str(uuid)
537            .map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
538
539        get_one(db.pool(), uuid, voided, &format).await
540    } else {
541        let filter = Filter::from(
542            db.pool(),
543            q.agent,
544            q.verb,
545            q.activity,
546            q.registration,
547            q.related_activities,
548            q.related_agents,
549            q.since,
550            q.until,
551            q.limit,
552            q.ascending,
553        )
554        .await
555        .map_err(|x| x.with_status(Status::BadRequest))?;
556
557        get_many(db.pool(), filter, &format, with_attachments).await
558    };
559
560    let resource = resource?;
561    debug!("resource = {:?}", resource);
562    if !with_attachments {
563        let stored = resource.stored();
564        let x = emit_response!(c, resource => StatementType, stored)?;
565        Ok(EitherOr::JsonX(Box::new(GetResponse { inner: x })))
566    } else {
567        send_multipart(&resource).await
568    }
569}
570
571async fn send_multipart(
572    resource: &StatementType,
573) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
574    let mut server_last_modified = get_consistent_thru().await;
575    let stored = resource.stored();
576    if stored > server_last_modified {
577        server_last_modified = stored
578    }
579
580    let first_part = save_statements(resource).await?;
581    let mut parts = vec![];
582    for att in resource.attachments() {
583        if let Some(y) = OutPartInfo::from(&att) {
584            parts.push(y);
585        }
586    }
587    Ok(EitherOr::Mixed(MultipartStream::new_random(stream! {
588        let ar = File::open(&first_part).await.expect("Failed re-opening");
589        yield MultipartSection::new(ar)
590            .add_header(ContentType::JSON)
591            .add_header(last_modified(stored))
592            .add_header(consistent_through(server_last_modified));
593        for p in parts {
594            let ar = File::open(p.path).await.expect("Failed re-opening");
595            yield MultipartSection::new(ar)
596                .add_header(p.content_type)
597                .add_header(Header::new(header::CONTENT_LENGTH.as_str(), p.len.to_string()))
598                .add_header(Header::new(HASH_HDR, p.sha2.unwrap()))
599        }
600    })))
601}
602
603#[get("/more?<sid>&<count>&<offset>&<limit>&<format>&<attachments>")]
604async fn get_more(
605    c: Headers,
606    sid: u64,
607    count: i32,
608    offset: i32,
609    limit: i32,
610    format: &str,
611    attachments: bool,
612    db: &State<DB>,
613    user: User,
614) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
615    debug!("----- get_more ----- {}", user);
616    user.can_use_xapi()?;
617
618    debug!("c = {:?}", c);
619    debug!("sid = {}", sid);
620    debug!("count = {}", count);
621    debug!("offset = {}", offset);
622    debug!("limit = {}", limit);
623    debug!("format = {}", format);
624    debug!("attachments? {}", attachments);
625
626    let format = Format::new(format, c.languages().to_vec())
627        .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
628
629    let (mut resource, y) =
630        find_more_statements(db.pool(), sid, count, offset, limit, &format).await?;
631    if let Some(pi) = y {
632        let more = format!(
633            "statements/more/?sid={}&count={}&offset={}&limit={}&format={}&attachments={}",
634            sid,
635            pi.count,
636            pi.offset,
637            pi.limit,
638            format.as_param(),
639            attachments
640        );
641        let url = config().to_external_url(&more);
642        debug!("more URL = '{}'", url);
643        if let Err(z) = &resource.set_more(&url) {
644            warn!(
645                "Failed updating `more` URL of StatementResult. Ignore + continue but StatementResult will be inaccurate: {}",
646                z
647            );
648        }
649    }
650
651    if attachments {
652        send_multipart(&resource).await
653    } else {
654        let last_modified = get_consistent_thru().await;
655        let x = emit_response!(c, resource => StatementType, last_modified)?;
656        Ok(EitherOr::JsonX(Box::new(GetResponse { inner: x })))
657    }
658}
659
660/// In a multipart Request, check if the Part has `application/json` content-type,
661/// consume the part's contents into a byte array in memory, then try deserializing
662/// it from JSON into the given type `T`.
663async fn as_json<T: DeserializeOwned>(
664    part: &mut MultipartReadSection<'_, '_>,
665) -> Result<T, MyError> {
666    // check part has a Content-Type header w/ `application/json` value...
667    if let Some(ct) = part.headers().get_one("content-type") {
668        debug!("content-type: '{}'", ct);
669        let mime = ct
670            .parse::<Mime>()
671            .unwrap_or_else(|x| panic!("Failed parsing CT: {x}"));
672        if mime != APPLICATION_JSON {
673            let msg = format!("Expected 'application/json' CT; got '{ct}'");
674            error!("{}", msg);
675            return Err(MyError::Runtime(msg.into()));
676        }
677        // don't check the charset; assume it's UTF-8...
678    }
679
680    let mut buf = vec![];
681    part.read_to_end(&mut buf)
682        .await
683        .unwrap_or_else(|x| panic!("Failed consuming Part: {x}"));
684    serde_json::from_slice::<T>(&buf).map_err(|x| {
685        let msg = format!("Failed deserializing part: {x}");
686        error!("{}", msg);
687        MyError::Runtime(msg.into())
688    })
689}
690
691/// `data` - The MultipartReader stream,
692/// `reuse_ids` - If TRUE then if a Statement already has an `id` then use as
693///     is; otherwise assign it a new UUID value.  If this parameter is FALSE
694///     then do not alter the Statement `id` whether it's set or not.
695async fn ingest_multipart(
696    mut data: MultipartReader<'_>,
697    force_ids: bool,
698) -> Result<Vec<Statement>, MyError> {
699    debug!("content-type: {}", data.content_type().0);
700    debug!("force_ids? {}", force_ids);
701
702    // Statement objects present in the 1st part
703    let mut statements = vec![];
704    // nbr. of Attachments in Statement(s)
705    let mut total = 0;
706    // nbr. of Attachments w/o fileUrl
707    let mut unpopulated = 0;
708    // nbr. of Attachments (both w/ and w/o fileUrl) matched to parts
709    let mut matched = 0;
710    // nbr. of Attachments (w/o fileUrl) matched to parts
711    let mut matched_unpopulated = 0;
712    // collection of 'InPartInfo' each representing a potential Attachment candidate
713    let mut included = vec![];
714    let mut ndx = 0;
715    while let Some(mut part) = data
716        .next()
717        .await
718        .unwrap_or_else(|x| panic!("Failed reading Part #{ndx}: {x}"))
719    {
720        if ndx == 0 {
721            // 1st part.  always one or more Statement...
722            let x = as_json::<Statements>(&mut part)
723                .map_err(|x| x.with_status(Status::BadRequest))
724                .await?;
725            for map in x.0 {
726                let y = Statement::from_json_obj(map)
727                    .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
728                statements.push(y)
729            }
730            // * When receiving a PUT or POST with a document type of
731            //   multipart/mixed, an LRS shall accept batches of
732            //   Statements which contain only Attachment Objects with
733            //   a populated fileUrl."
734            for s in &mut statements {
735                if s.id().is_none() && force_ids {
736                    s.set_id(Uuid::now_v7())
737                }
738                for att in s.attachments() {
739                    total += 1;
740                    if att.file_url().is_none() {
741                        unpopulated += 1
742                    }
743                    included.push(InPartInfo::from(att))
744                }
745            }
746        } else if total == 0 {
747            // * When receiving a PUT or POST with a document type of multipart/
748            //   mixed, an LRS shall reject batches of Statements having Attachments
749            //   that neither contain a fileUrl nor match a received Attachment
750            //   part based on their hash.
751            return Err(MyError::HTTP {
752                status: Status::BadRequest,
753                info: "This is the 2nd Part but we have no Attachments to match".into(),
754            });
755        } else {
756            // * shall include an X-Experience-API-Hash parameter in each part's
757            //   header after the first (Statements) part.
758            let hash = part.headers().get_one(HASH_HDR);
759            if hash.is_none() {
760                return Err(MyError::HTTP {
761                    status: Status::BadRequest,
762                    info: "Missing Hash header".into(),
763                });
764            }
765            let hash = hash.unwrap().to_owned();
766            debug!("-- x-experience-api-hash: '{}'", hash);
767
768            // * shall include a Content-Transfer-Encoding parameter with a value of
769            //   'binary' in each part's header after the first (Statements) part.
770            let cte = part.headers().get_one(CONTENT_TRANSFER_ENCODING_HDR);
771            if cte.is_none() {
772                return Err(MyError::HTTP {
773                    status: Status::BadRequest,
774                    info: "Missing CTE header".into(),
775                });
776            }
777            let enc = cte.unwrap().trim();
778            debug!("-- content-transfer-encoding: {}", enc);
779            if enc != "binary" {
780                return Err(MyError::HTTP {
781                    status: Status::BadRequest,
782                    info: format!("Expected 'binary' CTE but found '{enc}'").into(),
783                });
784            }
785
786            // size only enters into the equation if a Content-Length is present...
787            let mut buf = vec![];
788            let size = part
789                .read_to_end(&mut buf)
790                .await
791                .unwrap_or_else(|x| panic!("Failed consuming Part #{ndx}: {x}"));
792            debug!("size (actual) = {} (bytes)", size);
793            // convert it to i64 to make it easier when working w/ DB layer...
794            // TODO (rsn) 20240909 - this conversion must not fail.  to that end
795            // ensure that Rocket multipart limits accomodate usize::MAX and use
796            // an i128 data type for the Attachment.length property.
797            let size = i64::try_from(size).map_err(|x| {
798                MyError::Runtime(format!("Failed converting {size} to i64: {x}").into())
799            })?;
800
801            // does the part match any of our `included` items?
802            if let Some(ac) = included.iter_mut().find(|x| x.sha2 == hash) {
803                if ac.len != size {
804                    warn!(
805                        "Part #{} actual size ({}) doesn't match declared ({}) value",
806                        ndx, size, ac.len
807                    );
808                }
809
810                // if it has a content-length header, its value should also match
811                match part.headers().get_one(header::CONTENT_LENGTH.as_str()) {
812                    Some(x) => {
813                        match x.parse::<i64>() {
814                            Ok(cl) => {
815                                debug!("-- content-length: {}", cl);
816                                if ac.len != cl {
817                                    return Err(MyError::HTTP {
818                                    status: Status::BadRequest,
819                                    info: format!(
820                                        "Part #{ndx} CL ({cl}) doesn't match declared ({}) value", ac.len)
821                                    .into(),
822                                });
823                                }
824                            }
825                            Err(x) => {
826                                return Err(MyError::HTTP {
827                                    status: Status::BadRequest,
828                                    info: format!("Failed parsing Part #{ndx} CL: {x}").into(),
829                                });
830                            }
831                        }
832                    }
833                    None => info!("Part #{} has no CL", ndx),
834                }
835
836                // if it has a content-type header, its value should also match
837                match part.headers().get_one(header::CONTENT_TYPE.as_str()) {
838                    Some(x) => {
839                        match x.parse::<Mime>() {
840                            Ok(ct) => {
841                                debug!("-- content-type: {}", ct);
842                                if ac.mime != ct {
843                                    return Err(MyError::HTTP {
844                                    status: Status::BadRequest,
845                                    info: format!(
846                                        "Part #{ndx} CT ({ct}) doesn't match declared MIME ({})", ac.mime)
847                                    .into(),
848                                });
849                                }
850                            }
851                            Err(x) => {
852                                error!("Failed parsing Part #{} CT: {}", ndx, x);
853                                return Err(MyError::Data(DataError::MIME(x))
854                                    .with_status(Status::BadRequest));
855                            }
856                        }
857                    }
858                    None => info!("Part #{} has no CT", ndx),
859                }
860
861                // could be a real Attachment's binary or a JWS Signature...
862                if ac.signature {
863                    debug!("Found a JWS Signature!");
864                    let sig = Signature::from(buf).map_err(|x| {
865                        error!("Failed processing JWS signature part: {}", x);
866                        x.with_status(Status::BadRequest)
867                    })?;
868                    if statements.iter().any(|s| sig.verify(s)) {
869                        info!("Matched JWS Signature to its Statement");
870                        matched += 1;
871                        matched_unpopulated += 1;
872                    } else {
873                        return Err(MyError::HTTP {
874                            status: Status::BadRequest,
875                            info: "Failed matching any Statement to a JWS Signature".into(),
876                        });
877                    }
878                } else {
879                    debug!("Found an Attachment candidate!");
880                    save_attachment(buf, ac)
881                        .await
882                        .expect("Failed saving buffer");
883                    matched += 1;
884                    if ac.unpopulated {
885                        matched_unpopulated += 1
886                    }
887                }
888            } else {
889                return Err(MyError::HTTP {
890                    status: Status::BadRequest,
891                    info: format!("Part #{ndx} is not an attachment").into(),
892                });
893            }
894        }
895
896        ndx += 1;
897    }
898
899    ndx -= 1;
900    debug!("Total parts (minus Statement(s)) = {}", ndx);
901    debug!("Total Attachments = {}", total);
902    debug!("Total Attachments w/o 'fileUrl' = {}", unpopulated);
903    debug!("Total matched Attachments = {}", matched);
904    debug!(
905        "Total matched unpopulated Attachments = {}",
906        matched_unpopulated
907    );
908    let unmatched = ndx - matched;
909    debug!("Total unmatched parts = {}", unmatched);
910
911    // NOTE (rsn) 20241102 - [xAPI][1] under section 'Multipart/Mixed', sub-section
912    // 'LRS Requirements', states...
913    // * When receiving a PUT or POST with a document type of multipart/mixed,
914    // an LRS shall reject batches of Statements having Attachments that neither
915    // contain a `fileUrl` nor match a received _Attachment_ part based on their
916    // hash.
917    //
918    // [1]: https://opensource.ieee.org/xapi/xapi-base-standard-documentation/-/blob/main/9274.1.1%20xAPI%20Base%20Standard%20for%20LRSs.md#lrs-requirements
919    //
920    let problem = (unpopulated > 0) && (unpopulated != matched_unpopulated);
921    debug!("problem? {}", problem);
922    if problem {
923        return Err(MyError::HTTP {
924            status: Status::BadRequest,
925            info: "Houston, we have a problem".into(),
926        });
927    }
928
929    Ok(statements)
930}
931
932async fn persist_one(
933    conn: &PgPool,
934    c: Headers,
935    statement: &mut Statement,
936    user: &User,
937) -> Result<PutResponse, MyError> {
938    debug!("statement = {}", statement);
939
940    let uuid = statement.id().unwrap();
941    let x = statement_exists(conn, uuid).await?;
942    match x {
943        None => (),
944        Some(_fingerprint) => {
945            // we already have a statement w/ the same UUID; what we do next
946            // depends on the pre-conditions
947            if c.has_no_conditionals() {
948                return Err(MyError::HTTP {
949                    status: Status::Conflict,
950                    info: "Missing pre-condition(s)".into(),
951                });
952            } else {
953                // request contains pre-conditions, however we already found a
954                // statement w/ same UUID.
955                // IMPORTANT (rsn) 20240727 - there is a case where the existing
956                // Statement (with the same UUID) produces a different ETag than
957                // the one previously stored.
958                // for now, just note the fact but do nothing about it...
959                // return match compute_etag::<Statement>(statement) {
960                let etag = compute_etag::<Statement>(statement)?;
961                return match eval_preconditions!(&etag, c) {
962                    s if s != Status::Ok => Err(MyError::HTTP {
963                        status: s,
964                        info: "Failed pre-condition(s)".into(),
965                    }),
966                    _ => Ok(PutResponse {
967                        inner: WithETag {
968                            inner: Status::NoContent,
969                            etag: Header::new(header::ETAG.as_str(), etag.to_string()),
970                        },
971                    }),
972                };
973            }
974        }
975    }
976
977    // ensure `timestamp` is set... `stored` is set by the DB layer...
978    // NOTE (rsn) 20241104 - however, in "4.2.4.2 Specific Statement Data
979    // Requirements for an LRS", the spec also says "The LRS shall set the
980    // 'timestamp' property to the value of the 'stored' property if not
981    // provided."
982    // if statement.timestamp().is_none() {
983    //     statement.set_timestamp_unchecked(Utc::now());
984    // }
985
986    ensure_authority(statement, user)?;
987
988    // NOTE (rsn) 20240922 - need to check validity of target Statement (wrt.
989    // voiding) _before_ persisting it in the database...
990    let mut to_void_id = None;
991    if statement.is_verb_voided() {
992        if let Some(target_uuid) = statement.voided_target() {
993            // target Statement, if known, should not be a voiding one...
994            let (found, valid, id) = find_statement_to_void(conn, &target_uuid).await?;
995            if found {
996                if valid {
997                    to_void_id = Some(id)
998                } else {
999                    return Err(MyError::HTTP {
1000                        status: Status::BadRequest,
1001                        info: format!("Target of voiding statement ({target_uuid}) is invalid")
1002                            .into(),
1003                    });
1004                }
1005            }
1006        } else {
1007            return Err(MyError::HTTP {
1008                status: Status::BadRequest,
1009                info: format!("Invalid voiding statement {statement}").into(),
1010            });
1011        }
1012    }
1013
1014    insert_statement(conn, statement).await?;
1015
1016    // NOTE (rsn) 20240910 -if the Verb is 'voided' then void the target Statement...
1017    if let Some(id) = to_void_id {
1018        debug!("About to void Statement #{}", id);
1019        void_statement(conn, id).await?;
1020        info!("Voided Statement #{}", id)
1021    }
1022
1023    let etag = compute_etag::<Statement>(statement)?;
1024    match eval_preconditions!(&etag, c) {
1025        s if s != Status::Ok => Err(MyError::HTTP {
1026            status: s,
1027            info: "Failed pre-condition(s)".into(),
1028        }),
1029        _ => Ok(PutResponse {
1030            inner: WithETag {
1031                inner: Status::NoContent,
1032                etag: Header::new(header::ETAG.as_str(), etag.to_string()),
1033            },
1034        }),
1035    }
1036}
1037
1038/// xAPI requirements for POST Statements stipulate:
1039/// * An LRS shall not make any modifications to its state based on receiving
1040///   a Statement with an id that it already has a Statement for. Whether it
1041///   responds with 409 Conflict or 204 No Content, it shall not modify the
1042///   Statement or any other Object.
1043/// * If the LRS receives a Statement with an id it already has a Statement
1044///   for, it should verify the received Statement matches the existing one
1045///   and should return 409 Conflict if they do not match.
1046/// * If the LRS receives a batch of Statements containing two or more
1047///   Statements with the same id, it shall reject the batch and return 400
1048///   Bad Request.
1049///
1050async fn persist_many(
1051    conn: &PgPool,
1052    c: Headers,
1053    mut statements: Vec<Statement>,
1054    user: &User,
1055) -> Result<PostResponse, MyError> {
1056    debug!("statements = {:?}", statements);
1057
1058    // not every statement has a UUID; if it doesn't assign it one...
1059    // in the process, collect and verify that no 2 UUIDs are the same...
1060    let mut uuids = vec![];
1061    for s in &mut statements {
1062        let uuid = match s.id() {
1063            Some(x) => *x,
1064            None => {
1065                let id = Uuid::now_v7();
1066                s.set_id(id);
1067                id
1068            }
1069        };
1070        if uuids.contains(&uuid) {
1071            return Err(MyError::HTTP {
1072                status: Status::BadRequest,
1073                info: format!("Found Statements w/ same ID: {uuid}").into(),
1074            });
1075        }
1076
1077        uuids.push(uuid)
1078    }
1079    debug!("uuids (before) = {:?}", uuids);
1080    // at this point all Statements in `statements` have unique UUIDs; some
1081    // though may be _Equivalent_ to ones we already have in the DB. check +
1082    // remove the ones we already have Equivalents for
1083    let mut i = 0;
1084    while i < statements.len() {
1085        let s = &statements[i];
1086        let uuid = s.id().unwrap();
1087        let tmp = statement_exists(conn, uuid).await?;
1088        match tmp {
1089            None => i += 1,
1090            Some(x) => {
1091                // if fingerprints match, drop `s`; otherwise return Conflict
1092                let s_uid = s.uid();
1093                if s_uid != x {
1094                    return Err(MyError::HTTP {
1095                        status: Status::Conflict,
1096                        info: format!(
1097                            "Already have a Statement w/ same UUID ({uuid}) but different FP. Conflict")
1098                        .into(),
1099                    });
1100                }
1101                let dup = statements.remove(i);
1102                info!("Drop duplicate {}", dup);
1103            }
1104        }
1105    }
1106    // if we end-up w/ no Statements, return NoContent...
1107    if statements.is_empty() {
1108        return Err(MyError::HTTP {
1109            status: Status::NoContent,
1110            info: "No new Statements left".into(),
1111        });
1112    }
1113
1114    // at this point all statements have an UUID and a timestamp.  before
1115    // persisting them though we must validate them wrt. to voiding...
1116    let mut ids_to_void = vec![];
1117    for s in &statements {
1118        if s.is_verb_voided() {
1119            if let Some(target_uuid) = s.voided_target() {
1120                // target Statement, if known, should not be a voiding one...
1121                let (found, valid, id) = find_statement_to_void(conn, &target_uuid).await?;
1122                if found {
1123                    if valid {
1124                        ids_to_void.push(id)
1125                    } else {
1126                        return Err(MyError::HTTP {
1127                            status: Status::BadRequest,
1128                            info: format!("Target of voiding statement ({target_uuid}) is invalid")
1129                                .into(),
1130                        });
1131                    }
1132                }
1133            } else {
1134                return Err(MyError::HTTP {
1135                    status: Status::BadRequest,
1136                    info: format!("Invalid voiding statement {s}").into(),
1137                });
1138            }
1139        }
1140    }
1141    info!("Found {} Statement(s) to void", ids_to_void.len());
1142
1143    // otherwise, insert'em in the DB + collect their UUIDs...
1144    uuids.clear();
1145    let n = statements.len();
1146    for mut s in statements {
1147        let uuid = *s.id().unwrap();
1148
1149        // ensure `timestamp` is set... `stored` is set by the DB layer...
1150        // NOTE (rsn) 20241104 - however, in "4.2.4.2 Specific Statement Data
1151        // Requirements for an LRS", the spec also says "The LRS shall set the
1152        // 'timestamp' property to the value of the 'stored' property if not
1153        // provided."
1154        // if s.timestamp().is_none() {
1155        //     s.set_timestamp_unchecked(Utc::now());
1156        // }
1157
1158        ensure_authority(&mut s, user)?;
1159
1160        debug!("Persisting Statement #{} (1 of {})...", uuid, n);
1161        insert_statement(conn, &s).await?;
1162        uuids.push(uuid);
1163    }
1164
1165    // finally, void statements...
1166    for id in ids_to_void {
1167        debug!("About to void Statement #{}", id);
1168        void_statement(conn, id).await?;
1169        info!("Voided Statement #{}", id)
1170    }
1171
1172    // and return their UUIDs...
1173    let resource = StatementIDs(uuids);
1174    let inner = emit_response!(c, resource => StatementIDs)?;
1175    Ok(PostResponse { inner })
1176}
1177
1178/// Return a single Statement in the desired `Format` w/ or w/o the associated
1179/// Attachments.
1180///
1181/// If the result also contains Attachment(s) then the response will be of type
1182/// `multipart/mixed` otherwise it'll be `application/json`.
1183async fn get_one(
1184    conn: &PgPool,
1185    uuid: Uuid,
1186    voided: bool,
1187    format: &Format,
1188) -> Result<StatementType, MyError> {
1189    debug!("uuid = {}", uuid);
1190    debug!("voided? {}", voided);
1191    debug!("format = {}", format);
1192
1193    let x = find_statement_by_uuid(conn, uuid, voided, format).await?;
1194    match x {
1195        Some(x) => Ok(x),
1196        None => Err(MyError::HTTP {
1197            status: Status::NotFound,
1198            info: "Statement not found".into(),
1199        }),
1200    }
1201}
1202
1203async fn get_many(
1204    conn: &PgPool,
1205    filter: Filter,
1206    format: &Format,
1207    with_attachments: bool,
1208) -> Result<StatementType, MyError> {
1209    debug!("filter = {}", filter);
1210    debug!("format = {}", format);
1211
1212    let sid = register_new_filter(conn).await?;
1213    debug!("sid = {}", sid);
1214
1215    let (mut x, y) = find_statements_by_filter(conn, filter, format, sid).await?;
1216    if let Some(pi) = y {
1217        let more = format!(
1218            "statements/more/?sid={}&count={}&offset={}&limit={}&format={}&attachments={}",
1219            sid,
1220            pi.count,
1221            pi.offset,
1222            pi.limit,
1223            format.as_param(),
1224            with_attachments
1225        );
1226        let url = config().to_external_url(&more);
1227        debug!("more URL = '{}'", url);
1228        if let Err(z) = &x.set_more(&url) {
1229            warn!(
1230                "Failed updating `more` URL of StatementResult. Ignore + continue but StatementResult will be inaccurate: {}",
1231                z
1232            );
1233        }
1234    }
1235    Ok(x)
1236}
1237
1238/// Write the JSON serialized form of the given Statement array to a named local
1239/// file inside 'static/s' folder path rooted at this project's home dir.
1240/// Return the file's path if/when successful.
1241async fn save_statements(res: &StatementType) -> Result<PathBuf, MyError> {
1242    let name = &format!("_{}", BASE64_URL_SAFE_NO_PAD.encode(Uuid::now_v7()));
1243    // create the temp file in 'static' dir, under a folder named 's'...
1244    let path = config().static_dir.join("s").join(name);
1245    let parent = path.parent().unwrap();
1246    DirBuilder::new()
1247        .recursive(true)
1248        .create(parent)
1249        .map_err(MyError::IO)
1250        .await?;
1251
1252    let mut file = File::create(&path).map_err(MyError::IO).await?;
1253    let json = match res {
1254        StatementType::S(x) => serde_json::to_string(x).expect("Failed serializing S to temp file"),
1255        StatementType::SId(x) => {
1256            serde_json::to_string(x).expect("Failed serializing SId to temp file")
1257        }
1258        StatementType::SR(x) => {
1259            serde_json::to_string(x).expect("Failed serializing SR to temp file")
1260        }
1261        StatementType::SRId(x) => {
1262            serde_json::to_string(x).expect("Failed serializing SRId to temp file")
1263        }
1264    };
1265    file.write_all(json.as_bytes()).map_err(MyError::IO).await?;
1266    file.flush().map_err(MyError::IO).await?;
1267    Ok(path)
1268}
1269
1270/// Write the given byte array `buf`fer to a local file system at the given
1271/// `path`.
1272async fn save_attachment(bytes: Vec<u8>, part: &InPartInfo) -> Result<(), MyError> {
1273    let path = &part.path;
1274    let name = path.to_string_lossy();
1275
1276    // if the file already exists then return...
1277    if path.exists() {
1278        info!("Attachment {} already exists", name);
1279        return Ok(());
1280    }
1281
1282    let parent = path.parent().unwrap();
1283    DirBuilder::new()
1284        .recursive(true)
1285        .create(parent)
1286        .map_err(MyError::IO)
1287        .await?;
1288
1289    let mut file = File::create(path).map_err(MyError::IO).await?;
1290    file.write_all(&bytes).map_err(MyError::IO).await?;
1291    file.flush().map_err(MyError::IO).await?;
1292    Ok(())
1293}
1294
1295fn consistent_through(timestamp: DateTime<Utc>) -> Header<'static> {
1296    Header::new(
1297        CONSISTENT_THRU_HDR,
1298        timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
1299    )
1300}
1301
1302fn last_modified(timestamp: DateTime<Utc>) -> Header<'static> {
1303    Header::new(
1304        header::LAST_MODIFIED.as_str(),
1305        timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
1306    )
1307}
1308
1309fn ensure_authority(s: &mut Statement, user: &User) -> Result<(), MyError> {
1310    if s.authority().is_none() {
1311        user.can_authorize_statement()?;
1312
1313        s.set_authority_unchecked(Actor::Agent(user.authority()));
1314    }
1315
1316    Ok(())
1317}