buckets_core/database/
uploads.rs1use crate::{
2 DataManager, auto_method,
3 model::{MediaUpload, UploadMetadata, result::*},
4};
5use oiseau::{PostgresRow, cache::Cache, execute, get, params, query_row, query_rows};
6
7impl DataManager {
8 pub(crate) fn get_upload_from_row(x: &PostgresRow) -> MediaUpload {
10 MediaUpload {
11 id: get!(x->0(i64)) as usize,
12 created: get!(x->1(i64)) as usize,
13 owner: get!(x->2(i64)) as usize,
14 bucket: get!(x->3(String)),
15 metadata: serde_json::from_str(&get!(x->4(String))).unwrap(),
16 }
17 }
18
19 auto_method!(get_upload_by_id(usize as i64)@get_upload_from_row -> "SELECT * FROM uploads WHERE id = $1" --name="upload" --returns=MediaUpload --cache-key-tmpl="atto.upload:{}");
20
21 pub async fn get_upload_by_id_bucket(&self, id: usize, bucket: &str) -> Result<MediaUpload> {
23 if bucket.is_empty() {
24 return self.get_upload_by_id(id).await;
25 }
26
27 let conn = match self.0.connect().await {
28 Ok(c) => c,
29 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
30 };
31
32 let res = query_row!(
33 &conn,
34 "SELECT * FROM uploads WHERE id = $1 AND bucket = $2",
35 &[&(id as i64), &bucket],
36 |x| { Ok(Self::get_upload_from_row(x)) }
37 );
38
39 if res.is_err() {
40 return Err(Error::GeneralNotFound("upload".to_string()));
41 }
42
43 Ok(res.unwrap())
44 }
45
46 pub async fn get_uploads(&self, batch: usize, page: usize) -> Result<Vec<MediaUpload>> {
52 let conn = match self.0.connect().await {
53 Ok(c) => c,
54 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
55 };
56
57 let res = query_rows!(
58 &conn,
59 "SELECT * FROM uploads ORDER BY created DESC LIMIT $1 OFFSET $2",
60 &[&(batch as i64), &((page * batch) as i64)],
61 |x| { Self::get_upload_from_row(x) }
62 );
63
64 if res.is_err() {
65 return Err(Error::GeneralNotFound("upload".to_string()));
66 }
67
68 Ok(res.unwrap())
69 }
70
71 pub async fn get_uploads_by_owner(
78 &self,
79 owner: usize,
80 batch: usize,
81 page: usize,
82 ) -> Result<Vec<MediaUpload>> {
83 let conn = match self.0.connect().await {
84 Ok(c) => c,
85 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
86 };
87
88 let res = query_rows!(
89 &conn,
90 "SELECT * FROM uploads WHERE owner = $1 ORDER BY created DESC LIMIT $2 OFFSET $3",
91 &[&(owner as i64), &(batch as i64), &((page * batch) as i64)],
92 |x| { Self::get_upload_from_row(x) }
93 );
94
95 if res.is_err() {
96 return Err(Error::GeneralNotFound("upload".to_string()));
97 }
98
99 Ok(res.unwrap())
100 }
101
102 pub async fn get_uploads_by_owner_all(&self, owner: usize) -> Result<Vec<MediaUpload>> {
107 let conn = match self.0.connect().await {
108 Ok(c) => c,
109 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
110 };
111
112 let res = query_rows!(
113 &conn,
114 "SELECT * FROM uploads WHERE owner = $1 ORDER BY created DESC",
115 &[&(owner as i64)],
116 |x| { Self::get_upload_from_row(x) }
117 );
118
119 if res.is_err() {
120 return Err(Error::GeneralNotFound("upload".to_string()));
121 }
122
123 Ok(res.unwrap())
124 }
125
126 pub async fn create_upload(&self, data: MediaUpload) -> Result<MediaUpload> {
134 let conn = match self.0.connect().await {
135 Ok(c) => c,
136 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
137 };
138
139 data.metadata.validate_kv()?;
140
141 let res = execute!(
142 &conn,
143 "INSERT INTO uploads VALUES ($1, $2, $3, $4, $5)",
144 params![
145 &(data.id as i64),
146 &(data.created as i64),
147 &(data.owner as i64),
148 &data.bucket,
149 &serde_json::to_string(&data.metadata).unwrap().as_str(),
150 ]
151 );
152
153 if let Err(e) = res {
154 return Err(Error::DatabaseError(e.to_string()));
155 }
156
157 Ok(data)
159 }
160
161 pub async fn delete_upload(&self, id: usize) -> Result<()> {
167 let upload = self.get_upload_by_id(id).await?;
177 upload.remove(&self.0.0.directory)?;
178
179 let conn = match self.0.connect().await {
181 Ok(c) => c,
182 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
183 };
184
185 let res = execute!(&conn, "DELETE FROM uploads WHERE id = $1", &[&(id as i64)]);
186
187 if let Err(e) = res {
188 return Err(Error::DatabaseError(e.to_string()));
189 }
190
191 self.0.1.remove(format!("atto.upload:{}", id)).await;
192
193 Ok(())
195 }
196
197 pub async fn delete_upload_with_bucket(&self, id: usize, bucket: &str) -> Result<()> {
199 if bucket.is_empty() {
203 return self.delete_upload(id).await;
204 }
205
206 let upload = self.get_upload_by_id_bucket(id, bucket).await?;
212 upload.remove(&self.0.0.directory)?;
213
214 let conn = match self.0.connect().await {
216 Ok(c) => c,
217 Err(e) => return Err(Error::DatabaseConnection(e.to_string())),
218 };
219
220 let res = execute!(
221 &conn,
222 "DELETE FROM uploads WHERE id = $1 AND bucket = $2",
223 &[&(id as i64), &bucket]
224 );
225
226 if let Err(e) = res {
227 return Err(Error::DatabaseError(e.to_string()));
228 }
229
230 self.0.1.remove(format!("atto.upload:{}", id)).await;
231
232 Ok(())
234 }
235
236 auto_method!(update_upload_metadata(UploadMetadata) -> "UPDATE uploads SET metadata = $1 WHERE id = $2" --serde --cache-key-tmpl="atto.upload:{}");
237}