cooklang_sync_server/metadata/
mod.rs1use rocket::fairing::AdHoc;
2use rocket::form::Form;
3use rocket::response::Debug;
4use rocket::serde::json::Json;
5use rocket::{Shutdown, State};
6
7use std::sync::Mutex;
8use std::time::Duration;
9
10use crate::auth::user::User;
11
12mod db;
13mod middleware;
14mod models;
15mod notification;
16mod request;
17mod response;
18mod schema;
19
20use db::{insert_new_record, list as db_list, Db};
21use models::{FileRecord, NewFileRecord};
22
23use notification::{ActiveClients, Client};
24
25type Result<T, E = Debug<diesel::result::Error>> = std::result::Result<T, E>;
26
27#[post("/commit?<uuid>", data = "<commit_payload>")]
31async fn commit(
32 user: User,
33 clients: &State<Mutex<ActiveClients>>,
34 db: Db,
35 uuid: String,
36 commit_payload: Form<request::CommitPayload<'_>>,
37) -> Result<Json<response::CommitResultStatus>> {
38 let to_be_uploaded = commit_payload.non_local_chunks();
39
40 match to_be_uploaded.is_empty() {
41 true => {
42 let r = NewFileRecord::from_payload_and_user_id(commit_payload, user.id);
43 let id: i32 = db.run(move |conn| insert_new_record(conn, r)).await?;
44
45 clients.lock().unwrap().notify(uuid);
46
47 Ok(Json(response::CommitResultStatus::Success(id)))
48 }
49 false => {
50 let to_be_uploaded_strings: Vec<String> = to_be_uploaded
51 .iter()
52 .map(|chunk_id| chunk_id.0.to_string())
53 .collect();
54
55 Ok(Json(response::CommitResultStatus::NeedChunks(
56 to_be_uploaded_strings.join(","),
57 )))
58 }
59 }
60}
61
62#[get("/list?<jid>")]
64async fn list(db: Db, user: User, jid: i32) -> Result<Json<Vec<FileRecord>>> {
65 let records = db.run(move |conn| db_list(conn, user.id, jid)).await?;
66
67 Ok(Json(records))
68}
69
70#[get("/poll?<seconds>&<uuid>")]
71async fn poll(
72 _user: User,
73 clients: &State<Mutex<ActiveClients>>,
74 uuid: String,
75 seconds: u64,
76 shutdown: Shutdown,
77) -> Result<()> {
78 let notification = {
79 let mut data = clients.lock().unwrap();
80
81 let client = Client::new(uuid);
82
83 match data.clients.get(&client) {
84 Some(c) => c.notification.clone(),
85 None => {
86 let notification = client.notification.clone();
87 data.clients.insert(client);
88 notification
89 }
90 }
91 };
92
93 let timeout = tokio::time::timeout(Duration::from_secs(seconds), notification.notified());
94
95 tokio::select! {
96 _ = shutdown => Ok(()),
97 _ = timeout => Ok(()),
98 }
99}
100
101pub fn stage() -> AdHoc {
102 AdHoc::on_ignite("Diesel DB Stage", |rocket| async {
103 let clients = notification::init();
104
105 rocket
106 .attach(Db::fairing())
107 .attach(AdHoc::on_ignite(
108 "Diesel Migrations",
109 middleware::run_migrations,
110 ))
111 .mount("/metadata", routes![commit, list, poll])
112 .manage(clients)
113 })
114}