Skip to main content

sqltool/core/
kv_adapter.rs

1/// Key-Value 存储适配器 - 处理 Redis、MongoDB 等非关系型数据库
2
3use crate::models::FieldMapping;
4use anyhow::Result;
5use serde_json::{Map, Value};
6
7/// Key-Value 存储数据类型
8#[derive(Debug, Clone)]
9pub enum KvDataType {
10    /// 字符串
11    String,
12    /// 哈希
13    Hash,
14    /// 列表
15    List,
16    /// 集合
17    Set,
18    /// 有序集合
19    SortedSet,
20    /// JSON
21    Json,
22}
23
24/// Key-Value 存储转换配置
25#[derive(Debug, Clone)]
26pub struct KvConversionConfig {
27    /// 主键字段(将作为 Redis Key 的基础)
28    pub primary_key_field: String,
29    /// Key 前缀
30    pub key_prefix: String,
31    /// Key 格式模板
32    pub key_template: String,
33    /// 过期时间(秒),0 表示不过期
34    pub ttl_seconds: u64,
35    /// JSON 序列化模式
36    pub json_mode: JsonMode,
37    /// 字段合并策略
38    pub field_merge_strategy: FieldMergeStrategy,
39}
40
41impl Default for KvConversionConfig {
42    fn default() -> Self {
43        Self {
44            primary_key_field: "id".to_string(),
45            key_prefix: "db:".to_string(),
46            key_template: "{prefix}{pk_value}".to_string(),
47            ttl_seconds: 0,
48            json_mode: JsonMode::Flatten,
49            field_merge_strategy: FieldMergeStrategy::Json,
50        }
51    }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum JsonMode {
56    /// 扁平化:所有字段平铺
57    Flatten,
58    /// 嵌套:保留原有结构
59    Nested,
60    /// 混合:指定字段嵌套
61    Mixed,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum FieldMergeStrategy {
66    /// JSON:所有字段合并为一个 JSON
67    Json,
68    /// 哈希:使用 Redis Hash 存储
69    Hash,
70    /// 单独键:每个字段单独作为键
71    Separate,
72}
73
74/// Redis 转换器
75pub struct RedisConverter {
76    config: KvConversionConfig,
77}
78
79impl RedisConverter {
80    pub fn new(config: KvConversionConfig) -> Self {
81        Self { config }
82    }
83
84    pub fn with_default_config() -> Self {
85        Self::new(KvConversionConfig::default())
86    }
87
88    /// 将关系型数据转换为 Redis 格式
89    pub fn convert_to_redis(
90        &self,
91        table_name: &str,
92        data: &Map<String, Value>,
93        mappings: &[FieldMapping],
94    ) -> Result<Vec<RedisCommand>> {
95        let mut commands = Vec::new();
96        
97        // 获取主键值
98        let pk_value = self.get_field_value(data, &self.config.primary_key_field)?;
99        
100        // 生成 Key
101        let key = self.generate_key(table_name, &pk_value)?;
102        
103        // 根据字段合并策略生成命令
104        match self.config.field_merge_strategy {
105            FieldMergeStrategy::Json => {
106                let json_value = self.convert_to_json(data, mappings)?;
107                commands.push(RedisCommand::Set {
108                    key: key.clone(),
109                    value: json_value,
110                    ttl: self.config.ttl_seconds,
111                });
112            }
113            FieldMergeStrategy::Hash => {
114                let hash_fields = self.convert_to_hash(data, mappings)?;
115                commands.push(RedisCommand::HSet {
116                    key: key.clone(),
117                    fields: hash_fields,
118                });
119                if self.config.ttl_seconds > 0 {
120                    commands.push(RedisCommand::Expire {
121                        key: key,
122                        seconds: self.config.ttl_seconds,
123                    });
124                }
125            }
126            FieldMergeStrategy::Separate => {
127                for mapping in mappings {
128                    if let Some(value) = data.get(&mapping.source_field) {
129                        let field_key = format!("{}:{}", key, mapping.target_field);
130                        commands.push(RedisCommand::Set {
131                            key: field_key,
132                            value: value.to_string(),
133                            ttl: self.config.ttl_seconds,
134                        });
135                    }
136                }
137            }
138        }
139        
140        Ok(commands)
141    }
142
143    /// 转换为 JSON 字符串
144    fn convert_to_json(&self, data: &Map<String, Value>, mappings: &[FieldMapping]) -> Result<String> {
145        let mut target_data = Map::new();
146        
147        for mapping in mappings {
148            if let Some(value) = data.get(&mapping.source_field) {
149                target_data.insert(mapping.target_field.clone(), value.clone());
150            }
151        }
152        
153        Ok(serde_json::to_string(&target_data)?)
154    }
155
156    /// 转换为 Redis Hash
157    fn convert_to_hash(&self, data: &Map<String, Value>, mappings: &[FieldMapping]) -> Result<Vec<(String, String)>> {
158        let mut fields = Vec::new();
159        
160        for mapping in mappings {
161            if let Some(value) = data.get(&mapping.source_field) {
162                fields.push((mapping.target_field.clone(), value.to_string()));
163            }
164        }
165        
166        Ok(fields)
167    }
168
169    /// 获取字段值
170    fn get_field_value(&self, data: &Map<String, Value>, field_name: &str) -> Result<String> {
171        data.get(field_name)
172            .map(|v| v.to_string())
173            .ok_or_else(|| anyhow::anyhow!("Field '{}' not found", field_name))
174    }
175
176    /// 生成 Key
177    fn generate_key(&self, table_name: &str, pk_value: &str) -> Result<String> {
178        let key = self.config.key_template
179            .replace("{prefix}", &self.config.key_prefix)
180            .replace("{table}", table_name)
181            .replace("{pk}", &self.config.primary_key_field)
182            .replace("{pk_value}", pk_value);
183        
184        Ok(key)
185    }
186
187    /// 生成 Redis 命令列表
188    pub fn generate_commands(&self, table_name: &str, data_list: &[Map<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<RedisCommand>> {
189        let mut all_commands = Vec::new();
190        
191        for data in data_list {
192            let commands = self.convert_to_redis(table_name, data, mappings)?;
193            all_commands.extend(commands);
194        }
195        
196        Ok(all_commands)
197    }
198}
199
200/// Redis 命令
201#[derive(Debug, Clone)]
202pub enum RedisCommand {
203    Set { key: String, value: String, ttl: u64 },
204    HSet { key: String, fields: Vec<(String, String)> },
205    Expire { key: String, seconds: u64 },
206    Del { key: String },
207}
208
209impl RedisCommand {
210    /// 转换为 Redis 协议格式
211    pub fn to_redis_proto(&self) -> String {
212        match self {
213            RedisCommand::Set { key, value, .. } => {
214                format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n${}\r\n{}", 
215                    key.len(), key, value.len(), value)
216            }
217            RedisCommand::HSet { key, fields } => {
218                let mut cmd = format!("*3\r\n$4\r\nHSET\r\n${}\r\n{}\r\n", key.len(), key);
219                for (field, value) in fields {
220                    cmd.push_str(&format!("${}\r\n{}\r\n${}\r\n{}\r\n", 
221                        field.len(), field, value.len(), value));
222                }
223                cmd
224            }
225            RedisCommand::Expire { key, seconds } => {
226                format!("*3\r\n$6\r\nEXPIRE\r\n${}\r\n{}\r\n${}\r\n{}\r\n",
227                    key.len(), key, seconds.to_string().len(), seconds)
228            }
229            RedisCommand::Del { key } => {
230                format!("*2\r\n$3\r\nDEL\r\n${}\r\n{}\r\n",
231                    key.len(), key)
232            }
233        }
234    }
235}
236
237/// MongoDB 转换器
238pub struct MongoDbConverter {
239    config: KvConversionConfig,
240}
241
242impl MongoDbConverter {
243    pub fn new(config: KvConversionConfig) -> Self {
244        Self { config }
245    }
246
247    pub fn with_default_config() -> Self {
248        Self::new(KvConversionConfig::default())
249    }
250
251    /// 将关系型数据转换为 MongoDB 文档
252    pub fn convert_to_mongo(
253        &self,
254        collection_name: &str,
255        data: &Map<String, Value>,
256        mappings: &[FieldMapping],
257    ) -> Result<MongoOperation> {
258        let mut document = Map::new();
259        
260        for mapping in mappings {
261            if let Some(value) = data.get(&mapping.source_field) {
262                document.insert(mapping.target_field.clone(), value.clone());
263            }
264        }
265        
266        Ok(MongoOperation::InsertOne {
267            collection: collection_name.to_string(),
268            document,
269        })
270    }
271
272    /// 生成批量插入操作
273    pub fn convert_batch(
274        &self,
275        collection_name: &str,
276        data_list: &[Map<String, Value>],
277        mappings: &[FieldMapping],
278    ) -> Result<Vec<MongoOperation>> {
279        let mut operations = Vec::new();
280        
281        for data in data_list {
282            let op = self.convert_to_mongo(collection_name, data, mappings)?;
283            operations.push(op);
284        }
285        
286        Ok(operations)
287    }
288}
289
290/// MongoDB 操作
291#[derive(Debug, Clone)]
292pub enum MongoOperation {
293    InsertOne { collection: String, document: Map<String, Value> },
294    InsertMany { collection: String, documents: Vec<Map<String, Value>> },
295    UpdateOne { collection: String, filter: Map<String, Value>, update: Map<String, Value> },
296    UpdateMany { collection: String, filter: Map<String, Value>, update: Map<String, Value> },
297    DeleteOne { collection: String, filter: Map<String, Value> },
298    DeleteMany { collection: String, filter: Map<String, Value> },
299}
300
301/// Key-Value 存储统一接口
302pub trait KvStoreConverter {
303    fn convert(&self, table_name: &str, data: &Map<String, Value>, mappings: &[FieldMapping]) -> Result<KvConversionResult>;
304}
305
306/// 转换结果
307#[derive(Debug, Clone)]
308pub struct KvConversionResult {
309    pub key: String,
310    pub value: Value,
311    pub ttl_seconds: Option<u64>,
312    pub metadata: Option<Map<String, Value>>,
313}
314
315impl KvConversionResult {
316    pub fn new(key: String, value: Value) -> Self {
317        Self {
318            key,
319            value,
320            ttl_seconds: None,
321            metadata: None,
322        }
323    }
324
325    pub fn with_ttl(mut self, ttl: u64) -> Self {
326        self.ttl_seconds = Some(ttl);
327        self
328    }
329
330    pub fn with_metadata(mut self, metadata: Map<String, Value>) -> Self {
331        self.metadata = Some(metadata);
332        self
333    }
334
335    /// 转换为 Redis 命令
336    pub fn to_redis_command(&self) -> RedisCommand {
337        RedisCommand::Set {
338            key: self.key.clone(),
339            value: serde_json::to_string(&self.value).unwrap_or_default(),
340            ttl: self.ttl_seconds.unwrap_or(0),
341        }
342    }
343
344    /// 转换为 MongoDB 文档
345    pub fn to_mongo_document(&self) -> Map<String, Value> {
346        let mut doc = Map::new();
347        doc.insert("_id".to_string(), Value::String(self.key.clone()));
348        doc.insert("data".to_string(), self.value.clone());
349        
350        if let Some(ref metadata) = self.metadata {
351            for (k, v) in metadata {
352                doc.insert(k.clone(), v.clone());
353            }
354        }
355        
356        doc
357    }
358}
359
360/// 批量转换器
361pub struct BatchKvConverter {
362    converters: Vec<Box<dyn KvStoreConverter>>,
363}
364
365impl BatchKvConverter {
366    pub fn new() -> Self {
367        Self {
368            converters: Vec::new(),
369        }
370    }
371
372    pub fn add_converter<C: KvStoreConverter + 'static>(&mut self, converter: C) {
373        self.converters.push(Box::new(converter));
374    }
375
376    /// 批量转换数据
377    pub fn convert_batch(
378        &self,
379        table_name: &str,
380        data_list: &[Map<String, Value>],
381        mappings: &[FieldMapping],
382    ) -> Result<Vec<KvConversionResult>> {
383        let mut results = Vec::new();
384        
385        for converter in &self.converters {
386            for data in data_list {
387                let result = converter.convert(table_name, data, mappings)?;
388                results.push(result);
389            }
390        }
391        
392        Ok(results)
393    }
394}
395
396impl Default for BatchKvConverter {
397    fn default() -> Self {
398        Self::new()
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_redis_converter_basic() {
408        let converter = RedisConverter::with_default_config();
409        
410        let mut data = Map::new();
411        data.insert("id".to_string(), Value::Number(1.into()));
412        data.insert("name".to_string(), Value::String("Alice".to_string()));
413        data.insert("email".to_string(), Value::String("alice@example.com".to_string()));
414        
415        let mappings = vec![
416            FieldMapping {
417                source_table: "users".to_string(),
418                source_field: "id".to_string(),
419                target_table: "users".to_string(),
420                target_field: "id".to_string(),
421            },
422            FieldMapping {
423                source_table: "users".to_string(),
424                source_field: "name".to_string(),
425                target_table: "users".to_string(),
426                target_field: "name".to_string(),
427            },
428        ];
429        
430        let commands = converter.convert_to_redis("users", &data, &mappings).unwrap();
431        
432        assert!(!commands.is_empty());
433        println!("生成的 Redis 命令: {:?}", commands);
434    }
435
436    #[test]
437    fn test_key_generation() {
438        let config = KvConversionConfig {
439            primary_key_field: "id".to_string(),
440            key_prefix: "user:".to_string(),
441            key_template: "{prefix}{pk_value}".to_string(),
442            ttl_seconds: 3600,
443            json_mode: JsonMode::Flatten,
444            field_merge_strategy: FieldMergeStrategy::Json,
445        };
446        
447        let converter = RedisConverter::new(config);
448        
449        let mut data = Map::new();
450        data.insert("id".to_string(), Value::Number(123.into()));
451        
452        let key = converter.generate_key("users", "123").unwrap();
453        assert_eq!(key, "user:123");
454        
455        println!("生成的 Key: {}", key);
456    }
457
458    #[test]
459    fn test_mongodb_converter() {
460        let converter = MongoDbConverter::with_default_config();
461        
462        let mut data = Map::new();
463        data.insert("id".to_string(), Value::Number(1.into()));
464        data.insert("name".to_string(), Value::String("Bob".to_string()));
465        
466        let mappings = vec![
467            FieldMapping {
468                source_table: "users".to_string(),
469                source_field: "id".to_string(),
470                target_table: "users".to_string(),
471                target_field: "_id".to_string(),
472            },
473        ];
474        
475        let op = converter.convert_to_mongo("users", &data, &mappings).unwrap();
476
477        match &op {
478            MongoOperation::InsertOne { collection, document } => {
479                assert_eq!(collection, "users");
480                assert!(document.contains_key("_id"));
481            }
482            _ => panic!("Expected InsertOne"),
483        }
484
485        println!("生成的 MongoDB 操作: {:?}", op);
486    }
487}