#![allow(non_snake_case)]
#![allow(clippy::too_many_arguments)]
use crate::{
MyError, config,
db::{
filter::{Filter, register_new_filter},
statement::{
find_more_statements, find_statement_by_uuid, find_statement_to_void,
find_statements_by_filter, insert_statement, statement_exists, void_statement,
},
},
emit_response, eval_preconditions,
lrs::{
DB, Signature, User, compute_etag,
headers::{CONSISTENT_THRU_HDR, CONTENT_TRANSFER_ENCODING_HDR, HASH_HDR, Headers},
resources::{WithETag, WithResource},
server::{get_consistent_thru, qp},
},
};
use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD};
use chrono::{DateTime, SecondsFormat, Utc};
use mime::{APPLICATION_JSON, Mime};
use openssl::sha::Sha256;
use rocket::{
Request, Responder, State,
futures::{Stream, TryFutureExt},
get,
http::{ContentType, Header, Status, hyper::header},
post, put,
request::{FromRequest, Outcome},
response::stream::stream,
routes,
serde::json::Json,
tokio::{
fs::{DirBuilder, File},
io::{AsyncReadExt, AsyncWriteExt},
},
};
use rocket_multipart::{MultipartReadSection, MultipartReader, MultipartSection, MultipartStream};
use serde::{Deserialize, de::DeserializeOwned};
use serde_json::{Map, Value};
use serde_with::serde_as;
use sqlx::PgPool;
use std::{collections::HashMap, path::PathBuf, str::FromStr};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use xapi_data::{Actor, Attachment, DataError, Format, Statement, StatementIDs, StatementType};
#[derive(Responder)]
struct PutResponse {
inner: WithETag,
}
#[derive(Responder)]
struct PostResponse {
inner: WithResource<StatementIDs>,
}
#[derive(Responder)]
struct GetResponse {
inner: WithResource<StatementType>,
}
#[derive(Responder)]
enum EitherOr<T> {
JsonX(Box<GetResponse>),
Mixed(MultipartStream<T>),
}
fn sha2_path(sha2: &str) -> PathBuf {
let bytes = hex::decode(sha2).expect("Failed decoding signature");
let mut hasher = Sha256::new();
hasher.update(&bytes);
let signature = hasher.finish();
let name = BASE64_URL_SAFE_NO_PAD.encode(signature);
config().static_dir.join(format!("_{name}"))
}
#[derive(Debug, PartialEq)]
struct InPartInfo {
path: PathBuf,
mime: Mime,
len: i64,
sha2: String,
unpopulated: bool,
signature: bool,
}
impl InPartInfo {
fn from(att: &Attachment) -> Self {
InPartInfo {
path: sha2_path(att.sha2()),
mime: att.content_type().clone(),
len: att.length(),
sha2: att.sha2().to_string(),
unpopulated: att.file_url().is_none(),
signature: att.is_signature(),
}
}
}
#[serde_as]
#[derive(Debug, Default, Deserialize)]
struct Statements(#[serde_as(as = "serde_with::OneOrMany<_>")] Vec<Map<String, Value>>);
#[derive(Debug, Default)]
struct QueryParams<'a> {
statement_id: Option<&'a str>,
voided_statement_id: Option<&'a str>,
agent: Option<&'a str>,
verb: Option<&'a str>,
activity: Option<&'a str>,
registration: Option<&'a str>,
since: Option<&'a str>,
until: Option<&'a str>,
limit: Option<u32>,
related_activities: Option<bool>,
related_agents: Option<bool>,
attachments: Option<bool>,
ascending: Option<bool>,
format: Option<&'a str>,
}
#[rocket::async_trait]
impl<'r> FromRequest<'r> for QueryParams<'r> {
type Error = ();
async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
let statement_id = qp::<&str>(req, "statementId");
let voided_statement_id = qp::<&str>(req, "voidedStatementId");
let agent = qp::<&str>(req, "agent");
let verb = qp::<&str>(req, "verb");
let activity = qp::<&str>(req, "activity");
let registration = qp::<&str>(req, "registration");
let since = qp::<&str>(req, "since");
let until = qp::<&str>(req, "until");
let limit = qp::<u32>(req, "limit");
let related_activities = qp::<bool>(req, "related_activities");
let related_agents = qp::<bool>(req, "related_agents");
let attachments = qp::<bool>(req, "attachments");
let ascending = qp::<bool>(req, "ascending");
let format = qp::<&str>(req, "format");
Outcome::Success(QueryParams {
statement_id,
voided_statement_id,
agent,
verb,
activity,
registration,
since,
until,
limit,
related_activities,
related_agents,
attachments,
ascending,
format,
})
}
}
#[derive(Debug)]
struct OutPartInfo {
pub(crate) path: PathBuf,
pub(crate) content_type: ContentType,
pub(crate) len: i64,
pub(crate) sha2: Option<String>,
}
impl OutPartInfo {
fn from(att: &Attachment) -> Option<Self> {
let path = sha2_path(att.sha2());
if !path.exists() {
None
} else {
Some(OutPartInfo {
path,
content_type: ContentType::from_str(att.content_type().as_ref())
.expect("Failed finding MIME"),
len: att.length(),
sha2: Some(att.sha2().to_owned()),
})
}
}
}
#[doc(hidden)]
pub fn routes() -> Vec<rocket::Route> {
routes![
put_mixed, put_json, post_mixed, post_json, __post, post_form, get_some, get_more
]
}
#[put("/?<statementId>", data = "<data>", format = "multipart/mixed")]
async fn put_mixed(
c: Headers,
statementId: &str,
data: MultipartReader<'_>,
db: &State<DB>,
user: User,
) -> Result<PutResponse, MyError> {
debug!("----- put_mixed ----- {}", user);
user.can_use_xapi()?;
let uuid = Uuid::parse_str(statementId)
.map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
debug!("Statement UUID = {}", uuid);
let mut statements = ingest_multipart(data, false).await?;
let statement = statements.iter_mut().next().unwrap();
if statement.id().is_none() {
statement.set_id(uuid)
} else if *statement.id().unwrap() != uuid {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "Statement ID in URL does not match one in body".into(),
});
}
return persist_one(db.pool(), c, statement, &user).await;
}
#[put("/?<statementId>", data = "<json>", format = "application/json")]
async fn put_json(
c: Headers,
statementId: &str,
json: &str,
db: &State<DB>,
user: User,
) -> Result<PutResponse, MyError> {
debug!("----- put_json ----- {}", user);
user.can_use_xapi()?;
let uuid = Uuid::parse_str(statementId)
.map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
debug!("statement UUID = {}", uuid);
let mut statement =
Statement::from_str(json).map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
let mut count = 0;
for att in statement.attachments() {
if att.file_url().is_none() {
count += 1;
}
}
if count > 0 {
error!("Found {} Attachment(s) w/ unpopulated 'fileUrl'", count);
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Found {count} Attachment(s) w/ unpopulated 'fileUrl'").into(),
});
}
if statement.id().is_none() {
statement.set_id(uuid)
} else if *statement.id().unwrap() != uuid {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "Statement ID in URL does not match one in body".into(),
});
}
return persist_one(db.pool(), c, &mut statement, &user).await;
}
#[post("/", data = "<data>", format = "multipart/mixed")]
async fn post_mixed(
c: Headers,
data: MultipartReader<'_>,
db: &State<DB>,
user: User,
) -> Result<PostResponse, MyError> {
debug!("----- post_mixed ----- {}", user);
user.can_use_xapi()?;
debug!("c = {:?}", c);
let statements = ingest_multipart(data, true).await?;
persist_many(db.pool(), c, statements, &user).await
}
#[post("/", data = "<json>", format = "application/json")]
async fn post_json(
c: Headers,
json: Json<Statements>,
db: &State<DB>,
user: User,
) -> Result<PostResponse, MyError> {
debug!("----- post_json ----- {}", user);
user.can_use_xapi()?;
debug!("c = {:?}", c);
let mut statements = vec![];
for map in json.0.0 {
let x = Statement::from_json_obj(map)
.map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
statements.push(x)
}
let mut count = 0;
for s in &statements {
for att in s.attachments() {
if att.file_url().is_none() {
count += 1;
}
}
}
if count > 0 {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Statement w/ {count} unresolved Attachment(s)").into(),
});
}
persist_many(db.pool(), c, statements, &user).await
}
#[post("/", data = "<ignored>", rank = 1)]
async fn __post(ignored: &str) -> Result<PostResponse, MyError> {
debug!("----- __post -----");
let _ = ignored;
Err(MyError::HTTP {
status: Status::BadRequest,
info: "Rocket-specific stopgap. Redirect 404 to 400".into(),
})
}
#[post("/", format = "multipart/form-data")]
async fn post_form() -> Result<PostResponse, MyError> {
debug!("----- post_form -----");
Err(MyError::HTTP {
status: Status::BadRequest,
info: "Abort. xAPI V2 does not support multipart/form-data".into(),
})
}
const VALID_GET_PARAMS: [&str; 14] = [
"statementId",
"voidedStatementId",
"agent",
"verb",
"activity",
"registration",
"related_activities",
"related_agents",
"since",
"until",
"limit",
"format",
"attachments",
"ascending",
];
#[get("/?<extras..>")]
async fn get_some<'r>(
c: Headers,
q: QueryParams<'_>,
mut extras: HashMap<&'r str, &'r str>,
db: &State<DB>,
user: User,
) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
debug!("----- get_some ----- {}", user);
user.can_use_xapi()?;
debug!("q = {:?}", q);
extras.retain(|k, _| !VALID_GET_PARAMS.contains(k));
debug!("extras = {:?}", extras);
if !extras.is_empty() {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Received extraneous query string parameters: {extras:?}").into(),
});
}
if let (Some(_), Some(_)) = (q.statement_id, q.voided_statement_id) {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "Either 'statementId' or 'voidedStatementId' should be present. Not both".into(),
});
}
let with_attachments = q.attachments.unwrap_or(false);
let format = Format::new(q.format.unwrap_or("exact"), c.languages().to_vec())
.map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
let single = q.statement_id.is_some() || q.voided_statement_id.is_some();
let resource = if single {
if q.agent.is_some()
|| q.verb.is_some()
|| q.activity.is_some()
|| q.registration.is_some()
|| q.related_activities.is_some()
|| q.related_agents.is_some()
|| q.since.is_some()
|| q.until.is_some()
|| q.limit.is_some()
|| q.ascending.is_some()
{
return Err(MyError::HTTP {
status: Status::BadRequest,
info:
"Only 'attachments' and 'format' can be present when 1 Statement is requested"
.into(),
});
}
let (voided, uuid) = if let Some(z_statement_id) = q.statement_id {
(false, z_statement_id)
} else {
(true, q.voided_statement_id.unwrap())
};
let uuid = Uuid::from_str(uuid)
.map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
get_one(db.pool(), uuid, voided, &format).await
} else {
let filter = Filter::from(
db.pool(),
q.agent,
q.verb,
q.activity,
q.registration,
q.related_activities,
q.related_agents,
q.since,
q.until,
q.limit,
q.ascending,
)
.await
.map_err(|x| x.with_status(Status::BadRequest))?;
get_many(db.pool(), filter, &format, with_attachments).await
};
let resource = resource?;
debug!("resource = {:?}", resource);
if !with_attachments {
let stored = resource.stored();
let x = emit_response!(c, resource => StatementType, stored)?;
Ok(EitherOr::JsonX(Box::new(GetResponse { inner: x })))
} else {
send_multipart(&resource).await
}
}
async fn send_multipart(
resource: &StatementType,
) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
let mut server_last_modified = get_consistent_thru().await;
let stored = resource.stored();
if stored > server_last_modified {
server_last_modified = stored
}
let first_part = save_statements(resource).await?;
let mut parts = vec![];
for att in resource.attachments() {
if let Some(y) = OutPartInfo::from(&att) {
parts.push(y);
}
}
Ok(EitherOr::Mixed(MultipartStream::new_random(stream! {
let ar = File::open(&first_part).await.expect("Failed re-opening");
yield MultipartSection::new(ar)
.add_header(ContentType::JSON)
.add_header(last_modified(stored))
.add_header(consistent_through(server_last_modified));
for p in parts {
let ar = File::open(p.path).await.expect("Failed re-opening");
yield MultipartSection::new(ar)
.add_header(p.content_type)
.add_header(Header::new(header::CONTENT_LENGTH.as_str(), p.len.to_string()))
.add_header(Header::new(HASH_HDR, p.sha2.unwrap()))
}
})))
}
#[get("/more?<sid>&<count>&<offset>&<limit>&<format>&<attachments>")]
async fn get_more(
c: Headers,
sid: u64,
count: i32,
offset: i32,
limit: i32,
format: &str,
attachments: bool,
db: &State<DB>,
user: User,
) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
debug!("----- get_more ----- {}", user);
user.can_use_xapi()?;
debug!("c = {:?}", c);
debug!("sid = {}", sid);
debug!("count = {}", count);
debug!("offset = {}", offset);
debug!("limit = {}", limit);
debug!("format = {}", format);
debug!("attachments? {}", attachments);
let format = Format::new(format, c.languages().to_vec())
.map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
let (mut resource, y) =
find_more_statements(db.pool(), sid, count, offset, limit, &format).await?;
if let Some(pi) = y {
let more = format!(
"statements/more/?sid={}&count={}&offset={}&limit={}&format={}&attachments={}",
sid,
pi.count,
pi.offset,
pi.limit,
format.as_param(),
attachments
);
let url = config().to_external_url(&more);
debug!("more URL = '{}'", url);
if let Err(z) = &resource.set_more(&url) {
warn!(
"Failed updating `more` URL of StatementResult. Ignore + continue but StatementResult will be inaccurate: {}",
z
);
}
}
if attachments {
send_multipart(&resource).await
} else {
let last_modified = get_consistent_thru().await;
let x = emit_response!(c, resource => StatementType, last_modified)?;
Ok(EitherOr::JsonX(Box::new(GetResponse { inner: x })))
}
}
async fn as_json<T: DeserializeOwned>(
part: &mut MultipartReadSection<'_, '_>,
) -> Result<T, MyError> {
if let Some(ct) = part.headers().get_one("content-type") {
debug!("content-type: '{}'", ct);
let mime = ct
.parse::<Mime>()
.unwrap_or_else(|x| panic!("Failed parsing CT: {x}"));
if mime != APPLICATION_JSON {
let msg = format!("Expected 'application/json' CT; got '{ct}'");
error!("{}", msg);
return Err(MyError::Runtime(msg.into()));
}
}
let mut buf = vec![];
part.read_to_end(&mut buf)
.await
.unwrap_or_else(|x| panic!("Failed consuming Part: {x}"));
serde_json::from_slice::<T>(&buf).map_err(|x| {
let msg = format!("Failed deserializing part: {x}");
error!("{}", msg);
MyError::Runtime(msg.into())
})
}
async fn ingest_multipart(
mut data: MultipartReader<'_>,
force_ids: bool,
) -> Result<Vec<Statement>, MyError> {
debug!("content-type: {}", data.content_type().0);
debug!("force_ids? {}", force_ids);
let mut statements = vec![];
let mut total = 0;
let mut unpopulated = 0;
let mut matched = 0;
let mut matched_unpopulated = 0;
let mut included = vec![];
let mut ndx = 0;
while let Some(mut part) = data
.next()
.await
.unwrap_or_else(|x| panic!("Failed reading Part #{ndx}: {x}"))
{
if ndx == 0 {
let x = as_json::<Statements>(&mut part)
.map_err(|x| x.with_status(Status::BadRequest))
.await?;
for map in x.0 {
let y = Statement::from_json_obj(map)
.map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
statements.push(y)
}
for s in &mut statements {
if s.id().is_none() && force_ids {
s.set_id(Uuid::now_v7())
}
for att in s.attachments() {
total += 1;
if att.file_url().is_none() {
unpopulated += 1
}
included.push(InPartInfo::from(att))
}
}
} else if total == 0 {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "This is the 2nd Part but we have no Attachments to match".into(),
});
} else {
let hash = part.headers().get_one(HASH_HDR);
if hash.is_none() {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "Missing Hash header".into(),
});
}
let hash = hash.unwrap().to_owned();
debug!("-- x-experience-api-hash: '{}'", hash);
let cte = part.headers().get_one(CONTENT_TRANSFER_ENCODING_HDR);
if cte.is_none() {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "Missing CTE header".into(),
});
}
let enc = cte.unwrap().trim();
debug!("-- content-transfer-encoding: {}", enc);
if enc != "binary" {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Expected 'binary' CTE but found '{enc}'").into(),
});
}
let mut buf = vec![];
let size = part
.read_to_end(&mut buf)
.await
.unwrap_or_else(|x| panic!("Failed consuming Part #{ndx}: {x}"));
debug!("size (actual) = {} (bytes)", size);
let size = i64::try_from(size).map_err(|x| {
MyError::Runtime(format!("Failed converting {size} to i64: {x}").into())
})?;
if let Some(ac) = included.iter_mut().find(|x| x.sha2 == hash) {
if ac.len != size {
warn!(
"Part #{} actual size ({}) doesn't match declared ({}) value",
ndx, size, ac.len
);
}
match part.headers().get_one(header::CONTENT_LENGTH.as_str()) {
Some(x) => {
match x.parse::<i64>() {
Ok(cl) => {
debug!("-- content-length: {}", cl);
if ac.len != cl {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!(
"Part #{ndx} CL ({cl}) doesn't match declared ({}) value", ac.len)
.into(),
});
}
}
Err(x) => {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Failed parsing Part #{ndx} CL: {x}").into(),
});
}
}
}
None => info!("Part #{} has no CL", ndx),
}
match part.headers().get_one(header::CONTENT_TYPE.as_str()) {
Some(x) => {
match x.parse::<Mime>() {
Ok(ct) => {
debug!("-- content-type: {}", ct);
if ac.mime != ct {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!(
"Part #{ndx} CT ({ct}) doesn't match declared MIME ({})", ac.mime)
.into(),
});
}
}
Err(x) => {
error!("Failed parsing Part #{} CT: {}", ndx, x);
return Err(MyError::Data(DataError::MIME(x))
.with_status(Status::BadRequest));
}
}
}
None => info!("Part #{} has no CT", ndx),
}
if ac.signature {
debug!("Found a JWS Signature!");
let sig = Signature::from(buf).map_err(|x| {
error!("Failed processing JWS signature part: {}", x);
x.with_status(Status::BadRequest)
})?;
if statements.iter().any(|s| sig.verify(s)) {
info!("Matched JWS Signature to its Statement");
matched += 1;
matched_unpopulated += 1;
} else {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "Failed matching any Statement to a JWS Signature".into(),
});
}
} else {
debug!("Found an Attachment candidate!");
save_attachment(buf, ac)
.await
.expect("Failed saving buffer");
matched += 1;
if ac.unpopulated {
matched_unpopulated += 1
}
}
} else {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Part #{ndx} is not an attachment").into(),
});
}
}
ndx += 1;
}
ndx -= 1;
debug!("Total parts (minus Statement(s)) = {}", ndx);
debug!("Total Attachments = {}", total);
debug!("Total Attachments w/o 'fileUrl' = {}", unpopulated);
debug!("Total matched Attachments = {}", matched);
debug!(
"Total matched unpopulated Attachments = {}",
matched_unpopulated
);
let unmatched = ndx - matched;
debug!("Total unmatched parts = {}", unmatched);
let problem = (unpopulated > 0) && (unpopulated != matched_unpopulated);
debug!("problem? {}", problem);
if problem {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: "Houston, we have a problem".into(),
});
}
Ok(statements)
}
async fn persist_one(
conn: &PgPool,
c: Headers,
statement: &mut Statement,
user: &User,
) -> Result<PutResponse, MyError> {
debug!("statement = {}", statement);
let uuid = statement.id().unwrap();
let x = statement_exists(conn, uuid).await?;
match x {
None => (),
Some(_fingerprint) => {
if c.has_no_conditionals() {
return Err(MyError::HTTP {
status: Status::Conflict,
info: "Missing pre-condition(s)".into(),
});
} else {
let etag = compute_etag::<Statement>(statement)?;
return match eval_preconditions!(&etag, c) {
s if s != Status::Ok => Err(MyError::HTTP {
status: s,
info: "Failed pre-condition(s)".into(),
}),
_ => Ok(PutResponse {
inner: WithETag {
inner: Status::NoContent,
etag: Header::new(header::ETAG.as_str(), etag.to_string()),
},
}),
};
}
}
}
ensure_authority(statement, user)?;
let mut to_void_id = None;
if statement.is_verb_voided() {
if let Some(target_uuid) = statement.voided_target() {
let (found, valid, id) = find_statement_to_void(conn, &target_uuid).await?;
if found {
if valid {
to_void_id = Some(id)
} else {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Target of voiding statement ({target_uuid}) is invalid")
.into(),
});
}
}
} else {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Invalid voiding statement {statement}").into(),
});
}
}
insert_statement(conn, statement).await?;
if let Some(id) = to_void_id {
debug!("About to void Statement #{}", id);
void_statement(conn, id).await?;
info!("Voided Statement #{}", id)
}
let etag = compute_etag::<Statement>(statement)?;
match eval_preconditions!(&etag, c) {
s if s != Status::Ok => Err(MyError::HTTP {
status: s,
info: "Failed pre-condition(s)".into(),
}),
_ => Ok(PutResponse {
inner: WithETag {
inner: Status::NoContent,
etag: Header::new(header::ETAG.as_str(), etag.to_string()),
},
}),
}
}
async fn persist_many(
conn: &PgPool,
c: Headers,
mut statements: Vec<Statement>,
user: &User,
) -> Result<PostResponse, MyError> {
debug!("statements = {:?}", statements);
let mut uuids = vec![];
for s in &mut statements {
let uuid = match s.id() {
Some(x) => *x,
None => {
let id = Uuid::now_v7();
s.set_id(id);
id
}
};
if uuids.contains(&uuid) {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Found Statements w/ same ID: {uuid}").into(),
});
}
uuids.push(uuid)
}
debug!("uuids (before) = {:?}", uuids);
let mut i = 0;
while i < statements.len() {
let s = &statements[i];
let uuid = s.id().unwrap();
let tmp = statement_exists(conn, uuid).await?;
match tmp {
None => i += 1,
Some(x) => {
let s_uid = s.uid();
if s_uid != x {
return Err(MyError::HTTP {
status: Status::Conflict,
info: format!(
"Already have a Statement w/ same UUID ({uuid}) but different FP. Conflict")
.into(),
});
}
let dup = statements.remove(i);
info!("Drop duplicate {}", dup);
}
}
}
if statements.is_empty() {
return Err(MyError::HTTP {
status: Status::NoContent,
info: "No new Statements left".into(),
});
}
let mut ids_to_void = vec![];
for s in &statements {
if s.is_verb_voided() {
if let Some(target_uuid) = s.voided_target() {
let (found, valid, id) = find_statement_to_void(conn, &target_uuid).await?;
if found {
if valid {
ids_to_void.push(id)
} else {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Target of voiding statement ({target_uuid}) is invalid")
.into(),
});
}
}
} else {
return Err(MyError::HTTP {
status: Status::BadRequest,
info: format!("Invalid voiding statement {s}").into(),
});
}
}
}
info!("Found {} Statement(s) to void", ids_to_void.len());
uuids.clear();
let n = statements.len();
for mut s in statements {
let uuid = *s.id().unwrap();
ensure_authority(&mut s, user)?;
debug!("Persisting Statement #{} (1 of {})...", uuid, n);
insert_statement(conn, &s).await?;
uuids.push(uuid);
}
for id in ids_to_void {
debug!("About to void Statement #{}", id);
void_statement(conn, id).await?;
info!("Voided Statement #{}", id)
}
let resource = StatementIDs(uuids);
let inner = emit_response!(c, resource => StatementIDs)?;
Ok(PostResponse { inner })
}
async fn get_one(
conn: &PgPool,
uuid: Uuid,
voided: bool,
format: &Format,
) -> Result<StatementType, MyError> {
debug!("uuid = {}", uuid);
debug!("voided? {}", voided);
debug!("format = {}", format);
let x = find_statement_by_uuid(conn, uuid, voided, format).await?;
match x {
Some(x) => Ok(x),
None => Err(MyError::HTTP {
status: Status::NotFound,
info: "Statement not found".into(),
}),
}
}
async fn get_many(
conn: &PgPool,
filter: Filter,
format: &Format,
with_attachments: bool,
) -> Result<StatementType, MyError> {
debug!("filter = {}", filter);
debug!("format = {}", format);
let sid = register_new_filter(conn).await?;
debug!("sid = {}", sid);
let (mut x, y) = find_statements_by_filter(conn, filter, format, sid).await?;
if let Some(pi) = y {
let more = format!(
"statements/more/?sid={}&count={}&offset={}&limit={}&format={}&attachments={}",
sid,
pi.count,
pi.offset,
pi.limit,
format.as_param(),
with_attachments
);
let url = config().to_external_url(&more);
debug!("more URL = '{}'", url);
if let Err(z) = &x.set_more(&url) {
warn!(
"Failed updating `more` URL of StatementResult. Ignore + continue but StatementResult will be inaccurate: {}",
z
);
}
}
Ok(x)
}
async fn save_statements(res: &StatementType) -> Result<PathBuf, MyError> {
let name = &format!("_{}", BASE64_URL_SAFE_NO_PAD.encode(Uuid::now_v7()));
let path = config().static_dir.join("s").join(name);
let parent = path.parent().unwrap();
DirBuilder::new()
.recursive(true)
.create(parent)
.map_err(MyError::IO)
.await?;
let mut file = File::create(&path).map_err(MyError::IO).await?;
let json = match res {
StatementType::S(x) => serde_json::to_string(x).expect("Failed serializing S to temp file"),
StatementType::SId(x) => {
serde_json::to_string(x).expect("Failed serializing SId to temp file")
}
StatementType::SR(x) => {
serde_json::to_string(x).expect("Failed serializing SR to temp file")
}
StatementType::SRId(x) => {
serde_json::to_string(x).expect("Failed serializing SRId to temp file")
}
};
file.write_all(json.as_bytes()).map_err(MyError::IO).await?;
file.flush().map_err(MyError::IO).await?;
Ok(path)
}
async fn save_attachment(bytes: Vec<u8>, part: &InPartInfo) -> Result<(), MyError> {
let path = &part.path;
let name = path.to_string_lossy();
if path.exists() {
info!("Attachment {} already exists", name);
return Ok(());
}
let parent = path.parent().unwrap();
DirBuilder::new()
.recursive(true)
.create(parent)
.map_err(MyError::IO)
.await?;
let mut file = File::create(path).map_err(MyError::IO).await?;
file.write_all(&bytes).map_err(MyError::IO).await?;
file.flush().map_err(MyError::IO).await?;
Ok(())
}
fn consistent_through(timestamp: DateTime<Utc>) -> Header<'static> {
Header::new(
CONSISTENT_THRU_HDR,
timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
)
}
fn last_modified(timestamp: DateTime<Utc>) -> Header<'static> {
Header::new(
header::LAST_MODIFIED.as_str(),
timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
)
}
fn ensure_authority(s: &mut Statement, user: &User) -> Result<(), MyError> {
if s.authority().is_none() {
user.can_authorize_statement()?;
s.set_authority_unchecked(Actor::Agent(user.authority()));
}
Ok(())
}