1use crate::DatabaseManager;
2use crate::database::Database;
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tracing::{debug, warn};
10#[derive(Debug, Clone, PartialEq)]
14pub enum ConfigType {
15 String,
16 Number,
17 Boolean,
18 Object,
19 Array,
20}
21
22impl ConfigType {
23 pub fn as_str(&self) -> &'static str {
24 match self {
25 ConfigType::String => "STRING",
26 ConfigType::Number => "NUMBER",
27 ConfigType::Boolean => "BOOLEAN",
28 ConfigType::Object => "OBJECT",
29 ConfigType::Array => "ARRAY",
30 }
31 }
32
33 pub fn from_str(s: &str) -> Option<Self> {
34 match s {
35 "STRING" => Some(ConfigType::String),
36 "NUMBER" => Some(ConfigType::Number),
37 "BOOLEAN" => Some(ConfigType::Boolean),
38 "OBJECT" => Some(ConfigType::Object),
39 "ARRAY" => Some(ConfigType::Array),
40 _ => None,
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct ConfigItem {
48 pub key: String,
49 pub value: Value,
50 pub config_type: ConfigType,
51 pub category: String,
52 pub description: Option<String>,
53 pub is_system_config: bool,
54 pub is_user_editable: bool,
55 pub validation_rule: Option<String>,
56 pub default_value: Option<Value>,
57}
58
59#[derive(Debug, Clone)]
61pub struct ConfigUpdateRequest {
62 pub key: String,
63 pub value: Value,
64 pub validate: bool,
65}
66
67pub enum DatabaseConnection {
69 DatabaseManager(Arc<DatabaseManager>),
70 Database(Arc<Database>),
71}
72
73impl DatabaseConnection {
74 pub async fn read_with_retry<F, R>(&self, operation: F) -> Result<R>
76 where
77 F: Fn(&duckdb::Connection) -> duckdb::Result<R> + Send + Sync,
78 R: Send,
79 {
80 match self {
81 DatabaseConnection::DatabaseManager(db) => db.read_with_retry(operation).await,
82 DatabaseConnection::Database(_db) => {
83 Err(anyhow::anyhow!(
85 "Configuration management is not supported for legacy database connections yet"
86 ))
87 }
88 }
89 }
90
91 pub async fn write_with_retry<F, R>(&self, operation: F) -> Result<R>
93 where
94 F: Fn(&duckdb::Connection) -> duckdb::Result<R> + Send + Sync,
95 R: Send,
96 {
97 match self {
98 DatabaseConnection::DatabaseManager(db) => db.write_with_retry(operation).await,
99 DatabaseConnection::Database(_db) => {
100 Err(anyhow::anyhow!(
102 "Configuration management is not supported for legacy database connections yet"
103 ))
104 }
105 }
106 }
107
108 pub async fn batch_write_with_retry<F, R>(&self, operations: F) -> Result<R>
110 where
111 F: Fn(&duckdb::Connection) -> duckdb::Result<R> + Send + Sync,
112 R: Send,
113 {
114 match self {
115 DatabaseConnection::DatabaseManager(db) => db.batch_write_with_retry(operations).await,
116 DatabaseConnection::Database(_db) => {
117 Err(anyhow::anyhow!(
119 "Configuration management is not supported for legacy database connections yet"
120 ))
121 }
122 }
123 }
124}
125
126pub struct ConfigManager {
135 db: DatabaseConnection,
136 cache: Arc<RwLock<HashMap<String, ConfigItem>>>,
138 cache_initialized: Arc<RwLock<bool>>,
140}
141
142impl ConfigManager {
143 pub fn new(db: Arc<DatabaseManager>) -> Self {
145 Self {
146 db: DatabaseConnection::DatabaseManager(db),
147 cache: Arc::new(RwLock::new(HashMap::new())),
148 cache_initialized: Arc::new(RwLock::new(false)),
149 }
150 }
151
152 pub fn new_with_database(db: Arc<Database>) -> Self {
154 Self {
155 db: DatabaseConnection::Database(db),
156 cache: Arc::new(RwLock::new(HashMap::new())),
157 cache_initialized: Arc::new(RwLock::new(false)),
158 }
159 }
160
161 pub async fn initialize_cache(&self) -> Result<()> {
163 debug!("Initializing configuration cache...");
164
165 let configs = match &self.db {
166 DatabaseConnection::DatabaseManager(db) => {
167 db.read_with_retry(|conn| {
168 let mut stmt = conn.prepare(
169 "SELECT config_key, config_value, config_type, category, description,
170 is_system_config, is_user_editable, validation_rule, default_value
171 FROM app_config",
172 )?;
173
174 let config_iter = stmt.query_map([], |row| {
175 let key: String = row.get(0)?;
176 let value_str: String = row.get(1)?;
177 let type_str: String = row.get(2)?;
178 let category: String = row.get(3)?;
179 let description: Option<String> = row.get(4)?;
180 let is_system: bool = row.get(5)?;
181 let is_editable: bool = row.get(6)?;
182 let validation: Option<String> = row.get(7)?;
183 let default_str: Option<String> = row.get(8)?;
184
185 let value: Value = serde_json::from_str(&value_str).map_err(|e| {
187 duckdb::Error::InvalidParameterName(format!(
188 "Failed to parse JSON: {e}"
189 ))
190 })?;
191
192 let default_value = if let Some(default_str) = default_str {
193 Some(serde_json::from_str(&default_str).map_err(|e| {
194 duckdb::Error::InvalidParameterName(format!(
195 "Failed to parse default value JSON: {e}"
196 ))
197 })?)
198 } else {
199 None
200 };
201
202 let config_type = ConfigType::from_str(&type_str).ok_or_else(|| {
203 duckdb::Error::InvalidParameterName(format!(
204 "Invalid config type: {type_str}"
205 ))
206 })?;
207
208 Ok(ConfigItem {
209 key: key.clone(),
210 value,
211 config_type,
212 category,
213 description,
214 is_system_config: is_system,
215 is_user_editable: is_editable,
216 validation_rule: validation,
217 default_value,
218 })
219 })?;
220
221 let mut configs = Vec::new();
222 for config in config_iter {
223 configs.push(config?);
224 }
225 Ok(configs)
226 })
227 .await?
228 }
229 DatabaseConnection::Database(_db) => {
230 warn!("Traditional database connection does not support configuration management");
233 Vec::new()
234 }
235 };
236
237 let mut cache = self.cache.write().await;
239 cache.clear();
240 for config in configs {
241 cache.insert(config.key.clone(), config);
242 }
243
244 *self.cache_initialized.write().await = true;
246
247 debug!("Configuration cache initialized, loaded {} config items", cache.len());
248 Ok(())
249 }
250
251 async fn ensure_cache_initialized(&self) -> Result<()> {
253 let initialized = *self.cache_initialized.read().await;
254 if !initialized {
255 self.initialize_cache().await?;
256 }
257 Ok(())
258 }
259
260 pub async fn get_string(&self, key: &str) -> Result<Option<String>> {
262 self.ensure_cache_initialized().await?;
263
264 let cache = self.cache.read().await;
265 if let Some(config) = cache.get(key) {
266 match &config.value {
267 Value::String(s) => Ok(Some(s.clone())),
268 _ => {
269 warn!("Config item {} is not a string type: {:?}", key, config.value);
270 Ok(None)
271 }
272 }
273 } else {
274 debug!("Config item {} does not exist", key);
275 Ok(None)
276 }
277 }
278
279 pub async fn get_number(&self, key: &str) -> Result<Option<f64>> {
281 self.ensure_cache_initialized().await?;
282
283 let cache = self.cache.read().await;
284 if let Some(config) = cache.get(key) {
285 match &config.value {
286 Value::Number(n) => Ok(n.as_f64()),
287 _ => {
288 warn!("Config item {} is not a numeric type: {:?}", key, config.value);
289 Ok(None)
290 }
291 }
292 } else {
293 debug!("Config item {} does not exist", key);
294 Ok(None)
295 }
296 }
297
298 pub async fn get_integer(&self, key: &str) -> Result<Option<i64>> {
300 self.ensure_cache_initialized().await?;
301
302 let cache = self.cache.read().await;
303 if let Some(config) = cache.get(key) {
304 match &config.value {
305 Value::Number(n) => Ok(n.as_i64()),
306 _ => {
307 warn!("Config item {} is not a numeric type: {:?}", key, config.value);
308 Ok(None)
309 }
310 }
311 } else {
312 debug!("Config item {} does not exist", key);
313 Ok(None)
314 }
315 }
316
317 pub async fn get_bool(&self, key: &str) -> Result<Option<bool>> {
319 self.ensure_cache_initialized().await?;
320
321 let cache = self.cache.read().await;
322 if let Some(config) = cache.get(key) {
323 match &config.value {
324 Value::Bool(b) => Ok(Some(*b)),
325 _ => {
326 warn!("Config item {} is not a boolean type: {:?}", key, config.value);
327 Ok(None)
328 }
329 }
330 } else {
331 debug!("Config item {} does not exist", key);
332 Ok(None)
333 }
334 }
335
336 pub async fn get_object(&self, key: &str) -> Result<Option<Value>> {
338 self.ensure_cache_initialized().await?;
339
340 let cache = self.cache.read().await;
341 if let Some(config) = cache.get(key) {
342 match &config.value {
343 Value::Object(_) => Ok(Some(config.value.clone())),
344 _ => {
345 warn!("Config item {} is not an object type: {:?}", key, config.value);
346 Ok(None)
347 }
348 }
349 } else {
350 debug!("Config item {} does not exist", key);
351 Ok(None)
352 }
353 }
354
355 pub async fn get_array(&self, key: &str) -> Result<Option<Vec<Value>>> {
357 self.ensure_cache_initialized().await?;
358
359 let cache = self.cache.read().await;
360 if let Some(config) = cache.get(key) {
361 match &config.value {
362 Value::Array(arr) => Ok(Some(arr.clone())),
363 _ => {
364 warn!("Config item {} is not an array type: {:?}", key, config.value);
365 Ok(None)
366 }
367 }
368 } else {
369 debug!("Config item {} does not exist", key);
370 Ok(None)
371 }
372 }
373
374 pub async fn get_config(&self, key: &str) -> Result<Option<ConfigItem>> {
376 self.ensure_cache_initialized().await?;
377
378 let cache = self.cache.read().await;
379 Ok(cache.get(key).cloned())
380 }
381
382 pub async fn get_configs_by_category(&self, category: &str) -> Result<Vec<ConfigItem>> {
384 self.ensure_cache_initialized().await?;
385
386 let cache = self.cache.read().await;
387 Ok(cache
388 .values()
389 .filter(|config| config.category == category)
390 .cloned()
391 .collect())
392 }
393
394 pub async fn get_user_editable_configs(&self) -> Result<Vec<ConfigItem>> {
396 self.ensure_cache_initialized().await?;
397
398 let cache = self.cache.read().await;
399 Ok(cache
400 .values()
401 .filter(|config| config.is_user_editable)
402 .cloned()
403 .collect())
404 }
405
406 pub async fn update_config(&self, key: &str, value: Value) -> Result<()> {
408 self.ensure_cache_initialized().await?;
409
410 let is_editable = {
412 let cache = self.cache.read().await;
413 if let Some(config) = cache.get(key) {
414 if !config.is_user_editable {
415 return Err(anyhow::anyhow!("Config item {key} is not editable"));
416 }
417 config.is_user_editable
418 } else {
419 return Err(anyhow::anyhow!("Config item {key} does not exist"));
420 }
421 };
422
423 if !is_editable {
424 return Err(anyhow::anyhow!("Config item {key} is not editable"));
425 }
426
427 let expected_type = {
429 let cache = self.cache.read().await;
430 cache.get(key).map(|config| config.config_type.clone())
431 };
432
433 if let Some(expected_type) = expected_type {
434 if !self.validate_value_type(&value, &expected_type) {
435 return Err(anyhow::anyhow!(
436 "Config item {key} has mismatched value type: expected {expected_type:?}, actual {value:?}"
437 ));
438 }
439 }
440
441 let value_json = serde_json::to_string(&value)?;
443 self.db.write_with_retry(|conn| {
444 conn.execute(
445 "UPDATE app_config SET config_value = ?, updated_at = CURRENT_TIMESTAMP WHERE config_key = ?",
446 [&value_json, key]
447 )?;
448 Ok(())
449 }).await?;
450
451 let mut cache = self.cache.write().await;
453 if let Some(config) = cache.get_mut(key) {
454 config.value = value;
455 }
456
457 debug!("Config item {} updated successfully", key);
458 Ok(())
459 }
460
461 pub async fn update_configs(&self, updates: Vec<ConfigUpdateRequest>) -> Result<()> {
463 self.ensure_cache_initialized().await?;
464
465 for update in &updates {
467 let cache = self.cache.read().await;
469 if let Some(config) = cache.get(&update.key) {
470 if !config.is_user_editable {
471 return Err(anyhow::anyhow!("Config item {} is not editable", update.key));
472 }
473
474 if update.validate && !self.validate_value_type(&update.value, &config.config_type)
476 {
477 return Err(anyhow::anyhow!(
478 "Config item {} has mismatched value type",
479 update.key
480 ));
481 }
482 } else {
483 return Err(anyhow::anyhow!("Config item {} does not exist", update.key));
484 }
485 }
486
487 self.db.batch_write_with_retry(|conn| {
489 for update in &updates {
490 let value_json = serde_json::to_string(&update.value)
491 .map_err(|e| {
492 duckdb::Error::InvalidParameterName(format!(
493 "JSON serialization failed: {e}"
494 ))
495 })?;
496
497 conn.execute(
498 "UPDATE app_config SET config_value = ?, updated_at = CURRENT_TIMESTAMP WHERE config_key = ?",
499 [&value_json, &update.key]
500 )?;
501 }
502 Ok(())
503 }).await?;
504
505 let mut cache = self.cache.write().await;
507 for update in updates {
508 if let Some(config) = cache.get_mut(&update.key) {
509 config.value = update.value;
510 }
511 }
512
513 debug!("Batch configuration update successful");
514 Ok(())
515 }
516
517 pub async fn reset_config_to_default(&self, key: &str) -> Result<()> {
519 self.ensure_cache_initialized().await?;
520
521 let default_value = {
522 let cache = self.cache.read().await;
523 if let Some(config) = cache.get(key) {
524 if !config.is_user_editable {
525 return Err(anyhow::anyhow!("Config item {key} is not editable"));
526 }
527 config.default_value.clone()
528 } else {
529 return Err(anyhow::anyhow!("Config item {key} does not exist"));
530 }
531 };
532
533 if let Some(default_value) = default_value {
534 self.update_config(key, default_value).await
535 } else {
536 Err(anyhow::anyhow!("Config item {key} does not have a default value"))
537 }
538 }
539
540 pub async fn refresh_cache(&self) -> Result<()> {
542 *self.cache_initialized.write().await = false;
543 self.initialize_cache().await
544 }
545
546 pub async fn get_config_stats(&self) -> Result<ConfigStats> {
548 self.ensure_cache_initialized().await?;
549
550 let cache = self.cache.read().await;
551 let total_count = cache.len();
552 let editable_count = cache.values().filter(|c| c.is_user_editable).count();
553 let system_count = cache.values().filter(|c| c.is_system_config).count();
554
555 let mut category_stats = HashMap::new();
557 for config in cache.values() {
558 *category_stats.entry(config.category.clone()).or_insert(0) += 1;
559 }
560
561 Ok(ConfigStats {
562 total_count,
563 editable_count,
564 system_count,
565 category_stats,
566 })
567 }
568
569 pub async fn update_last_backup_time(
573 &self,
574 backup_time: chrono::DateTime<chrono::Utc>,
575 success: bool,
576 ) -> Result<()> {
577 let time_value = Value::String(backup_time.to_rfc3339());
578 self.update_config("auto_backup_last_time", time_value)
579 .await?;
580
581 if success {
582 let status_value = Value::String("success".to_string());
583 self.update_config("auto_backup_last_status", status_value)
584 .await?;
585 } else {
586 let status_value = Value::String("failed".to_string());
587 self.update_config("auto_backup_last_status", status_value)
588 .await?;
589 }
590
591 Ok(())
592 }
593
594 pub async fn set_auto_backup_cron(&self, cron_expr: &str) -> Result<()> {
596 let value = Value::String(cron_expr.to_string());
597 self.update_config("auto_backup_schedule", value).await
598 }
599
600 pub async fn set_auto_backup_enabled(&self, enabled: bool) -> Result<()> {
602 let value = Value::Bool(enabled);
603 self.update_config("auto_backup_enabled", value).await
604 }
605
606 pub async fn get_auto_backup_config(&self) -> Result<AutoBackupConfig> {
608 let enabled = self.get_bool("auto_backup_enabled").await?.unwrap_or(false);
609 let cron_expr = self
610 .get_string("auto_backup_schedule")
611 .await?
612 .unwrap_or("0 2 * * *".to_string());
613 let retention_days = self
614 .get_integer("auto_backup_retention_days")
615 .await?
616 .unwrap_or(7) as i32;
617 let backup_dir = self
618 .get_string("auto_backup_directory")
619 .await?
620 .unwrap_or("./backups".to_string());
621
622 let last_backup_time =
623 if let Some(time_str) = self.get_string("auto_backup_last_time").await? {
624 chrono::DateTime::parse_from_rfc3339(&time_str)
625 .map(|dt| dt.with_timezone(&chrono::Utc))
626 .ok()
627 } else {
628 None
629 };
630
631 Ok(AutoBackupConfig {
632 enabled,
633 cron_expression: cron_expr,
634 last_backup_time,
635 backup_retention_days: retention_days,
636 backup_directory: backup_dir,
637 })
638 }
639
640 pub async fn create_auto_upgrade_task(&self, task: &AutoUpgradeTask) -> Result<()> {
642 let _task_json = serde_json::to_value(task)?;
643
644 self.db.write_with_retry(|conn| {
646 conn.execute(
647 r#"INSERT OR REPLACE INTO auto_upgrade_tasks
648 (task_id, task_name, schedule_time, upgrade_type, target_version, status, progress, error_message, created_at, updated_at)
649 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"#,
650 [
651 &task.task_id,
652 &task.task_name,
653 &task.schedule_time.to_rfc3339(),
654 &task.upgrade_type,
655task.target_version.as_deref().unwrap_or(""),
656 &task.status,
657 &task.progress.map(|p| p.to_string()).unwrap_or_default(),
658 task.error_message.as_deref().unwrap_or(""),
659 &task.created_at.to_rfc3339(),
660 &task.updated_at.to_rfc3339(),
661 ]
662 )?;
663 Ok(())
664 }).await?;
665
666 debug!("Auto upgrade task {} created successfully", task.task_id);
667 Ok(())
668 }
669
670 pub async fn update_upgrade_task_status(
672 &self,
673 task_id: &str,
674 status: &str,
675 progress: Option<i32>,
676 error_message: Option<&str>,
677 ) -> Result<()> {
678 self.db
679 .write_with_retry(|conn| {
680 conn.execute(
681 r#"UPDATE auto_upgrade_tasks
682 SET status = ?1, progress = ?2, error_message = ?3, updated_at = ?4
683 WHERE task_id = ?5"#,
684 [
685 status,
686 &progress.map(|p| p.to_string()).unwrap_or_default(),
687 error_message.unwrap_or(""),
688 &chrono::Utc::now().to_rfc3339(),
689 task_id,
690 ],
691 )?;
692 Ok(())
693 })
694 .await?;
695
696 debug!("Upgrade task {} status updated to: {}", task_id, status);
697 Ok(())
698 }
699
700 pub async fn get_pending_upgrade_tasks(&self) -> Result<Vec<AutoUpgradeTask>> {
702 self.db
703 .read_with_retry(|conn| {
704 let mut stmt = conn.prepare(
705 r#"SELECT task_id, task_name, schedule_time, upgrade_type, target_version,
706 status, progress, error_message, created_at, updated_at
707 FROM auto_upgrade_tasks
708 WHERE status IN ('pending', 'in_progress')
709 ORDER BY schedule_time ASC"#,
710 )?;
711
712 let tasks = stmt.query_map([], |row| {
713 let schedule_time_str: String = row.get("schedule_time")?;
714 let created_at_str: String = row.get("created_at")?;
715 let updated_at_str: String = row.get("updated_at")?;
716 let progress_str: String = row.get("progress")?;
717 let target_version: String = row.get("target_version")?;
718 let error_msg: String = row.get("error_message")?;
719
720 Ok(AutoUpgradeTask {
721 task_id: row.get("task_id")?,
722 task_name: row.get("task_name")?,
723 schedule_time: chrono::DateTime::parse_from_rfc3339(&schedule_time_str)
724 .map_err(|_| {
725 duckdb::Error::InvalidColumnType(
726 0,
727 "schedule_time".to_string(),
728 duckdb::types::Type::Text,
729 )
730 })?
731 .with_timezone(&chrono::Utc),
732 upgrade_type: row.get("upgrade_type")?,
733 target_version: if target_version.is_empty() {
734 None
735 } else {
736 Some(target_version)
737 },
738 status: row.get("status")?,
739 progress: if progress_str.is_empty() {
740 None
741 } else {
742 progress_str.parse().ok()
743 },
744 error_message: if error_msg.is_empty() {
745 None
746 } else {
747 Some(error_msg)
748 },
749 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
750 .map_err(|_| {
751 duckdb::Error::InvalidColumnType(
752 0,
753 "created_at".to_string(),
754 duckdb::types::Type::Text,
755 )
756 })?
757 .with_timezone(&chrono::Utc),
758 updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at_str)
759 .map_err(|_| {
760 duckdb::Error::InvalidColumnType(
761 0,
762 "updated_at".to_string(),
763 duckdb::types::Type::Text,
764 )
765 })?
766 .with_timezone(&chrono::Utc),
767 })
768 })?;
769
770 let mut result = Vec::new();
771 for task in tasks {
772 result.push(task?);
773 }
774 Ok(result)
775 })
776 .await
777 }
778
779 fn validate_value_type(&self, value: &Value, expected_type: &ConfigType) -> bool {
781 match (value, expected_type) {
782 (Value::String(_), ConfigType::String) => true,
783 (Value::Number(_), ConfigType::Number) => true,
784 (Value::Bool(_), ConfigType::Boolean) => true,
785 (Value::Object(_), ConfigType::Object) => true,
786 (Value::Array(_), ConfigType::Array) => true,
787 _ => false,
788 }
789 }
790}
791
792#[derive(Debug, Clone)]
794pub struct ConfigStats {
795 pub total_count: usize,
796 pub editable_count: usize,
797 pub system_count: usize,
798 pub category_stats: HashMap<String, usize>,
799}
800
801#[derive(Debug, Clone, Serialize, Deserialize)]
804pub struct AutoUpgradeTask {
805 pub task_id: String,
806 pub task_name: String,
807 pub schedule_time: chrono::DateTime<chrono::Utc>,
808 pub upgrade_type: String,
809 pub target_version: Option<String>,
810 pub status: String,
811 pub progress: Option<i32>,
812 pub error_message: Option<String>,
813 pub created_at: chrono::DateTime<chrono::Utc>,
814 pub updated_at: chrono::DateTime<chrono::Utc>,
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize)]
818pub struct AutoBackupConfig {
819 pub enabled: bool,
820 pub cron_expression: String,
821 pub last_backup_time: Option<chrono::DateTime<chrono::Utc>>,
822 pub backup_retention_days: i32,
823 pub backup_directory: String,
824}