Skip to main content

client_core/db/
manager.rs

1use crate::DuckError;
2use anyhow::Result;
3use chrono::{DateTime, Utc};
4use std::path::Path;
5use tokio::sync::{mpsc, oneshot};
6use tracing::debug;
7use uuid::Uuid;
8
9use super::actor::DuckDbActor;
10use super::messages::{AppStateRecord, DbMessage, DownloadTaskRecord, UserActionRecord};
11use super::models::{BackupRecord, ScheduledTask};
12
13/// DuckDB数据库管理器
14#[derive(Debug, Clone)]
15pub struct DuckDbManager {
16    sender: mpsc::Sender<DbMessage>,
17}
18
19impl DuckDbManager {
20    /// 创建新的DuckDB管理器(不自动初始化表)
21    pub async fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
22        let db_path = db_path.as_ref().to_path_buf();
23
24        // 确保数据库文件的父目录存在
25        if let Some(parent) = db_path.parent() {
26            tokio::fs::create_dir_all(parent).await?;
27        }
28
29        let (sender, receiver) = mpsc::channel(100);
30
31        // 启动DuckDB Actor
32        let actor = DuckDbActor::new(db_path)?;
33        tokio::spawn(actor.run(receiver));
34
35        let manager = Self { sender };
36
37        // 移除自动初始化 - 只有在明确调用 init_database 时才初始化
38        // manager.init_tables().await?;
39
40        Ok(manager)
41    }
42
43    /// 创建内存数据库管理器(不自动初始化表)
44    pub async fn new_memory() -> Result<Self> {
45        let (sender, receiver) = mpsc::channel(100);
46
47        // 启动DuckDB Actor(内存模式)
48        let actor = DuckDbActor::new_memory()?;
49        tokio::spawn(actor.run(receiver));
50
51        let manager = Self { sender };
52
53        // 移除自动初始化 - 只有在明确调用 init_database 时才初始化
54        // manager.init_tables().await?;
55
56        Ok(manager)
57    }
58
59    /// 显式初始化数据库(只应在 nuwax-cli init 时调用)
60    pub async fn init_database(&self) -> Result<()> {
61        debug!("Explicitly initializing database tables...");
62        self.init_tables().await?;
63        debug!("Database initialization completed");
64        Ok(())
65    }
66
67    /// 检查数据库是否已经初始化
68    pub async fn is_database_initialized(&self) -> Result<bool> {
69        // 检查关键配置表是否存在且有数据
70        match self.get_config("db_initialized").await {
71            Ok(Some(value)) => Ok(value == "true"),
72            Ok(None) => Ok(false),
73            Err(_) => Ok(false), // 如果出错,认为未初始化
74        }
75    }
76
77    /// 标记数据库为已初始化
78    pub async fn mark_database_initialized(&self) -> Result<()> {
79        self.set_config("db_initialized", "true").await
80    }
81
82    /// 初始化数据库表(私有方法)
83    async fn init_tables(&self) -> Result<()> {
84        let (respond_to, receiver) = oneshot::channel();
85
86        self.sender
87            .send(DbMessage::InitTables { respond_to })
88            .await
89            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
90
91        receiver
92            .await
93            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
94    }
95
96    /// 获取配置值
97    pub async fn get_config(&self, key: &str) -> Result<Option<String>> {
98        let (respond_to, receiver) = oneshot::channel();
99
100        self.sender
101            .send(DbMessage::GetConfig {
102                key: key.to_string(),
103                respond_to,
104            })
105            .await
106            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
107
108        receiver
109            .await
110            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
111    }
112
113    /// 设置配置值
114    pub async fn set_config(&self, key: &str, value: &str) -> Result<()> {
115        let (respond_to, receiver) = oneshot::channel();
116
117        self.sender
118            .send(DbMessage::SetConfig {
119                key: key.to_string(),
120                value: value.to_string(),
121                respond_to,
122            })
123            .await
124            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
125
126        receiver
127            .await
128            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
129    }
130
131    /// 获取或创建客户端 UUID
132    pub async fn get_or_create_client_uuid(&self) -> Result<Uuid> {
133        const CLIENT_UUID_KEY: &str = "client_uuid";
134
135        // 尝试从数据库获取现有的UUID
136        if let Some(uuid_str) = self.get_config(CLIENT_UUID_KEY).await? {
137            if let Ok(uuid) = Uuid::parse_str(&uuid_str) {
138                return Ok(uuid);
139            }
140        }
141
142        // 生成新的UUID并保存
143        let new_uuid = Uuid::new_v4();
144        self.set_config(CLIENT_UUID_KEY, &new_uuid.to_string())
145            .await?;
146
147        Ok(new_uuid)
148    }
149
150    // ========== 下载任务管理 ==========
151
152    /// 创建下载任务
153    pub async fn create_download_task(
154        &self,
155        task_name: String,
156        download_url: String,
157        total_size: i64,
158        target_path: String,
159        file_hash: Option<String>,
160    ) -> Result<i64> {
161        let (respond_to, receiver) = oneshot::channel();
162
163        self.sender
164            .send(DbMessage::CreateDownloadTask {
165                task_name,
166                download_url,
167                total_size,
168                target_path,
169                file_hash,
170                respond_to,
171            })
172            .await
173            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
174
175        receiver
176            .await
177            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
178    }
179
180    /// 更新下载任务状态
181    pub async fn update_download_task_status(
182        &self,
183        task_id: i64,
184        status: &str,
185        downloaded_size: Option<i64>,
186        error_message: Option<String>,
187    ) -> Result<()> {
188        let (respond_to, receiver) = oneshot::channel();
189
190        self.sender
191            .send(DbMessage::UpdateDownloadTaskStatus {
192                task_id,
193                status: status.to_string(),
194                downloaded_size,
195                error_message,
196                respond_to,
197            })
198            .await
199            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
200
201        receiver
202            .await
203            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
204    }
205
206    /// 完成下载任务
207    pub async fn complete_download_task(
208        &self,
209        task_id: i64,
210        average_speed: Option<i64>,
211        total_duration: Option<i32>,
212    ) -> Result<()> {
213        let (respond_to, receiver) = oneshot::channel();
214
215        self.sender
216            .send(DbMessage::CompleteDownloadTask {
217                task_id,
218                average_speed,
219                total_duration,
220                respond_to,
221            })
222            .await
223            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
224
225        receiver
226            .await
227            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
228    }
229
230    /// 获取下载任务
231    pub async fn get_download_task(&self, task_id: i64) -> Result<Option<DownloadTaskRecord>> {
232        let (respond_to, receiver) = oneshot::channel();
233
234        self.sender
235            .send(DbMessage::GetDownloadTask {
236                task_id,
237                respond_to,
238            })
239            .await
240            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
241
242        receiver
243            .await
244            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
245    }
246
247    /// 获取活跃的下载任务
248    pub async fn get_active_download_tasks(&self) -> Result<Vec<DownloadTaskRecord>> {
249        let (respond_to, receiver) = oneshot::channel();
250
251        self.sender
252            .send(DbMessage::GetActiveDownloadTasks { respond_to })
253            .await
254            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
255
256        receiver
257            .await
258            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
259    }
260
261    // ========== 应用状态管理 ==========
262
263    /// 更新应用状态
264    pub async fn update_app_state(
265        &self,
266        state: &str,
267        state_data: Option<String>,
268        error_message: Option<String>,
269    ) -> Result<()> {
270        let (respond_to, receiver) = oneshot::channel();
271
272        self.sender
273            .send(DbMessage::UpdateAppState {
274                state: state.to_string(),
275                state_data,
276                error_message,
277                respond_to,
278            })
279            .await
280            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
281
282        receiver
283            .await
284            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
285    }
286
287    /// 获取当前应用状态
288    pub async fn get_app_state(&self) -> Result<Option<AppStateRecord>> {
289        let (respond_to, receiver) = oneshot::channel();
290
291        self.sender
292            .send(DbMessage::GetAppState { respond_to })
293            .await
294            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
295
296        receiver
297            .await
298            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
299    }
300
301    // ========== 用户操作历史 ==========
302
303    /// 记录用户操作
304    pub async fn record_user_action(
305        &self,
306        action_type: &str,
307        action_description: &str,
308        action_params: Option<String>,
309    ) -> Result<i64> {
310        let (respond_to, receiver) = oneshot::channel();
311
312        self.sender
313            .send(DbMessage::RecordUserAction {
314                action_type: action_type.to_string(),
315                action_description: action_description.to_string(),
316                action_params,
317                respond_to,
318            })
319            .await
320            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
321
322        receiver
323            .await
324            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
325    }
326
327    /// 完成用户操作
328    pub async fn complete_user_action(
329        &self,
330        action_id: i64,
331        status: &str,
332        result_message: Option<String>,
333        duration_seconds: Option<i32>,
334    ) -> Result<()> {
335        let (respond_to, receiver) = oneshot::channel();
336
337        self.sender
338            .send(DbMessage::CompleteUserAction {
339                action_id,
340                status: status.to_string(),
341                result_message,
342                duration_seconds,
343                respond_to,
344            })
345            .await
346            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
347
348        receiver
349            .await
350            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
351    }
352
353    /// 获取用户操作历史
354    pub async fn get_user_actions(&self, limit: Option<i32>) -> Result<Vec<UserActionRecord>> {
355        let (respond_to, receiver) = oneshot::channel();
356
357        self.sender
358            .send(DbMessage::GetUserActions { limit, respond_to })
359            .await
360            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
361
362        receiver
363            .await
364            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
365    }
366
367    // ========== 现有的备份和任务管理 ==========
368
369    /// 创建备份记录
370    pub async fn create_backup_record(
371        &self,
372        file_path: String,
373        service_version: String,
374        backup_type: &str,
375        status: &str,
376    ) -> Result<i64> {
377        let (respond_to, receiver) = oneshot::channel();
378
379        self.sender
380            .send(DbMessage::CreateBackupRecord {
381                file_path,
382                service_version,
383                backup_type: backup_type.to_string(),
384                status: status.to_string(),
385                respond_to,
386            })
387            .await
388            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
389
390        receiver
391            .await
392            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
393    }
394
395    /// 获取所有备份记录
396    pub async fn get_all_backups(&self) -> Result<Vec<BackupRecord>> {
397        let (respond_to, receiver) = oneshot::channel();
398
399        self.sender
400            .send(DbMessage::GetAllBackups { respond_to })
401            .await
402            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
403
404        receiver
405            .await
406            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
407    }
408
409    /// 根据ID获取备份记录
410    pub async fn get_backup_by_id(&self, id: i64) -> Result<Option<BackupRecord>> {
411        let (respond_to, receiver) = oneshot::channel();
412
413        self.sender
414            .send(DbMessage::GetBackupById { id, respond_to })
415            .await
416            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
417
418        receiver
419            .await
420            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
421    }
422
423    /// 删除备份记录
424    pub async fn delete_backup_record(&self, backup_id: i64) -> Result<()> {
425        let (respond_to, receiver) = oneshot::channel();
426
427        self.sender
428            .send(DbMessage::DeleteBackupRecord {
429                backup_id,
430                respond_to,
431            })
432            .await
433            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
434
435        receiver
436            .await
437            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
438    }
439
440    /// 更新备份文件路径
441    pub async fn update_backup_file_path(&self, backup_id: i64, new_path: String) -> Result<()> {
442        let (respond_to, receiver) = oneshot::channel();
443
444        self.sender
445            .send(DbMessage::UpdateBackupFilePath {
446                backup_id,
447                new_path,
448                respond_to,
449            })
450            .await
451            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
452
453        receiver
454            .await
455            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
456    }
457
458    /// 创建计划任务
459    pub async fn create_scheduled_task(
460        &self,
461        task_type: &str,
462        target_version: String,
463        scheduled_at: DateTime<Utc>,
464    ) -> Result<i64> {
465        // 取消同类型的待执行任务
466        self.cancel_pending_tasks(task_type).await?;
467
468        let (respond_to, receiver) = oneshot::channel();
469
470        self.sender
471            .send(DbMessage::CreateScheduledTask {
472                task_type: task_type.to_string(),
473                target_version,
474                scheduled_at,
475                respond_to,
476            })
477            .await
478            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
479
480        receiver
481            .await
482            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
483    }
484
485    /// 获取待执行任务
486    pub async fn get_pending_tasks(&self) -> Result<Vec<ScheduledTask>> {
487        let (respond_to, receiver) = oneshot::channel();
488
489        self.sender
490            .send(DbMessage::GetPendingTasks { respond_to })
491            .await
492            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
493
494        receiver
495            .await
496            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
497    }
498
499    /// 更新任务状态
500    pub async fn update_task_status(
501        &self,
502        task_id: i64,
503        status: &str,
504        details: Option<String>,
505    ) -> Result<()> {
506        let (respond_to, receiver) = oneshot::channel();
507
508        self.sender
509            .send(DbMessage::UpdateTaskStatus {
510                task_id,
511                status: status.to_string(),
512                details,
513                respond_to,
514            })
515            .await
516            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
517
518        receiver
519            .await
520            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
521    }
522
523    /// 取消待执行任务
524    async fn cancel_pending_tasks(&self, task_type: &str) -> Result<()> {
525        let (respond_to, receiver) = oneshot::channel();
526
527        self.sender
528            .send(DbMessage::CancelPendingTasks {
529                task_type: task_type.to_string(),
530                respond_to,
531            })
532            .await
533            .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
534
535        receiver
536            .await
537            .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
538    }
539}