1use std::path::{Path};
2use std::str::FromStr;
3use cyfs_base::*;
4use crate::{DecInfo, sql_query, SqlConnection, SqlPool, TaskCategory, TaskId, TaskStatus, TaskType, SqlRow, RawSqlPool};
5
6#[async_trait::async_trait]
7pub trait TaskStore: Send + Sync {
8 async fn save_task(&self, task_id: &TaskId, task_status: TaskStatus, task_data: Vec<u8>) -> BuckyResult<()>;
9 async fn save_task_status(&self, task_id: &TaskId, task_status: TaskStatus) -> BuckyResult<()>;
10 async fn save_task_data(&self, task_id: &TaskId, task_data: Vec<u8>) -> BuckyResult<()>;
11}
12
13pub struct QueryTaskParams {
14 source: DeviceId,
15 dec_id: Option<ObjectId>,
16}
17#[async_trait::async_trait]
18pub trait TaskManagerStore: Send + Sync {
19 async fn add_task(&self, task_id: &TaskId, category: TaskCategory, task_type: TaskType, task_status: TaskStatus, dec_list: Vec<DecInfo>, task_params: Vec<u8>) -> BuckyResult<()>;
20 async fn get_task(&self, task_id: &TaskId) -> BuckyResult<(TaskCategory, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>;
21 async fn get_tasks_by_status(&self, status: TaskStatus) -> BuckyResult<Vec<(TaskId, TaskType, Vec<u8>, Vec<u8>)>>;
22 async fn get_tasks_by_category(&self, category: TaskCategory) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>>;
23 async fn get_tasks_by_task_id(&self, task_id_list: &[TaskId]) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>>;
24 async fn get_tasks(&self, source: &DeviceId, dec_id: &ObjectId, category: TaskCategory, task_status: TaskStatus, range: Option<(u64, u32)>) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>>;
25 async fn get_dec_list(&self, task_id: &TaskId) -> BuckyResult<Vec<DecInfo>>;
26 async fn add_dec_info(&self, task_id: &TaskId, category: TaskCategory, task_status: TaskStatus, dec_info: &DecInfo) -> BuckyResult<()>;
27 async fn delete_dec_info(&self, task_id: &TaskId, dec_id: &ObjectId, source: &DeviceId) -> BuckyResult<()>;
28 async fn delete_task(&self, task_id: &TaskId) -> BuckyResult<()>;
29}
30
31pub struct SQLiteTaskStore {
32 pool: SqlPool
33}
34
35impl From<RawSqlPool> for SQLiteTaskStore {
36 fn from(pool: RawSqlPool) -> Self {
37 Self {
38 pool: SqlPool::from_raw_pool(pool)
39 }
40 }
41}
42
43impl SQLiteTaskStore {
44 pub async fn new<P: AsRef<Path>>(db_path: P) -> BuckyResult<Self> {
45 let pool = SqlPool::open(format!("sqlite://{}", db_path.as_ref().to_string_lossy().to_string()).as_str(), 10).await?;
46 Ok(Self {
47 pool
48 })
49 }
50
51 pub async fn create_connection(&self) -> BuckyResult<SqlConnection> {
52 self.pool.get_conn().await
53 }
54
55 pub async fn init(&self) -> BuckyResult<()> {
56 let mut conn = self.pool.get_conn().await?;
57 let sql = r#"create table if not exists "tasks" (
58 "task_id" char(45) primary key not null,
59 "task_category" INTEGER,
60 "task_type" INTEGER,
61 "task_status" INTEGER,
62 "task_param" BLOB,
63 "task_data" BLOB,
64 "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
65 "updated_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
66 )"#;
67 conn.execute_sql(sql_query(sql)).await?;
68
69 let sql = r#"create index if not exists category_index on tasks (task_category, created_at)"#;
70 conn.execute_sql(sql_query(sql)).await?;
71
72 let sql = r#"create index if not exists status_index on tasks (task_status, updated_at)"#;
73 conn.execute_sql(sql_query(sql)).await?;
74
75 let sql = r#"create table if not exists "dec_tasks" (
76 "source" char(45) not null,
77 "dec_id" char(45) not null,
78 "task_id" char(45) not null,
79 "task_status" INTEGER,
80 "task_category" INTEGER,
81 "task_type" INTEGER,
82 "dec_info" BLOB not null,
83 "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
84 "updated_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
85 )"#;
86 conn.execute_sql(sql_query(sql)).await?;
87
88 let sql = r#"create index if not exists dec_index on dec_tasks (source, dec_id, task_category, task_status, created_at, task_id)"#;
89 conn.execute_sql(sql_query(sql)).await?;
90
91 let sql = r#"create index if not exists task_index on dec_tasks (task_id)"#;
92 conn.execute_sql(sql_query(sql)).await?;
93
94 Ok(())
95 }
96}
97
98#[async_trait::async_trait]
99impl TaskStore for SQLiteTaskStore {
100 async fn save_task(&self, task_id: &TaskId, task_status: TaskStatus, task_data: Vec<u8>) -> BuckyResult<()> {
101 let mut conn = self.pool.get_conn().await?;
102 conn.begin_transaction().await?;
103
104 let sql = r#"update tasks set task_status = ?1, task_data = ?2, updated_at = CURRENT_TIMESTAMP where task_id = ?3"#;
105 conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_data).bind(task_id.to_string())).await?;
106
107 let sql = r#"update dec_tasks set task_status = ?1, updated_at = CURRENT_TIMESTAMP where task_id = ?2"#;
108 conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_id.to_string())).await?;
109
110 conn.commit_transaction().await?;
111 Ok(())
112 }
113
114 async fn save_task_status(&self, task_id: &TaskId, task_status: TaskStatus) -> BuckyResult<()> {
115 let mut conn = self.pool.get_conn().await?;
116 conn.begin_transaction().await?;
117 let sql = r#"update tasks set task_status = ?1, updated_at = CURRENT_TIMESTAMP where task_id = ?2"#;
118 conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_id.to_string())).await?;
119
120 let sql = r#"update dec_tasks set task_status = ?1, updated_at = CURRENT_TIMESTAMP where task_id = ?2"#;
121 conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_id.to_string())).await?;
122
123 conn.commit_transaction().await?;
124 Ok(())
125 }
126
127 async fn save_task_data(&self, task_id: &TaskId, task_data: Vec<u8>) -> BuckyResult<()> {
128 let mut conn = self.pool.get_conn().await?;
129 let sql = r#"update tasks set task_data = ?1 where task_id = ?2"#;
130 conn.execute_sql(sql_query(sql).bind(task_data).bind(task_id.to_string())).await?;
131 Ok(())
132 }
133}
134
135#[async_trait::async_trait]
136impl TaskManagerStore for SQLiteTaskStore
137{
138 async fn add_task(&self, task_id: &TaskId, category: TaskCategory, task_type: TaskType, task_status: TaskStatus, dec_list: Vec<DecInfo>, task_params: Vec<u8>) -> BuckyResult<()> {
139 info!("will add task: id={}, category={}, type={}, status={:?}, dec={:?}", task_id, category, task_type, task_status, dec_list);
140
141 let mut conn = self.pool.get_conn().await?;
142 conn.begin_transaction().await?;
143
144 let sql = r#"insert into tasks (task_id, task_category, task_type, task_status, task_param, task_data) values (?1, ?2, ?3, ?4, ?5, ?6)"#;
145 conn.execute_sql(sql_query(sql)
146 .bind(task_id.to_string())
147 .bind(category.into())
148 .bind(task_type.into())
149 .bind(task_status.into())
150 .bind(task_params)
151 .bind(Vec::new())).await?;
152
153 for dec_info in dec_list.iter() {
154 let sql = r#"insert into dec_tasks (source, dec_id, task_category, task_status, task_id, dec_info) values (?1, ?2, ?3, ?4, ?5, ?6)"#;
155 conn.execute_sql(sql_query(sql)
156 .bind(dec_info.source().to_string())
157 .bind(dec_info.dec_id().to_string())
158 .bind(category.into())
159 .bind(task_status.into())
160 .bind(task_id.to_string())
161 .bind(dec_info.to_vec()?)).await?;
162 }
163
164 conn.commit_transaction().await?;
165
166 Ok(())
167 }
168
169 async fn get_task(&self, task_id: &TaskId) -> BuckyResult<(TaskCategory, TaskType, TaskStatus, Vec<u8>, Vec<u8>)> {
170 let mut conn = self.pool.get_conn().await?;
171 let sql = r#"select * from tasks where task_id = ?1"#;
172 let row = conn.query_one(sql_query(sql).bind(task_id.to_string())).await?;
173 Ok((TaskCategory::try_from(row.get("task_category"))?,
174 TaskType::try_from(row.get("task_type"))?,
175 TaskStatus::try_from(row.get("task_status"))?,
176 row.get("task_param"), row.get("task_data")))
177 }
178
179 async fn get_tasks_by_status(&self, status: TaskStatus) -> BuckyResult<Vec<(TaskId, TaskType, Vec<u8>, Vec<u8>)>> {
180 let mut conn = self.pool.get_conn().await?;
181 let sql = r#"select * from tasks where task_status = ?1"#;
182 let rows = conn.query_all(sql_query(sql).bind(status.into())).await?;
183 let mut list = Vec::new();
184 for row in rows.iter() {
185 list.push((TaskId::from_str(row.get("task_id"))?,
186 TaskType::try_from(row.get("task_type"))?,
187 row.get("task_param"), row.get("task_data")))
188 }
189 Ok(list)
190 }
191
192 async fn get_tasks_by_category(&self, category: TaskCategory) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
193 let mut conn = self.pool.get_conn().await?;
194 let sql = r#"select * from tasks where task_category = ?1"#;
195 let rows = conn.query_all(sql_query(sql).bind(category.into())).await?;
196 let mut list = Vec::new();
197 for row in rows.iter() {
198 list.push((TaskId::from_str(row.get("task_id"))?,
199 TaskType::try_from(row.get("task_type"))?,
200 TaskStatus::try_from(row.get("task_status"))?,
201 row.get("task_param"), row.get("task_data")))
202 }
203 Ok(list)
204 }
205
206 async fn get_tasks_by_task_id(&self, task_id_list: &[TaskId]) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
207 let mut conn = self.pool.get_conn().await?;
208
209 let mut remainder = task_id_list;
210 let mut list = Vec::new();
211 while remainder.len() > 0 {
212 let (left, right) = if remainder.len() > 100 {
213 remainder.split_at(100)
214 } else {
215 (remainder, &remainder[remainder.len()..])
216 };
217 remainder = right;
218 let id_list: Vec<String> = left.iter().map(|task_id| {
219 format!("'{}'", task_id.to_string())
220 }).collect();
221 let in_sql = id_list.join(",");
222
223 let sql = format!(r#"select * from tasks where task_id in ({})"#, in_sql);
224 let rows = conn.query_all(sql_query(sql.as_str())).await?;
225 for row in rows.iter() {
226 list.push((TaskId::from_str(row.get("task_id"))?,
227 TaskType::try_from(row.get("task_type"))?,
228 TaskStatus::try_from(row.get("task_status"))?,
229 row.get("task_param"), row.get("task_data")))
230 }
231 }
232 Ok(list)
233 }
234
235 async fn get_tasks(&self, source: &DeviceId, dec_id: &ObjectId, category: TaskCategory, task_status: TaskStatus, range: Option<(u64, u32)>) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
236 let mut conn = self.pool.get_conn().await?;
237
238 let rows = if range.is_none() {
239 let sql = r#"select task_id from dec_tasks where source = ?1 and dec_id = ?2 and category = ?3 and task_status = ?4 order by created_at"#;
240 conn.query_all(sql_query(sql)
241 .bind(source.to_string())
242 .bind(dec_id.to_string())
243 .bind(category.into())
244 .bind(task_status.into())).await?
245 } else {
246 let sql = r#"select task_id from dec_tasks where source = ?1 and dec_id = ?2 and category = ?3 and task_status = ?4 order by created_at limit ?5, ?6"#;
247 conn.query_all(sql_query(sql)
248 .bind(source.to_string())
249 .bind(dec_id.to_string())
250 .bind(category.into())
251 .bind(task_status.into())
252 .bind(range.as_ref().unwrap().0 as i64)
253 .bind(range.as_ref().unwrap().1 as i32)).await?
254 };
255
256 let mut task_id_list = Vec::new();
257 for row in rows {
258 task_id_list.push(TaskId::from_str(row.get("task_id"))?);
259 }
260
261 let mut remainder = task_id_list.as_slice();
262 let mut list = Vec::new();
263 while remainder.len() > 0 {
264 let (left, right) = if remainder.len() > 100 {
265 remainder.split_at(100)
266 } else {
267 (remainder, &remainder[remainder.len()..])
268 };
269 remainder = right;
270 let id_list: Vec<String> = left.iter().map(|task_id| {
271 format!("'{}'", task_id.to_string())
272 }).collect();
273 let in_sql = id_list.join(",");
274
275 let sql = format!(r#"select * from tasks where task_id in ({})"#, in_sql);
276 let rows = conn.query_all(sql_query(sql.as_str())).await?;
277 for row in rows.iter() {
278 list.push((TaskId::from_str(row.get("task_id"))?,
279 TaskType::try_from(row.get("task_type"))?,
280 TaskStatus::try_from(row.get("task_status"))?,
281 row.get("task_param"), row.get("task_data")))
282 }
283 }
284 Ok(list)
285 }
286
287 async fn get_dec_list(&self, task_id: &TaskId) -> BuckyResult<Vec<DecInfo>> {
288 let mut conn = self.pool.get_conn().await?;
289 let sql = r#"select dec_info from dec_tasks where task_id = ?1"#;
290 let rows = conn.query_all(sql_query(sql).bind(task_id.to_string())).await?;
291 let mut list = Vec::new();
292 for row in rows {
293 list.push(DecInfo::clone_from_slice(row.get("dec_info"))?);
294 }
295 Ok(list)
296 }
297
298 async fn add_dec_info(&self, task_id: &TaskId, category: TaskCategory, task_status: TaskStatus, dec_info: &DecInfo) -> BuckyResult<()> {
299 info!("will add dec info! task={}, category={}, status={:?}, dec_info={:?}", task_id, category, task_status, dec_info);
300
301 let sql = r#"insert into dec_tasks (source, dec_id, task_category, task_status, task_id, dec_info) values (?1, ?2, ?3, ?4, ?5, ?6)"#;
302 let mut conn = self.pool.get_conn().await?;
303 conn.execute_sql(sql_query(sql)
304 .bind(dec_info.source().to_string())
305 .bind(dec_info.dec_id().to_string())
306 .bind(category.into())
307 .bind(task_status.into())
308 .bind(task_id.to_string())
309 .bind(dec_info.to_vec()?)).await?;
310 Ok(())
311 }
312
313 async fn delete_dec_info(&self, task_id: &TaskId, dec_id: &ObjectId, source: &DeviceId) -> BuckyResult<()> {
314 info!("will delete dec info! task={}, dec={}, source={}", task_id, dec_id, source);
315
316 let sql = r#"delete from dec_tasks where task_id = ?1 and dec_id = ?2 and source = ?3"#;
317 let mut conn = self.pool.get_conn().await?;
318 conn.execute_sql(sql_query(sql)
319 .bind(task_id.to_string())
320 .bind(dec_id.to_string())
321 .bind(source.to_string())).await?;
322 Ok(())
323 }
324
325 async fn delete_task(&self, task_id: &TaskId) -> BuckyResult<()> {
326 info!("will delete task! id={}", task_id);
327
328 let mut conn = self.pool.get_conn().await?;
329 conn.begin_transaction().await?;
330
331 let sql = r#"delete from tasks where task_id = ?1"#;
332 conn.execute_sql(sql_query(sql).bind(task_id.to_string())).await?;
333
334 let sql = r#"delete from dec_tasks where task_id = ?1"#;
335 conn.execute_sql(sql_query(sql).bind(task_id.to_string())).await?;
336
337 conn.commit_transaction().await?;
338
339 Ok(())
340 }
341}