1use crate::DatabaseManager;
2use crate::database::Database;
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tracing::{debug, warn};
10#[derive(Debug, Clone, PartialEq)]
14pub enum ConfigType {
15 String,
16 Number,
17 Boolean,
18 Object,
19 Array,
20}
21
22impl ConfigType {
23 pub fn as_str(&self) -> &'static str {
24 match self {
25 ConfigType::String => "STRING",
26 ConfigType::Number => "NUMBER",
27 ConfigType::Boolean => "BOOLEAN",
28 ConfigType::Object => "OBJECT",
29 ConfigType::Array => "ARRAY",
30 }
31 }
32
33 pub fn parse(s: &str) -> Option<Self> {
34 match s {
35 "STRING" => Some(ConfigType::String),
36 "NUMBER" => Some(ConfigType::Number),
37 "BOOLEAN" => Some(ConfigType::Boolean),
38 "OBJECT" => Some(ConfigType::Object),
39 "ARRAY" => Some(ConfigType::Array),
40 _ => None,
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct ConfigItem {
48 pub key: String,
49 pub value: Value,
50 pub config_type: ConfigType,
51 pub category: String,
52 pub description: Option<String>,
53 pub is_system_config: bool,
54 pub is_user_editable: bool,
55 pub validation_rule: Option<String>,
56 pub default_value: Option<Value>,
57}
58
59#[derive(Debug, Clone)]
61pub struct ConfigUpdateRequest {
62 pub key: String,
63 pub value: Value,
64 pub validate: bool,
65}
66
67pub enum DatabaseConnection {
69 DatabaseManager(Arc<DatabaseManager>),
70 Database(Arc<Database>),
71}
72
73impl DatabaseConnection {
74 pub async fn read_with_retry<F, R>(&self, operation: F) -> Result<R>
76 where
77 F: Fn(&duckdb::Connection) -> duckdb::Result<R> + Send + Sync,
78 R: Send,
79 {
80 match self {
81 DatabaseConnection::DatabaseManager(db) => db.read_with_retry(operation).await,
82 DatabaseConnection::Database(_db) => {
83 Err(anyhow::anyhow!(
85 "Configuration management is not supported for legacy database connections yet"
86 ))
87 }
88 }
89 }
90
91 pub async fn write_with_retry<F, R>(&self, operation: F) -> Result<R>
93 where
94 F: Fn(&duckdb::Connection) -> duckdb::Result<R> + Send + Sync,
95 R: Send,
96 {
97 match self {
98 DatabaseConnection::DatabaseManager(db) => db.write_with_retry(operation).await,
99 DatabaseConnection::Database(_db) => {
100 Err(anyhow::anyhow!(
102 "Configuration management is not supported for legacy database connections yet"
103 ))
104 }
105 }
106 }
107
108 pub async fn batch_write_with_retry<F, R>(&self, operations: F) -> Result<R>
110 where
111 F: Fn(&duckdb::Connection) -> duckdb::Result<R> + Send + Sync,
112 R: Send,
113 {
114 match self {
115 DatabaseConnection::DatabaseManager(db) => db.batch_write_with_retry(operations).await,
116 DatabaseConnection::Database(_db) => {
117 Err(anyhow::anyhow!(
119 "Configuration management is not supported for legacy database connections yet"
120 ))
121 }
122 }
123 }
124}
125
126pub struct ConfigManager {
135 db: DatabaseConnection,
136 cache: Arc<RwLock<HashMap<String, ConfigItem>>>,
138 cache_initialized: Arc<RwLock<bool>>,
140}
141
142impl ConfigManager {
143 pub fn new(db: Arc<DatabaseManager>) -> Self {
145 Self {
146 db: DatabaseConnection::DatabaseManager(db),
147 cache: Arc::new(RwLock::new(HashMap::new())),
148 cache_initialized: Arc::new(RwLock::new(false)),
149 }
150 }
151
152 pub fn new_with_database(db: Arc<Database>) -> Self {
154 Self {
155 db: DatabaseConnection::Database(db),
156 cache: Arc::new(RwLock::new(HashMap::new())),
157 cache_initialized: Arc::new(RwLock::new(false)),
158 }
159 }
160
161 pub async fn initialize_cache(&self) -> Result<()> {
163 debug!("Initializing configuration cache...");
164
165 let configs = match &self.db {
166 DatabaseConnection::DatabaseManager(db) => {
167 db.read_with_retry(|conn| {
168 let mut stmt = conn.prepare(
169 "SELECT config_key, config_value, config_type, category, description,
170 is_system_config, is_user_editable, validation_rule, default_value
171 FROM app_config",
172 )?;
173
174 let config_iter = stmt.query_map([], |row| {
175 let key: String = row.get(0)?;
176 let value_str: String = row.get(1)?;
177 let type_str: String = row.get(2)?;
178 let category: String = row.get(3)?;
179 let description: Option<String> = row.get(4)?;
180 let is_system: bool = row.get(5)?;
181 let is_editable: bool = row.get(6)?;
182 let validation: Option<String> = row.get(7)?;
183 let default_str: Option<String> = row.get(8)?;
184
185 let value: Value = serde_json::from_str(&value_str).map_err(|e| {
187 duckdb::Error::InvalidParameterName(format!(
188 "Failed to parse JSON: {e}"
189 ))
190 })?;
191
192 let default_value = if let Some(default_str) = default_str {
193 Some(serde_json::from_str(&default_str).map_err(|e| {
194 duckdb::Error::InvalidParameterName(format!(
195 "Failed to parse default value JSON: {e}"
196 ))
197 })?)
198 } else {
199 None
200 };
201
202 let config_type = ConfigType::parse(&type_str).ok_or_else(|| {
203 duckdb::Error::InvalidParameterName(format!(
204 "Invalid config type: {type_str}"
205 ))
206 })?;
207
208 Ok(ConfigItem {
209 key: key.clone(),
210 value,
211 config_type,
212 category,
213 description,
214 is_system_config: is_system,
215 is_user_editable: is_editable,
216 validation_rule: validation,
217 default_value,
218 })
219 })?;
220
221 let mut configs = Vec::new();
222 for config in config_iter {
223 configs.push(config?);
224 }
225 Ok(configs)
226 })
227 .await?
228 }
229 DatabaseConnection::Database(_db) => {
230 warn!("Traditional database connection does not support configuration management");
233 Vec::new()
234 }
235 };
236
237 let mut cache = self.cache.write().await;
239 cache.clear();
240 for config in configs {
241 cache.insert(config.key.clone(), config);
242 }
243
244 *self.cache_initialized.write().await = true;
246
247 debug!(
248 "Configuration cache initialized, loaded {} config items",
249 cache.len()
250 );
251 Ok(())
252 }
253
254 async fn ensure_cache_initialized(&self) -> Result<()> {
256 let initialized = *self.cache_initialized.read().await;
257 if !initialized {
258 self.initialize_cache().await?;
259 }
260 Ok(())
261 }
262
263 pub async fn get_string(&self, key: &str) -> Result<Option<String>> {
265 self.ensure_cache_initialized().await?;
266
267 let cache = self.cache.read().await;
268 if let Some(config) = cache.get(key) {
269 match &config.value {
270 Value::String(s) => Ok(Some(s.clone())),
271 _ => {
272 warn!(
273 "Config item {} is not a string type: {:?}",
274 key, config.value
275 );
276 Ok(None)
277 }
278 }
279 } else {
280 debug!("Config item {} does not exist", key);
281 Ok(None)
282 }
283 }
284
285 pub async fn get_number(&self, key: &str) -> Result<Option<f64>> {
287 self.ensure_cache_initialized().await?;
288
289 let cache = self.cache.read().await;
290 if let Some(config) = cache.get(key) {
291 match &config.value {
292 Value::Number(n) => Ok(n.as_f64()),
293 _ => {
294 warn!(
295 "Config item {} is not a numeric type: {:?}",
296 key, config.value
297 );
298 Ok(None)
299 }
300 }
301 } else {
302 debug!("Config item {} does not exist", key);
303 Ok(None)
304 }
305 }
306
307 pub async fn get_integer(&self, key: &str) -> Result<Option<i64>> {
309 self.ensure_cache_initialized().await?;
310
311 let cache = self.cache.read().await;
312 if let Some(config) = cache.get(key) {
313 match &config.value {
314 Value::Number(n) => Ok(n.as_i64()),
315 _ => {
316 warn!(
317 "Config item {} is not a numeric type: {:?}",
318 key, config.value
319 );
320 Ok(None)
321 }
322 }
323 } else {
324 debug!("Config item {} does not exist", key);
325 Ok(None)
326 }
327 }
328
329 pub async fn get_bool(&self, key: &str) -> Result<Option<bool>> {
331 self.ensure_cache_initialized().await?;
332
333 let cache = self.cache.read().await;
334 if let Some(config) = cache.get(key) {
335 match &config.value {
336 Value::Bool(b) => Ok(Some(*b)),
337 _ => {
338 warn!(
339 "Config item {} is not a boolean type: {:?}",
340 key, config.value
341 );
342 Ok(None)
343 }
344 }
345 } else {
346 debug!("Config item {} does not exist", key);
347 Ok(None)
348 }
349 }
350
351 pub async fn get_object(&self, key: &str) -> Result<Option<Value>> {
353 self.ensure_cache_initialized().await?;
354
355 let cache = self.cache.read().await;
356 if let Some(config) = cache.get(key) {
357 match &config.value {
358 Value::Object(_) => Ok(Some(config.value.clone())),
359 _ => {
360 warn!(
361 "Config item {} is not an object type: {:?}",
362 key, config.value
363 );
364 Ok(None)
365 }
366 }
367 } else {
368 debug!("Config item {} does not exist", key);
369 Ok(None)
370 }
371 }
372
373 pub async fn get_array(&self, key: &str) -> Result<Option<Vec<Value>>> {
375 self.ensure_cache_initialized().await?;
376
377 let cache = self.cache.read().await;
378 if let Some(config) = cache.get(key) {
379 match &config.value {
380 Value::Array(arr) => Ok(Some(arr.clone())),
381 _ => {
382 warn!(
383 "Config item {} is not an array type: {:?}",
384 key, config.value
385 );
386 Ok(None)
387 }
388 }
389 } else {
390 debug!("Config item {} does not exist", key);
391 Ok(None)
392 }
393 }
394
395 pub async fn get_config(&self, key: &str) -> Result<Option<ConfigItem>> {
397 self.ensure_cache_initialized().await?;
398
399 let cache = self.cache.read().await;
400 Ok(cache.get(key).cloned())
401 }
402
403 pub async fn get_configs_by_category(&self, category: &str) -> Result<Vec<ConfigItem>> {
405 self.ensure_cache_initialized().await?;
406
407 let cache = self.cache.read().await;
408 Ok(cache
409 .values()
410 .filter(|config| config.category == category)
411 .cloned()
412 .collect())
413 }
414
415 pub async fn get_user_editable_configs(&self) -> Result<Vec<ConfigItem>> {
417 self.ensure_cache_initialized().await?;
418
419 let cache = self.cache.read().await;
420 Ok(cache
421 .values()
422 .filter(|config| config.is_user_editable)
423 .cloned()
424 .collect())
425 }
426
427 pub async fn update_config(&self, key: &str, value: Value) -> Result<()> {
429 self.ensure_cache_initialized().await?;
430
431 let is_editable = {
433 let cache = self.cache.read().await;
434 if let Some(config) = cache.get(key) {
435 if !config.is_user_editable {
436 return Err(anyhow::anyhow!("Config item {key} is not editable"));
437 }
438 config.is_user_editable
439 } else {
440 return Err(anyhow::anyhow!("Config item {key} does not exist"));
441 }
442 };
443
444 if !is_editable {
445 return Err(anyhow::anyhow!("Config item {key} is not editable"));
446 }
447
448 let expected_type = {
450 let cache = self.cache.read().await;
451 cache.get(key).map(|config| config.config_type.clone())
452 };
453
454 if let Some(expected_type) = expected_type
455 && !self.validate_value_type(&value, &expected_type)
456 {
457 return Err(anyhow::anyhow!(
458 "Config item {key} has mismatched value type: expected {expected_type:?}, actual {value:?}"
459 ));
460 }
461
462 let value_json = serde_json::to_string(&value)?;
464 self.db.write_with_retry(|conn| {
465 conn.execute(
466 "UPDATE app_config SET config_value = ?, updated_at = CURRENT_TIMESTAMP WHERE config_key = ?",
467 [&value_json, key]
468 )?;
469 Ok(())
470 }).await?;
471
472 let mut cache = self.cache.write().await;
474 if let Some(config) = cache.get_mut(key) {
475 config.value = value;
476 }
477
478 debug!("Config item {} updated successfully", key);
479 Ok(())
480 }
481
482 pub async fn update_configs(&self, updates: Vec<ConfigUpdateRequest>) -> Result<()> {
484 self.ensure_cache_initialized().await?;
485
486 for update in &updates {
488 let cache = self.cache.read().await;
490 if let Some(config) = cache.get(&update.key) {
491 if !config.is_user_editable {
492 return Err(anyhow::anyhow!(
493 "Config item {} is not editable",
494 update.key
495 ));
496 }
497
498 if update.validate && !self.validate_value_type(&update.value, &config.config_type)
500 {
501 return Err(anyhow::anyhow!(
502 "Config item {} has mismatched value type",
503 update.key
504 ));
505 }
506 } else {
507 return Err(anyhow::anyhow!("Config item {} does not exist", update.key));
508 }
509 }
510
511 self.db.batch_write_with_retry(|conn| {
513 for update in &updates {
514 let value_json = serde_json::to_string(&update.value)
515 .map_err(|e| {
516 duckdb::Error::InvalidParameterName(format!(
517 "JSON serialization failed: {e}"
518 ))
519 })?;
520
521 conn.execute(
522 "UPDATE app_config SET config_value = ?, updated_at = CURRENT_TIMESTAMP WHERE config_key = ?",
523 [&value_json, &update.key]
524 )?;
525 }
526 Ok(())
527 }).await?;
528
529 let mut cache = self.cache.write().await;
531 for update in updates {
532 if let Some(config) = cache.get_mut(&update.key) {
533 config.value = update.value;
534 }
535 }
536
537 debug!("Batch configuration update successful");
538 Ok(())
539 }
540
541 pub async fn reset_config_to_default(&self, key: &str) -> Result<()> {
543 self.ensure_cache_initialized().await?;
544
545 let default_value = {
546 let cache = self.cache.read().await;
547 if let Some(config) = cache.get(key) {
548 if !config.is_user_editable {
549 return Err(anyhow::anyhow!("Config item {key} is not editable"));
550 }
551 config.default_value.clone()
552 } else {
553 return Err(anyhow::anyhow!("Config item {key} does not exist"));
554 }
555 };
556
557 if let Some(default_value) = default_value {
558 self.update_config(key, default_value).await
559 } else {
560 Err(anyhow::anyhow!(
561 "Config item {key} does not have a default value"
562 ))
563 }
564 }
565
566 pub async fn refresh_cache(&self) -> Result<()> {
568 *self.cache_initialized.write().await = false;
569 self.initialize_cache().await
570 }
571
572 pub async fn get_config_stats(&self) -> Result<ConfigStats> {
574 self.ensure_cache_initialized().await?;
575
576 let cache = self.cache.read().await;
577 let total_count = cache.len();
578 let editable_count = cache.values().filter(|c| c.is_user_editable).count();
579 let system_count = cache.values().filter(|c| c.is_system_config).count();
580
581 let mut category_stats = HashMap::new();
583 for config in cache.values() {
584 *category_stats.entry(config.category.clone()).or_insert(0) += 1;
585 }
586
587 Ok(ConfigStats {
588 total_count,
589 editable_count,
590 system_count,
591 category_stats,
592 })
593 }
594
595 pub async fn update_last_backup_time(
599 &self,
600 backup_time: chrono::DateTime<chrono::Utc>,
601 success: bool,
602 ) -> Result<()> {
603 let time_value = Value::String(backup_time.to_rfc3339());
604 self.update_config("auto_backup_last_time", time_value)
605 .await?;
606
607 if success {
608 let status_value = Value::String("success".to_string());
609 self.update_config("auto_backup_last_status", status_value)
610 .await?;
611 } else {
612 let status_value = Value::String("failed".to_string());
613 self.update_config("auto_backup_last_status", status_value)
614 .await?;
615 }
616
617 Ok(())
618 }
619
620 pub async fn set_auto_backup_cron(&self, cron_expr: &str) -> Result<()> {
622 let value = Value::String(cron_expr.to_string());
623 self.update_config("auto_backup_schedule", value).await
624 }
625
626 pub async fn set_auto_backup_enabled(&self, enabled: bool) -> Result<()> {
628 let value = Value::Bool(enabled);
629 self.update_config("auto_backup_enabled", value).await
630 }
631
632 pub async fn get_auto_backup_config(&self) -> Result<AutoBackupConfig> {
634 let enabled = self.get_bool("auto_backup_enabled").await?.unwrap_or(false);
635 let cron_expr = self
636 .get_string("auto_backup_schedule")
637 .await?
638 .unwrap_or("0 2 * * *".to_string());
639 let retention_days = self
640 .get_integer("auto_backup_retention_days")
641 .await?
642 .unwrap_or(7) as i32;
643 let backup_dir = self
644 .get_string("auto_backup_directory")
645 .await?
646 .unwrap_or("./backups".to_string());
647
648 let last_backup_time =
649 if let Some(time_str) = self.get_string("auto_backup_last_time").await? {
650 chrono::DateTime::parse_from_rfc3339(&time_str)
651 .map(|dt| dt.with_timezone(&chrono::Utc))
652 .ok()
653 } else {
654 None
655 };
656
657 Ok(AutoBackupConfig {
658 enabled,
659 cron_expression: cron_expr,
660 last_backup_time,
661 backup_retention_days: retention_days,
662 backup_directory: backup_dir,
663 })
664 }
665
666 pub async fn create_auto_upgrade_task(&self, task: &AutoUpgradeTask) -> Result<()> {
668 let _task_json = serde_json::to_value(task)?;
669
670 self.db.write_with_retry(|conn| {
672 conn.execute(
673 r#"INSERT OR REPLACE INTO auto_upgrade_tasks
674 (task_id, task_name, schedule_time, upgrade_type, target_version, status, progress, error_message, created_at, updated_at)
675 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"#,
676 [
677 &task.task_id,
678 &task.task_name,
679 &task.schedule_time.to_rfc3339(),
680 &task.upgrade_type,
681task.target_version.as_deref().unwrap_or(""),
682 &task.status,
683 &task.progress.map(|p| p.to_string()).unwrap_or_default(),
684 task.error_message.as_deref().unwrap_or(""),
685 &task.created_at.to_rfc3339(),
686 &task.updated_at.to_rfc3339(),
687 ]
688 )?;
689 Ok(())
690 }).await?;
691
692 debug!("Auto upgrade task {} created successfully", task.task_id);
693 Ok(())
694 }
695
696 pub async fn update_upgrade_task_status(
698 &self,
699 task_id: &str,
700 status: &str,
701 progress: Option<i32>,
702 error_message: Option<&str>,
703 ) -> Result<()> {
704 self.db
705 .write_with_retry(|conn| {
706 conn.execute(
707 r#"UPDATE auto_upgrade_tasks
708 SET status = ?1, progress = ?2, error_message = ?3, updated_at = ?4
709 WHERE task_id = ?5"#,
710 [
711 status,
712 &progress.map(|p| p.to_string()).unwrap_or_default(),
713 error_message.unwrap_or(""),
714 &chrono::Utc::now().to_rfc3339(),
715 task_id,
716 ],
717 )?;
718 Ok(())
719 })
720 .await?;
721
722 debug!("Upgrade task {} status updated to: {}", task_id, status);
723 Ok(())
724 }
725
726 pub async fn get_pending_upgrade_tasks(&self) -> Result<Vec<AutoUpgradeTask>> {
728 self.db
729 .read_with_retry(|conn| {
730 let mut stmt = conn.prepare(
731 r#"SELECT task_id, task_name, schedule_time, upgrade_type, target_version,
732 status, progress, error_message, created_at, updated_at
733 FROM auto_upgrade_tasks
734 WHERE status IN ('pending', 'in_progress')
735 ORDER BY schedule_time ASC"#,
736 )?;
737
738 let tasks = stmt.query_map([], |row| {
739 let schedule_time_str: String = row.get("schedule_time")?;
740 let created_at_str: String = row.get("created_at")?;
741 let updated_at_str: String = row.get("updated_at")?;
742 let progress_str: String = row.get("progress")?;
743 let target_version: String = row.get("target_version")?;
744 let error_msg: String = row.get("error_message")?;
745
746 Ok(AutoUpgradeTask {
747 task_id: row.get("task_id")?,
748 task_name: row.get("task_name")?,
749 schedule_time: chrono::DateTime::parse_from_rfc3339(&schedule_time_str)
750 .map_err(|_| {
751 duckdb::Error::InvalidColumnType(
752 0,
753 "schedule_time".to_string(),
754 duckdb::types::Type::Text,
755 )
756 })?
757 .with_timezone(&chrono::Utc),
758 upgrade_type: row.get("upgrade_type")?,
759 target_version: if target_version.is_empty() {
760 None
761 } else {
762 Some(target_version)
763 },
764 status: row.get("status")?,
765 progress: if progress_str.is_empty() {
766 None
767 } else {
768 progress_str.parse().ok()
769 },
770 error_message: if error_msg.is_empty() {
771 None
772 } else {
773 Some(error_msg)
774 },
775 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
776 .map_err(|_| {
777 duckdb::Error::InvalidColumnType(
778 0,
779 "created_at".to_string(),
780 duckdb::types::Type::Text,
781 )
782 })?
783 .with_timezone(&chrono::Utc),
784 updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at_str)
785 .map_err(|_| {
786 duckdb::Error::InvalidColumnType(
787 0,
788 "updated_at".to_string(),
789 duckdb::types::Type::Text,
790 )
791 })?
792 .with_timezone(&chrono::Utc),
793 })
794 })?;
795
796 let mut result = Vec::new();
797 for task in tasks {
798 result.push(task?);
799 }
800 Ok(result)
801 })
802 .await
803 }
804
805 fn validate_value_type(&self, value: &Value, expected_type: &ConfigType) -> bool {
807 matches!(
808 (value, expected_type),
809 (Value::String(_), ConfigType::String)
810 | (Value::Number(_), ConfigType::Number)
811 | (Value::Bool(_), ConfigType::Boolean)
812 | (Value::Object(_), ConfigType::Object)
813 | (Value::Array(_), ConfigType::Array)
814 )
815 }
816}
817
818#[derive(Debug, Clone)]
820pub struct ConfigStats {
821 pub total_count: usize,
822 pub editable_count: usize,
823 pub system_count: usize,
824 pub category_stats: HashMap<String, usize>,
825}
826
827#[derive(Debug, Clone, Serialize, Deserialize)]
830pub struct AutoUpgradeTask {
831 pub task_id: String,
832 pub task_name: String,
833 pub schedule_time: chrono::DateTime<chrono::Utc>,
834 pub upgrade_type: String,
835 pub target_version: Option<String>,
836 pub status: String,
837 pub progress: Option<i32>,
838 pub error_message: Option<String>,
839 pub created_at: chrono::DateTime<chrono::Utc>,
840 pub updated_at: chrono::DateTime<chrono::Utc>,
841}
842
843#[derive(Debug, Clone, Serialize, Deserialize)]
844pub struct AutoBackupConfig {
845 pub enabled: bool,
846 pub cron_expression: String,
847 pub last_backup_time: Option<chrono::DateTime<chrono::Utc>>,
848 pub backup_retention_days: i32,
849 pub backup_directory: String,
850}