1#![allow(non_snake_case)]
4#![allow(clippy::too_many_arguments)]
5
6use crate::{
29 DataError, MyError,
30 db::state::{
31 SingleResourceParams, as_many, as_single, find, find_ids, remove, remove_many, upsert,
32 },
33 eval_preconditions,
34 lrs::{
35 DB, User, emit_doc_response, etag_from_str,
36 headers::Headers,
37 no_content,
38 resources::{WithDocumentOrIDs, WithETag},
39 },
40};
41use rocket::{State, delete, futures::TryFutureExt, get, http::Status, post, put, routes};
42use serde_json::{Map, Value};
43use sqlx::{
44 PgPool,
45 types::chrono::{DateTime, Utc},
46};
47use std::mem;
48use tracing::{debug, info};
49
50#[doc(hidden)]
51pub fn routes() -> Vec<rocket::Route> {
52 routes![put, post, get, delete]
53}
54
55#[put("/?<activityId>&<agent>&<registration>&<stateId>", data = "<doc>")]
58async fn put(
59 c: Headers,
60 activityId: &str,
61 agent: &str,
62 registration: Option<&str>,
63 stateId: &str,
64 doc: &str,
65 db: &State<DB>,
66 user: User,
67) -> Result<WithETag, MyError> {
68 debug!("----- put ----- {}", user);
69 user.can_use_xapi()?;
70
71 if doc.is_empty() {
72 return Err(MyError::HTTP {
73 status: Status::BadRequest,
74 info: "Document must NOT be an empty string".into(),
75 });
76 }
77
78 if c.is_json_content() {
80 serde_json::from_str::<Map<String, Value>>(doc)
81 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
82 }
83
84 let conn = db.pool();
85 let s = as_single(conn, activityId, agent, registration, stateId)
86 .map_err(|x| x.with_status(Status::BadRequest))
87 .await?;
88 debug!("s = {:?}", s);
89 let (x, _) = find(conn, &s).await?;
92 match x {
93 None => {
94 upsert(conn, &s, doc).await?;
96 let etag = etag_from_str(doc);
97 Ok(no_content(&etag))
98 }
99 Some(old_doc) => {
100 if c.has_no_conditionals() {
101 Err(MyError::HTTP {
102 status: Status::Conflict,
103 info: "PUT a known resource, w/ no pre-conditions, is NOT allowed".into(),
104 })
105 } else {
106 let etag = etag_from_str(&old_doc);
108 debug!("etag (old) = {}", etag);
109 match eval_preconditions!(&etag, c) {
110 s if s != Status::Ok => Err(MyError::HTTP {
111 status: s,
112 info: "Failed pre-condition(s)".into(),
113 }),
114 _ => {
115 if old_doc == doc {
117 info!("Old + new State documents are identidal");
118 Ok(no_content(&etag))
119 } else {
120 upsert(conn, &s, doc).await?;
121 let etag = etag_from_str(doc);
122 Ok(no_content(&etag))
123 }
124 }
125 }
126 }
127 }
128 }
129}
130
131#[post("/?<activityId>&<agent>&<registration>&<stateId>", data = "<doc>")]
134async fn post(
135 c: Headers,
136 activityId: &str,
137 agent: &str,
138 registration: Option<&str>,
139 stateId: &str,
140 doc: &str,
141 db: &State<DB>,
142 user: User,
143) -> Result<WithETag, MyError> {
144 debug!("----- post ----- {}", user);
145 user.can_use_xapi()?;
146
147 if doc.is_empty() {
148 return Err(MyError::HTTP {
149 status: Status::BadRequest,
150 info: "Document must NOT be an empty string".into(),
151 });
152 }
153
154 if c.is_json_content() {
156 serde_json::from_str::<Map<String, Value>>(doc)
157 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
158 }
159
160 let conn = db.pool();
161 let s = as_single(conn, activityId, agent, registration, stateId)
162 .map_err(|x| x.with_status(Status::BadRequest))
163 .await?;
164 debug!("s = {:?}", s);
165 let (x, _) = find(conn, &s).await?;
166 match x {
167 None => {
168 upsert(conn, &s, doc).await?;
170 let etag = etag_from_str(doc);
171 Ok(no_content(&etag))
172 }
173 Some(old_doc) => {
174 let etag = etag_from_str(&old_doc);
175 debug!("etag (old) = {}", etag);
176 if c.has_conditionals() {
177 match eval_preconditions!(&etag, c) {
178 s if s != Status::Ok => {
179 return Err(MyError::HTTP {
180 status: s,
181 info: "Failed pre-condition(s)".into(),
182 });
183 }
184 _ => (),
185 }
186 }
187
188 let mut old: Map<String, Value> = serde_json::from_str(&old_doc)
190 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
191
192 let mut new: Map<String, Value> = serde_json::from_str(doc)
193 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
194
195 if old == new {
197 info!("Old + new State documents are identical");
198 return Ok(no_content(&etag));
199 }
200
201 debug!("document (before) = '{}'", old_doc);
203 for (k, v) in new.iter_mut() {
204 let new_v = mem::take(v);
205 old.insert(k.to_owned(), new_v);
206 }
207 let merged = serde_json::to_string(&old).expect("Failed serialize merged document");
209 debug!("document ( after) = '{}'", merged);
210
211 upsert(conn, &s, &merged).await?;
212 let etag = etag_from_str(&merged);
213 Ok(no_content(&etag))
214 }
215 }
216}
217
218#[get("/?<activityId>&<agent>&<registration>&<stateId>&<since>")]
219async fn get(
220 activityId: &str,
221 agent: &str,
222 registration: Option<&str>,
223 stateId: Option<&str>,
224 since: Option<&str>,
225 db: &State<DB>,
226 user: User,
227) -> Result<WithDocumentOrIDs, MyError> {
228 debug!("----- get ----- {}", user);
229 user.can_use_xapi()?;
230
231 let conn = db.pool();
232 let resource = if stateId.is_some() {
233 if since.is_some() {
234 return Err(MyError::HTTP {
235 status: Status::BadRequest,
236 info: "Either `stateId` or `since` should be specified; not both".into(),
237 });
238 }
239
240 let s = as_single(conn, activityId, agent, registration, stateId.unwrap())
241 .map_err(|x| x.with_status(Status::BadRequest))
242 .await?;
243 debug!("s = {:?}", s);
244 let res = get_state(conn, &s).await?;
245 (res.0, Some(res.1))
246 } else {
247 let s = as_many(conn, activityId, agent, registration, since)
248 .map_err(|x| x.with_status(Status::BadRequest))
249 .await?;
250 debug!("s = {:?}", s);
251 let x = find_ids(conn, &s).await?;
252 (serde_json::to_string(&x).unwrap(), None)
253 };
254
255 emit_doc_response(resource.0, resource.1).await
256}
257
258#[delete("/?<activityId>&<agent>&<registration>&<stateId>")]
259async fn delete(
260 c: Headers,
261 activityId: &str,
262 agent: &str,
263 registration: Option<&str>,
264 stateId: Option<&str>,
265 db: &State<DB>,
266 user: User,
267) -> Result<Status, MyError> {
268 debug!("----- delete ----- {}", user);
269 user.can_use_xapi()?;
270
271 let conn = db.pool();
272 if let Some(sid) = stateId {
273 delete_one(conn, c, activityId, agent, registration, sid).await
274 } else {
275 delete_many(conn, activityId, agent, registration).await
276 }
277}
278
279async fn get_state(
280 conn: &PgPool,
281 s: &SingleResourceParams<'_>,
282) -> Result<(String, DateTime<Utc>), MyError> {
283 let (x, updated) = find(conn, s).await?;
284 match x {
285 None => Err(MyError::HTTP {
286 status: Status::NotFound,
287 info: format!("State ({s}) not found").into(),
288 }),
289 Some(y) => Ok((y, updated)),
290 }
291}
292
293async fn delete_one(
294 conn: &PgPool,
295 c: Headers,
296 activity_iri: &str,
297 agent: &str,
298 registration: Option<&str>,
299 state_id: &str,
300) -> Result<Status, MyError> {
301 let s = as_single(conn, activity_iri, agent, registration, state_id)
302 .map_err(|x| x.with_status(Status::BadRequest))
303 .await?;
304 debug!("s = {:?}", s);
305 let (doc, _) = get_state(conn, &s).await?;
306 let etag = etag_from_str(&doc);
307 match eval_preconditions!(&etag, c) {
308 s if s != Status::Ok => Err(MyError::HTTP {
309 status: s,
310 info: "Failed pre-condition(s)".into(),
311 }),
312 _ => {
313 remove(conn, &s).await?;
314 Ok(Status::NoContent)
315 }
316 }
317}
318
319async fn delete_many(
320 conn: &PgPool,
321 activity_iri: &str,
322 agent: &str,
323 registration: Option<&str>,
324) -> Result<Status, MyError> {
325 let s = as_single(conn, activity_iri, agent, registration, "")
326 .map_err(|x| x.with_status(Status::BadRequest))
327 .await?;
328 debug!("s = {:?}", s);
329 remove_many(conn, &s).await?;
330 Ok(Status::NoContent)
331}