1#![allow(non_snake_case)]
4#![allow(clippy::too_many_arguments)]
5
6use crate::{
29 MyError,
30 db::state::{
31 SingleResourceParams, as_many, as_single, find, find_ids, remove, remove_many, upsert,
32 },
33 eval_preconditions,
34 lrs::{
35 DB, User, emit_doc_response, etag_from_str,
36 headers::Headers,
37 no_content,
38 resources::{WithDocumentOrIDs, WithETag},
39 },
40};
41use rocket::{State, delete, futures::TryFutureExt, get, http::Status, post, put, routes};
42use serde_json::{Map, Value};
43use sqlx::{
44 PgPool,
45 types::chrono::{DateTime, Utc},
46};
47use std::mem;
48use tracing::{debug, info};
49use xapi_data::DataError;
50
51#[doc(hidden)]
52pub fn routes() -> Vec<rocket::Route> {
53 routes![put, post, get, delete]
54}
55
56#[put("/?<activityId>&<agent>&<registration>&<stateId>", data = "<doc>")]
59async fn put(
60 c: Headers,
61 activityId: &str,
62 agent: &str,
63 registration: Option<&str>,
64 stateId: &str,
65 doc: &str,
66 db: &State<DB>,
67 user: User,
68) -> Result<WithETag, MyError> {
69 debug!("----- put ----- {}", user);
70 user.can_use_xapi()?;
71
72 if doc.is_empty() {
73 return Err(MyError::HTTP {
74 status: Status::BadRequest,
75 info: "Document must NOT be an empty string".into(),
76 });
77 }
78
79 if c.is_json_content() {
81 serde_json::from_str::<Map<String, Value>>(doc)
82 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
83 }
84
85 let conn = db.pool();
86 let s = as_single(conn, activityId, agent, registration, stateId)
87 .map_err(|x| x.with_status(Status::BadRequest))
88 .await?;
89 debug!("s = {:?}", s);
90 let (x, _) = find(conn, &s).await?;
93 match x {
94 None => {
95 upsert(conn, &s, doc).await?;
97 let etag = etag_from_str(doc);
98 Ok(no_content(&etag))
99 }
100 Some(old_doc) => {
101 if c.has_no_conditionals() {
102 Err(MyError::HTTP {
103 status: Status::Conflict,
104 info: "PUT a known resource, w/ no pre-conditions, is NOT allowed".into(),
105 })
106 } else {
107 let etag = etag_from_str(&old_doc);
109 debug!("etag (old) = {}", etag);
110 match eval_preconditions!(&etag, c) {
111 s if s != Status::Ok => Err(MyError::HTTP {
112 status: s,
113 info: "Failed pre-condition(s)".into(),
114 }),
115 _ => {
116 if old_doc == doc {
118 info!("Old + new State documents are identidal");
119 Ok(no_content(&etag))
120 } else {
121 upsert(conn, &s, doc).await?;
122 let etag = etag_from_str(doc);
123 Ok(no_content(&etag))
124 }
125 }
126 }
127 }
128 }
129 }
130}
131
132#[post("/?<activityId>&<agent>&<registration>&<stateId>", data = "<doc>")]
135async fn post(
136 c: Headers,
137 activityId: &str,
138 agent: &str,
139 registration: Option<&str>,
140 stateId: &str,
141 doc: &str,
142 db: &State<DB>,
143 user: User,
144) -> Result<WithETag, MyError> {
145 debug!("----- post ----- {}", user);
146 user.can_use_xapi()?;
147
148 if doc.is_empty() {
149 return Err(MyError::HTTP {
150 status: Status::BadRequest,
151 info: "Document must NOT be an empty string".into(),
152 });
153 }
154
155 if c.is_json_content() {
157 serde_json::from_str::<Map<String, Value>>(doc)
158 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
159 }
160
161 let conn = db.pool();
162 let s = as_single(conn, activityId, agent, registration, stateId)
163 .map_err(|x| x.with_status(Status::BadRequest))
164 .await?;
165 debug!("s = {:?}", s);
166 let (x, _) = find(conn, &s).await?;
167 match x {
168 None => {
169 upsert(conn, &s, doc).await?;
171 let etag = etag_from_str(doc);
172 Ok(no_content(&etag))
173 }
174 Some(old_doc) => {
175 let etag = etag_from_str(&old_doc);
176 debug!("etag (old) = {}", etag);
177 if c.has_conditionals() {
178 match eval_preconditions!(&etag, c) {
179 s if s != Status::Ok => {
180 return Err(MyError::HTTP {
181 status: s,
182 info: "Failed pre-condition(s)".into(),
183 });
184 }
185 _ => (),
186 }
187 }
188
189 let mut old: Map<String, Value> = serde_json::from_str(&old_doc)
191 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
192
193 let mut new: Map<String, Value> = serde_json::from_str(doc)
194 .map_err(|x| MyError::Data(DataError::JSON(x)).with_status(Status::BadRequest))?;
195
196 if old == new {
198 info!("Old + new State documents are identical");
199 return Ok(no_content(&etag));
200 }
201
202 debug!("document (before) = '{}'", old_doc);
204 for (k, v) in new.iter_mut() {
205 let new_v = mem::take(v);
206 old.insert(k.to_owned(), new_v);
207 }
208 let merged = serde_json::to_string(&old).expect("Failed serialize merged document");
210 debug!("document ( after) = '{}'", merged);
211
212 upsert(conn, &s, &merged).await?;
213 let etag = etag_from_str(&merged);
214 Ok(no_content(&etag))
215 }
216 }
217}
218
219#[get("/?<activityId>&<agent>&<registration>&<stateId>&<since>")]
220async fn get(
221 activityId: &str,
222 agent: &str,
223 registration: Option<&str>,
224 stateId: Option<&str>,
225 since: Option<&str>,
226 db: &State<DB>,
227 user: User,
228) -> Result<WithDocumentOrIDs, MyError> {
229 debug!("----- get ----- {}", user);
230 user.can_use_xapi()?;
231
232 let conn = db.pool();
233 let resource = if let Some(z_state_id) = stateId {
234 if since.is_some() {
235 return Err(MyError::HTTP {
236 status: Status::BadRequest,
237 info: "Either `stateId` or `since` should be specified; not both".into(),
238 });
239 }
240
241 let s = as_single(conn, activityId, agent, registration, z_state_id)
242 .map_err(|x| x.with_status(Status::BadRequest))
243 .await?;
244 debug!("s = {:?}", s);
245 let res = get_state(conn, &s).await?;
246 (res.0, Some(res.1))
247 } else {
248 let s = as_many(conn, activityId, agent, registration, since)
249 .map_err(|x| x.with_status(Status::BadRequest))
250 .await?;
251 debug!("s = {:?}", s);
252 let x = find_ids(conn, &s).await?;
253 (serde_json::to_string(&x).unwrap(), None)
254 };
255
256 emit_doc_response(resource.0, resource.1).await
257}
258
259#[delete("/?<activityId>&<agent>&<registration>&<stateId>")]
260async fn delete(
261 c: Headers,
262 activityId: &str,
263 agent: &str,
264 registration: Option<&str>,
265 stateId: Option<&str>,
266 db: &State<DB>,
267 user: User,
268) -> Result<Status, MyError> {
269 debug!("----- delete ----- {}", user);
270 user.can_use_xapi()?;
271
272 let conn = db.pool();
273 if let Some(sid) = stateId {
274 delete_one(conn, c, activityId, agent, registration, sid).await
275 } else {
276 delete_many(conn, activityId, agent, registration).await
277 }
278}
279
280async fn get_state(
281 conn: &PgPool,
282 s: &SingleResourceParams<'_>,
283) -> Result<(String, DateTime<Utc>), MyError> {
284 let (x, updated) = find(conn, s).await?;
285 match x {
286 None => Err(MyError::HTTP {
287 status: Status::NotFound,
288 info: format!("State ({s}) not found").into(),
289 }),
290 Some(y) => Ok((y, updated)),
291 }
292}
293
294async fn delete_one(
295 conn: &PgPool,
296 c: Headers,
297 activity_iri: &str,
298 agent: &str,
299 registration: Option<&str>,
300 state_id: &str,
301) -> Result<Status, MyError> {
302 let s = as_single(conn, activity_iri, agent, registration, state_id)
303 .map_err(|x| x.with_status(Status::BadRequest))
304 .await?;
305 debug!("s = {:?}", s);
306 let (doc, _) = get_state(conn, &s).await?;
307 let etag = etag_from_str(&doc);
308 match eval_preconditions!(&etag, c) {
309 s if s != Status::Ok => Err(MyError::HTTP {
310 status: s,
311 info: "Failed pre-condition(s)".into(),
312 }),
313 _ => {
314 remove(conn, &s).await?;
315 Ok(Status::NoContent)
316 }
317 }
318}
319
320async fn delete_many(
321 conn: &PgPool,
322 activity_iri: &str,
323 agent: &str,
324 registration: Option<&str>,
325) -> Result<Status, MyError> {
326 let s = as_single(conn, activity_iri, agent, registration, "")
327 .map_err(|x| x.with_status(Status::BadRequest))
328 .await?;
329 debug!("s = {:?}", s);
330 remove_many(conn, &s).await?;
331 Ok(Status::NoContent)
332}