Skip to main content

xapi_rs/lrs/
server.rs

1// SPDX-License-Identifier: GPL-3.0-or-later
2
3use crate::{
4    MyError, STATS_EXT_BASE, USERS_EXT_BASE, User, V200, VERBS_EXT_BASE, config,
5    lrs::{CONSISTENT_THRU_HDR, DB, VERSION_HDR, resources, stop_watch::StopWatch},
6};
7use chrono::{DateTime, SecondsFormat, Utc};
8use rocket::{
9    Build, Request, Responder, Rocket, catch, catchers,
10    fairing::AdHoc,
11    form::FromForm,
12    fs::{FileServer, relative},
13    futures::lock::Mutex,
14    http::{Header, Method},
15    response::status,
16    time::{OffsetDateTime, format_description::well_known::Rfc2822},
17};
18use std::{
19    fs,
20    io::ErrorKind,
21    mem,
22    sync::LazyLock,
23    time::{Duration, SystemTime},
24};
25use tracing::{debug, error, info, warn};
26
27/// Error message text we emit when returning 401.
28const MISSING_CREDENTIALS: &str = "Credentials required";
29/// Name of authentication header we send along a 401 response.
30const WWW_AUTHENTICATE: &str = "WWW-Authenticate";
31
32/// Our Response when detecting failing Basic Authentication requests.
33///
34/// The default implementation populates the `WWW-Authenticate` header w/
35/// our realm.
36#[derive(Responder)]
37#[response(status = 401, content_type = "json")]
38struct UnAuthorized {
39    inner: String,
40    realm: Header<'static>,
41}
42
43impl Default for UnAuthorized {
44    fn default() -> Self {
45        Self {
46            inner: MISSING_CREDENTIALS.to_owned(),
47            realm: Header::new(WWW_AUTHENTICATE, "Basic realm=\"LaRS\""),
48        }
49    }
50}
51
52/// Server Singleton of timestamp when this LaRS persistent storage was
53/// likely altered --i.e. received a PUT, POST or DELETE requests.
54static CONSISTENT_THRU: LazyLock<Mutex<DateTime<Utc>>> =
55    LazyLock::new(|| Mutex::new(DateTime::UNIX_EPOCH));
56
57pub(crate) async fn get_consistent_thru() -> DateTime<Utc> {
58    CONSISTENT_THRU.lock().await.to_utc()
59}
60
61pub(crate) async fn set_consistent_thru(now: DateTime<Utc>) {
62    let mut m = CONSISTENT_THRU.lock().await;
63    let was = mem::replace(&mut *m, now);
64    info!("CONSISTENT_THRU changed from {} to {}", was, now);
65}
66
67async fn update_consistent_thru() {
68    set_consistent_thru(Utc::now()).await;
69}
70
71/// Entry point for constructing a Local Rocket and use it for either testing
72/// or not. When `testing` is TRUE a mock DB is injected otherwise it's the
73/// real McKoy.
74pub fn build(testing: bool) -> Rocket<Build> {
75    let figment = rocket::Config::figment();
76    fs::create_dir_all(relative!("static")).expect("Failed creating 'static' dir :(");
77    rocket::custom(figment)
78        .mount("/about", resources::about::routes())
79        .mount("/activities", resources::activities::routes())
80        .mount("/activities/profile", resources::activity_profile::routes())
81        .mount("/activities/state", resources::state::routes())
82        .mount("/agents", resources::agents::routes())
83        .mount("/agents/profile", resources::agent_profile::routes())
84        .mount("/statements", resources::statement::routes())
85        // extensions...
86        .mount(prepend_slash(VERBS_EXT_BASE), resources::verbs::routes())
87        .mount(prepend_slash(STATS_EXT_BASE), resources::stats::routes())
88        .mount(prepend_slash(USERS_EXT_BASE), resources::users::routes())
89        // assets...
90        .mount("/static", FileServer::from(relative!("static")))
91        .attach(DB::fairing(testing))
92        // startup hook
93        .attach(AdHoc::on_liftoff("Liftoff Hook", move |_| {
94            Box::pin(async move {
95                let now: OffsetDateTime = SystemTime::now().into();
96                info!(
97                    "LaRS {} starting up on {:?}",
98                    env!("CARGO_PKG_VERSION"),
99                    now.format(&Rfc2822).unwrap()
100                );
101
102                User::clear_cache().await;
103                info!("Cleared User LRU cache...");
104
105                info!("Starting multipart temp file cleaner...");
106                tokio::spawn(async move {
107                    loop {
108                        tokio::time::sleep(Duration::from_secs(config().mfc_interval)).await;
109                        let tmp = clean_multipart_files();
110                        if let Err(x) = tmp {
111                            warn!("Failed: {}", x);
112                        }
113                    }
114                });
115            })
116        }))
117        // hook to update last-altered singleton...
118        .attach(AdHoc::on_request(
119            "Update consistent-thru timestamp",
120            |req, _| {
121                Box::pin(async move {
122                    if (req.uri().path().starts_with("/statements")
123                        || req.uri().path().starts_with("/activities")
124                        || req.uri().path().starts_with("/agents")
125                        || req.uri().path().starts_with("/extensions"))
126                        && (req.method() == Method::Put || req.method() == Method::Post)
127                    {
128                        update_consistent_thru().await;
129                    }
130                })
131            },
132        ))
133        // hook to add xAPI headers to responses as needed...
134        .attach(AdHoc::on_response("xAPI response headers", |req, resp| {
135            Box::pin(async move {
136                // add xAPI Version header to every response...
137                resp.set_header(Header::new(VERSION_HDR, V200.to_string()));
138
139                // add X-Experience-API-Consistent-Through header if missing in
140                // `/statements` responses...
141                if req.uri().path().ends_with("statements")
142                    && !resp.headers().contains(CONSISTENT_THRU_HDR)
143                {
144                    let val = get_consistent_thru()
145                        .await
146                        .to_rfc3339_opts(SecondsFormat::Millis, true);
147                    debug!("Added XCT header as {}", val);
148                    resp.set_header(Header::new(CONSISTENT_THRU_HDR, val));
149                }
150            })
151        }))
152        .attach(AdHoc::on_shutdown("Shutdown Hook", |_| {
153            Box::pin(async move {
154                info!("Removing multipart temp file folder...");
155                let s_dir = config().static_dir.join("s");
156                let _ = fs::remove_dir_all(s_dir);
157
158                let now: OffsetDateTime = SystemTime::now().into();
159                info!(
160                    "LaRS {} shutting down on {:?}",
161                    env!("CARGO_PKG_VERSION"),
162                    now.format(&Rfc2822).unwrap()
163                );
164            })
165        }))
166        .attach(resources::stats::StatsFairing)
167        .attach(StopWatch)
168        // wire the catchers...
169        .register(
170            "/",
171            catchers![bad_request, unauthorized, not_found, unknown_route],
172        )
173}
174
175fn prepend_slash(p: &str) -> String {
176    let mut result = String::with_capacity(p.len() + 1);
177    result.push('/');
178    result.push_str(p);
179    result
180}
181
182/// Capture a Query Parameter named `name` of type `T` as an Option\<T\>.
183/// Return `None` if the parameter is absent or an error was raised while
184/// processing it; e.g. data limit exceeded, etc... Note that in case of
185/// errors, a message is also logged to output.
186pub(crate) fn qp<'r, T: FromForm<'r>>(req: &'r Request<'_>, name: &str) -> Option<T> {
187    match req.query_value::<T>(name) {
188        Some(Ok(x)) => Some(x),
189        Some(Err(x)) => {
190            error!("Failed processing query parameter '{}': {}", name, x);
191            None
192        }
193        None => None,
194    }
195}
196
197#[catch(400)]
198fn bad_request(req: &Request) -> &'static str {
199    error!("----- 400 -----");
200    debug!("req = {:?}", req);
201    "400 - Bad request :("
202}
203
204#[catch(401)]
205async fn unauthorized() -> UnAuthorized {
206    debug!("----- 401 -----");
207    UnAuthorized::default()
208}
209
210#[catch(404)]
211fn not_found(req: &Request) -> &'static str {
212    error!("----- 404 -----");
213    debug!("req = {:?}", req);
214    "404 - Resource not found :("
215}
216
217#[catch(422)]
218fn unknown_route(req: &Request) -> status::BadRequest<String> {
219    error!("----- 422 -----");
220    debug!("req = {:?}", req);
221    status::BadRequest(req.uri().to_string())
222}
223
224fn clean_multipart_files() -> Result<(), MyError> {
225    let s_dir = config().static_dir.join("s");
226    match fs::read_dir(s_dir) {
227        Ok(objects) => {
228            for obj in objects {
229                let obj = obj?;
230                let md = obj.metadata()?;
231                if md.is_file() {
232                    if let Ok(created) = md.created() {
233                        match created.elapsed() {
234                            Ok(elapsed) => {
235                                if elapsed > Duration::new(config().mfc_interval, 0) {
236                                    debug!("About to delete {:?}", obj.path());
237                                    fs::remove_file(obj.path())?;
238                                }
239                            }
240                            Err(x) => warn!(
241                                "Failed computing elapsed time since object's creation: {}",
242                                x
243                            ),
244                        }
245                    } else {
246                        warn!("Unable to access file system object's creattion timestamp :(")
247                    }
248                }
249            }
250        }
251        Err(x) => {
252            if x.kind() != ErrorKind::NotFound {
253                return Err(MyError::IO(x));
254            }
255        }
256    }
257    Ok(())
258}