1use crate::models::FieldMapping;
4use anyhow::Result;
5use serde_json::{Map, Value};
6
7#[derive(Debug, Clone)]
9pub enum KvDataType {
10 String,
12 Hash,
14 List,
16 Set,
18 SortedSet,
20 Json,
22}
23
24#[derive(Debug, Clone)]
26pub struct KvConversionConfig {
27 pub primary_key_field: String,
29 pub key_prefix: String,
31 pub key_template: String,
33 pub ttl_seconds: u64,
35 pub json_mode: JsonMode,
37 pub field_merge_strategy: FieldMergeStrategy,
39}
40
41impl Default for KvConversionConfig {
42 fn default() -> Self {
43 Self {
44 primary_key_field: "id".to_string(),
45 key_prefix: "db:".to_string(),
46 key_template: "{prefix}{pk_value}".to_string(),
47 ttl_seconds: 0,
48 json_mode: JsonMode::Flatten,
49 field_merge_strategy: FieldMergeStrategy::Json,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum JsonMode {
56 Flatten,
58 Nested,
60 Mixed,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum FieldMergeStrategy {
66 Json,
68 Hash,
70 Separate,
72}
73
74pub struct RedisConverter {
76 config: KvConversionConfig,
77}
78
79impl RedisConverter {
80 pub fn new(config: KvConversionConfig) -> Self {
81 Self { config }
82 }
83
84 pub fn with_default_config() -> Self {
85 Self::new(KvConversionConfig::default())
86 }
87
88 pub fn convert_to_redis(
90 &self,
91 table_name: &str,
92 data: &Map<String, Value>,
93 mappings: &[FieldMapping],
94 ) -> Result<Vec<RedisCommand>> {
95 let mut commands = Vec::new();
96
97 let pk_value = self.get_field_value(data, &self.config.primary_key_field)?;
99
100 let key = self.generate_key(table_name, &pk_value)?;
102
103 match self.config.field_merge_strategy {
105 FieldMergeStrategy::Json => {
106 let json_value = self.convert_to_json(data, mappings)?;
107 commands.push(RedisCommand::Set {
108 key: key.clone(),
109 value: json_value,
110 ttl: self.config.ttl_seconds,
111 });
112 }
113 FieldMergeStrategy::Hash => {
114 let hash_fields = self.convert_to_hash(data, mappings)?;
115 commands.push(RedisCommand::HSet {
116 key: key.clone(),
117 fields: hash_fields,
118 });
119 if self.config.ttl_seconds > 0 {
120 commands.push(RedisCommand::Expire {
121 key: key,
122 seconds: self.config.ttl_seconds,
123 });
124 }
125 }
126 FieldMergeStrategy::Separate => {
127 for mapping in mappings {
128 if let Some(value) = data.get(&mapping.source_field) {
129 let field_key = format!("{}:{}", key, mapping.target_field);
130 commands.push(RedisCommand::Set {
131 key: field_key,
132 value: value.to_string(),
133 ttl: self.config.ttl_seconds,
134 });
135 }
136 }
137 }
138 }
139
140 Ok(commands)
141 }
142
143 fn convert_to_json(&self, data: &Map<String, Value>, mappings: &[FieldMapping]) -> Result<String> {
145 let mut target_data = Map::new();
146
147 for mapping in mappings {
148 if let Some(value) = data.get(&mapping.source_field) {
149 target_data.insert(mapping.target_field.clone(), value.clone());
150 }
151 }
152
153 Ok(serde_json::to_string(&target_data)?)
154 }
155
156 fn convert_to_hash(&self, data: &Map<String, Value>, mappings: &[FieldMapping]) -> Result<Vec<(String, String)>> {
158 let mut fields = Vec::new();
159
160 for mapping in mappings {
161 if let Some(value) = data.get(&mapping.source_field) {
162 fields.push((mapping.target_field.clone(), value.to_string()));
163 }
164 }
165
166 Ok(fields)
167 }
168
169 fn get_field_value(&self, data: &Map<String, Value>, field_name: &str) -> Result<String> {
171 data.get(field_name)
172 .map(|v| v.to_string())
173 .ok_or_else(|| anyhow::anyhow!("Field '{}' not found", field_name))
174 }
175
176 fn generate_key(&self, table_name: &str, pk_value: &str) -> Result<String> {
178 let key = self.config.key_template
179 .replace("{prefix}", &self.config.key_prefix)
180 .replace("{table}", table_name)
181 .replace("{pk}", &self.config.primary_key_field)
182 .replace("{pk_value}", pk_value);
183
184 Ok(key)
185 }
186
187 pub fn generate_commands(&self, table_name: &str, data_list: &[Map<String, Value>], mappings: &[FieldMapping]) -> Result<Vec<RedisCommand>> {
189 let mut all_commands = Vec::new();
190
191 for data in data_list {
192 let commands = self.convert_to_redis(table_name, data, mappings)?;
193 all_commands.extend(commands);
194 }
195
196 Ok(all_commands)
197 }
198}
199
200#[derive(Debug, Clone)]
202pub enum RedisCommand {
203 Set { key: String, value: String, ttl: u64 },
204 HSet { key: String, fields: Vec<(String, String)> },
205 Expire { key: String, seconds: u64 },
206 Del { key: String },
207}
208
209impl RedisCommand {
210 pub fn to_redis_proto(&self) -> String {
212 match self {
213 RedisCommand::Set { key, value, .. } => {
214 format!("*3\r\n$3\r\nSET\r\n${}\r\n{}\r\n${}\r\n{}",
215 key.len(), key, value.len(), value)
216 }
217 RedisCommand::HSet { key, fields } => {
218 let mut cmd = format!("*3\r\n$4\r\nHSET\r\n${}\r\n{}\r\n", key.len(), key);
219 for (field, value) in fields {
220 cmd.push_str(&format!("${}\r\n{}\r\n${}\r\n{}\r\n",
221 field.len(), field, value.len(), value));
222 }
223 cmd
224 }
225 RedisCommand::Expire { key, seconds } => {
226 format!("*3\r\n$6\r\nEXPIRE\r\n${}\r\n{}\r\n${}\r\n{}\r\n",
227 key.len(), key, seconds.to_string().len(), seconds)
228 }
229 RedisCommand::Del { key } => {
230 format!("*2\r\n$3\r\nDEL\r\n${}\r\n{}\r\n",
231 key.len(), key)
232 }
233 }
234 }
235}
236
237pub struct MongoDbConverter {
239 config: KvConversionConfig,
240}
241
242impl MongoDbConverter {
243 pub fn new(config: KvConversionConfig) -> Self {
244 Self { config }
245 }
246
247 pub fn with_default_config() -> Self {
248 Self::new(KvConversionConfig::default())
249 }
250
251 pub fn convert_to_mongo(
253 &self,
254 collection_name: &str,
255 data: &Map<String, Value>,
256 mappings: &[FieldMapping],
257 ) -> Result<MongoOperation> {
258 let mut document = Map::new();
259
260 for mapping in mappings {
261 if let Some(value) = data.get(&mapping.source_field) {
262 document.insert(mapping.target_field.clone(), value.clone());
263 }
264 }
265
266 Ok(MongoOperation::InsertOne {
267 collection: collection_name.to_string(),
268 document,
269 })
270 }
271
272 pub fn convert_batch(
274 &self,
275 collection_name: &str,
276 data_list: &[Map<String, Value>],
277 mappings: &[FieldMapping],
278 ) -> Result<Vec<MongoOperation>> {
279 let mut operations = Vec::new();
280
281 for data in data_list {
282 let op = self.convert_to_mongo(collection_name, data, mappings)?;
283 operations.push(op);
284 }
285
286 Ok(operations)
287 }
288}
289
290#[derive(Debug, Clone)]
292pub enum MongoOperation {
293 InsertOne { collection: String, document: Map<String, Value> },
294 InsertMany { collection: String, documents: Vec<Map<String, Value>> },
295 UpdateOne { collection: String, filter: Map<String, Value>, update: Map<String, Value> },
296 UpdateMany { collection: String, filter: Map<String, Value>, update: Map<String, Value> },
297 DeleteOne { collection: String, filter: Map<String, Value> },
298 DeleteMany { collection: String, filter: Map<String, Value> },
299}
300
301pub trait KvStoreConverter {
303 fn convert(&self, table_name: &str, data: &Map<String, Value>, mappings: &[FieldMapping]) -> Result<KvConversionResult>;
304}
305
306#[derive(Debug, Clone)]
308pub struct KvConversionResult {
309 pub key: String,
310 pub value: Value,
311 pub ttl_seconds: Option<u64>,
312 pub metadata: Option<Map<String, Value>>,
313}
314
315impl KvConversionResult {
316 pub fn new(key: String, value: Value) -> Self {
317 Self {
318 key,
319 value,
320 ttl_seconds: None,
321 metadata: None,
322 }
323 }
324
325 pub fn with_ttl(mut self, ttl: u64) -> Self {
326 self.ttl_seconds = Some(ttl);
327 self
328 }
329
330 pub fn with_metadata(mut self, metadata: Map<String, Value>) -> Self {
331 self.metadata = Some(metadata);
332 self
333 }
334
335 pub fn to_redis_command(&self) -> RedisCommand {
337 RedisCommand::Set {
338 key: self.key.clone(),
339 value: serde_json::to_string(&self.value).unwrap_or_default(),
340 ttl: self.ttl_seconds.unwrap_or(0),
341 }
342 }
343
344 pub fn to_mongo_document(&self) -> Map<String, Value> {
346 let mut doc = Map::new();
347 doc.insert("_id".to_string(), Value::String(self.key.clone()));
348 doc.insert("data".to_string(), self.value.clone());
349
350 if let Some(ref metadata) = self.metadata {
351 for (k, v) in metadata {
352 doc.insert(k.clone(), v.clone());
353 }
354 }
355
356 doc
357 }
358}
359
360pub struct BatchKvConverter {
362 converters: Vec<Box<dyn KvStoreConverter>>,
363}
364
365impl BatchKvConverter {
366 pub fn new() -> Self {
367 Self {
368 converters: Vec::new(),
369 }
370 }
371
372 pub fn add_converter<C: KvStoreConverter + 'static>(&mut self, converter: C) {
373 self.converters.push(Box::new(converter));
374 }
375
376 pub fn convert_batch(
378 &self,
379 table_name: &str,
380 data_list: &[Map<String, Value>],
381 mappings: &[FieldMapping],
382 ) -> Result<Vec<KvConversionResult>> {
383 let mut results = Vec::new();
384
385 for converter in &self.converters {
386 for data in data_list {
387 let result = converter.convert(table_name, data, mappings)?;
388 results.push(result);
389 }
390 }
391
392 Ok(results)
393 }
394}
395
396impl Default for BatchKvConverter {
397 fn default() -> Self {
398 Self::new()
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn test_redis_converter_basic() {
408 let converter = RedisConverter::with_default_config();
409
410 let mut data = Map::new();
411 data.insert("id".to_string(), Value::Number(1.into()));
412 data.insert("name".to_string(), Value::String("Alice".to_string()));
413 data.insert("email".to_string(), Value::String("alice@example.com".to_string()));
414
415 let mappings = vec![
416 FieldMapping {
417 source_table: "users".to_string(),
418 source_field: "id".to_string(),
419 target_table: "users".to_string(),
420 target_field: "id".to_string(),
421 },
422 FieldMapping {
423 source_table: "users".to_string(),
424 source_field: "name".to_string(),
425 target_table: "users".to_string(),
426 target_field: "name".to_string(),
427 },
428 ];
429
430 let commands = converter.convert_to_redis("users", &data, &mappings).unwrap();
431
432 assert!(!commands.is_empty());
433 println!("生成的 Redis 命令: {:?}", commands);
434 }
435
436 #[test]
437 fn test_key_generation() {
438 let config = KvConversionConfig {
439 primary_key_field: "id".to_string(),
440 key_prefix: "user:".to_string(),
441 key_template: "{prefix}{pk_value}".to_string(),
442 ttl_seconds: 3600,
443 json_mode: JsonMode::Flatten,
444 field_merge_strategy: FieldMergeStrategy::Json,
445 };
446
447 let converter = RedisConverter::new(config);
448
449 let mut data = Map::new();
450 data.insert("id".to_string(), Value::Number(123.into()));
451
452 let key = converter.generate_key("users", "123").unwrap();
453 assert_eq!(key, "user:123");
454
455 println!("生成的 Key: {}", key);
456 }
457
458 #[test]
459 fn test_mongodb_converter() {
460 let converter = MongoDbConverter::with_default_config();
461
462 let mut data = Map::new();
463 data.insert("id".to_string(), Value::Number(1.into()));
464 data.insert("name".to_string(), Value::String("Bob".to_string()));
465
466 let mappings = vec![
467 FieldMapping {
468 source_table: "users".to_string(),
469 source_field: "id".to_string(),
470 target_table: "users".to_string(),
471 target_field: "_id".to_string(),
472 },
473 ];
474
475 let op = converter.convert_to_mongo("users", &data, &mappings).unwrap();
476
477 match &op {
478 MongoOperation::InsertOne { collection, document } => {
479 assert_eq!(collection, "users");
480 assert!(document.contains_key("_id"));
481 }
482 _ => panic!("Expected InsertOne"),
483 }
484
485 println!("生成的 MongoDB 操作: {:?}", op);
486 }
487}