1#![allow(non_snake_case)]
4#![allow(clippy::too_many_arguments)]
5
6use crate::{
17 DataError, MyError, config,
18 data::{Actor, Attachment, Format, Statement, StatementIDs, statement_type::StatementType},
19 db::{
20 filter::{Filter, register_new_filter},
21 statement::{
22 find_more_statements, find_statement_by_uuid, find_statement_to_void,
23 find_statements_by_filter, insert_statement, statement_exists, void_statement,
24 },
25 },
26 emit_response, eval_preconditions,
27 lrs::{
28 DB, Signature, User, compute_etag,
29 headers::{CONSISTENT_THRU_HDR, CONTENT_TRANSFER_ENCODING_HDR, HASH_HDR, Headers},
30 resources::{WithETag, WithResource},
31 server::{get_consistent_thru, qp},
32 },
33};
34use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD};
35use chrono::{DateTime, SecondsFormat, Utc};
36use mime::{APPLICATION_JSON, Mime};
37use openssl::sha::Sha256;
38use rocket::{
39 Request, Responder, State,
40 futures::{Stream, TryFutureExt},
41 get,
42 http::{ContentType, Header, Status, hyper::header},
43 post, put,
44 request::{FromRequest, Outcome},
45 response::stream::stream,
46 routes,
47 serde::json::Json,
48 tokio::{
49 fs::{DirBuilder, File},
50 io::{AsyncReadExt, AsyncWriteExt},
51 },
52};
53use rocket_multipart::{MultipartReadSection, MultipartReader, MultipartSection, MultipartStream};
54use serde::{Deserialize, de::DeserializeOwned};
55use serde_json::{Map, Value};
56use serde_with::serde_as;
57use sqlx::PgPool;
58use std::{collections::HashMap, path::PathBuf, str::FromStr};
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62#[derive(Responder)]
65struct PutResponse {
66 inner: WithETag,
67}
68
69#[derive(Responder)]
72struct PostResponse {
73 inner: WithResource<StatementIDs>,
74}
75
76#[derive(Responder)]
80struct GetResponse {
81 inner: WithResource<StatementType>,
82}
83
84#[derive(Responder)]
88enum EitherOr<T> {
89 JsonX(Box<GetResponse>),
90 Mixed(MultipartStream<T>),
91}
92
93fn sha2_path(sha2: &str) -> PathBuf {
96 let bytes = hex::decode(sha2).expect("Failed decoding signature");
97 let mut hasher = Sha256::new();
98 hasher.update(&bytes);
99 let signature = hasher.finish();
100 let name = BASE64_URL_SAFE_NO_PAD.encode(signature);
101 config().static_dir.join(format!("_{name}"))
102}
103
104#[derive(Debug, PartialEq)]
107struct InPartInfo {
108 path: PathBuf,
109 mime: Mime,
110 len: i64,
111 sha2: String,
112 unpopulated: bool,
113 signature: bool,
114}
115
116impl InPartInfo {
117 fn from(att: &Attachment) -> Self {
118 InPartInfo {
119 path: sha2_path(att.sha2()),
120 mime: att.content_type().clone(),
121 len: att.length(),
122 sha2: att.sha2().to_string(),
123 unpopulated: att.file_url().is_none(),
124 signature: att.is_signature(),
125 }
126 }
127}
128
129#[serde_as]
131#[derive(Debug, Default, Deserialize)]
132struct Statements(#[serde_as(as = "serde_with::OneOrMany<_>")] Vec<Map<String, Value>>);
133
134#[derive(Debug, Default)]
136struct QueryParams<'a> {
137 statement_id: Option<&'a str>,
138 voided_statement_id: Option<&'a str>,
139 agent: Option<&'a str>,
140 verb: Option<&'a str>,
141 activity: Option<&'a str>,
142 registration: Option<&'a str>,
143 since: Option<&'a str>,
144 until: Option<&'a str>,
145 limit: Option<u32>,
146 related_activities: Option<bool>,
147 related_agents: Option<bool>,
148 attachments: Option<bool>,
149 ascending: Option<bool>,
150 format: Option<&'a str>,
151}
152
153#[rocket::async_trait]
154impl<'r> FromRequest<'r> for QueryParams<'r> {
155 type Error = ();
156
157 async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
158 let statement_id = qp::<&str>(req, "statementId");
159 let voided_statement_id = qp::<&str>(req, "voidedStatementId");
160 let agent = qp::<&str>(req, "agent");
161 let verb = qp::<&str>(req, "verb");
162 let activity = qp::<&str>(req, "activity");
163 let registration = qp::<&str>(req, "registration");
164 let since = qp::<&str>(req, "since");
165 let until = qp::<&str>(req, "until");
166
167 let limit = qp::<u32>(req, "limit");
168
169 let related_activities = qp::<bool>(req, "related_activities");
170 let related_agents = qp::<bool>(req, "related_agents");
171 let attachments = qp::<bool>(req, "attachments");
172 let ascending = qp::<bool>(req, "ascending");
173
174 let format = qp::<&str>(req, "format");
175
176 Outcome::Success(QueryParams {
177 statement_id,
178 voided_statement_id,
179 agent,
180 verb,
181 activity,
182 registration,
183 since,
184 until,
185 limit,
186 related_activities,
187 related_agents,
188 attachments,
189 ascending,
190 format,
191 })
192 }
193}
194
195#[derive(Debug)]
198struct OutPartInfo {
199 pub(crate) path: PathBuf,
201 pub(crate) content_type: ContentType,
203 pub(crate) len: i64,
216 pub(crate) sha2: Option<String>,
218}
219
220impl OutPartInfo {
221 fn from(att: &Attachment) -> Option<Self> {
222 let path = sha2_path(att.sha2());
223 if !path.exists() {
224 None
225 } else {
226 Some(OutPartInfo {
227 path,
228 content_type: ContentType::from_str(att.content_type().as_ref())
229 .expect("Failed finding MIME"),
230 len: att.length(),
231 sha2: Some(att.sha2().to_owned()),
232 })
233 }
234 }
235}
236
237#[doc(hidden)]
238pub fn routes() -> Vec<rocket::Route> {
239 routes![
240 put_mixed, put_json, post_mixed, post_json, __post, post_form, get_some, get_more
241 ]
242}
243
244#[put("/?<statementId>", data = "<data>", format = "multipart/mixed")]
263async fn put_mixed(
264 c: Headers,
265 statementId: &str,
266 data: MultipartReader<'_>,
267 db: &State<DB>,
268 user: User,
269) -> Result<PutResponse, MyError> {
270 debug!("----- put_mixed ----- {}", user);
271 user.can_use_xapi()?;
272
273 let uuid = Uuid::parse_str(statementId)
274 .map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
275 debug!("Statement UUID = {}", uuid);
276
277 let mut statements = ingest_multipart(data, false).await?;
280
281 let statement = statements.iter_mut().next().unwrap();
282 if statement.id().is_none() {
283 statement.set_id(uuid)
284 } else if *statement.id().unwrap() != uuid {
285 return Err(MyError::HTTP {
286 status: Status::BadRequest,
287 info: "Statement ID in URL does not match one in body".into(),
288 });
289 }
290
291 return persist_one(db.pool(), c, statement, &user).await;
292}
293
294#[put("/?<statementId>", data = "<json>", format = "application/json")]
295async fn put_json(
296 c: Headers,
297 statementId: &str,
298 json: &str,
299 db: &State<DB>,
300 user: User,
301) -> Result<PutResponse, MyError> {
302 debug!("----- put_json ----- {}", user);
303 user.can_use_xapi()?;
304
305 let uuid = Uuid::parse_str(statementId)
306 .map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
307 debug!("statement UUID = {}", uuid);
308
309 let mut statement =
310 Statement::from_str(json).map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
311
312 let mut count = 0;
317 for att in statement.attachments() {
318 if att.file_url().is_none() {
319 count += 1;
320 }
321 }
322 if count > 0 {
323 error!("Found {} Attachment(s) w/ unpopulated 'fileUrl'", count);
324 return Err(MyError::HTTP {
325 status: Status::BadRequest,
326 info: format!("Found {count} Attachment(s) w/ unpopulated 'fileUrl'").into(),
327 });
328 }
329
330 if statement.id().is_none() {
331 statement.set_id(uuid)
332 } else if *statement.id().unwrap() != uuid {
333 return Err(MyError::HTTP {
334 status: Status::BadRequest,
335 info: "Statement ID in URL does not match one in body".into(),
336 });
337 }
338
339 return persist_one(db.pool(), c, &mut statement, &user).await;
340}
341
342#[post("/", data = "<data>", format = "multipart/mixed")]
364async fn post_mixed(
365 c: Headers,
366 data: MultipartReader<'_>,
367 db: &State<DB>,
368 user: User,
369) -> Result<PostResponse, MyError> {
370 debug!("----- post_mixed ----- {}", user);
371 user.can_use_xapi()?;
372
373 debug!("c = {:?}", c);
374 let statements = ingest_multipart(data, true).await?;
375
376 persist_many(db.pool(), c, statements, &user).await
377}
378
379#[post("/", data = "<json>", format = "application/json")]
380async fn post_json(
381 c: Headers,
382 json: Json<Statements>,
383 db: &State<DB>,
384 user: User,
385) -> Result<PostResponse, MyError> {
386 debug!("----- post_json ----- {}", user);
387 user.can_use_xapi()?;
388
389 debug!("c = {:?}", c);
390 let mut statements = vec![];
391 for map in json.0.0 {
392 let x = Statement::from_json_obj(map)
393 .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
394 statements.push(x)
395 }
396
397 let mut count = 0;
402 for s in &statements {
403 for att in s.attachments() {
404 if att.file_url().is_none() {
405 count += 1;
406 }
407 }
408 }
409 if count > 0 {
410 return Err(MyError::HTTP {
411 status: Status::BadRequest,
412 info: format!("Statement w/ {count} unresolved Attachment(s)").into(),
413 });
414 }
415
416 persist_many(db.pool(), c, statements, &user).await
417}
418
419#[post("/", data = "<ignored>", rank = 1)]
423async fn __post(ignored: &str) -> Result<PostResponse, MyError> {
424 debug!("----- __post -----");
425 let _ = ignored;
426 Err(MyError::HTTP {
427 status: Status::BadRequest,
428 info: "Rocket-specific stopgap. Redirect 404 to 400".into(),
429 })
430}
431
432#[post("/", format = "multipart/form-data")]
433async fn post_form() -> Result<PostResponse, MyError> {
434 debug!("----- post_form -----");
435 Err(MyError::HTTP {
436 status: Status::BadRequest,
437 info: "Abort. xAPI V2 does not support multipart/form-data".into(),
438 })
439}
440
441const VALID_GET_PARAMS: [&str; 14] = [
442 "statementId",
443 "voidedStatementId",
444 "agent",
445 "verb",
446 "activity",
447 "registration",
448 "related_activities",
449 "related_agents",
450 "since",
451 "until",
452 "limit",
453 "format",
454 "attachments",
455 "ascending",
456];
457
458#[get("/?<extras..>")]
469async fn get_some<'r>(
470 c: Headers,
471 q: QueryParams<'_>,
472 mut extras: HashMap<&'r str, &'r str>,
473 db: &State<DB>,
474 user: User,
475) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
476 debug!("----- get_some ----- {}", user);
477 user.can_use_xapi()?;
478
479 debug!("q = {:?}", q);
480 extras.retain(|k, _| !VALID_GET_PARAMS.contains(k));
484 debug!("extras = {:?}", extras);
485 if !extras.is_empty() {
486 return Err(MyError::HTTP {
487 status: Status::BadRequest,
488 info: format!("Received extraneous query string parameters: {extras:?}").into(),
489 });
490 }
491
492 if let (Some(_), Some(_)) = (q.statement_id, q.voided_statement_id) {
495 return Err(MyError::HTTP {
496 status: Status::BadRequest,
497 info: "Either 'statementId' or 'voidedStatementId' should be present. Not both".into(),
498 });
499 }
500
501 let with_attachments = q.attachments.unwrap_or(false);
502 let format = Format::new(q.format.unwrap_or("exact"), c.languages().to_vec())
503 .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
504
505 let single = q.statement_id.is_some() || q.voided_statement_id.is_some();
506 let resource = if single {
507 if q.agent.is_some()
512 || q.verb.is_some()
513 || q.activity.is_some()
514 || q.registration.is_some()
515 || q.related_activities.is_some()
516 || q.related_agents.is_some()
517 || q.since.is_some()
518 || q.until.is_some()
519 || q.limit.is_some()
520 || q.ascending.is_some()
521 {
522 return Err(MyError::HTTP {
523 status: Status::BadRequest,
524 info:
525 "Only 'attachments' and 'format' can be present when 1 Statement is requested"
526 .into(),
527 });
528 }
529
530 let (voided, uuid) = if q.statement_id.is_some() {
531 (false, q.statement_id.unwrap())
532 } else {
533 (true, q.voided_statement_id.unwrap())
534 };
535
536 let uuid = Uuid::from_str(uuid)
537 .map_err(|x| MyError::Data(DataError::UUID(x)).with_status(Status::BadRequest))?;
538
539 get_one(db.pool(), uuid, voided, &format).await
540 } else {
541 let filter = Filter::from(
542 db.pool(),
543 q.agent,
544 q.verb,
545 q.activity,
546 q.registration,
547 q.related_activities,
548 q.related_agents,
549 q.since,
550 q.until,
551 q.limit,
552 q.ascending,
553 )
554 .await
555 .map_err(|x| x.with_status(Status::BadRequest))?;
556
557 get_many(db.pool(), filter, &format, with_attachments).await
558 };
559
560 let resource = resource?;
561 debug!("resource = {:?}", resource);
562 if !with_attachments {
563 let stored = resource.stored();
564 let x = emit_response!(c, resource => StatementType, stored)?;
565 Ok(EitherOr::JsonX(Box::new(GetResponse { inner: x })))
566 } else {
567 send_multipart(&resource).await
568 }
569}
570
571async fn send_multipart(
572 resource: &StatementType,
573) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
574 let mut server_last_modified = get_consistent_thru().await;
575 let stored = resource.stored();
576 if stored > server_last_modified {
577 server_last_modified = stored
578 }
579
580 let first_part = save_statements(resource).await?;
581 let mut parts = vec![];
582 for att in resource.attachments() {
583 if let Some(y) = OutPartInfo::from(&att) {
584 parts.push(y);
585 }
586 }
587 Ok(EitherOr::Mixed(MultipartStream::new_random(stream! {
588 let ar = File::open(&first_part).await.expect("Failed re-opening");
589 yield MultipartSection::new(ar)
590 .add_header(ContentType::JSON)
591 .add_header(last_modified(stored))
592 .add_header(consistent_through(server_last_modified));
593 for p in parts {
594 let ar = File::open(p.path).await.expect("Failed re-opening");
595 yield MultipartSection::new(ar)
596 .add_header(p.content_type)
597 .add_header(Header::new(header::CONTENT_LENGTH.as_str(), p.len.to_string()))
598 .add_header(Header::new(HASH_HDR, p.sha2.unwrap()))
599 }
600 })))
601}
602
603#[get("/more?<sid>&<count>&<offset>&<limit>&<format>&<attachments>")]
604async fn get_more(
605 c: Headers,
606 sid: u64,
607 count: i32,
608 offset: i32,
609 limit: i32,
610 format: &str,
611 attachments: bool,
612 db: &State<DB>,
613 user: User,
614) -> Result<EitherOr<impl Stream<Item = MultipartSection<'static>> + use<>>, MyError> {
615 debug!("----- get_more ----- {}", user);
616 user.can_use_xapi()?;
617
618 debug!("c = {:?}", c);
619 debug!("sid = {}", sid);
620 debug!("count = {}", count);
621 debug!("offset = {}", offset);
622 debug!("limit = {}", limit);
623 debug!("format = {}", format);
624 debug!("attachments? {}", attachments);
625
626 let format = Format::new(format, c.languages().to_vec())
627 .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
628
629 let (mut resource, y) =
630 find_more_statements(db.pool(), sid, count, offset, limit, &format).await?;
631 if let Some(pi) = y {
632 let more = format!(
633 "statements/more/?sid={}&count={}&offset={}&limit={}&format={}&attachments={}",
634 sid,
635 pi.count,
636 pi.offset,
637 pi.limit,
638 format.as_param(),
639 attachments
640 );
641 let url = config().to_external_url(&more);
642 debug!("more URL = '{}'", url);
643 if let Err(z) = &resource.set_more(&url) {
644 warn!(
645 "Failed updating `more` URL of StatementResult. Ignore + continue but StatementResult will be inaccurate: {}",
646 z
647 );
648 }
649 }
650
651 if attachments {
652 send_multipart(&resource).await
653 } else {
654 let last_modified = get_consistent_thru().await;
655 let x = emit_response!(c, resource => StatementType, last_modified)?;
656 Ok(EitherOr::JsonX(Box::new(GetResponse { inner: x })))
657 }
658}
659
660async fn as_json<T: DeserializeOwned>(
664 part: &mut MultipartReadSection<'_, '_>,
665) -> Result<T, MyError> {
666 if let Some(ct) = part.headers().get_one("content-type") {
668 debug!("content-type: '{}'", ct);
669 let mime = ct
670 .parse::<Mime>()
671 .unwrap_or_else(|x| panic!("Failed parsing CT: {x}"));
672 if mime != APPLICATION_JSON {
673 let msg = format!("Expected 'application/json' CT; got '{ct}'");
674 error!("{}", msg);
675 return Err(MyError::Runtime(msg.into()));
676 }
677 }
679
680 let mut buf = vec![];
681 part.read_to_end(&mut buf)
682 .await
683 .unwrap_or_else(|x| panic!("Failed consuming Part: {x}"));
684 serde_json::from_slice::<T>(&buf).map_err(|x| {
685 let msg = format!("Failed deserializing part: {x}");
686 error!("{}", msg);
687 MyError::Runtime(msg.into())
688 })
689}
690
691async fn ingest_multipart(
696 mut data: MultipartReader<'_>,
697 force_ids: bool,
698) -> Result<Vec<Statement>, MyError> {
699 debug!("content-type: {}", data.content_type().0);
700 debug!("force_ids? {}", force_ids);
701
702 let mut statements = vec![];
704 let mut total = 0;
706 let mut unpopulated = 0;
708 let mut matched = 0;
710 let mut matched_unpopulated = 0;
712 let mut included = vec![];
714 let mut ndx = 0;
715 while let Some(mut part) = data
716 .next()
717 .await
718 .unwrap_or_else(|x| panic!("Failed reading Part #{ndx}: {x}"))
719 {
720 if ndx == 0 {
721 let x = as_json::<Statements>(&mut part)
723 .map_err(|x| x.with_status(Status::BadRequest))
724 .await?;
725 for map in x.0 {
726 let y = Statement::from_json_obj(map)
727 .map_err(|x| MyError::Data(x).with_status(Status::BadRequest))?;
728 statements.push(y)
729 }
730 for s in &mut statements {
735 if s.id().is_none() && force_ids {
736 s.set_id(Uuid::now_v7())
737 }
738 for att in s.attachments() {
739 total += 1;
740 if att.file_url().is_none() {
741 unpopulated += 1
742 }
743 included.push(InPartInfo::from(att))
744 }
745 }
746 } else if total == 0 {
747 return Err(MyError::HTTP {
752 status: Status::BadRequest,
753 info: "This is the 2nd Part but we have no Attachments to match".into(),
754 });
755 } else {
756 let hash = part.headers().get_one(HASH_HDR);
759 if hash.is_none() {
760 return Err(MyError::HTTP {
761 status: Status::BadRequest,
762 info: "Missing Hash header".into(),
763 });
764 }
765 let hash = hash.unwrap().to_owned();
766 debug!("-- x-experience-api-hash: '{}'", hash);
767
768 let cte = part.headers().get_one(CONTENT_TRANSFER_ENCODING_HDR);
771 if cte.is_none() {
772 return Err(MyError::HTTP {
773 status: Status::BadRequest,
774 info: "Missing CTE header".into(),
775 });
776 }
777 let enc = cte.unwrap().trim();
778 debug!("-- content-transfer-encoding: {}", enc);
779 if enc != "binary" {
780 return Err(MyError::HTTP {
781 status: Status::BadRequest,
782 info: format!("Expected 'binary' CTE but found '{enc}'").into(),
783 });
784 }
785
786 let mut buf = vec![];
788 let size = part
789 .read_to_end(&mut buf)
790 .await
791 .unwrap_or_else(|x| panic!("Failed consuming Part #{ndx}: {x}"));
792 debug!("size (actual) = {} (bytes)", size);
793 let size = i64::try_from(size).map_err(|x| {
798 MyError::Runtime(format!("Failed converting {size} to i64: {x}").into())
799 })?;
800
801 if let Some(ac) = included.iter_mut().find(|x| x.sha2 == hash) {
803 if ac.len != size {
804 warn!(
805 "Part #{} actual size ({}) doesn't match declared ({}) value",
806 ndx, size, ac.len
807 );
808 }
809
810 match part.headers().get_one(header::CONTENT_LENGTH.as_str()) {
812 Some(x) => {
813 match x.parse::<i64>() {
814 Ok(cl) => {
815 debug!("-- content-length: {}", cl);
816 if ac.len != cl {
817 return Err(MyError::HTTP {
818 status: Status::BadRequest,
819 info: format!(
820 "Part #{ndx} CL ({cl}) doesn't match declared ({}) value", ac.len)
821 .into(),
822 });
823 }
824 }
825 Err(x) => {
826 return Err(MyError::HTTP {
827 status: Status::BadRequest,
828 info: format!("Failed parsing Part #{ndx} CL: {x}").into(),
829 });
830 }
831 }
832 }
833 None => info!("Part #{} has no CL", ndx),
834 }
835
836 match part.headers().get_one(header::CONTENT_TYPE.as_str()) {
838 Some(x) => {
839 match x.parse::<Mime>() {
840 Ok(ct) => {
841 debug!("-- content-type: {}", ct);
842 if ac.mime != ct {
843 return Err(MyError::HTTP {
844 status: Status::BadRequest,
845 info: format!(
846 "Part #{ndx} CT ({ct}) doesn't match declared MIME ({})", ac.mime)
847 .into(),
848 });
849 }
850 }
851 Err(x) => {
852 error!("Failed parsing Part #{} CT: {}", ndx, x);
853 return Err(MyError::Data(DataError::MIME(x))
854 .with_status(Status::BadRequest));
855 }
856 }
857 }
858 None => info!("Part #{} has no CT", ndx),
859 }
860
861 if ac.signature {
863 debug!("Found a JWS Signature!");
864 let sig = Signature::from(buf).map_err(|x| {
865 error!("Failed processing JWS signature part: {}", x);
866 x.with_status(Status::BadRequest)
867 })?;
868 if statements.iter().any(|s| sig.verify(s)) {
869 info!("Matched JWS Signature to its Statement");
870 matched += 1;
871 matched_unpopulated += 1;
872 } else {
873 return Err(MyError::HTTP {
874 status: Status::BadRequest,
875 info: "Failed matching any Statement to a JWS Signature".into(),
876 });
877 }
878 } else {
879 debug!("Found an Attachment candidate!");
880 save_attachment(buf, ac)
881 .await
882 .expect("Failed saving buffer");
883 matched += 1;
884 if ac.unpopulated {
885 matched_unpopulated += 1
886 }
887 }
888 } else {
889 return Err(MyError::HTTP {
890 status: Status::BadRequest,
891 info: format!("Part #{ndx} is not an attachment").into(),
892 });
893 }
894 }
895
896 ndx += 1;
897 }
898
899 ndx -= 1;
900 debug!("Total parts (minus Statement(s)) = {}", ndx);
901 debug!("Total Attachments = {}", total);
902 debug!("Total Attachments w/o 'fileUrl' = {}", unpopulated);
903 debug!("Total matched Attachments = {}", matched);
904 debug!(
905 "Total matched unpopulated Attachments = {}",
906 matched_unpopulated
907 );
908 let unmatched = ndx - matched;
909 debug!("Total unmatched parts = {}", unmatched);
910
911 let problem = (unpopulated > 0) && (unpopulated != matched_unpopulated);
921 debug!("problem? {}", problem);
922 if problem {
923 return Err(MyError::HTTP {
924 status: Status::BadRequest,
925 info: "Houston, we have a problem".into(),
926 });
927 }
928
929 Ok(statements)
930}
931
932async fn persist_one(
933 conn: &PgPool,
934 c: Headers,
935 statement: &mut Statement,
936 user: &User,
937) -> Result<PutResponse, MyError> {
938 debug!("statement = {}", statement);
939
940 let uuid = statement.id().unwrap();
941 let x = statement_exists(conn, uuid).await?;
942 match x {
943 None => (),
944 Some(_fingerprint) => {
945 if c.has_no_conditionals() {
948 return Err(MyError::HTTP {
949 status: Status::Conflict,
950 info: "Missing pre-condition(s)".into(),
951 });
952 } else {
953 let etag = compute_etag::<Statement>(statement)?;
961 return match eval_preconditions!(&etag, c) {
962 s if s != Status::Ok => Err(MyError::HTTP {
963 status: s,
964 info: "Failed pre-condition(s)".into(),
965 }),
966 _ => Ok(PutResponse {
967 inner: WithETag {
968 inner: Status::NoContent,
969 etag: Header::new(header::ETAG.as_str(), etag.to_string()),
970 },
971 }),
972 };
973 }
974 }
975 }
976
977 ensure_authority(statement, user)?;
987
988 let mut to_void_id = None;
991 if statement.is_verb_voided() {
992 if let Some(target_uuid) = statement.voided_target() {
993 let (found, valid, id) = find_statement_to_void(conn, &target_uuid).await?;
995 if found {
996 if valid {
997 to_void_id = Some(id)
998 } else {
999 return Err(MyError::HTTP {
1000 status: Status::BadRequest,
1001 info: format!("Target of voiding statement ({target_uuid}) is invalid")
1002 .into(),
1003 });
1004 }
1005 }
1006 } else {
1007 return Err(MyError::HTTP {
1008 status: Status::BadRequest,
1009 info: format!("Invalid voiding statement {statement}").into(),
1010 });
1011 }
1012 }
1013
1014 insert_statement(conn, statement).await?;
1015
1016 if let Some(id) = to_void_id {
1018 debug!("About to void Statement #{}", id);
1019 void_statement(conn, id).await?;
1020 info!("Voided Statement #{}", id)
1021 }
1022
1023 let etag = compute_etag::<Statement>(statement)?;
1024 match eval_preconditions!(&etag, c) {
1025 s if s != Status::Ok => Err(MyError::HTTP {
1026 status: s,
1027 info: "Failed pre-condition(s)".into(),
1028 }),
1029 _ => Ok(PutResponse {
1030 inner: WithETag {
1031 inner: Status::NoContent,
1032 etag: Header::new(header::ETAG.as_str(), etag.to_string()),
1033 },
1034 }),
1035 }
1036}
1037
1038async fn persist_many(
1051 conn: &PgPool,
1052 c: Headers,
1053 mut statements: Vec<Statement>,
1054 user: &User,
1055) -> Result<PostResponse, MyError> {
1056 debug!("statements = {:?}", statements);
1057
1058 let mut uuids = vec![];
1061 for s in &mut statements {
1062 let uuid = match s.id() {
1063 Some(x) => *x,
1064 None => {
1065 let id = Uuid::now_v7();
1066 s.set_id(id);
1067 id
1068 }
1069 };
1070 if uuids.contains(&uuid) {
1071 return Err(MyError::HTTP {
1072 status: Status::BadRequest,
1073 info: format!("Found Statements w/ same ID: {uuid}").into(),
1074 });
1075 }
1076
1077 uuids.push(uuid)
1078 }
1079 debug!("uuids (before) = {:?}", uuids);
1080 let mut i = 0;
1084 while i < statements.len() {
1085 let s = &statements[i];
1086 let uuid = s.id().unwrap();
1087 let tmp = statement_exists(conn, uuid).await?;
1088 match tmp {
1089 None => i += 1,
1090 Some(x) => {
1091 let s_uid = s.uid();
1093 if s_uid != x {
1094 return Err(MyError::HTTP {
1095 status: Status::Conflict,
1096 info: format!(
1097 "Already have a Statement w/ same UUID ({uuid}) but different FP. Conflict")
1098 .into(),
1099 });
1100 }
1101 let dup = statements.remove(i);
1102 info!("Drop duplicate {}", dup);
1103 }
1104 }
1105 }
1106 if statements.is_empty() {
1108 return Err(MyError::HTTP {
1109 status: Status::NoContent,
1110 info: "No new Statements left".into(),
1111 });
1112 }
1113
1114 let mut ids_to_void = vec![];
1117 for s in &statements {
1118 if s.is_verb_voided() {
1119 if let Some(target_uuid) = s.voided_target() {
1120 let (found, valid, id) = find_statement_to_void(conn, &target_uuid).await?;
1122 if found {
1123 if valid {
1124 ids_to_void.push(id)
1125 } else {
1126 return Err(MyError::HTTP {
1127 status: Status::BadRequest,
1128 info: format!("Target of voiding statement ({target_uuid}) is invalid")
1129 .into(),
1130 });
1131 }
1132 }
1133 } else {
1134 return Err(MyError::HTTP {
1135 status: Status::BadRequest,
1136 info: format!("Invalid voiding statement {s}").into(),
1137 });
1138 }
1139 }
1140 }
1141 info!("Found {} Statement(s) to void", ids_to_void.len());
1142
1143 uuids.clear();
1145 let n = statements.len();
1146 for mut s in statements {
1147 let uuid = *s.id().unwrap();
1148
1149 ensure_authority(&mut s, user)?;
1159
1160 debug!("Persisting Statement #{} (1 of {})...", uuid, n);
1161 insert_statement(conn, &s).await?;
1162 uuids.push(uuid);
1163 }
1164
1165 for id in ids_to_void {
1167 debug!("About to void Statement #{}", id);
1168 void_statement(conn, id).await?;
1169 info!("Voided Statement #{}", id)
1170 }
1171
1172 let resource = StatementIDs(uuids);
1174 let inner = emit_response!(c, resource => StatementIDs)?;
1175 Ok(PostResponse { inner })
1176}
1177
1178async fn get_one(
1184 conn: &PgPool,
1185 uuid: Uuid,
1186 voided: bool,
1187 format: &Format,
1188) -> Result<StatementType, MyError> {
1189 debug!("uuid = {}", uuid);
1190 debug!("voided? {}", voided);
1191 debug!("format = {}", format);
1192
1193 let x = find_statement_by_uuid(conn, uuid, voided, format).await?;
1194 match x {
1195 Some(x) => Ok(x),
1196 None => Err(MyError::HTTP {
1197 status: Status::NotFound,
1198 info: "Statement not found".into(),
1199 }),
1200 }
1201}
1202
1203async fn get_many(
1204 conn: &PgPool,
1205 filter: Filter,
1206 format: &Format,
1207 with_attachments: bool,
1208) -> Result<StatementType, MyError> {
1209 debug!("filter = {}", filter);
1210 debug!("format = {}", format);
1211
1212 let sid = register_new_filter(conn).await?;
1213 debug!("sid = {}", sid);
1214
1215 let (mut x, y) = find_statements_by_filter(conn, filter, format, sid).await?;
1216 if let Some(pi) = y {
1217 let more = format!(
1218 "statements/more/?sid={}&count={}&offset={}&limit={}&format={}&attachments={}",
1219 sid,
1220 pi.count,
1221 pi.offset,
1222 pi.limit,
1223 format.as_param(),
1224 with_attachments
1225 );
1226 let url = config().to_external_url(&more);
1227 debug!("more URL = '{}'", url);
1228 if let Err(z) = &x.set_more(&url) {
1229 warn!(
1230 "Failed updating `more` URL of StatementResult. Ignore + continue but StatementResult will be inaccurate: {}",
1231 z
1232 );
1233 }
1234 }
1235 Ok(x)
1236}
1237
1238async fn save_statements(res: &StatementType) -> Result<PathBuf, MyError> {
1242 let name = &format!("_{}", BASE64_URL_SAFE_NO_PAD.encode(Uuid::now_v7()));
1243 let path = config().static_dir.join("s").join(name);
1245 let parent = path.parent().unwrap();
1246 DirBuilder::new()
1247 .recursive(true)
1248 .create(parent)
1249 .map_err(MyError::IO)
1250 .await?;
1251
1252 let mut file = File::create(&path).map_err(MyError::IO).await?;
1253 let json = match res {
1254 StatementType::S(x) => serde_json::to_string(x).expect("Failed serializing S to temp file"),
1255 StatementType::SId(x) => {
1256 serde_json::to_string(x).expect("Failed serializing SId to temp file")
1257 }
1258 StatementType::SR(x) => {
1259 serde_json::to_string(x).expect("Failed serializing SR to temp file")
1260 }
1261 StatementType::SRId(x) => {
1262 serde_json::to_string(x).expect("Failed serializing SRId to temp file")
1263 }
1264 };
1265 file.write_all(json.as_bytes()).map_err(MyError::IO).await?;
1266 file.flush().map_err(MyError::IO).await?;
1267 Ok(path)
1268}
1269
1270async fn save_attachment(bytes: Vec<u8>, part: &InPartInfo) -> Result<(), MyError> {
1273 let path = &part.path;
1274 let name = path.to_string_lossy();
1275
1276 if path.exists() {
1278 info!("Attachment {} already exists", name);
1279 return Ok(());
1280 }
1281
1282 let parent = path.parent().unwrap();
1283 DirBuilder::new()
1284 .recursive(true)
1285 .create(parent)
1286 .map_err(MyError::IO)
1287 .await?;
1288
1289 let mut file = File::create(path).map_err(MyError::IO).await?;
1290 file.write_all(&bytes).map_err(MyError::IO).await?;
1291 file.flush().map_err(MyError::IO).await?;
1292 Ok(())
1293}
1294
1295fn consistent_through(timestamp: DateTime<Utc>) -> Header<'static> {
1296 Header::new(
1297 CONSISTENT_THRU_HDR,
1298 timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
1299 )
1300}
1301
1302fn last_modified(timestamp: DateTime<Utc>) -> Header<'static> {
1303 Header::new(
1304 header::LAST_MODIFIED.as_str(),
1305 timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
1306 )
1307}
1308
1309fn ensure_authority(s: &mut Statement, user: &User) -> Result<(), MyError> {
1310 if s.authority().is_none() {
1311 user.can_authorize_statement()?;
1312
1313 s.set_authority_unchecked(Actor::Agent(user.authority()));
1314 }
1315
1316 Ok(())
1317}