cooklang_sync_server/metadata/
mod.rs

1use rocket::fairing::AdHoc;
2use rocket::form::Form;
3use rocket::response::Debug;
4use rocket::serde::json::Json;
5use rocket::{Shutdown, State};
6
7use std::sync::Mutex;
8use std::time::Duration;
9
10use crate::auth::user::User;
11
12mod db;
13mod middleware;
14mod models;
15mod notification;
16mod request;
17mod response;
18mod schema;
19
20use db::{insert_new_record, list as db_list, Db};
21use models::{FileRecord, NewFileRecord};
22
23use notification::{ActiveClients, Client};
24
25type Result<T, E = Debug<diesel::result::Error>> = std::result::Result<T, E>;
26
27// check if all hashes are present
28// if any not present return back need more and list of hashes
29// if present all insert into db path and chunk hashes and return back a new jid
30#[post("/commit?<uuid>", data = "<commit_payload>")]
31async fn commit(
32    user: User,
33    clients: &State<Mutex<ActiveClients>>,
34    db: Db,
35    uuid: String,
36    commit_payload: Form<request::CommitPayload<'_>>,
37) -> Result<Json<response::CommitResultStatus>> {
38    let to_be_uploaded = commit_payload.non_local_chunks();
39
40    match to_be_uploaded.is_empty() {
41        true => {
42            let r = NewFileRecord::from_payload_and_user_id(commit_payload, user.id);
43            let id: i32 = db.run(move |conn| insert_new_record(conn, r)).await?;
44
45            clients.lock().unwrap().notify(uuid);
46
47            Ok(Json(response::CommitResultStatus::Success(id)))
48        }
49        false => {
50            let to_be_uploaded_strings: Vec<String> = to_be_uploaded
51                .iter()
52                .map(|chunk_id| chunk_id.0.to_string())
53                .collect();
54
55            Ok(Json(response::CommitResultStatus::NeedChunks(
56                to_be_uploaded_strings.join(","),
57            )))
58        }
59    }
60}
61
62// return back array of jid, path, hashes for all jid since requested
63#[get("/list?<jid>")]
64async fn list(db: Db, user: User, jid: i32) -> Result<Json<Vec<FileRecord>>> {
65    let records = db.run(move |conn| db_list(conn, user.id, jid)).await?;
66
67    Ok(Json(records))
68}
69
70#[get("/poll?<seconds>&<uuid>")]
71async fn poll(
72    _user: User,
73    clients: &State<Mutex<ActiveClients>>,
74    uuid: String,
75    seconds: u64,
76    shutdown: Shutdown,
77) -> Result<()> {
78    let notification = {
79        let mut data = clients.lock().unwrap();
80
81        let client = Client::new(uuid);
82
83        match data.clients.get(&client) {
84            Some(c) => c.notification.clone(),
85            None => {
86                let notification = client.notification.clone();
87                data.clients.insert(client);
88                notification
89            }
90        }
91    };
92
93    let timeout = tokio::time::timeout(Duration::from_secs(seconds), notification.notified());
94
95    tokio::select! {
96        _ = shutdown => Ok(()),
97        _ = timeout => Ok(()),
98    }
99}
100
101pub fn stage() -> AdHoc {
102    AdHoc::on_ignite("Diesel DB Stage", |rocket| async {
103        let clients = notification::init();
104
105        rocket
106            .attach(Db::fairing())
107            .attach(AdHoc::on_ignite(
108                "Diesel Migrations",
109                middleware::run_migrations,
110            ))
111            .mount("/metadata", routes![commit, list, poll])
112            .manage(clients)
113    })
114}