cyfs_task_manager/
task_store.rs

1use std::path::{Path};
2use std::str::FromStr;
3use cyfs_base::*;
4use crate::{DecInfo, sql_query, SqlConnection, SqlPool, TaskCategory, TaskId, TaskStatus, TaskType, SqlRow, RawSqlPool};
5
6#[async_trait::async_trait]
7pub trait TaskStore: Send + Sync {
8    async fn save_task(&self, task_id: &TaskId, task_status: TaskStatus, task_data: Vec<u8>) -> BuckyResult<()>;
9    async fn save_task_status(&self, task_id: &TaskId, task_status: TaskStatus) -> BuckyResult<()>;
10    async fn save_task_data(&self, task_id: &TaskId, task_data: Vec<u8>) -> BuckyResult<()>;
11}
12
13pub struct QueryTaskParams {
14    source: DeviceId,
15    dec_id: Option<ObjectId>,
16}
17#[async_trait::async_trait]
18pub trait TaskManagerStore: Send + Sync {
19    async fn add_task(&self, task_id: &TaskId, category: TaskCategory, task_type: TaskType, task_status: TaskStatus, dec_list: Vec<DecInfo>, task_params: Vec<u8>) -> BuckyResult<()>;
20    async fn get_task(&self, task_id: &TaskId) -> BuckyResult<(TaskCategory, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>;
21    async fn get_tasks_by_status(&self, status: TaskStatus) -> BuckyResult<Vec<(TaskId, TaskType, Vec<u8>, Vec<u8>)>>;
22    async fn get_tasks_by_category(&self, category: TaskCategory) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>>;
23    async fn get_tasks_by_task_id(&self, task_id_list: &[TaskId]) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>>;
24    async fn get_tasks(&self, source: &DeviceId, dec_id: &ObjectId, category: TaskCategory, task_status: TaskStatus, range: Option<(u64, u32)>) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>>;
25    async fn get_dec_list(&self, task_id: &TaskId) -> BuckyResult<Vec<DecInfo>>;
26    async fn add_dec_info(&self, task_id: &TaskId, category: TaskCategory, task_status: TaskStatus, dec_info: &DecInfo) -> BuckyResult<()>;
27    async fn delete_dec_info(&self, task_id: &TaskId, dec_id: &ObjectId, source: &DeviceId) -> BuckyResult<()>;
28    async fn delete_task(&self, task_id: &TaskId) -> BuckyResult<()>;
29}
30
31pub struct SQLiteTaskStore {
32    pool: SqlPool
33}
34
35impl From<RawSqlPool> for SQLiteTaskStore {
36    fn from(pool: RawSqlPool) -> Self {
37        Self {
38            pool: SqlPool::from_raw_pool(pool)
39        }
40    }
41}
42
43impl SQLiteTaskStore {
44    pub async fn new<P: AsRef<Path>>(db_path: P) -> BuckyResult<Self> {
45        let pool = SqlPool::open(format!("sqlite://{}", db_path.as_ref().to_string_lossy().to_string()).as_str(), 10).await?;
46        Ok(Self {
47            pool
48        })
49    }
50
51    pub async fn create_connection(&self) -> BuckyResult<SqlConnection> {
52        self.pool.get_conn().await
53    }
54
55    pub async fn init(&self) -> BuckyResult<()> {
56        let mut conn = self.pool.get_conn().await?;
57        let sql = r#"create table if not exists "tasks" (
58            "task_id" char(45) primary key not null,
59            "task_category" INTEGER,
60            "task_type" INTEGER,
61            "task_status" INTEGER,
62            "task_param" BLOB,
63            "task_data" BLOB,
64            "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
65            "updated_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
66        )"#;
67        conn.execute_sql(sql_query(sql)).await?;
68
69        let sql = r#"create index if not exists category_index on tasks (task_category, created_at)"#;
70        conn.execute_sql(sql_query(sql)).await?;
71
72        let sql = r#"create index if not exists status_index on tasks (task_status, updated_at)"#;
73        conn.execute_sql(sql_query(sql)).await?;
74
75        let sql = r#"create table if not exists "dec_tasks" (
76            "source" char(45) not null,
77            "dec_id" char(45) not null,
78            "task_id" char(45) not null,
79            "task_status" INTEGER,
80            "task_category" INTEGER,
81            "task_type" INTEGER,
82            "dec_info" BLOB not null,
83            "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
84            "updated_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
85            )"#;
86        conn.execute_sql(sql_query(sql)).await?;
87
88        let sql = r#"create index if not exists dec_index on dec_tasks (source, dec_id, task_category, task_status, created_at, task_id)"#;
89        conn.execute_sql(sql_query(sql)).await?;
90
91        let sql = r#"create index if not exists task_index on dec_tasks (task_id)"#;
92        conn.execute_sql(sql_query(sql)).await?;
93
94        Ok(())
95    }
96}
97
98#[async_trait::async_trait]
99impl TaskStore for SQLiteTaskStore {
100    async fn save_task(&self, task_id: &TaskId, task_status: TaskStatus, task_data: Vec<u8>) -> BuckyResult<()> {
101        let mut conn = self.pool.get_conn().await?;
102        conn.begin_transaction().await?;
103
104        let sql = r#"update tasks set task_status = ?1, task_data = ?2, updated_at = CURRENT_TIMESTAMP where task_id = ?3"#;
105        conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_data).bind(task_id.to_string())).await?;
106
107        let sql = r#"update dec_tasks set task_status = ?1, updated_at = CURRENT_TIMESTAMP where task_id = ?2"#;
108        conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_id.to_string())).await?;
109
110        conn.commit_transaction().await?;
111        Ok(())
112    }
113
114    async fn save_task_status(&self, task_id: &TaskId, task_status: TaskStatus) -> BuckyResult<()> {
115        let mut conn = self.pool.get_conn().await?;
116        conn.begin_transaction().await?;
117        let sql = r#"update tasks set task_status = ?1, updated_at = CURRENT_TIMESTAMP where task_id = ?2"#;
118        conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_id.to_string())).await?;
119
120        let sql = r#"update dec_tasks set task_status = ?1, updated_at = CURRENT_TIMESTAMP where task_id = ?2"#;
121        conn.execute_sql(sql_query(sql).bind(task_status.into()).bind(task_id.to_string())).await?;
122
123        conn.commit_transaction().await?;
124        Ok(())
125    }
126
127    async fn save_task_data(&self, task_id: &TaskId, task_data: Vec<u8>) -> BuckyResult<()> {
128        let mut conn = self.pool.get_conn().await?;
129        let sql = r#"update tasks set task_data = ?1 where task_id = ?2"#;
130        conn.execute_sql(sql_query(sql).bind(task_data).bind(task_id.to_string())).await?;
131        Ok(())
132    }
133}
134
135#[async_trait::async_trait]
136impl TaskManagerStore for SQLiteTaskStore
137{
138    async fn add_task(&self, task_id: &TaskId, category: TaskCategory, task_type: TaskType, task_status: TaskStatus, dec_list: Vec<DecInfo>, task_params: Vec<u8>) -> BuckyResult<()> {
139        info!("will add task: id={}, category={}, type={}, status={:?}, dec={:?}", task_id, category, task_type, task_status, dec_list);
140
141        let mut conn = self.pool.get_conn().await?;
142        conn.begin_transaction().await?;
143
144        let sql = r#"insert into tasks (task_id, task_category, task_type, task_status, task_param, task_data) values (?1, ?2, ?3, ?4, ?5, ?6)"#;
145        conn.execute_sql(sql_query(sql)
146            .bind(task_id.to_string())
147            .bind(category.into())
148            .bind(task_type.into())
149            .bind(task_status.into())
150            .bind(task_params)
151            .bind(Vec::new())).await?;
152
153        for dec_info in dec_list.iter() {
154            let sql = r#"insert into dec_tasks (source, dec_id, task_category, task_status, task_id, dec_info) values (?1, ?2, ?3, ?4, ?5, ?6)"#;
155            conn.execute_sql(sql_query(sql)
156                .bind(dec_info.source().to_string())
157                .bind(dec_info.dec_id().to_string())
158                .bind(category.into())
159                .bind(task_status.into())
160                .bind(task_id.to_string())
161                .bind(dec_info.to_vec()?)).await?;
162        }
163
164        conn.commit_transaction().await?;
165
166        Ok(())
167    }
168
169    async fn get_task(&self, task_id: &TaskId) -> BuckyResult<(TaskCategory, TaskType, TaskStatus, Vec<u8>, Vec<u8>)> {
170        let mut conn = self.pool.get_conn().await?;
171        let sql = r#"select * from tasks where task_id = ?1"#;
172        let row = conn.query_one(sql_query(sql).bind(task_id.to_string())).await?;
173        Ok((TaskCategory::try_from(row.get("task_category"))?,
174            TaskType::try_from(row.get("task_type"))?,
175            TaskStatus::try_from(row.get("task_status"))?,
176            row.get("task_param"), row.get("task_data")))
177    }
178
179    async fn get_tasks_by_status(&self, status: TaskStatus) -> BuckyResult<Vec<(TaskId, TaskType, Vec<u8>, Vec<u8>)>> {
180        let mut conn = self.pool.get_conn().await?;
181        let sql = r#"select * from tasks where task_status = ?1"#;
182        let rows = conn.query_all(sql_query(sql).bind(status.into())).await?;
183        let mut list = Vec::new();
184        for row in rows.iter() {
185            list.push((TaskId::from_str(row.get("task_id"))?,
186                       TaskType::try_from(row.get("task_type"))?,
187                       row.get("task_param"), row.get("task_data")))
188        }
189        Ok(list)
190    }
191
192    async fn get_tasks_by_category(&self, category: TaskCategory) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
193        let mut conn = self.pool.get_conn().await?;
194        let sql = r#"select * from tasks where task_category = ?1"#;
195        let rows = conn.query_all(sql_query(sql).bind(category.into())).await?;
196        let mut list = Vec::new();
197        for row in rows.iter() {
198            list.push((TaskId::from_str(row.get("task_id"))?,
199                       TaskType::try_from(row.get("task_type"))?,
200                       TaskStatus::try_from(row.get("task_status"))?,
201                       row.get("task_param"), row.get("task_data")))
202        }
203        Ok(list)
204    }
205
206    async fn get_tasks_by_task_id(&self, task_id_list: &[TaskId]) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
207        let mut conn = self.pool.get_conn().await?;
208
209        let mut remainder = task_id_list;
210        let mut list = Vec::new();
211        while remainder.len() > 0 {
212            let (left, right) = if remainder.len() > 100 {
213                remainder.split_at(100)
214            } else {
215                (remainder, &remainder[remainder.len()..])
216            };
217            remainder = right;
218            let id_list: Vec<String> = left.iter().map(|task_id| {
219                format!("'{}'", task_id.to_string())
220            }).collect();
221            let in_sql = id_list.join(",");
222
223            let sql = format!(r#"select * from tasks where task_id in ({})"#, in_sql);
224            let rows = conn.query_all(sql_query(sql.as_str())).await?;
225            for row in rows.iter() {
226                list.push((TaskId::from_str(row.get("task_id"))?,
227                           TaskType::try_from(row.get("task_type"))?,
228                           TaskStatus::try_from(row.get("task_status"))?,
229                           row.get("task_param"), row.get("task_data")))
230            }
231        }
232        Ok(list)
233    }
234
235    async fn get_tasks(&self, source: &DeviceId, dec_id: &ObjectId, category: TaskCategory, task_status: TaskStatus, range: Option<(u64, u32)>) -> BuckyResult<Vec<(TaskId, TaskType, TaskStatus, Vec<u8>, Vec<u8>)>> {
236        let mut conn = self.pool.get_conn().await?;
237
238        let rows = if range.is_none() {
239            let sql = r#"select task_id from dec_tasks where source = ?1 and dec_id = ?2 and category = ?3 and task_status = ?4 order by created_at"#;
240            conn.query_all(sql_query(sql)
241                .bind(source.to_string())
242                .bind(dec_id.to_string())
243                .bind(category.into())
244                .bind(task_status.into())).await?
245        } else {
246            let sql = r#"select task_id from dec_tasks where source = ?1 and dec_id = ?2 and category = ?3 and task_status = ?4 order by created_at limit ?5, ?6"#;
247            conn.query_all(sql_query(sql)
248                .bind(source.to_string())
249                .bind(dec_id.to_string())
250                .bind(category.into())
251                .bind(task_status.into())
252                .bind(range.as_ref().unwrap().0 as i64)
253                .bind(range.as_ref().unwrap().1 as i32)).await?
254        };
255
256        let mut task_id_list = Vec::new();
257        for row in rows {
258            task_id_list.push(TaskId::from_str(row.get("task_id"))?);
259        }
260
261        let mut remainder = task_id_list.as_slice();
262        let mut list = Vec::new();
263        while remainder.len() > 0 {
264            let (left, right) = if remainder.len() > 100 {
265                remainder.split_at(100)
266            } else {
267                (remainder, &remainder[remainder.len()..])
268            };
269            remainder = right;
270            let id_list: Vec<String> = left.iter().map(|task_id| {
271                format!("'{}'", task_id.to_string())
272            }).collect();
273            let in_sql = id_list.join(",");
274
275            let sql = format!(r#"select * from tasks where task_id in ({})"#, in_sql);
276            let rows = conn.query_all(sql_query(sql.as_str())).await?;
277            for row in rows.iter() {
278                list.push((TaskId::from_str(row.get("task_id"))?,
279                           TaskType::try_from(row.get("task_type"))?,
280                           TaskStatus::try_from(row.get("task_status"))?,
281                           row.get("task_param"), row.get("task_data")))
282            }
283        }
284        Ok(list)
285    }
286
287    async fn get_dec_list(&self, task_id: &TaskId) -> BuckyResult<Vec<DecInfo>> {
288        let mut conn = self.pool.get_conn().await?;
289        let sql = r#"select dec_info from dec_tasks where task_id = ?1"#;
290        let rows = conn.query_all(sql_query(sql).bind(task_id.to_string())).await?;
291        let mut list = Vec::new();
292        for row in rows {
293            list.push(DecInfo::clone_from_slice(row.get("dec_info"))?);
294        }
295        Ok(list)
296    }
297
298    async fn add_dec_info(&self, task_id: &TaskId, category: TaskCategory, task_status: TaskStatus, dec_info: &DecInfo) -> BuckyResult<()> {
299        info!("will add dec info! task={}, category={}, status={:?}, dec_info={:?}", task_id, category, task_status, dec_info);
300
301        let sql = r#"insert into dec_tasks (source, dec_id, task_category, task_status, task_id, dec_info) values (?1, ?2, ?3, ?4, ?5, ?6)"#;
302        let mut conn = self.pool.get_conn().await?;
303        conn.execute_sql(sql_query(sql)
304            .bind(dec_info.source().to_string())
305            .bind(dec_info.dec_id().to_string())
306            .bind(category.into())
307            .bind(task_status.into())
308            .bind(task_id.to_string())
309            .bind(dec_info.to_vec()?)).await?;
310        Ok(())
311    }
312
313    async fn delete_dec_info(&self, task_id: &TaskId, dec_id: &ObjectId, source: &DeviceId) -> BuckyResult<()> {
314        info!("will delete dec info! task={}, dec={}, source={}", task_id, dec_id, source);
315
316        let sql = r#"delete from dec_tasks where task_id = ?1 and dec_id = ?2 and source = ?3"#;
317        let mut conn = self.pool.get_conn().await?;
318        conn.execute_sql(sql_query(sql)
319            .bind(task_id.to_string())
320            .bind(dec_id.to_string())
321            .bind(source.to_string())).await?;
322        Ok(())
323    }
324
325    async fn delete_task(&self, task_id: &TaskId) -> BuckyResult<()> {
326        info!("will delete task! id={}", task_id);
327
328        let mut conn = self.pool.get_conn().await?;
329        conn.begin_transaction().await?;
330
331        let sql = r#"delete from tasks where task_id = ?1"#;
332        conn.execute_sql(sql_query(sql).bind(task_id.to_string())).await?;
333
334        let sql = r#"delete from dec_tasks where task_id = ?1"#;
335        conn.execute_sql(sql_query(sql).bind(task_id.to_string())).await?;
336
337        conn.commit_transaction().await?;
338
339        Ok(())
340    }
341}