1use crate::DuckError;
2use anyhow::Result;
3use chrono::{DateTime, Utc};
4use std::path::Path;
5use tokio::sync::{mpsc, oneshot};
6use tracing::debug;
7use uuid::Uuid;
8
9use super::actor::DuckDbActor;
10use super::messages::{AppStateRecord, DbMessage, DownloadTaskRecord, UserActionRecord};
11use super::models::{BackupRecord, ScheduledTask};
12
13#[derive(Debug, Clone)]
15pub struct DuckDbManager {
16 sender: mpsc::Sender<DbMessage>,
17}
18
19impl DuckDbManager {
20 pub async fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
22 let db_path = db_path.as_ref().to_path_buf();
23
24 if let Some(parent) = db_path.parent() {
26 tokio::fs::create_dir_all(parent).await?;
27 }
28
29 let (sender, receiver) = mpsc::channel(100);
30
31 let actor = DuckDbActor::new(db_path)?;
33 tokio::spawn(actor.run(receiver));
34
35 let manager = Self { sender };
36
37 Ok(manager)
41 }
42
43 pub async fn new_memory() -> Result<Self> {
45 let (sender, receiver) = mpsc::channel(100);
46
47 let actor = DuckDbActor::new_memory()?;
49 tokio::spawn(actor.run(receiver));
50
51 let manager = Self { sender };
52
53 Ok(manager)
57 }
58
59 pub async fn init_database(&self) -> Result<()> {
61 debug!("Explicitly initializing database tables...");
62 self.init_tables().await?;
63 debug!("Database initialization completed");
64 Ok(())
65 }
66
67 pub async fn is_database_initialized(&self) -> Result<bool> {
69 match self.get_config("db_initialized").await {
71 Ok(Some(value)) => Ok(value == "true"),
72 Ok(None) => Ok(false),
73 Err(_) => Ok(false), }
75 }
76
77 pub async fn mark_database_initialized(&self) -> Result<()> {
79 self.set_config("db_initialized", "true").await
80 }
81
82 async fn init_tables(&self) -> Result<()> {
84 let (respond_to, receiver) = oneshot::channel();
85
86 self.sender
87 .send(DbMessage::InitTables { respond_to })
88 .await
89 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
90
91 receiver
92 .await
93 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
94 }
95
96 pub async fn get_config(&self, key: &str) -> Result<Option<String>> {
98 let (respond_to, receiver) = oneshot::channel();
99
100 self.sender
101 .send(DbMessage::GetConfig {
102 key: key.to_string(),
103 respond_to,
104 })
105 .await
106 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
107
108 receiver
109 .await
110 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
111 }
112
113 pub async fn set_config(&self, key: &str, value: &str) -> Result<()> {
115 let (respond_to, receiver) = oneshot::channel();
116
117 self.sender
118 .send(DbMessage::SetConfig {
119 key: key.to_string(),
120 value: value.to_string(),
121 respond_to,
122 })
123 .await
124 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
125
126 receiver
127 .await
128 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
129 }
130
131 pub async fn get_or_create_client_uuid(&self) -> Result<Uuid> {
133 const CLIENT_UUID_KEY: &str = "client_uuid";
134
135 if let Some(uuid_str) = self.get_config(CLIENT_UUID_KEY).await? {
137 if let Ok(uuid) = Uuid::parse_str(&uuid_str) {
138 return Ok(uuid);
139 }
140 }
141
142 let new_uuid = Uuid::new_v4();
144 self.set_config(CLIENT_UUID_KEY, &new_uuid.to_string())
145 .await?;
146
147 Ok(new_uuid)
148 }
149
150 pub async fn create_download_task(
154 &self,
155 task_name: String,
156 download_url: String,
157 total_size: i64,
158 target_path: String,
159 file_hash: Option<String>,
160 ) -> Result<i64> {
161 let (respond_to, receiver) = oneshot::channel();
162
163 self.sender
164 .send(DbMessage::CreateDownloadTask {
165 task_name,
166 download_url,
167 total_size,
168 target_path,
169 file_hash,
170 respond_to,
171 })
172 .await
173 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
174
175 receiver
176 .await
177 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
178 }
179
180 pub async fn update_download_task_status(
182 &self,
183 task_id: i64,
184 status: &str,
185 downloaded_size: Option<i64>,
186 error_message: Option<String>,
187 ) -> Result<()> {
188 let (respond_to, receiver) = oneshot::channel();
189
190 self.sender
191 .send(DbMessage::UpdateDownloadTaskStatus {
192 task_id,
193 status: status.to_string(),
194 downloaded_size,
195 error_message,
196 respond_to,
197 })
198 .await
199 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
200
201 receiver
202 .await
203 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
204 }
205
206 pub async fn complete_download_task(
208 &self,
209 task_id: i64,
210 average_speed: Option<i64>,
211 total_duration: Option<i32>,
212 ) -> Result<()> {
213 let (respond_to, receiver) = oneshot::channel();
214
215 self.sender
216 .send(DbMessage::CompleteDownloadTask {
217 task_id,
218 average_speed,
219 total_duration,
220 respond_to,
221 })
222 .await
223 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
224
225 receiver
226 .await
227 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
228 }
229
230 pub async fn get_download_task(&self, task_id: i64) -> Result<Option<DownloadTaskRecord>> {
232 let (respond_to, receiver) = oneshot::channel();
233
234 self.sender
235 .send(DbMessage::GetDownloadTask {
236 task_id,
237 respond_to,
238 })
239 .await
240 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
241
242 receiver
243 .await
244 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
245 }
246
247 pub async fn get_active_download_tasks(&self) -> Result<Vec<DownloadTaskRecord>> {
249 let (respond_to, receiver) = oneshot::channel();
250
251 self.sender
252 .send(DbMessage::GetActiveDownloadTasks { respond_to })
253 .await
254 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
255
256 receiver
257 .await
258 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
259 }
260
261 pub async fn update_app_state(
265 &self,
266 state: &str,
267 state_data: Option<String>,
268 error_message: Option<String>,
269 ) -> Result<()> {
270 let (respond_to, receiver) = oneshot::channel();
271
272 self.sender
273 .send(DbMessage::UpdateAppState {
274 state: state.to_string(),
275 state_data,
276 error_message,
277 respond_to,
278 })
279 .await
280 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
281
282 receiver
283 .await
284 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
285 }
286
287 pub async fn get_app_state(&self) -> Result<Option<AppStateRecord>> {
289 let (respond_to, receiver) = oneshot::channel();
290
291 self.sender
292 .send(DbMessage::GetAppState { respond_to })
293 .await
294 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
295
296 receiver
297 .await
298 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
299 }
300
301 pub async fn record_user_action(
305 &self,
306 action_type: &str,
307 action_description: &str,
308 action_params: Option<String>,
309 ) -> Result<i64> {
310 let (respond_to, receiver) = oneshot::channel();
311
312 self.sender
313 .send(DbMessage::RecordUserAction {
314 action_type: action_type.to_string(),
315 action_description: action_description.to_string(),
316 action_params,
317 respond_to,
318 })
319 .await
320 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
321
322 receiver
323 .await
324 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
325 }
326
327 pub async fn complete_user_action(
329 &self,
330 action_id: i64,
331 status: &str,
332 result_message: Option<String>,
333 duration_seconds: Option<i32>,
334 ) -> Result<()> {
335 let (respond_to, receiver) = oneshot::channel();
336
337 self.sender
338 .send(DbMessage::CompleteUserAction {
339 action_id,
340 status: status.to_string(),
341 result_message,
342 duration_seconds,
343 respond_to,
344 })
345 .await
346 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
347
348 receiver
349 .await
350 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
351 }
352
353 pub async fn get_user_actions(&self, limit: Option<i32>) -> Result<Vec<UserActionRecord>> {
355 let (respond_to, receiver) = oneshot::channel();
356
357 self.sender
358 .send(DbMessage::GetUserActions { limit, respond_to })
359 .await
360 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
361
362 receiver
363 .await
364 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
365 }
366
367 pub async fn create_backup_record(
371 &self,
372 file_path: String,
373 service_version: String,
374 backup_type: &str,
375 status: &str,
376 ) -> Result<i64> {
377 let (respond_to, receiver) = oneshot::channel();
378
379 self.sender
380 .send(DbMessage::CreateBackupRecord {
381 file_path,
382 service_version,
383 backup_type: backup_type.to_string(),
384 status: status.to_string(),
385 respond_to,
386 })
387 .await
388 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
389
390 receiver
391 .await
392 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
393 }
394
395 pub async fn get_all_backups(&self) -> Result<Vec<BackupRecord>> {
397 let (respond_to, receiver) = oneshot::channel();
398
399 self.sender
400 .send(DbMessage::GetAllBackups { respond_to })
401 .await
402 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
403
404 receiver
405 .await
406 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
407 }
408
409 pub async fn get_backup_by_id(&self, id: i64) -> Result<Option<BackupRecord>> {
411 let (respond_to, receiver) = oneshot::channel();
412
413 self.sender
414 .send(DbMessage::GetBackupById { id, respond_to })
415 .await
416 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
417
418 receiver
419 .await
420 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
421 }
422
423 pub async fn delete_backup_record(&self, backup_id: i64) -> Result<()> {
425 let (respond_to, receiver) = oneshot::channel();
426
427 self.sender
428 .send(DbMessage::DeleteBackupRecord {
429 backup_id,
430 respond_to,
431 })
432 .await
433 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
434
435 receiver
436 .await
437 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
438 }
439
440 pub async fn update_backup_file_path(&self, backup_id: i64, new_path: String) -> Result<()> {
442 let (respond_to, receiver) = oneshot::channel();
443
444 self.sender
445 .send(DbMessage::UpdateBackupFilePath {
446 backup_id,
447 new_path,
448 respond_to,
449 })
450 .await
451 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
452
453 receiver
454 .await
455 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
456 }
457
458 pub async fn create_scheduled_task(
460 &self,
461 task_type: &str,
462 target_version: String,
463 scheduled_at: DateTime<Utc>,
464 ) -> Result<i64> {
465 self.cancel_pending_tasks(task_type).await?;
467
468 let (respond_to, receiver) = oneshot::channel();
469
470 self.sender
471 .send(DbMessage::CreateScheduledTask {
472 task_type: task_type.to_string(),
473 target_version,
474 scheduled_at,
475 respond_to,
476 })
477 .await
478 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
479
480 receiver
481 .await
482 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
483 }
484
485 pub async fn get_pending_tasks(&self) -> Result<Vec<ScheduledTask>> {
487 let (respond_to, receiver) = oneshot::channel();
488
489 self.sender
490 .send(DbMessage::GetPendingTasks { respond_to })
491 .await
492 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
493
494 receiver
495 .await
496 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
497 }
498
499 pub async fn update_task_status(
501 &self,
502 task_id: i64,
503 status: &str,
504 details: Option<String>,
505 ) -> Result<()> {
506 let (respond_to, receiver) = oneshot::channel();
507
508 self.sender
509 .send(DbMessage::UpdateTaskStatus {
510 task_id,
511 status: status.to_string(),
512 details,
513 respond_to,
514 })
515 .await
516 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
517
518 receiver
519 .await
520 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
521 }
522
523 async fn cancel_pending_tasks(&self, task_type: &str) -> Result<()> {
525 let (respond_to, receiver) = oneshot::channel();
526
527 self.sender
528 .send(DbMessage::CancelPendingTasks {
529 task_type: task_type.to_string(),
530 respond_to,
531 })
532 .await
533 .map_err(|_| DuckError::Custom("Database actor has been closed".to_string()))?;
534
535 receiver
536 .await
537 .map_err(|_| DuckError::Custom("Timed out waiting for database response".to_string()))?
538 }
539}